implemented streaming request/response streaming

added metadata to cache values

added cache servlet for comparison
This commit is contained in:
2025-02-19 22:36:31 +08:00
parent 0463038aaa
commit f048a60540
66 changed files with 2474 additions and 1078 deletions

View File

@@ -1,30 +0,0 @@
package net.woggioni.rbcs.server
import io.netty.channel.ChannelHandlerContext
import org.slf4j.Logger
import java.net.InetSocketAddress
inline fun Logger.trace(ctx : ChannelHandlerContext, messageBuilder : () -> String) {
log(this, ctx, { isTraceEnabled }, { trace(it) } , messageBuilder)
}
inline fun Logger.debug(ctx : ChannelHandlerContext, messageBuilder : () -> String) {
log(this, ctx, { isDebugEnabled }, { debug(it) } , messageBuilder)
}
inline fun Logger.info(ctx : ChannelHandlerContext, messageBuilder : () -> String) {
log(this, ctx, { isInfoEnabled }, { info(it) } , messageBuilder)
}
inline fun Logger.warn(ctx : ChannelHandlerContext, messageBuilder : () -> String) {
log(this, ctx, { isWarnEnabled }, { warn(it) } , messageBuilder)
}
inline fun Logger.error(ctx : ChannelHandlerContext, messageBuilder : () -> String) {
log(this, ctx, { isErrorEnabled }, { error(it) } , messageBuilder)
}
inline fun log(log : Logger, ctx : ChannelHandlerContext,
filter : Logger.() -> Boolean,
loggerMethod : Logger.(String) -> Unit, messageBuilder : () -> String) {
if(log.filter()) {
val clientAddress = (ctx.channel().remoteAddress() as InetSocketAddress).address.hostAddress
log.loggerMethod(clientAddress + " - " + messageBuilder())
}
}

View File

