diff --git a/rbcs-api/src/main/java/net/woggioni/rbcs/api/CacheHandler.java b/rbcs-api/src/main/java/net/woggioni/rbcs/api/CacheHandler.java index 7a51dae..5040efa 100644 --- a/rbcs-api/src/main/java/net/woggioni/rbcs/api/CacheHandler.java +++ b/rbcs-api/src/main/java/net/woggioni/rbcs/api/CacheHandler.java @@ -16,7 +16,7 @@ public abstract class CacheHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { if(!requestFinished && msg instanceof CacheMessage) { - if(msg instanceof CacheMessage.LastCacheContent || msg instanceof CacheMessage.CacheGetRequest) requestFinished = true; + if(msg instanceof CacheMessage.LastCacheContent) requestFinished = true; try { channelRead0(ctx, (CacheMessage) msg); } finally { diff --git a/rbcs-cli/build.gradle b/rbcs-cli/build.gradle index 10f260e..868cdfc 100644 --- a/rbcs-cli/build.gradle +++ b/rbcs-cli/build.gradle @@ -105,6 +105,7 @@ tasks.named(NativeImagePlugin.CONFIGURE_NATIVE_IMAGE_TASK_NAME, NativeImageConfi systemProperty('io.netty.leakDetectionLevel', 'DISABLED') modularity.inferModulePath = false enabled = true + systemProperty('gradle.tmp.dir', temporaryDir.toString()) } nativeImage { diff --git a/rbcs-cli/native-image/reflect-config.json b/rbcs-cli/native-image/reflect-config.json index b819705..f8e0ee4 100644 --- a/rbcs-cli/native-image/reflect-config.json +++ b/rbcs-cli/native-image/reflect-config.json @@ -637,6 +637,10 @@ "name":"sun.security.provider.DSA$SHA256withDSA", "methods":[{"name":"","parameterTypes":[] }] }, +{ + "name":"sun.security.provider.JavaKeyStore$JKS", + "methods":[{"name":"","parameterTypes":[] }] +}, { "name":"sun.security.provider.MD5", "methods":[{"name":"","parameterTypes":[] }] @@ -725,14 +729,6 @@ "name":"sun.security.x509.CertificatePoliciesExtension", "methods":[{"name":"","parameterTypes":["java.lang.Boolean","java.lang.Object"] }] }, -{ - "name":"sun.security.x509.ExtendedKeyUsageExtension", - "methods":[{"name":"","parameterTypes":["java.lang.Boolean","java.lang.Object"] }] -}, -{ - "name":"sun.security.x509.IssuerAlternativeNameExtension", - "methods":[{"name":"","parameterTypes":["java.lang.Boolean","java.lang.Object"] }] -}, { "name":"sun.security.x509.KeyUsageExtension", "methods":[{"name":"","parameterTypes":["java.lang.Boolean","java.lang.Object"] }] diff --git a/rbcs-cli/src/configureNativeImage/kotlin/net/woggioni/rbcs/cli/graal/GraalNativeImageConfiguration.kt b/rbcs-cli/src/configureNativeImage/kotlin/net/woggioni/rbcs/cli/graal/GraalNativeImageConfiguration.kt index 6fdb607..91d5541 100644 --- a/rbcs-cli/src/configureNativeImage/kotlin/net/woggioni/rbcs/cli/graal/GraalNativeImageConfiguration.kt +++ b/rbcs-cli/src/configureNativeImage/kotlin/net/woggioni/rbcs/cli/graal/GraalNativeImageConfiguration.kt @@ -179,6 +179,8 @@ object GraalNativeImageConfiguration { } catch (ee : ExecutionException) { } } - RemoteBuildCacheServerCli.main("--help") + System.setProperty("net.woggioni.rbcs.conf.dir", System.getProperty("gradle.tmp.dir")) + RemoteBuildCacheServerCli.createCommandLine().execute("--version") + RemoteBuildCacheServerCli.createCommandLine().execute("server", "-t", "PT10S") } } \ No newline at end of file diff --git a/rbcs-cli/src/main/kotlin/net/woggioni/rbcs/cli/RemoteBuildCacheServerCli.kt b/rbcs-cli/src/main/kotlin/net/woggioni/rbcs/cli/RemoteBuildCacheServerCli.kt index 7330b20..0b0c273 100644 --- a/rbcs-cli/src/main/kotlin/net/woggioni/rbcs/cli/RemoteBuildCacheServerCli.kt +++ b/rbcs-cli/src/main/kotlin/net/woggioni/rbcs/cli/RemoteBuildCacheServerCli.kt @@ -26,8 +26,8 @@ class RemoteBuildCacheServerCli : RbcsCommand() { private fun setPropertyIfNotPresent(key: String, value: String) { System.getProperty(key) ?: System.setProperty(key, value) } - @JvmStatic - fun main(vararg args: String) { + + fun createCommandLine() : CommandLine { setPropertyIfNotPresent("logback.configurationFile", "net/woggioni/rbcs/cli/logback.xml") setPropertyIfNotPresent("io.netty.leakDetectionLevel", "DISABLED") val currentClassLoader = RemoteBuildCacheServerCli::class.java.classLoader @@ -56,7 +56,12 @@ class RemoteBuildCacheServerCli : RbcsCommand() { addSubcommand(GetCommand()) addSubcommand(HealthCheckCommand()) }) - System.exit(commandLine.execute(*args)) + return commandLine + } + + @JvmStatic + fun main(vararg args: String) { + System.exit(createCommandLine().execute(*args)) } } diff --git a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheHandler.kt b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheHandler.kt index 0dd4109..faf5c03 100644 --- a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheHandler.kt +++ b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheHandler.kt @@ -69,10 +69,14 @@ class MemcacheCacheHandler( } } + private interface InProgressRequest { + + } + private inner class InProgressGetRequest( - private val key: String, + val key: String, private val ctx: ChannelHandlerContext - ) { + ) : InProgressRequest { private val acc = ctx.alloc().compositeBuffer() private val chunk = ctx.alloc().compositeBuffer() private val outputStream = ByteBufOutputStream(chunk).let { @@ -149,7 +153,7 @@ class MemcacheCacheHandler( val digest : ByteBuf, val requestController: CompletableFuture, private val alloc: ByteBufAllocator - ) { + ) : InProgressRequest { private var totalSize = 0 private var tmpFile : FileChannel? = null private val accumulator = alloc.compositeBuffer() @@ -227,8 +231,7 @@ class MemcacheCacheHandler( } } - private var inProgressPutRequest: InProgressPutRequest? = null - private var inProgressGetRequest: InProgressGetRequest? = null + private var inProgressRequest: InProgressRequest? = null override fun channelRead0(ctx: ChannelHandlerContext, msg: CacheMessage) { when (msg) { @@ -241,60 +244,7 @@ class MemcacheCacheHandler( } private fun handleGetRequest(ctx: ChannelHandlerContext, msg: CacheGetRequest) { - log.debug(ctx) { - "Fetching ${msg.key} from memcache" - } - val key = ctx.alloc().buffer().also { - it.writeBytes(processCacheKey(msg.key, digestAlgorithm)) - } - val responseHandler = object : MemcacheResponseHandler { - override fun responseReceived(response: BinaryMemcacheResponse) { - val status = response.status() - when (status) { - BinaryMemcacheResponseStatus.SUCCESS -> { - log.debug(ctx) { - "Cache hit for key ${msg.key} on memcache" - } - inProgressGetRequest = InProgressGetRequest(msg.key, ctx) - } - - BinaryMemcacheResponseStatus.KEY_ENOENT -> { - log.debug(ctx) { - "Cache miss for key ${msg.key} on memcache" - } - sendMessageAndFlush(ctx, CacheValueNotFoundResponse()) - } - } - } - - override fun contentReceived(content: MemcacheContent) { - log.trace(ctx) { - "${if(content is LastMemcacheContent) "Last chunk" else "Chunk"} of ${content.content().readableBytes()} bytes received from memcache for key ${msg.key}" - } - inProgressGetRequest?.write(content.content()) - if (content is LastMemcacheContent) { - inProgressGetRequest?.commit() - } - } - - override fun exceptionCaught(ex: Throwable) { - inProgressGetRequest?.let { - inProgressGetRequest = null - it.rollback() - } - this@MemcacheCacheHandler.exceptionCaught(ctx, ex) - } - } - client.sendRequest(key.retainedDuplicate(), responseHandler).thenAccept { requestHandle -> - log.trace(ctx) { - "Sending GET request for key ${msg.key} to memcache" - } - val request = DefaultBinaryMemcacheRequest(key).apply { - setOpcode(BinaryMemcacheOpcodes.GET) - } - requestHandle.sendRequest(request) - requestHandle.sendContent(LastMemcacheContent.EMPTY_LAST_CONTENT) - } + inProgressRequest = InProgressGetRequest(msg.key, ctx) } private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) { @@ -327,89 +277,162 @@ class MemcacheCacheHandler( this@MemcacheCacheHandler.exceptionCaught(ctx, ex) } } - inProgressPutRequest = InProgressPutRequest(ctx.channel(), msg.metadata, key, requestController, ctx.alloc()) + inProgressRequest = InProgressPutRequest(ctx.channel(), msg.metadata, key, requestController, ctx.alloc()) } private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) { - inProgressPutRequest?.let { request -> - log.trace(ctx) { - "Received chunk of ${msg.content().readableBytes()} bytes for memcache" + val request = inProgressRequest + when(request) { + is InProgressPutRequest -> { + log.trace(ctx) { + "Received chunk of ${msg.content().readableBytes()} bytes for memcache" + } + request.write(msg.content()) + } + is InProgressGetRequest -> { + msg.release() } - request.write(msg.content()) } } private fun handleLastCacheContent(ctx: ChannelHandlerContext, msg: LastCacheContent) { - inProgressPutRequest?.let { request -> - inProgressPutRequest = null - log.trace(ctx) { - "Received last chunk of ${msg.content().readableBytes()} bytes for memcache" - } - request.write(msg.content()) - val key = request.digest.retainedDuplicate() - val (payloadSize, payloadSource) = request.commit() - val extras = ctx.alloc().buffer(8, 8) - extras.writeInt(0) - extras.writeInt(encodeExpiry(maxAge)) - val totalBodyLength = request.digest.readableBytes() + extras.readableBytes() + payloadSize - log.trace(ctx) { - "Trying to send SET request to memcache" - } - request.requestController.whenComplete { requestController, ex -> - if(ex == null) { - log.trace(ctx) { - "Sending SET request to memcache" - } - requestController.sendRequest(DefaultBinaryMemcacheRequest().apply { - setOpcode(BinaryMemcacheOpcodes.SET) - setKey(key) - setExtras(extras) - setTotalBodyLength(totalBodyLength) - }) - log.trace(ctx) { - "Sending request payload to memcache" - } - payloadSource.use { source -> - val bb = ByteBuffer.allocate(chunkSize) - while (true) { - val read = source.read(bb) - bb.limit() - if(read >= 0 && bb.position() < chunkSize && bb.hasRemaining()) { - continue + val request = inProgressRequest + when(request) { + is InProgressPutRequest -> { + inProgressRequest = null + log.trace(ctx) { + "Received last chunk of ${msg.content().readableBytes()} bytes for memcache" + } + request.write(msg.content()) + val key = request.digest.retainedDuplicate() + val (payloadSize, payloadSource) = request.commit() + val extras = ctx.alloc().buffer(8, 8) + extras.writeInt(0) + extras.writeInt(encodeExpiry(maxAge)) + val totalBodyLength = request.digest.readableBytes() + extras.readableBytes() + payloadSize + log.trace(ctx) { + "Trying to send SET request to memcache" + } + request.requestController.whenComplete { requestController, ex -> + if(ex == null) { + log.trace(ctx) { + "Sending SET request to memcache" + } + requestController.sendRequest(DefaultBinaryMemcacheRequest().apply { + setOpcode(BinaryMemcacheOpcodes.SET) + setKey(key) + setExtras(extras) + setTotalBodyLength(totalBodyLength) + }) + log.trace(ctx) { + "Sending request payload to memcache" + } + payloadSource.use { source -> + val bb = ByteBuffer.allocate(chunkSize) + while (true) { + val read = source.read(bb) + bb.limit() + if(read >= 0 && bb.position() < chunkSize && bb.hasRemaining()) { + continue + } + val chunk = ctx.alloc().buffer(chunkSize) + bb.flip() + chunk.writeBytes(bb) + bb.clear() + log.trace(ctx) { + "Sending ${chunk.readableBytes()} bytes chunk to memcache" + } + if(read < 0) { + requestController.sendContent(DefaultLastMemcacheContent(chunk)) + break + } else { + requestController.sendContent(DefaultMemcacheContent(chunk)) + } } - val chunk = ctx.alloc().buffer(chunkSize) - bb.flip() - chunk.writeBytes(bb) - bb.clear() - log.trace(ctx) { - "Sending ${chunk.readableBytes()} bytes chunk to memcache" + } + } else { + payloadSource.close() + } + } + } + is InProgressGetRequest -> { + log.debug(ctx) { + "Fetching ${request.key} from memcache" + } + val key = ctx.alloc().buffer().also { + it.writeBytes(processCacheKey(request.key, digestAlgorithm)) + } + val responseHandler = object : MemcacheResponseHandler { + override fun responseReceived(response: BinaryMemcacheResponse) { + val status = response.status() + when (status) { + BinaryMemcacheResponseStatus.SUCCESS -> { + log.debug(ctx) { + "Cache hit for key ${request.key} on memcache" + } + inProgressRequest = InProgressGetRequest(request.key, ctx) } - if(read < 0) { - requestController.sendContent(DefaultLastMemcacheContent(chunk)) - break - } else { - requestController.sendContent(DefaultMemcacheContent(chunk)) + + BinaryMemcacheResponseStatus.KEY_ENOENT -> { + log.debug(ctx) { + "Cache miss for key ${request.key} on memcache" + } + sendMessageAndFlush(ctx, CacheValueNotFoundResponse()) } } } - } else { - payloadSource.close() + + override fun contentReceived(content: MemcacheContent) { + log.trace(ctx) { + "${if(content is LastMemcacheContent) "Last chunk" else "Chunk"} of ${content.content().readableBytes()} bytes received from memcache for key ${request.key}" + } + (inProgressRequest as? InProgressGetRequest).let { inProgressGetRequest -> + inProgressGetRequest?.write(content.content()) + if (content is LastMemcacheContent) { + inProgressRequest = null + inProgressGetRequest?.commit() + } + } + } + + override fun exceptionCaught(ex: Throwable) { + (inProgressRequest as? InProgressGetRequest).let { inProgressGetRequest -> + inProgressGetRequest?.let { + inProgressRequest = null + it.rollback() + } + } + this@MemcacheCacheHandler.exceptionCaught(ctx, ex) + } + } + client.sendRequest(key.retainedDuplicate(), responseHandler).thenAccept { requestHandle -> + log.trace(ctx) { + "Sending GET request for key ${request.key} to memcache" + } + val request = DefaultBinaryMemcacheRequest(key).apply { + setOpcode(BinaryMemcacheOpcodes.GET) + } + requestHandle.sendRequest(request) + requestHandle.sendContent(LastMemcacheContent.EMPTY_LAST_CONTENT) } } } } override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { - inProgressGetRequest?.let { - inProgressGetRequest = null - it.rollback() - } - inProgressPutRequest?.let { - inProgressPutRequest = null - it.requestController.thenAccept { controller -> - controller.exceptionCaught(cause) + val request = inProgressRequest + when(request) { + is InProgressPutRequest -> { + inProgressRequest = null + request.requestController.thenAccept { controller -> + controller.exceptionCaught(cause) + } + request.rollback() + } + is InProgressGetRequest -> { + inProgressRequest = null + request.rollback() } - it.rollback() } super.exceptionCaught(ctx, cause) } diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/RemoteBuildCacheServer.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/RemoteBuildCacheServer.kt index 17301e0..3bbd44e 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/RemoteBuildCacheServer.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/RemoteBuildCacheServer.kt @@ -54,6 +54,7 @@ import net.woggioni.rbcs.server.auth.RoleAuthorizer import net.woggioni.rbcs.server.configuration.Parser import net.woggioni.rbcs.server.configuration.Serializer import net.woggioni.rbcs.server.exception.ExceptionHandler +import net.woggioni.rbcs.server.handler.BlackHoleRequestHandler import net.woggioni.rbcs.server.handler.MaxRequestSizeHandler import net.woggioni.rbcs.server.handler.ServerHandler import net.woggioni.rbcs.server.throttling.BucketManager @@ -361,6 +362,7 @@ class RemoteBuildCacheServer(private val cfg: Configuration) { } pipeline.addLast(eventExecutorGroup, ServerHandler.NAME, serverHandler) pipeline.addLast(ExceptionHandler.NAME, ExceptionHandler) + pipeline.addLast(BlackHoleRequestHandler.NAME, BlackHoleRequestHandler()) } override fun asyncClose() = cacheHandlerFactory.asyncClose() diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCacheHandler.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCacheHandler.kt index 09a5591..600b54b 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCacheHandler.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCacheHandler.kt @@ -2,7 +2,6 @@ package net.woggioni.rbcs.server.cache import io.netty.buffer.ByteBuf import io.netty.channel.ChannelHandlerContext -import io.netty.channel.SimpleChannelInboundHandler import io.netty.handler.codec.http.LastHttpContent import io.netty.handler.stream.ChunkedNioFile import net.woggioni.rbcs.api.CacheHandler @@ -29,10 +28,16 @@ class FileSystemCacheHandler( private val chunkSize: Int ) : CacheHandler() { + private interface InProgressRequest{ + + } + + private class InProgressGetRequest(val request : CacheGetRequest) : InProgressRequest + private inner class InProgressPutRequest( val key : String, private val fileSink : FileSystemCache.FileSink - ) { + ) : InProgressRequest { private val stream = Channels.newOutputStream(fileSink.channel).let { if (compressionEnabled) { @@ -56,7 +61,7 @@ class FileSystemCacheHandler( } } - private var inProgressPutRequest: InProgressPutRequest? = null + private var inProgressRequest: InProgressRequest? = null override fun channelRead0(ctx: ChannelHandlerContext, msg: CacheMessage) { when (msg) { @@ -69,55 +74,64 @@ class FileSystemCacheHandler( } private fun handleGetRequest(ctx: ChannelHandlerContext, msg: CacheGetRequest) { - val key = String(Base64.getUrlEncoder().encode(processCacheKey(msg.key, digestAlgorithm))) - cache.get(key)?.also { entryValue -> - sendMessageAndFlush(ctx, CacheValueFoundResponse(msg.key, entryValue.metadata)) - entryValue.channel.let { channel -> - if(compressionEnabled) { - InflaterInputStream(Channels.newInputStream(channel)).use { stream -> + inProgressRequest = InProgressGetRequest(msg) - outerLoop@ - while (true) { - val buf = ctx.alloc().heapBuffer(chunkSize) - while(buf.readableBytes() < chunkSize) { - val read = buf.writeBytes(stream, chunkSize) - if(read < 0) { - sendMessageAndFlush(ctx, LastCacheContent(buf)) - break@outerLoop - } - } - sendMessageAndFlush(ctx, CacheContent(buf)) - } - } - } else { - sendMessage(ctx, ChunkedNioFile(channel, entryValue.offset, entryValue.size - entryValue.offset, chunkSize)) - sendMessageAndFlush(ctx, LastHttpContent.EMPTY_LAST_CONTENT) - } - } - } ?: sendMessageAndFlush(ctx, CacheValueNotFoundResponse()) } private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) { val key = String(Base64.getUrlEncoder().encode(processCacheKey(msg.key, digestAlgorithm))) val sink = cache.put(key, msg.metadata) - inProgressPutRequest = InProgressPutRequest(msg.key, sink) + inProgressRequest = InProgressPutRequest(msg.key, sink) } private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) { - inProgressPutRequest!!.write(msg.content()) + val request = inProgressRequest + if(request is InProgressPutRequest) { + request.write(msg.content()) + } } private fun handleLastCacheContent(ctx: ChannelHandlerContext, msg: LastCacheContent) { - inProgressPutRequest?.let { request -> - inProgressPutRequest = null - request.write(msg.content()) - request.commit() - sendMessageAndFlush(ctx, CachePutResponse(request.key)) + when(val request = inProgressRequest) { + is InProgressPutRequest -> { + inProgressRequest = null + request.write(msg.content()) + request.commit() + sendMessageAndFlush(ctx, CachePutResponse(request.key)) + } + is InProgressGetRequest -> { + val key = String(Base64.getUrlEncoder().encode(processCacheKey(request.request.key, digestAlgorithm))) + cache.get(key)?.also { entryValue -> + sendMessageAndFlush(ctx, CacheValueFoundResponse(request.request.key, entryValue.metadata)) + entryValue.channel.let { channel -> + if(compressionEnabled) { + InflaterInputStream(Channels.newInputStream(channel)).use { stream -> + + outerLoop@ + while (true) { + val buf = ctx.alloc().heapBuffer(chunkSize) + while(buf.readableBytes() < chunkSize) { + val read = buf.writeBytes(stream, chunkSize) + if(read < 0) { + sendMessageAndFlush(ctx, LastCacheContent(buf)) + break@outerLoop + } + } + sendMessageAndFlush(ctx, CacheContent(buf)) + } + } + } else { + sendMessage(ctx, ChunkedNioFile(channel, entryValue.offset, entryValue.size - entryValue.offset, chunkSize)) + sendMessageAndFlush(ctx, LastHttpContent.EMPTY_LAST_CONTENT) + } + } + } ?: sendMessageAndFlush(ctx, CacheValueNotFoundResponse()) + } } } override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { - inProgressPutRequest?.rollback() + (inProgressRequest as? InProgressPutRequest)?.rollback() super.exceptionCaught(ctx, cause) } } \ No newline at end of file diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCache.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCache.kt index 416015c..db1f446 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCache.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCache.kt @@ -75,6 +75,10 @@ class InMemoryCache( cond.await(interval.toMillis(), TimeUnit.MILLISECONDS) } } + map.forEach { + it.value.content.release() + } + map.clear() } complete(null) } catch (ex: Throwable) { diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheHandler.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheHandler.kt index 7cbdb7a..f8dcb94 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheHandler.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheHandler.kt @@ -26,7 +26,15 @@ class InMemoryCacheHandler( private val compressionLevel: Int ) : CacheHandler() { - private interface InProgressPutRequest : AutoCloseable { + private interface InProgressRequest : AutoCloseable { + } + + private class InProgressGetRequest(val request : CacheGetRequest) : InProgressRequest { + override fun close() { + } + } + + private interface InProgressPutRequest : InProgressRequest { val request: CachePutRequest val buf: ByteBuf @@ -74,7 +82,7 @@ class InMemoryCacheHandler( } } - private var inProgressPutRequest: InProgressPutRequest? = null + private var inProgressRequest: InProgressRequest? = null override fun channelRead0(ctx: ChannelHandlerContext, msg: CacheMessage) { when (msg) { @@ -87,24 +95,11 @@ class InMemoryCacheHandler( } private fun handleGetRequest(ctx: ChannelHandlerContext, msg: CacheGetRequest) { - cache.get(processCacheKey(msg.key, digestAlgorithm))?.let { value -> - sendMessageAndFlush(ctx, CacheValueFoundResponse(msg.key, value.metadata)) - if (compressionEnabled) { - val buf = ctx.alloc().heapBuffer() - InflaterOutputStream(ByteBufOutputStream(buf)).use { - value.content.readBytes(it, value.content.readableBytes()) - value.content.release() - buf.retain() - } - sendMessage(ctx, LastCacheContent(buf)) - } else { - sendMessage(ctx, LastCacheContent(value.content)) - } - } ?: sendMessage(ctx, CacheValueNotFoundResponse()) + inProgressRequest = InProgressGetRequest(msg) } private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) { - inProgressPutRequest = if(compressionEnabled) { + inProgressRequest = if(compressionEnabled) { InProgressCompressedPutRequest(ctx, msg) } else { InProgressPlainPutRequest(ctx, msg) @@ -112,27 +107,46 @@ class InMemoryCacheHandler( } private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) { - inProgressPutRequest?.append(msg.content()) + val req = inProgressRequest + if(req is InProgressPutRequest) { + req.append(msg.content()) + } } private fun handleLastCacheContent(ctx: ChannelHandlerContext, msg: LastCacheContent) { handleCacheContent(ctx, msg) - inProgressPutRequest?.let { inProgressRequest -> - inProgressPutRequest = null - val buf = inProgressRequest.buf - buf.retain() - inProgressRequest.close() - val cacheKey = processCacheKey(inProgressRequest.request.key, digestAlgorithm) - cache.put(cacheKey, CacheEntry(inProgressRequest.request.metadata, buf)) - sendMessageAndFlush(ctx, CachePutResponse(inProgressRequest.request.key)) + when(val req = inProgressRequest) { + is InProgressGetRequest -> { + cache.get(processCacheKey(req.request.key, digestAlgorithm))?.let { value -> + sendMessageAndFlush(ctx, CacheValueFoundResponse(req.request.key, value.metadata)) + if (compressionEnabled) { + val buf = ctx.alloc().heapBuffer() + InflaterOutputStream(ByteBufOutputStream(buf)).use { + value.content.readBytes(it, value.content.readableBytes()) + value.content.release() + buf.retain() + } + sendMessage(ctx, LastCacheContent(buf)) + } else { + sendMessage(ctx, LastCacheContent(value.content)) + } + } ?: sendMessage(ctx, CacheValueNotFoundResponse()) + } + is InProgressPutRequest -> { + this.inProgressRequest = null + val buf = req.buf + buf.retain() + req.close() + val cacheKey = processCacheKey(req.request.key, digestAlgorithm) + cache.put(cacheKey, CacheEntry(req.request.metadata, buf)) + sendMessageAndFlush(ctx, CachePutResponse(req.request.key)) + } } } override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { - inProgressPutRequest?.let { req -> - req.buf.release() - inProgressPutRequest = null - } + inProgressRequest?.close() + inProgressRequest = null super.exceptionCaught(ctx, cause) } } \ No newline at end of file diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/handler/BlackHoleRequestHandler.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/handler/BlackHoleRequestHandler.kt new file mode 100644 index 0000000..d912c40 --- /dev/null +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/handler/BlackHoleRequestHandler.kt @@ -0,0 +1,13 @@ +package net.woggioni.rbcs.server.handler + +import io.netty.channel.ChannelHandlerContext +import io.netty.channel.SimpleChannelInboundHandler +import io.netty.handler.codec.http.HttpContent + +class BlackHoleRequestHandler : SimpleChannelInboundHandler() { + companion object { + val NAME = BlackHoleRequestHandler::class.java.name + } + override fun channelRead0(ctx: ChannelHandlerContext, msg: HttpContent) { + } +} \ No newline at end of file diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/throttling/ThrottlingHandler.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/throttling/ThrottlingHandler.kt index 7fcddb3..60f5413 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/throttling/ThrottlingHandler.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/throttling/ThrottlingHandler.kt @@ -94,6 +94,9 @@ class ThrottlingHandler(private val bucketManager : BucketManager, handleBuckets(buckets, ctx, msg, false) }, waitDuration.toMillis(), TimeUnit.MILLISECONDS) } else { + queuedContent?.let { qc -> + qc.forEach { it.release() } + } this.queuedContent = null sendThrottledResponse(ctx, waitDuration) } diff --git a/rbcs-server/src/test/kotlin/net/woggioni/rbcs/server/test/BasicAuthServerTest.kt b/rbcs-server/src/test/kotlin/net/woggioni/rbcs/server/test/BasicAuthServerTest.kt index b07fbe5..836ccb7 100644 --- a/rbcs-server/src/test/kotlin/net/woggioni/rbcs/server/test/BasicAuthServerTest.kt +++ b/rbcs-server/src/test/kotlin/net/woggioni/rbcs/server/test/BasicAuthServerTest.kt @@ -154,7 +154,7 @@ class BasicAuthServerTest : AbstractBasicAuthServerTest() { } @Test - @Order(6) + @Order(8) fun getAsAThrottledUser() { val client: HttpClient = HttpClient.newHttpClient() @@ -172,7 +172,7 @@ class BasicAuthServerTest : AbstractBasicAuthServerTest() { } @Test - @Order(7) + @Order(9) fun getAsAThrottledUser2() { val client: HttpClient = HttpClient.newHttpClient()