@@ -37,7 +37,7 @@ import net.woggioni.rbcs.common.PasswordSecurity.decodePasswordHash
import net.woggioni.rbcs.common.PasswordSecurity.hashPassword
import net.woggioni.rbcs.common.RBCS.toUrl
import net.woggioni.rbcs.common.Xml
import net.woggioni.rbcs.common.contextLogger
import net.woggioni.rbcs.common.createLogger
import net.woggioni.rbcs.common.debug
import net.woggioni.rbcs.common.info
import net.woggioni.rbcs.server.auth.AbstractNettyHttpAuthenticator
@@ -47,7 +47,10 @@ import net.woggioni.rbcs.server.auth.RoleAuthorizer
import net.woggioni.rbcs.server.configuration.Parser
import net.woggioni.rbcs.server.configuration.Serializer
import net.woggioni.rbcs.server.exception.ExceptionHandler
import net.woggioni.rbcs.server.handler.MaxRequestSizeHandler
import net.woggioni.rbcs.server.handler.ServerHandler
import net.woggioni.rbcs.server.handler.TraceHandler
import net.woggioni.rbcs.server.throttling.BucketManager
import net.woggioni.rbcs.server.throttling.ThrottlingHandler
import java.io.OutputStream
import java.net.InetSocketAddress
@@ -56,19 +59,23 @@ import java.nio.file.Path
import java.security.KeyStore
import java.security.PrivateKey
import java.security.cert.X509Certificate
import java.time.Duration
import java.time.Instant
import java.util.Arrays
import java.util.Base64
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.regex.Matcher
import java.util.regex.Pattern
import javax.naming.ldap.LdapName
import javax.net.ssl.SSLPeerUnverifiedException
class RemoteBuildCacheServer(private val cfg: Configuration) {
private val log = contextLogger()
companion object {
private val log = createLogger<RemoteBuildCacheServer>()
val userAttribute: AttributeKey<Configuration.User> = AttributeKey.valueOf("user")
val groupAttribute: AttributeKey<Set<Configuration.Group>> = AttributeKey.valueOf("group")
@@ -142,7 +149,9 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
private class NettyHttpBasicAuthenticator(
private val users: Map<String, Configuration.User>, authorizer: Authorizer
) : AbstractNettyHttpAuthenticator(authorizer) {
private val log = contextLogger()
companion object {
private val log = createLogger<NettyHttpBasicAuthenticator>()
}
override fun authenticate(ctx: ChannelHandlerContext, req: HttpRequest): AuthenticationResult? {
val authorizationHeader = req.headers()[HttpHeaderNames.AUTHORIZATION] ?: let {
@@ -242,13 +251,13 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
}
return keystore
}
private val log = createLogger<ServerInitializer>()
}
private val log = contextLogger()
private val cacheHandlerFactory = cfg.cache.materialize()
private val cache = cfg.cache.materialize()
private val exceptionHandler = ExceptionHandler()
private val bucketManager = BucketManager.from(cfg)
private val authenticator = when (val auth = cfg.authentication) {
is Configuration.BasicAuthentication -> NettyHttpBasicAuthenticator(cfg.users, RoleAuthorizer())
@@ -359,59 +368,94 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
pipeline.addLast(SSL_HANDLER_NAME, it)
}
pipeline.addLast(HttpServerCodec())
pipeline.addLast(MaxRequestSizeHandler.NAME, MaxRequestSizeHandler(cfg.connection.maxRequestSize))
pipeline.addLast(HttpChunkContentCompressor(1024))
pipeline.addLast(ChunkedWriteHandler())
authenticator?.let {
pipeline.addLast(it)
}
pipeline.addLast(ThrottlingHandler(cfg))
pipeline.addLast(ThrottlingHandler(bucketManager, cfg.connection))
val serverHandler = let {
val prefix = Path.of("/").resolve(Path.of(cfg.serverPath ?: "/"))
ServerHandler(cache, prefix)
ServerHandler(prefix)
}
pipeline.addLast(eventExecutorGroup, serverHandler)
pipeline.addLast(exceptionHandler)
pipeline.addLast(eventExecutorGroup, ServerHandler.NAME, serverHandler)
pipeline.addLast(cacheHandlerFactory.newHandler())
pipeline.addLast(TraceHandler)
pipeline.addLast(ExceptionHandler)
}
override fun close() {
cache.close()
cacheHandlerFactory.close()
}
}
class ServerHandle(
httpChannelFuture: ChannelFuture,
closeFuture: ChannelFuture,
private val bossGroup: EventExecutorGroup,
private val executorGroups: Iterable<EventExecutorGroup>,
private val serverInitializer: AutoCloseable
) : AutoCloseable {
private val httpChannel: Channel = httpChannelFuture.channel()
private val closeFuture: ChannelFuture = httpChannel.closeFuture()
private val log = contextLogger()
private val serverInitializer: AutoCloseable,
) : Future<Void> by from(closeFuture, executorGroups, serverInitializer) {
fun shutdown(): Future<Void> {
return httpChannel.close()
}
companion object {
private val log = createLogger<ServerHandle>()
override fun close() {
try {
closeFuture.sync()
} catch (ex: Throwable) {
log.error(ex.message, ex)
}
try {
serverInitializer.close()
} catch (ex: Throwable) {
log.error(ex.message, ex)
}
executorGroups.forEach {
try {
it.shutdownGracefully().sync()
} catch (ex: Throwable) {
log.error(ex.message, ex)
private fun from(
closeFuture: ChannelFuture,
executorGroups: Iterable<EventExecutorGroup>,
serverInitializer: AutoCloseable
): CompletableFuture<Void> {
val result = CompletableFuture<Void>()
closeFuture.addListener {
val errors = mutableListOf<Throwable>()
val deadline = Instant.now().plusSeconds(20)
for (executorGroup in executorGroups) {
val future = executorGroup.terminationFuture()
try {
val now = Instant.now()
if (now > deadline) {
future.get(0, TimeUnit.SECONDS)
} else {
future.get(Duration.between(now, deadline).toMillis(), TimeUnit.MILLISECONDS)
}
}
catch (te: TimeoutException) {
errors.addLast(te)
log.warn("Timeout while waiting for shutdown of $executorGroup", te)
} catch (ex: Throwable) {
log.warn(ex.message, ex)
errors.addLast(ex)
}
}
try {
serverInitializer.close()
} catch (ex: Throwable) {
log.error(ex.message, ex)
errors.addLast(ex)
}
if(errors.isEmpty()) {
result.complete(null)
} else {
result.completeExceptionally(errors.first())
}
}
return result.thenAccept {
log.info {
"RemoteBuildCacheServer has been gracefully shut down"
}
}
}
log.info {
"RemoteBuildCacheServer has been gracefully shut down"
}
fun sendShutdownSignal() {
bossGroup.shutdownGracefully()
executorGroups.map {
it.shutdownGracefully()
}
}
}
@@ -442,10 +486,16 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
// Bind and start to accept incoming connections.
val bindAddress = InetSocketAddress(cfg.host, cfg.port)
val httpChannel = bootstrap.bind(bindAddress).sync()
val httpChannel = bootstrap.bind(bindAddress).sync().channel()
log.info {
"RemoteBuildCacheServer is listening on ${cfg.host}:${cfg.port}"
}
return ServerHandle(httpChannel, setOf(bossGroup, workerGroup, eventExecutorGroup), serverInitializer)
return ServerHandle(
httpChannel.closeFuture(),
bossGroup,
setOf(workerGroup, eventExecutorGroup),
serverInitializer
)
}
}

View File

@@ -1,42 +1,33 @@
package net.woggioni.rbcs.server.cache
import io.netty.buffer.ByteBufAllocator
import net.woggioni.jwo.JWO
import net.woggioni.rbcs.api.Cache
import net.woggioni.rbcs.api.RequestHandle
import net.woggioni.rbcs.api.ResponseHandle
import net.woggioni.rbcs.api.event.RequestStreamingEvent
import net.woggioni.rbcs.api.event.ResponseStreamingEvent
import net.woggioni.rbcs.common.ByteBufOutputStream
import net.woggioni.rbcs.common.RBCS.digestString
import net.woggioni.rbcs.common.contextLogger
import net.woggioni.rbcs.common.extractChunk
import net.woggioni.rbcs.api.CacheValueMetadata
import net.woggioni.rbcs.common.createLogger
import java.io.ByteArrayOutputStream
import java.io.InputStream
import java.io.ObjectInputStream
import java.io.ObjectOutputStream
import java.io.Serializable
import java.nio.ByteBuffer
import java.nio.channels.Channels
import java.nio.channels.FileChannel
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardCopyOption
import java.nio.file.StandardOpenOption
import java.nio.file.attribute.BasicFileAttributes
import java.security.MessageDigest
import java.time.Duration
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.InflaterInputStream
class FileSystemCache(
val root: Path,
val maxAge: Duration,
val digestAlgorithm: String?,
val compressionEnabled: Boolean,
val compressionLevel: Int,
val chunkSize: Int
) : Cache {
val maxAge: Duration
) : AutoCloseable {
class EntryValue(val metadata: CacheValueMetadata, val channel : FileChannel, val offset : Long, val size : Long) : Serializable
private companion object {
@JvmStatic
private val log = contextLogger()
private val log = createLogger<FileSystemCache>()
}
init {
@@ -48,111 +39,77 @@ class FileSystemCache(
private var nextGc = Instant.now()
override fun get(key: String, responseHandle: ResponseHandle, alloc: ByteBufAllocator) {
(digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digestString(key.toByteArray(), md)
} ?: key).let { digest ->
root.resolve(digest).takeIf(Files::exists)
?.let { file ->
file.takeIf(Files::exists)?.let { file ->
responseHandle.handleEvent(ResponseStreamingEvent.RESPONSE_RECEIVED)
if (compressionEnabled) {
val compositeBuffer = alloc.compositeBuffer()
ByteBufOutputStream(compositeBuffer).use { outputStream ->
InflaterInputStream(Files.newInputStream(file)).use { inputStream ->
val ioBuffer = alloc.buffer(chunkSize)
try {
while (true) {
val read = ioBuffer.writeBytes(inputStream, chunkSize)
val last = read < 0
if (read > 0) {
ioBuffer.readBytes(outputStream, read)
}
if (last) {
compositeBuffer.retain()
outputStream.close()
}
if (compositeBuffer.readableBytes() >= chunkSize || last) {
val chunk = extractChunk(compositeBuffer, alloc)
val evt = if (last) {
ResponseStreamingEvent.LastChunkReceived(chunk)
} else {
ResponseStreamingEvent.ChunkReceived(chunk)
}
responseHandle.handleEvent(evt)
}
if (last) break
}
} finally {
ioBuffer.release()
}
}
}
} else {
responseHandle.handleEvent(
ResponseStreamingEvent.FileReceived(
FileChannel.open(file, StandardOpenOption.READ)
)
)
fun get(key: String): EntryValue? =
root.resolve(key).takeIf(Files::exists)
?.let { file ->
val size = Files.size(file)
val channel = FileChannel.open(file, StandardOpenOption.READ)
val source = Channels.newInputStream(channel)
val tmp = ByteArray(Integer.BYTES)
val buffer = ByteBuffer.wrap(tmp)
source.read(tmp)
buffer.rewind()
val offset = (Integer.BYTES + buffer.getInt()).toLong()
var count = 0
val wrapper = object : InputStream() {
override fun read(): Int {
return source.read().also {
if (it > 0) count += it
}
}
} ?: responseHandle.handleEvent(ResponseStreamingEvent.NOT_FOUND)
override fun read(b: ByteArray, off: Int, len: Int): Int {
return source.read(b, off, len).also {
if (it > 0) count += it
}
}
override fun close() {
}
}
val metadata = ObjectInputStream(wrapper).use { ois ->
ois.readObject() as CacheValueMetadata
}
EntryValue(metadata, channel, offset, size)
}
class FileSink(metadata: CacheValueMetadata, private val path: Path, private val tmpFile: Path) {
val channel: FileChannel
init {
val baos = ByteArrayOutputStream()
ObjectOutputStream(baos).use {
it.writeObject(metadata)
}
Files.newOutputStream(tmpFile).use {
val bytes = baos.toByteArray()
val buffer = ByteBuffer.allocate(Integer.BYTES)
buffer.putInt(bytes.size)
buffer.rewind()
it.write(buffer.array())
it.write(bytes)
}
channel = FileChannel.open(tmpFile, StandardOpenOption.APPEND)
}
fun commit() {
channel.close()
Files.move(tmpFile, path, StandardCopyOption.ATOMIC_MOVE)
}
fun rollback() {
channel.close()
Files.delete(path)
}
}
override fun put(
fun put(
key: String,
responseHandle: ResponseHandle,
alloc: ByteBufAllocator
): CompletableFuture<RequestHandle> {
try {
(digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digestString(key.toByteArray(), md)
} ?: key).let { digest ->
val file = root.resolve(digest)
val tmpFile = Files.createTempFile(root, null, ".tmp")
val stream = Files.newOutputStream(tmpFile).let {
if (compressionEnabled) {
val deflater = Deflater(compressionLevel)
DeflaterOutputStream(it, deflater)
} else {
it
}
}
return CompletableFuture.completedFuture(object : RequestHandle {
override fun handleEvent(evt: RequestStreamingEvent) {
try {
when (evt) {
is RequestStreamingEvent.LastChunkReceived -> {
evt.chunk.readBytes(stream, evt.chunk.readableBytes())
stream.close()
Files.move(tmpFile, file, StandardCopyOption.ATOMIC_MOVE)
responseHandle.handleEvent(ResponseStreamingEvent.RESPONSE_RECEIVED)
}
is RequestStreamingEvent.ChunkReceived -> {
evt.chunk.readBytes(stream, evt.chunk.readableBytes())
}
is RequestStreamingEvent.ExceptionCaught -> {
Files.delete(tmpFile)
stream.close()
}
}
} catch (ex: Throwable) {
responseHandle.handleEvent(ResponseStreamingEvent.ExceptionCaught(ex))
}
}
})
}
} catch (ex: Throwable) {
responseHandle.handleEvent(ResponseStreamingEvent.ExceptionCaught(ex))
return CompletableFuture.failedFuture(ex)
}
metadata: CacheValueMetadata,
): FileSink {
val file = root.resolve(key)
val tmpFile = Files.createTempFile(root, null, ".tmp")
return FileSink(metadata, file, tmpFile)
}
private val garbageCollector = Thread.ofVirtual().name("file-system-cache-gc").start {

View File

@@ -1,8 +1,9 @@
package net.woggioni.rbcs.server.cache
import net.woggioni.jwo.Application
import net.woggioni.rbcs.api.CacheHandlerFactory
import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.common.RBCS
import net.woggioni.jwo.Application
import java.nio.file.Path
import java.time.Duration
@@ -14,14 +15,16 @@ data class FileSystemCacheConfiguration(
val compressionLevel: Int,
val chunkSize: Int,
) : Configuration.Cache {
override fun materialize() = FileSystemCache(
root ?: Application.builder("rbcs").build().computeCacheDirectory(),
maxAge,
digestAlgorithm,
compressionEnabled,
compressionLevel,
chunkSize,
)
override fun materialize() = object : CacheHandlerFactory {
private val cache = FileSystemCache(root ?: Application.builder("rbcs").build().computeCacheDirectory(), maxAge)
override fun close() {
cache.close()
}
override fun newHandler() = FileSystemCacheHandler(cache, digestAlgorithm, compressionEnabled, compressionLevel, chunkSize)
}
override fun getNamespaceURI() = RBCS.RBCS_NAMESPACE_URI

View File

@@ -0,0 +1,124 @@
package net.woggioni.rbcs.server.cache
import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.DefaultFileRegion
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.http.LastHttpContent
import io.netty.handler.stream.ChunkedNioFile
import net.woggioni.rbcs.api.CacheValueMetadata
import net.woggioni.rbcs.api.message.CacheMessage
import net.woggioni.rbcs.api.message.CacheMessage.CacheContent
import net.woggioni.rbcs.api.message.CacheMessage.CacheGetRequest
import net.woggioni.rbcs.api.message.CacheMessage.CachePutRequest
import net.woggioni.rbcs.api.message.CacheMessage.CachePutResponse
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueFoundResponse
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse
import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent
import net.woggioni.rbcs.common.RBCS.processCacheKey
import java.nio.channels.Channels
import java.util.Base64
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.InflaterInputStream
class FileSystemCacheHandler(
private val cache: FileSystemCache,
private val digestAlgorithm: String?,
private val compressionEnabled: Boolean,
private val compressionLevel: Int,
private val chunkSize: Int
) : SimpleChannelInboundHandler<CacheMessage>() {
private inner class InProgressPutRequest(
val key : String,
private val fileSink : FileSystemCache.FileSink
) {
private val stream = Channels.newOutputStream(fileSink.channel).let {
if (compressionEnabled) {
DeflaterOutputStream(it, Deflater(compressionLevel))
} else {
it
}
}
fun write(buf: ByteBuf) {
buf.readBytes(stream, buf.readableBytes())
}
fun commit() {
stream.close()
fileSink.commit()
}
fun rollback() {
fileSink.rollback()
}
}
private var inProgressPutRequest: InProgressPutRequest? = null
override fun channelRead0(ctx: ChannelHandlerContext, msg: CacheMessage) {
when (msg) {
is CacheGetRequest -> handleGetRequest(ctx, msg)
is CachePutRequest -> handlePutRequest(ctx, msg)
is LastCacheContent -> handleLastCacheContent(ctx, msg)
is CacheContent -> handleCacheContent(ctx, msg)
else -> ctx.fireChannelRead(msg)
}
}
private fun handleGetRequest(ctx: ChannelHandlerContext, msg: CacheGetRequest) {
val key = String(Base64.getUrlEncoder().encode(processCacheKey(msg.key, digestAlgorithm)))
cache.get(key)?.also { entryValue ->
ctx.writeAndFlush(CacheValueFoundResponse(msg.key, entryValue.metadata))
entryValue.channel.let { channel ->
if(compressionEnabled) {
InflaterInputStream(Channels.newInputStream(channel)).use { stream ->
outerLoop@
while (true) {
val buf = ctx.alloc().heapBuffer(chunkSize)
while(buf.readableBytes() < chunkSize) {
val read = buf.writeBytes(stream, chunkSize)
if(read < 0) {
ctx.writeAndFlush(LastCacheContent(buf))
break@outerLoop
}
}
ctx.writeAndFlush(CacheContent(buf))
}
}
} else {
ctx.writeAndFlush(ChunkedNioFile(channel, entryValue.offset, entryValue.size - entryValue.offset, chunkSize))
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
}
}
} ?: ctx.writeAndFlush(CacheValueNotFoundResponse())
}
private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) {
val key = String(Base64.getUrlEncoder().encode(processCacheKey(msg.key, digestAlgorithm)))
val sink = cache.put(key, msg.metadata)
inProgressPutRequest = InProgressPutRequest(msg.key, sink)
}
private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) {
inProgressPutRequest!!.write(msg.content())
}
private fun handleLastCacheContent(ctx: ChannelHandlerContext, msg: LastCacheContent) {
inProgressPutRequest?.let { request ->
inProgressPutRequest = null
request.write(msg.content())
request.commit()
ctx.writeAndFlush(CachePutResponse(request.key))
}
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
inProgressPutRequest?.rollback()
super.exceptionCaught(ctx, cause)
}
}

View File

@@ -33,7 +33,7 @@ class FileSystemCacheProvider : CacheProvider<FileSystemCacheConfiguration> {
val digestAlgorithm = el.renderAttribute("digest") ?: "MD5"
val chunkSize = el.renderAttribute("chunk-size")
?.let(Integer::decode)
?: 0x4000
?: 0x10000
return FileSystemCacheConfiguration(
path,
@@ -50,7 +50,9 @@ class FileSystemCacheProvider : CacheProvider<FileSystemCacheConfiguration> {
Xml.of(doc, result) {
val prefix = doc.lookupPrefix(RBCS.RBCS_NAMESPACE_URI)
attr("xs:type", "${prefix}:fileSystemCacheType", RBCS.XML_SCHEMA_NAMESPACE_URI)
attr("path", root.toString())
root?.let {
attr("path", it.toString())
}
attr("max-age", maxAge.toString())
digestAlgorithm?.let { digestAlgorithm ->
attr("digest", digestAlgorithm)

View File

@@ -1,47 +1,41 @@
package net.woggioni.rbcs.server.cache
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import net.woggioni.rbcs.api.Cache
import net.woggioni.rbcs.api.RequestHandle
import net.woggioni.rbcs.api.ResponseHandle
import net.woggioni.rbcs.api.event.RequestStreamingEvent
import net.woggioni.rbcs.api.event.ResponseStreamingEvent
import net.woggioni.rbcs.common.ByteBufOutputStream
import net.woggioni.rbcs.common.RBCS.digestString
import net.woggioni.rbcs.common.contextLogger
import net.woggioni.rbcs.common.extractChunk
import java.security.MessageDigest
import net.woggioni.rbcs.api.CacheValueMetadata
import net.woggioni.rbcs.common.createLogger
import java.time.Duration
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.PriorityBlockingQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.Inflater
import java.util.zip.InflaterOutputStream
private class CacheKey(private val value: ByteArray) {
override fun equals(other: Any?) = if (other is CacheKey) {
value.contentEquals(other.value)
} else false
override fun hashCode() = value.contentHashCode()
}
class CacheEntry(
val metadata: CacheValueMetadata,
val content: ByteBuf
)
class InMemoryCache(
private val maxAge: Duration,
private val maxSize: Long,
private val digestAlgorithm: String?,
private val compressionEnabled: Boolean,
private val compressionLevel: Int,
private val chunkSize : Int
) : Cache {
private val maxSize: Long
) : AutoCloseable {
companion object {
@JvmStatic
private val log = contextLogger()
private val log = createLogger<InMemoryCache>()
}
private val size = AtomicLong()
private val map = ConcurrentHashMap<String, ByteBuf>()
private val map = ConcurrentHashMap<CacheKey, CacheEntry>()
private class RemovalQueueElement(val key: String, val value: ByteBuf, val expiry: Instant) :
private class RemovalQueueElement(val key: CacheKey, val value: CacheEntry, val expiry: Instant) :
Comparable<RemovalQueueElement> {
override fun compareTo(other: RemovalQueueElement) = expiry.compareTo(other.expiry)
}
@@ -54,14 +48,14 @@ class InMemoryCache(
private val garbageCollector = Thread.ofVirtual().name("in-memory-cache-gc").start {
while (running) {
val el = removalQueue.poll(1, TimeUnit.SECONDS) ?: continue
val buf = el.value
val value = el.value
val now = Instant.now()
if (now > el.expiry) {
val removed = map.remove(el.key, buf)
val removed = map.remove(el.key, value)
if (removed) {
updateSizeAfterRemoval(buf)
updateSizeAfterRemoval(value.content)
//Decrease the reference count for map
buf.release()
value.content.release()
}
} else {
removalQueue.put(el)
@@ -73,12 +67,12 @@ class InMemoryCache(
private fun removeEldest(): Long {
while (true) {
val el = removalQueue.take()
val buf = el.value
val removed = map.remove(el.key, buf)
val value = el.value
val removed = map.remove(el.key, value)
if (removed) {
val newSize = updateSizeAfterRemoval(buf)
val newSize = updateSizeAfterRemoval(value.content)
//Decrease the reference count for map
buf.release()
value.content.release()
return newSize
}
}
@@ -95,114 +89,27 @@ class InMemoryCache(
garbageCollector.join()
}
override fun get(key: String, responseHandle: ResponseHandle, alloc: ByteBufAllocator) {
try {
(digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digestString(key.toByteArray(), md)
} ?: key
).let { digest ->
map[digest]
?.let { value ->
val copy = value.retainedDuplicate()
responseHandle.handleEvent(ResponseStreamingEvent.RESPONSE_RECEIVED)
val output = alloc.compositeBuffer()
if (compressionEnabled) {
try {
val stream = ByteBufOutputStream(output).let {
val inflater = Inflater()
InflaterOutputStream(it, inflater)
}
stream.use { os ->
var readable = copy.readableBytes()
while (true) {
copy.readBytes(os, chunkSize.coerceAtMost(readable))
readable = copy.readableBytes()
val last = readable == 0
if (last) stream.flush()
if (output.readableBytes() >= chunkSize || last) {
val chunk = extractChunk(output, alloc)
val evt = if (last) {
ResponseStreamingEvent.LastChunkReceived(chunk)
} else {
ResponseStreamingEvent.ChunkReceived(chunk)
}
responseHandle.handleEvent(evt)
}
if (last) break
}
}
} finally {
copy.release()
}
} else {
responseHandle.handleEvent(
ResponseStreamingEvent.LastChunkReceived(copy)
)
}
} ?: responseHandle.handleEvent(ResponseStreamingEvent.NOT_FOUND)
}
} catch (ex: Throwable) {
responseHandle.handleEvent(ResponseStreamingEvent.ExceptionCaught(ex))
fun get(key: ByteArray) = map[CacheKey(key)]?.run {
CacheEntry(metadata, content.retainedDuplicate())
}
fun put(
key: ByteArray,
value: CacheEntry,
) {
val cacheKey = CacheKey(key)
val oldSize = map.put(cacheKey, value)?.let { old ->
val result = old.content.readableBytes()
old.content.release()
result
} ?: 0
val delta = value.content.readableBytes() - oldSize
var newSize = size.updateAndGet { currentSize: Long ->
currentSize + delta
}
removalQueue.put(RemovalQueueElement(cacheKey, value, Instant.now().plus(maxAge)))
while (newSize > maxSize) {
newSize = removeEldest()
}
}
override fun put(
key: String,
responseHandle: ResponseHandle,
alloc: ByteBufAllocator
): CompletableFuture<RequestHandle> {
return CompletableFuture.completedFuture(object : RequestHandle {
val buf = alloc.heapBuffer()
val stream = ByteBufOutputStream(buf).let {
if (compressionEnabled) {
val deflater = Deflater(compressionLevel)
DeflaterOutputStream(it, deflater)
} else {
it
}
}
override fun handleEvent(evt: RequestStreamingEvent) {
when (evt) {
is RequestStreamingEvent.ChunkReceived -> {
evt.chunk.readBytes(stream, evt.chunk.readableBytes())
if (evt is RequestStreamingEvent.LastChunkReceived) {
(digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digestString(key.toByteArray(), md)
} ?: key
).let { digest ->
val oldSize = map.put(digest, buf.retain())?.let { old ->
val result = old.readableBytes()
old.release()
result
} ?: 0
val delta = buf.readableBytes() - oldSize
var newSize = size.updateAndGet { currentSize : Long ->
currentSize + delta
}
removalQueue.put(RemovalQueueElement(digest, buf, Instant.now().plus(maxAge)))
while(newSize > maxSize) {
newSize = removeEldest()
}
stream.close()
responseHandle.handleEvent(ResponseStreamingEvent.RESPONSE_RECEIVED)
}
}
}
is RequestStreamingEvent.ExceptionCaught -> {
stream.close()
}
else -> {
}
}
}
})
}
}

View File

@@ -1,5 +1,6 @@
package net.woggioni.rbcs.server.cache
import net.woggioni.rbcs.api.CacheHandlerFactory
import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.common.RBCS
import java.time.Duration
@@ -12,14 +13,15 @@ data class InMemoryCacheConfiguration(
val compressionLevel: Int,
val chunkSize : Int
) : Configuration.Cache {
override fun materialize() = InMemoryCache(
maxAge,
maxSize,
digestAlgorithm,
compressionEnabled,
compressionLevel,
chunkSize
)
override fun materialize() = object : CacheHandlerFactory {
private val cache = InMemoryCache(maxAge, maxSize)
override fun close() {
cache.close()
}
override fun newHandler() = InMemoryCacheHandler(cache, digestAlgorithm, compressionEnabled, compressionLevel)
}
override fun getNamespaceURI() = RBCS.RBCS_NAMESPACE_URI

View File

@@ -0,0 +1,135 @@
package net.woggioni.rbcs.server.cache
import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.SimpleChannelInboundHandler
import net.woggioni.rbcs.api.message.CacheMessage
import net.woggioni.rbcs.api.message.CacheMessage.CacheContent
import net.woggioni.rbcs.api.message.CacheMessage.CacheGetRequest
import net.woggioni.rbcs.api.message.CacheMessage.CachePutRequest
import net.woggioni.rbcs.api.message.CacheMessage.CachePutResponse
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueFoundResponse
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse
import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent
import net.woggioni.rbcs.common.ByteBufOutputStream
import net.woggioni.rbcs.common.RBCS.processCacheKey
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.InflaterOutputStream
class InMemoryCacheHandler(
private val cache: InMemoryCache,
private val digestAlgorithm: String?,
private val compressionEnabled: Boolean,
private val compressionLevel: Int
) : SimpleChannelInboundHandler<CacheMessage>() {
private interface InProgressPutRequest : AutoCloseable {
val request: CachePutRequest
val buf: ByteBuf
fun append(buf: ByteBuf)
}
private inner class InProgressPlainPutRequest(ctx: ChannelHandlerContext, override val request: CachePutRequest) :
InProgressPutRequest {
override val buf = ctx.alloc().compositeBuffer()
private val stream = ByteBufOutputStream(buf).let {
if (compressionEnabled) {
DeflaterOutputStream(it, Deflater(compressionLevel))
} else {
it
}
}
override fun append(buf: ByteBuf) {
this.buf.addComponent(true, buf.retain())
}
override fun close() {
buf.release()
}
}
private inner class InProgressCompressedPutRequest(
ctx: ChannelHandlerContext,
override val request: CachePutRequest
) : InProgressPutRequest {
override val buf = ctx.alloc().heapBuffer()
private val stream = ByteBufOutputStream(buf).let {
DeflaterOutputStream(it, Deflater(compressionLevel))
}
override fun append(buf: ByteBuf) {
buf.readBytes(stream, buf.readableBytes())
}
override fun close() {
stream.close()
}
}
private var inProgressPutRequest: InProgressPutRequest? = null
override fun channelRead0(ctx: ChannelHandlerContext, msg: CacheMessage) {
when (msg) {
is CacheGetRequest -> handleGetRequest(ctx, msg)
is CachePutRequest -> handlePutRequest(ctx, msg)
is LastCacheContent -> handleLastCacheContent(ctx, msg)
is CacheContent -> handleCacheContent(ctx, msg)
else -> ctx.fireChannelRead(msg)
}
}
private fun handleGetRequest(ctx: ChannelHandlerContext, msg: CacheGetRequest) {
cache.get(processCacheKey(msg.key, digestAlgorithm))?.let { value ->
ctx.writeAndFlush(CacheValueFoundResponse(msg.key, value.metadata))
if (compressionEnabled) {
val buf = ctx.alloc().heapBuffer()
InflaterOutputStream(ByteBufOutputStream(buf)).use {
value.content.readBytes(it, value.content.readableBytes())
buf.retain()
}
ctx.writeAndFlush(LastCacheContent(buf))
} else {
ctx.writeAndFlush(LastCacheContent(value.content))
}
} ?: ctx.writeAndFlush(CacheValueNotFoundResponse())
}
private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) {
inProgressPutRequest = if(compressionEnabled) {
InProgressCompressedPutRequest(ctx, msg)
} else {
InProgressPlainPutRequest(ctx, msg)
}
}
private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) {
inProgressPutRequest?.append(msg.content())
}
private fun handleLastCacheContent(ctx: ChannelHandlerContext, msg: LastCacheContent) {
handleCacheContent(ctx, msg)
inProgressPutRequest?.let { inProgressRequest ->
inProgressPutRequest = null
val buf = inProgressRequest.buf
buf.retain()
inProgressRequest.close()
val cacheKey = processCacheKey(inProgressRequest.request.key, digestAlgorithm)
cache.put(cacheKey, CacheEntry(inProgressRequest.request.metadata, buf))
ctx.writeAndFlush(CachePutResponse(inProgressRequest.request.key))
}
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
inProgressPutRequest?.let { req ->
req.buf.release()
inProgressPutRequest = null
}
super.exceptionCaught(ctx, cause)
}
}

View File

@@ -33,7 +33,7 @@ class InMemoryCacheProvider : CacheProvider<InMemoryCacheConfiguration> {
val digestAlgorithm = el.renderAttribute("digest") ?: "MD5"
val chunkSize = el.renderAttribute("chunk-size")
?.let(Integer::decode)
?: 0x4000
?: 0x10000
return InMemoryCacheConfiguration(
maxAge,
maxSize,

View File

@@ -3,7 +3,7 @@ package net.woggioni.rbcs.server.exception
import io.netty.buffer.Unpooled
import io.netty.channel.ChannelDuplexHandler
import io.netty.channel.ChannelFutureListener
import io.netty.channel.ChannelHandler
import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.DecoderException
import io.netty.handler.codec.http.DefaultFullHttpResponse
@@ -17,12 +17,16 @@ import net.woggioni.rbcs.api.exception.CacheException
import net.woggioni.rbcs.api.exception.ContentTooLargeException
import net.woggioni.rbcs.common.contextLogger
import net.woggioni.rbcs.common.debug
import net.woggioni.rbcs.common.log
import org.slf4j.event.Level
import org.slf4j.spi.LoggingEventBuilder
import java.net.ConnectException
import java.net.SocketException
import javax.net.ssl.SSLException
import javax.net.ssl.SSLPeerUnverifiedException
@ChannelHandler.Sharable
class ExceptionHandler : ChannelDuplexHandler() {
@Sharable
object ExceptionHandler : ChannelDuplexHandler() {
private val log = contextLogger()
private val NOT_AUTHORIZED: FullHttpResponse = DefaultFullHttpResponse(
@@ -31,12 +35,6 @@ class ExceptionHandler : ChannelDuplexHandler() {
headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
}
private val TOO_BIG: FullHttpResponse = DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, Unpooled.EMPTY_BUFFER
).apply {
headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
}
private val NOT_AVAILABLE: FullHttpResponse = DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.SERVICE_UNAVAILABLE, Unpooled.EMPTY_BUFFER
).apply {
@@ -49,6 +47,12 @@ class ExceptionHandler : ChannelDuplexHandler() {
headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
}
private val TOO_BIG: FullHttpResponse = DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, Unpooled.EMPTY_BUFFER
).apply {
headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
when (cause) {
is DecoderException -> {
@@ -56,6 +60,11 @@ class ExceptionHandler : ChannelDuplexHandler() {
ctx.close()
}
is ConnectException -> {
log.error(cause.message, cause)
ctx.writeAndFlush(SERVER_ERROR.retainedDuplicate())
}
is SocketException -> {
log.debug(cause.message, cause)
ctx.close()
@@ -72,6 +81,9 @@ class ExceptionHandler : ChannelDuplexHandler() {
}
is ContentTooLargeException -> {
log.log(Level.DEBUG, ctx.channel()) { builder : LoggingEventBuilder ->
builder.setMessage("Request body is too large")
}
ctx.writeAndFlush(TOO_BIG.retainedDuplicate())
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE)
}

View File

@@ -0,0 +1,28 @@
package net.woggioni.rbcs.server.handler
import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.http.HttpContent
import io.netty.handler.codec.http.LastHttpContent
import net.woggioni.rbcs.api.message.CacheMessage.CacheContent
import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent
@Sharable
object CacheContentHandler : SimpleChannelInboundHandler<HttpContent>() {
val NAME = this::class.java.name
override fun channelRead0(ctx: ChannelHandlerContext, msg: HttpContent) {
when(msg) {
is LastHttpContent -> {
ctx.fireChannelRead(LastCacheContent(msg.content().retain()))
ctx.pipeline().remove(this)
}
else -> ctx.fireChannelRead(CacheContent(msg.content().retain()))
}
}
override fun exceptionCaught(ctx: ChannelHandlerContext?, cause: Throwable?) {
super.exceptionCaught(ctx, cause)
}
}

View File

@@ -0,0 +1,40 @@
package net.woggioni.rbcs.server.handler
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.handler.codec.http.HttpContent
import io.netty.handler.codec.http.HttpRequest
import net.woggioni.rbcs.api.exception.ContentTooLargeException
class MaxRequestSizeHandler(private val maxRequestSize : Int) : ChannelInboundHandlerAdapter() {
companion object {
val NAME = MaxRequestSizeHandler::class.java.name
}
private var cumulativeSize = 0
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
when(msg) {
is HttpRequest -> {
cumulativeSize = 0
ctx.fireChannelRead(msg)
}
is HttpContent -> {
val exceeded = cumulativeSize > maxRequestSize
if(!exceeded) {
cumulativeSize += msg.content().readableBytes()
}
if(cumulativeSize > maxRequestSize) {
msg.release()
if(!exceeded) {
ctx.fireExceptionCaught(ContentTooLargeException("Request body is too large", null))
}
} else {
ctx.fireChannelRead(msg)
}
}
else -> ctx.fireChannelRead(msg)
}
}
}

View File

@@ -1,128 +1,148 @@
package net.woggioni.rbcs.server.handler
import io.netty.buffer.Unpooled
import io.netty.channel.ChannelFutureListener
import io.netty.channel.ChannelDuplexHandler
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.DefaultFileRegion
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.channel.ChannelPromise
import io.netty.handler.codec.http.DefaultFullHttpResponse
import io.netty.handler.codec.http.DefaultHttpContent
import io.netty.handler.codec.http.DefaultHttpResponse
import io.netty.handler.codec.http.DefaultLastHttpContent
import io.netty.handler.codec.http.HttpContent
import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpHeaderValues
import io.netty.handler.codec.http.HttpHeaders
import io.netty.handler.codec.http.HttpMethod
import io.netty.handler.codec.http.HttpObject
import io.netty.handler.codec.http.HttpRequest
import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpUtil
import io.netty.handler.codec.http.LastHttpContent
import net.woggioni.rbcs.api.Cache
import net.woggioni.rbcs.api.RequestHandle
import net.woggioni.rbcs.api.ResponseHandle
import net.woggioni.rbcs.api.event.RequestStreamingEvent
import net.woggioni.rbcs.api.event.ResponseStreamingEvent
import net.woggioni.rbcs.common.contextLogger
import io.netty.handler.codec.http.HttpVersion
import net.woggioni.rbcs.api.CacheValueMetadata
import net.woggioni.rbcs.api.message.CacheMessage
import net.woggioni.rbcs.api.message.CacheMessage.CacheContent
import net.woggioni.rbcs.api.message.CacheMessage.CacheGetRequest
import net.woggioni.rbcs.api.message.CacheMessage.CachePutRequest
import net.woggioni.rbcs.api.message.CacheMessage.CachePutResponse
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueFoundResponse
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse
import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent
import net.woggioni.rbcs.common.createLogger
import net.woggioni.rbcs.common.debug
import net.woggioni.rbcs.server.debug
import net.woggioni.rbcs.server.warn
import net.woggioni.rbcs.common.warn
import java.nio.file.Path
import java.util.concurrent.CompletableFuture
import java.util.Locale
class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
SimpleChannelInboundHandler<HttpObject>() {
class ServerHandler(private val serverPrefix: Path) :
ChannelDuplexHandler() {
private val log = contextLogger()
companion object {
private val log = createLogger<ServerHandler>()
val NAME = this::class.java.name
}
override fun channelRead0(ctx: ChannelHandlerContext, msg: HttpObject) {
when(msg) {
is HttpRequest -> handleRequest(ctx, msg)
is HttpContent -> handleContent(msg)
private var httpVersion = HttpVersion.HTTP_1_1
private var keepAlive = true
private fun resetRequestMetadata() {
httpVersion = HttpVersion.HTTP_1_1
keepAlive = true
}
private fun setRequestMetadata(req: HttpRequest) {
httpVersion = req.protocolVersion()
keepAlive = HttpUtil.isKeepAlive(req)
}
private fun setKeepAliveHeader(headers: HttpHeaders) {
if (!keepAlive) {
headers.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
} else {
headers.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
}
}
private var requestHandle : CompletableFuture<RequestHandle?> = CompletableFuture.completedFuture(null)
private fun handleContent(content : HttpContent) {
content.retain()
requestHandle.thenAccept { handle ->
handle?.let {
val evt = if(content is LastHttpContent) {
RequestStreamingEvent.LastChunkReceived(content.content())
} else {
RequestStreamingEvent.ChunkReceived(content.content())
}
it.handleEvent(evt)
content.release()
} ?: content.release()
}
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
when (msg) {
is HttpRequest -> handleRequest(ctx, msg)
else -> super.channelRead(ctx, msg)
}
}
private fun handleRequest(ctx : ChannelHandlerContext, msg : HttpRequest) {
val keepAlive: Boolean = HttpUtil.isKeepAlive(msg)
override fun write(ctx: ChannelHandlerContext, msg: Any, promise: ChannelPromise?) {
if (msg is CacheMessage) {
try {
when (msg) {
is CachePutResponse -> {
val response = DefaultFullHttpResponse(httpVersion, HttpResponseStatus.CREATED)
val keyBytes = msg.key.toByteArray(Charsets.UTF_8)
response.headers().apply {
set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_PLAIN)
set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
}
setKeepAliveHeader(response.headers())
ctx.write(response)
val buf = ctx.alloc().buffer(keyBytes.size).apply {
writeBytes(keyBytes)
}
ctx.writeAndFlush(DefaultLastHttpContent(buf))
}
is CacheValueNotFoundResponse -> {
val response = DefaultFullHttpResponse(httpVersion, HttpResponseStatus.NOT_FOUND)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
setKeepAliveHeader(response.headers())
ctx.writeAndFlush(response)
}
is CacheValueFoundResponse -> {
val response = DefaultHttpResponse(httpVersion, HttpResponseStatus.OK)
response.headers().apply {
set(HttpHeaderNames.CONTENT_TYPE, msg.metadata.mimeType ?: HttpHeaderValues.APPLICATION_OCTET_STREAM)
msg.metadata.contentDisposition?.let { contentDisposition ->
set(HttpHeaderNames.CONTENT_DISPOSITION, contentDisposition)
}
}
setKeepAliveHeader(response.headers())
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
ctx.writeAndFlush(response)
}
is LastCacheContent -> {
ctx.writeAndFlush(DefaultLastHttpContent(msg.content()))
}
is CacheContent -> {
ctx.writeAndFlush(DefaultHttpContent(msg.content()))
}
else -> throw UnsupportedOperationException("This should never happen")
}.let { channelFuture ->
if (promise != null) {
channelFuture.addListener {
if (it.isSuccess) promise.setSuccess()
else promise.setFailure(it.cause())
}
}
}
} finally {
resetRequestMetadata()
}
} else super.write(ctx, msg, promise)
}
private fun handleRequest(ctx: ChannelHandlerContext, msg: HttpRequest) {
setRequestMetadata(msg)
val method = msg.method()
if (method === HttpMethod.GET) {
val path = Path.of(msg.uri())
val prefix = path.parent
val key = path.fileName?.toString() ?: let {
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response)
return
}
if (serverPrefix == prefix) {
val responseHandle = ResponseHandle { evt ->
when (evt) {
is ResponseStreamingEvent.ResponseReceived -> {
val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK)
response.headers()[HttpHeaderNames.CONTENT_TYPE] = HttpHeaderValues.APPLICATION_OCTET_STREAM
if (!keepAlive) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
} else {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
}
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
ctx.writeAndFlush(response)
}
is ResponseStreamingEvent.LastChunkReceived -> {
val channelFuture = ctx.writeAndFlush(DefaultLastHttpContent(evt.chunk))
if (!keepAlive) {
channelFuture
.addListener(ChannelFutureListener.CLOSE)
}
}
is ResponseStreamingEvent.ChunkReceived -> {
ctx.writeAndFlush(DefaultHttpContent(evt.chunk))
}
is ResponseStreamingEvent.ExceptionCaught -> {
ctx.fireExceptionCaught(evt.exception)
}
is ResponseStreamingEvent.NotFound -> {
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response)
}
is ResponseStreamingEvent.FileReceived -> {
val content = DefaultFileRegion(evt.file, 0, evt.file.size())
if (keepAlive) {
ctx.write(content)
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate())
} else {
ctx.writeAndFlush(content)
.addListener(ChannelFutureListener.CLOSE)
}
}
}
}
cache.get(key, responseHandle, ctx.alloc())
ctx.pipeline().addAfter(NAME, CacheContentHandler.NAME, CacheContentHandler)
path.fileName?.toString()
?.let(::CacheGetRequest)
?.let(ctx::fireChannelRead)
?: ctx.channel().write(CacheValueNotFoundResponse())
} else {
log.warn(ctx) {
"Got request for unhandled path '${msg.uri()}'"
@@ -140,33 +160,14 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
log.debug(ctx) {
"Added value for key '$key' to build cache"
}
val responseHandle = ResponseHandle { evt ->
when (evt) {
is ResponseStreamingEvent.ResponseReceived -> {
val response = DefaultFullHttpResponse(
msg.protocolVersion(), HttpResponseStatus.CREATED,
Unpooled.copiedBuffer(key.toByteArray())
)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes()
ctx.writeAndFlush(response)
this.requestHandle = CompletableFuture.completedFuture(null)
}
is ResponseStreamingEvent.ChunkReceived -> {
evt.chunk.release()
}
is ResponseStreamingEvent.ExceptionCaught -> {
ctx.fireExceptionCaught(evt.exception)
}
else -> {}
ctx.pipeline().addAfter(NAME, CacheContentHandler.NAME, CacheContentHandler)
path.fileName?.toString()
?.let {
val mimeType = HttpUtil.getMimeType(msg)?.toString()
CachePutRequest(key, CacheValueMetadata(msg.headers().get(HttpHeaderNames.CONTENT_DISPOSITION), mimeType))
}
}
this.requestHandle = cache.put(key, responseHandle, ctx.alloc()).exceptionally { ex ->
ctx.fireExceptionCaught(ex)
null
}.also {
log.debug { "Replacing request handle with $it"}
}
?.let(ctx::fireChannelRead)
?: ctx.channel().write(CacheValueNotFoundResponse())
} else {
log.warn(ctx) {
"Got request for unhandled path '${msg.uri()}'"
@@ -176,40 +177,7 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
ctx.writeAndFlush(response)
}
} else if (method == HttpMethod.TRACE) {
val replayedRequestHead = ctx.alloc().buffer()
replayedRequestHead.writeCharSequence(
"TRACE ${Path.of(msg.uri())} ${msg.protocolVersion().text()}\r\n",
Charsets.US_ASCII
)
msg.headers().forEach { (key, value) ->
replayedRequestHead.apply {
writeCharSequence(key, Charsets.US_ASCII)
writeCharSequence(": ", Charsets.US_ASCII)
writeCharSequence(value, Charsets.UTF_8)
writeCharSequence("\r\n", Charsets.US_ASCII)
}
}
replayedRequestHead.writeCharSequence("\r\n", Charsets.US_ASCII)
this.requestHandle = CompletableFuture.completedFuture(RequestHandle { evt ->
when(evt) {
is RequestStreamingEvent.LastChunkReceived -> {
ctx.writeAndFlush(DefaultLastHttpContent(evt.chunk.retain()))
this.requestHandle = CompletableFuture.completedFuture(null)
}
is RequestStreamingEvent.ChunkReceived -> ctx.writeAndFlush(DefaultHttpContent(evt.chunk.retain()))
is RequestStreamingEvent.ExceptionCaught -> ctx.fireExceptionCaught(evt.exception)
else -> {
}
}
}).also {
log.debug { "Replacing request handle with $it"}
}
val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK)
response.headers().apply {
set(HttpHeaderNames.CONTENT_TYPE, "message/http")
}
ctx.writeAndFlush(response)
super.channelRead(ctx, msg)
} else {
log.warn(ctx) {
"Got request with unhandled method '${msg.method().name()}'"
@@ -220,10 +188,43 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
}
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
requestHandle.thenAccept { handle ->
handle?.handleEvent(RequestStreamingEvent.ExceptionCaught(cause))
data class ContentDisposition(val type: Type?, val fileName: String?) {
enum class Type {
attachment, `inline`;
companion object {
@JvmStatic
fun parse(maybeString: String?) = maybeString.let { s ->
try {
java.lang.Enum.valueOf(Type::class.java, s)
} catch (ex: IllegalArgumentException) {
null
}
}
}
}
companion object {
@JvmStatic
fun parse(contentDisposition: String) : ContentDisposition {
val parts = contentDisposition.split(";").dropLastWhile { it.isEmpty() }.toTypedArray()
val dispositionType = parts[0].trim { it <= ' ' }.let(Type::parse) // Get the type (e.g., attachment)
var filename: String? = null
for (i in 1..<parts.size) {
val part = parts[i].trim { it <= ' ' }
if (part.lowercase(Locale.getDefault()).startsWith("filename=")) {
filename = part.substring("filename=".length).trim { it <= ' ' }.replace("\"", "")
break
}
}
return ContentDisposition(dispositionType, filename)
}
}
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
super.exceptionCaught(ctx, cause)
}
}

View File

@@ -0,0 +1,54 @@
package net.woggioni.rbcs.server.handler
import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.handler.codec.http.DefaultHttpResponse
import io.netty.handler.codec.http.HttpContent
import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpHeaderValues
import io.netty.handler.codec.http.HttpRequest
import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.LastHttpContent
import java.nio.file.Path
@Sharable
object TraceHandler : ChannelInboundHandlerAdapter() {
val NAME = this::class.java.name
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
when(msg) {
is HttpRequest -> {
val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK)
response.headers().apply {
set(HttpHeaderNames.CONTENT_TYPE, "message/http")
set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
}
ctx.write(response)
val replayedRequestHead = ctx.alloc().buffer()
replayedRequestHead.writeCharSequence(
"TRACE ${Path.of(msg.uri())} ${msg.protocolVersion().text()}\r\n",
Charsets.US_ASCII
)
msg.headers().forEach { (key, value) ->
replayedRequestHead.apply {
writeCharSequence(key, Charsets.US_ASCII)
writeCharSequence(": ", Charsets.US_ASCII)
writeCharSequence(value, Charsets.UTF_8)
writeCharSequence("\r\n", Charsets.US_ASCII)
}
}
replayedRequestHead.writeCharSequence("\r\n", Charsets.US_ASCII)
ctx.writeAndFlush(replayedRequestHead)
}
is LastHttpContent -> {
ctx.writeAndFlush(msg)
}
is HttpContent -> ctx.writeAndFlush(msg)
else -> super.channelRead(ctx, msg)
}
}
override fun exceptionCaught(ctx: ChannelHandlerContext?, cause: Throwable?) {
super.exceptionCaught(ctx, cause)
}
}

View File

@@ -13,23 +13,20 @@ import net.woggioni.rbcs.common.contextLogger
import net.woggioni.rbcs.server.RemoteBuildCacheServer
import net.woggioni.jwo.Bucket
import net.woggioni.jwo.LongMath
import net.woggioni.rbcs.common.createLogger
import java.net.InetSocketAddress
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeUnit
class ThrottlingHandler(cfg: Configuration) : ChannelInboundHandlerAdapter() {
class ThrottlingHandler(private val bucketManager : BucketManager,
private val connectionConfiguration : Configuration.Connection) : ChannelInboundHandlerAdapter() {
private companion object {
@JvmStatic
private val log = contextLogger()
private val log = createLogger<ThrottlingHandler>()
}
private val bucketManager = BucketManager.from(cfg)
private val connectionConfiguration = cfg.connection
private var queuedContent : MutableList<HttpContent>? = null
/**
@@ -98,6 +95,7 @@ class ThrottlingHandler(cfg: Configuration) : ChannelInboundHandlerAdapter() {
handleBuckets(buckets, ctx, msg, false)
}, waitDuration.toMillis(), TimeUnit.MILLISECONDS)
} else {
this.queuedContent = null
sendThrottledResponse(ctx, waitDuration)
}
}

View File

@@ -39,7 +39,7 @@
<xs:attribute name="idle-timeout" type="xs:duration" use="optional" default="PT30S"/>
<xs:attribute name="read-idle-timeout" type="xs:duration" use="optional" default="PT60S"/>
<xs:attribute name="write-idle-timeout" type="xs:duration" use="optional" default="PT60S"/>
<xs:attribute name="max-request-size" type="rbcs:byteSize" use="optional" default="0x4000000"/>
<xs:attribute name="max-request-size" type="rbcs:byteSizeType" use="optional" default="0x4000000"/>
</xs:complexType>
<xs:complexType name="eventExecutorType">
@@ -52,11 +52,11 @@
<xs:complexContent>
<xs:extension base="rbcs:cacheType">
<xs:attribute name="max-age" type="xs:duration" default="P1D"/>
<xs:attribute name="max-size" type="rbcs:byteSize" default="0x1000000"/>
<xs:attribute name="max-size" type="rbcs:byteSizeType" default="0x1000000"/>
<xs:attribute name="digest" type="xs:token" default="MD5"/>
<xs:attribute name="enable-compression" type="xs:boolean" default="true"/>
<xs:attribute name="compression-level" type="xs:byte" default="-1"/>
<xs:attribute name="chunk-size" type="rbcs:byteSize" default="0x4000"/>
<xs:attribute name="compression-level" type="rbcs:compressionLevelType" default="-1"/>
<xs:attribute name="chunk-size" type="rbcs:byteSizeType" default="0x10000"/>
</xs:extension>
</xs:complexContent>
</xs:complexType>
@@ -64,12 +64,12 @@
<xs:complexType name="fileSystemCacheType">
<xs:complexContent>
<xs:extension base="rbcs:cacheType">
<xs:attribute name="path" type="xs:string" use="required"/>
<xs:attribute name="path" type="xs:string" use="optional"/>
<xs:attribute name="max-age" type="xs:duration" default="P1D"/>
<xs:attribute name="digest" type="xs:token" default="MD5"/>
<xs:attribute name="enable-compression" type="xs:boolean" default="true"/>
<xs:attribute name="compression-level" type="xs:byte" default="-1"/>
<xs:attribute name="chunk-size" type="rbcs:byteSize" default="0x4000"/>
<xs:attribute name="compression-level" type="rbcs:compressionLevelType" default="-1"/>
<xs:attribute name="chunk-size" type="rbcs:byteSizeType" default="0x10000"/>
</xs:extension>
</xs:complexContent>
</xs:complexType>
@@ -222,10 +222,17 @@
<xs:attribute name="port" type="xs:unsignedShort" use="required"/>
</xs:complexType>
<xs:simpleType name="byteSize">
<xs:simpleType name="byteSizeType">
<xs:restriction base="xs:token">
<xs:pattern value="(0x[a-f0-9]+|[0-9]+)"/>
</xs:restriction>
</xs:simpleType>
<xs:simpleType name="compressionLevelType">
<xs:restriction base="xs:integer">
<xs:minInclusive value="-1"/>
<xs:maxInclusive value="9"/>
</xs:restriction>
</xs:simpleType>
</xs:schema>

View File

@@ -43,8 +43,9 @@ abstract class AbstractServerTest {
}
private fun stopServer() {
this.serverHandle?.use {
it.shutdown()
this.serverHandle?.let {
it.sendShutdownSignal()
it.get()
}
}
}

View File

@@ -154,7 +154,7 @@ abstract class AbstractTlsServerTest : AbstractServerTest() {
sequenceOf(writersGroup, readersGroup).map { it.name to it }.toMap(),
FileSystemCacheConfiguration(this.cacheDir,
maxAge = Duration.ofSeconds(3600 * 24),
compressionEnabled = true,
compressionEnabled = false,
compressionLevel = Deflater.DEFAULT_COMPRESSION,
digestAlgorithm = "MD5",
chunkSize = 0x1000

View File

@@ -13,7 +13,7 @@
read-timeout="PT5M"
write-timeout="PT5M"/>
<event-executor use-virtual-threads="true"/>
<cache xs:type="rbcs-memcache:memcacheCacheType" max-age="P7D" max-size="16777216" compression-mode="deflate" chunk-size="123">
<cache xs:type="rbcs-memcache:memcacheCacheType" max-age="P7D" chunk-size="123">
<server host="memcached" port="11211"/>
</cache>
<authorization>

View File

@@ -12,7 +12,7 @@
idle-timeout="PT30M"
max-request-size="101325"/>
<event-executor use-virtual-threads="false"/>
<cache xs:type="rbcs-memcache:memcacheCacheType" max-age="P7D" max-size="101325" digest="SHA-256" chunk-size="456">
<cache xs:type="rbcs-memcache:memcacheCacheType" max-age="P7D" digest="SHA-256" chunk-size="456" compression-mode="deflate" compression-level="7">
<server host="127.0.0.1" port="11211" max-connections="10" connection-timeout="PT20S"/>
</cache>
<authentication>