diff --git a/rbcs-api/build.gradle b/rbcs-api/build.gradle index f70de23..c1d6c07 100644 --- a/rbcs-api/build.gradle +++ b/rbcs-api/build.gradle @@ -5,9 +5,12 @@ plugins { } dependencies { + implementation catalog.slf4j.api + implementation project(':rbcs-common') api catalog.netty.common api catalog.netty.buffer api catalog.netty.handler + api catalog.netty.codec.http } publishing { diff --git a/rbcs-api/src/main/java/module-info.java b/rbcs-api/src/main/java/module-info.java index e6373b5..e282812 100644 --- a/rbcs-api/src/main/java/module-info.java +++ b/rbcs-api/src/main/java/module-info.java @@ -1,10 +1,15 @@ module net.woggioni.rbcs.api { requires static lombok; - requires java.xml; - requires io.netty.buffer; requires io.netty.handler; - requires io.netty.transport; requires io.netty.common; + requires net.woggioni.rbcs.common; + requires io.netty.transport; + requires io.netty.codec.http; + requires io.netty.buffer; + requires org.slf4j; + requires java.xml; + + exports net.woggioni.rbcs.api; exports net.woggioni.rbcs.api.exception; exports net.woggioni.rbcs.api.message; diff --git a/rbcs-api/src/main/java/net/woggioni/rbcs/api/CacheHandler.java b/rbcs-api/src/main/java/net/woggioni/rbcs/api/CacheHandler.java new file mode 100644 index 0000000..7a51dae --- /dev/null +++ b/rbcs-api/src/main/java/net/woggioni/rbcs/api/CacheHandler.java @@ -0,0 +1,57 @@ +package net.woggioni.rbcs.api; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.util.ReferenceCounted; +import lombok.extern.slf4j.Slf4j; +import net.woggioni.rbcs.api.message.CacheMessage; + +@Slf4j +public abstract class CacheHandler extends ChannelInboundHandlerAdapter { + private boolean requestFinished = false; + + abstract protected void channelRead0(ChannelHandlerContext ctx, CacheMessage msg); + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + if(!requestFinished && msg instanceof CacheMessage) { + if(msg instanceof CacheMessage.LastCacheContent || msg instanceof CacheMessage.CacheGetRequest) requestFinished = true; + try { + channelRead0(ctx, (CacheMessage) msg); + } finally { + if(msg instanceof ReferenceCounted rc) rc.release(); + } + } else { + ctx.fireChannelRead(msg); + } + } + + protected void sendMessageAndFlush(ChannelHandlerContext ctx, Object msg) { + sendMessage(ctx, msg, true); + } + + protected void sendMessage(ChannelHandlerContext ctx, Object msg) { + sendMessage(ctx, msg, false); + } + + private void sendMessage(ChannelHandlerContext ctx, Object msg, boolean flush) { + ctx.write(msg); + if( + msg instanceof CacheMessage.LastCacheContent || + msg instanceof CacheMessage.CachePutResponse || + msg instanceof CacheMessage.CacheValueNotFoundResponse || + msg instanceof LastHttpContent + ) { + ctx.flush(); + ctx.pipeline().remove(this); + } else if(flush) { + ctx.flush(); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + super.exceptionCaught(ctx, cause); + } +} diff --git a/rbcs-api/src/main/java/net/woggioni/rbcs/api/CacheHandlerFactory.java b/rbcs-api/src/main/java/net/woggioni/rbcs/api/CacheHandlerFactory.java index 1a86b4b..4c3eb8d 100644 --- a/rbcs-api/src/main/java/net/woggioni/rbcs/api/CacheHandlerFactory.java +++ b/rbcs-api/src/main/java/net/woggioni/rbcs/api/CacheHandlerFactory.java @@ -1,13 +1,12 @@ package net.woggioni.rbcs.api; import io.netty.channel.ChannelFactory; -import io.netty.channel.ChannelHandler; import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.SocketChannel; public interface CacheHandlerFactory extends AsyncCloseable { - ChannelHandler newHandler( + CacheHandler newHandler( Configuration configuration, EventLoopGroup eventLoopGroup, ChannelFactory socketChannelFactory, diff --git a/rbcs-client/src/main/kotlin/net/woggioni/rbcs/client/Configuration.kt b/rbcs-client/src/main/kotlin/net/woggioni/rbcs/client/Configuration.kt index d471346..019f7b3 100644 --- a/rbcs-client/src/main/kotlin/net/woggioni/rbcs/client/Configuration.kt +++ b/rbcs-client/src/main/kotlin/net/woggioni/rbcs/client/Configuration.kt @@ -38,11 +38,12 @@ data class Configuration( val readIdleTimeout: Duration, val writeIdleTimeout: Duration, val idleTimeout: Duration, + val requestPipelining : Boolean, ) data class Profile( val serverURI: URI, - val connection: Connection?, + val connection: Connection, val authentication: Authentication?, val connectionTimeout: Duration?, val maxConnections: Int, diff --git a/rbcs-client/src/main/kotlin/net/woggioni/rbcs/client/RemoteBuildCacheClient.kt b/rbcs-client/src/main/kotlin/net/woggioni/rbcs/client/RemoteBuildCacheClient.kt index b029dd3..dc3cc3e 100644 --- a/rbcs-client/src/main/kotlin/net/woggioni/rbcs/client/RemoteBuildCacheClient.kt +++ b/rbcs-client/src/main/kotlin/net/woggioni/rbcs/client/RemoteBuildCacheClient.kt @@ -318,6 +318,9 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC ) { pipeline.remove(this) responseFuture.complete(response) + if(!profile.connection.requestPipelining) { + pool.release(channel) + } } override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { @@ -332,6 +335,9 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC override fun channelInactive(ctx: ChannelHandlerContext) { responseFuture.completeExceptionally(IOException("The remote server closed the connection")) + if(!profile.connection.requestPipelining) { + pool.release(channel) + } super.channelInactive(ctx) } @@ -352,6 +358,9 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC if (this === pipeline.last()) { ctx.close() } + if(!profile.connection.requestPipelining) { + pool.release(channel) + } } else { super.userEventTriggered(ctx, evt) } @@ -401,8 +410,9 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC val ex = it.cause() log.warn(ex.message, ex) } - - pool.release(channel) + if(profile.connection.requestPipelining) { + pool.release(channel) + } } } else { responseFuture.completeExceptionally(channelFuture.cause()) diff --git a/rbcs-client/src/main/kotlin/net/woggioni/rbcs/client/impl/Parser.kt b/rbcs-client/src/main/kotlin/net/woggioni/rbcs/client/impl/Parser.kt index 81a4661..1d53129 100644 --- a/rbcs-client/src/main/kotlin/net/woggioni/rbcs/client/impl/Parser.kt +++ b/rbcs-client/src/main/kotlin/net/woggioni/rbcs/client/impl/Parser.kt @@ -30,7 +30,12 @@ object Parser { ?: throw ConfigurationException("base-url attribute is required") var authentication: Configuration.Authentication? = null var retryPolicy: Configuration.RetryPolicy? = null - var connection : Configuration.Connection? = null + var connection : Configuration.Connection = Configuration.Connection( + Duration.ofSeconds(60), + Duration.ofSeconds(60), + Duration.ofSeconds(30), + false + ) var trustStore : Configuration.TrustStore? = null for (gchild in child.asIterable()) { when (gchild.localName) { @@ -97,10 +102,13 @@ object Parser { ?.let(Duration::parse) ?: Duration.of(60, ChronoUnit.SECONDS) val writeIdleTimeout = gchild.renderAttribute("write-idle-timeout") ?.let(Duration::parse) ?: Duration.of(60, ChronoUnit.SECONDS) + val requestPipelining = gchild.renderAttribute("request-pipelining") + ?.let(String::toBoolean) ?: false connection = Configuration.Connection( readIdleTimeout, writeIdleTimeout, idleTimeout, + requestPipelining ) } diff --git a/rbcs-client/src/main/resources/net/woggioni/rbcs/client/schema/rbcs-client.xsd b/rbcs-client/src/main/resources/net/woggioni/rbcs/client/schema/rbcs-client.xsd index f7e98ab..807cdea 100644 --- a/rbcs-client/src/main/resources/net/woggioni/rbcs/client/schema/rbcs-client.xsd +++ b/rbcs-client/src/main/resources/net/woggioni/rbcs/client/schema/rbcs-client.xsd @@ -123,6 +123,13 @@ + + + + Enables HTTP/1.1 request pipelining + + + diff --git a/rbcs-common/build.gradle b/rbcs-common/build.gradle index 0622894..97cef6a 100644 --- a/rbcs-common/build.gradle +++ b/rbcs-common/build.gradle @@ -6,7 +6,7 @@ plugins { } dependencies { - implementation project(':rbcs-api') + implementation catalog.netty.transport implementation catalog.slf4j.api implementation catalog.jwo implementation catalog.netty.buffer diff --git a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheConfiguration.kt b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheConfiguration.kt index 5c4896c..f5842da 100644 --- a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheConfiguration.kt +++ b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheConfiguration.kt @@ -6,6 +6,7 @@ import io.netty.channel.EventLoopGroup import io.netty.channel.pool.FixedChannelPool import io.netty.channel.socket.DatagramChannel import io.netty.channel.socket.SocketChannel +import net.woggioni.rbcs.api.CacheHandler import net.woggioni.rbcs.api.CacheHandlerFactory import net.woggioni.rbcs.api.Configuration import net.woggioni.rbcs.common.HostAndPort @@ -51,7 +52,7 @@ data class MemcacheCacheConfiguration( eventLoop: EventLoopGroup, socketChannelFactory: ChannelFactory, datagramChannelFactory: ChannelFactory, - ): ChannelHandler { + ): CacheHandler { return MemcacheCacheHandler( MemcacheClient( this@MemcacheCacheConfiguration.servers, diff --git a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheHandler.kt b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheHandler.kt index 1451010..0dd4109 100644 --- a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheHandler.kt +++ b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheHandler.kt @@ -4,7 +4,6 @@ import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import io.netty.buffer.CompositeByteBuf import io.netty.channel.ChannelHandlerContext -import io.netty.channel.SimpleChannelInboundHandler import io.netty.handler.codec.memcache.DefaultLastMemcacheContent import io.netty.handler.codec.memcache.DefaultMemcacheContent import io.netty.handler.codec.memcache.LastMemcacheContent @@ -13,6 +12,7 @@ import io.netty.handler.codec.memcache.binary.BinaryMemcacheOpcodes import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus import io.netty.handler.codec.memcache.binary.DefaultBinaryMemcacheRequest +import net.woggioni.rbcs.api.CacheHandler import net.woggioni.rbcs.api.CacheValueMetadata import net.woggioni.rbcs.api.exception.ContentTooLargeException import net.woggioni.rbcs.api.message.CacheMessage @@ -58,7 +58,7 @@ class MemcacheCacheHandler( private val compressionLevel: Int, private val chunkSize: Int, private val maxAge: Duration -) : SimpleChannelInboundHandler() { +) : CacheHandler() { companion object { private val log = createLogger() @@ -98,7 +98,10 @@ class MemcacheCacheHandler( acc.retain() it.readObject() as CacheValueMetadata } - ctx.writeAndFlush(CacheValueFoundResponse(key, metadata)) + log.trace(ctx) { + "Sending response from cache" + } + sendMessageAndFlush(ctx, CacheValueFoundResponse(key, metadata)) responseSent = true acc.readerIndex(Int.SIZE_BYTES + mSize) } @@ -114,16 +117,16 @@ class MemcacheCacheHandler( val toSend = extractChunk(chunk, ctx.alloc()) val msg = if(last) { log.trace(ctx) { - "Sending last chunk to client on channel ${ctx.channel().id().asShortText()}" + "Sending last chunk to client on channel" } LastCacheContent(toSend) } else { log.trace(ctx) { - "Sending chunk to client on channel ${ctx.channel().id().asShortText()}" + "Sending chunk to client" } CacheContent(toSend) } - ctx.writeAndFlush(msg) + sendMessageAndFlush(ctx, msg) } fun commit() { @@ -259,7 +262,7 @@ class MemcacheCacheHandler( log.debug(ctx) { "Cache miss for key ${msg.key} on memcache" } - ctx.writeAndFlush(CacheValueNotFoundResponse()) + sendMessageAndFlush(ctx, CacheValueNotFoundResponse()) } } } @@ -290,6 +293,7 @@ class MemcacheCacheHandler( setOpcode(BinaryMemcacheOpcodes.GET) } requestHandle.sendRequest(request) + requestHandle.sendContent(LastMemcacheContent.EMPTY_LAST_CONTENT) } } @@ -305,7 +309,7 @@ class MemcacheCacheHandler( log.debug(ctx) { "Inserted key ${msg.key} into memcache" } - ctx.writeAndFlush(CachePutResponse(msg.key)) + sendMessageAndFlush(ctx, CachePutResponse(msg.key)) } else -> this@MemcacheCacheHandler.exceptionCaught(ctx, MemcacheException(status)) } @@ -348,6 +352,9 @@ class MemcacheCacheHandler( extras.writeInt(0) extras.writeInt(encodeExpiry(maxAge)) val totalBodyLength = request.digest.readableBytes() + extras.readableBytes() + payloadSize + log.trace(ctx) { + "Trying to send SET request to memcache" + } request.requestController.whenComplete { requestController, ex -> if(ex == null) { log.trace(ctx) { diff --git a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/client/MemcacheClient.kt b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/client/MemcacheClient.kt index 2708f9d..dd31b21 100644 --- a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/client/MemcacheClient.kt +++ b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/client/MemcacheClient.kt @@ -12,7 +12,6 @@ import io.netty.channel.ChannelPipeline import io.netty.channel.EventLoopGroup import io.netty.channel.SimpleChannelInboundHandler import io.netty.channel.pool.AbstractChannelPoolHandler -import io.netty.channel.pool.ChannelPool import io.netty.channel.pool.FixedChannelPool import io.netty.channel.socket.SocketChannel import io.netty.handler.codec.memcache.LastMemcacheContent @@ -24,7 +23,7 @@ import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse import io.netty.util.concurrent.GenericFutureListener import net.woggioni.rbcs.common.HostAndPort import net.woggioni.rbcs.common.createLogger -import net.woggioni.rbcs.common.warn +import net.woggioni.rbcs.common.trace import net.woggioni.rbcs.server.memcache.MemcacheCacheConfiguration import net.woggioni.rbcs.server.memcache.MemcacheCacheHandler import java.io.IOException @@ -94,18 +93,6 @@ class MemcacheClient( pool.acquire().addListener(object : GenericFutureListener> { override fun operationComplete(channelFuture: NettyFuture) { if (channelFuture.isSuccess) { - - var requestSent = false - var requestBodySent = false - var requestFinished = false - var responseReceived = false - var responseBodyReceived = false - var responseFinished = false - var requestBodySize = 0 - var requestBodyBytesSent = 0 - - - val channel = channelFuture.now var connectionClosedByTheRemoteServer = true val closeCallback = { @@ -113,14 +100,7 @@ class MemcacheClient( val ex = IOException("The memcache server closed the connection") val completed = response.completeExceptionally(ex) if(!completed) responseHandler.exceptionCaught(ex) - log.warn { - "RequestSent: $requestSent, RequestBodySent: $requestBodySent, " + - "RequestFinished: $requestFinished, ResponseReceived: $responseReceived, " + - "ResponseBodyReceived: $responseBodyReceived, ResponseFinished: $responseFinished, " + - "RequestBodySize: $requestBodySize, RequestBodyBytesSent: $requestBodyBytesSent" - } } - pool.release(channel) } val closeListener = ChannelFutureListener { closeCallback() @@ -140,18 +120,14 @@ class MemcacheClient( when (msg) { is BinaryMemcacheResponse -> { responseHandler.responseReceived(msg) - responseReceived = true } is LastMemcacheContent -> { - responseFinished = true responseHandler.contentReceived(msg) pipeline.remove(this) - pool.release(channel) } is MemcacheContent -> { - responseBodyReceived = true responseHandler.contentReceived(msg) } } @@ -165,35 +141,43 @@ class MemcacheClient( override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { connectionClosedByTheRemoteServer = false ctx.close() - pool.release(channel) responseHandler.exceptionCaught(cause) } } - channel.pipeline() - .addLast("client-handler", handler) + channel.pipeline().addLast(handler) response.complete(object : MemcacheRequestController { + private var channelReleased = false override fun sendRequest(request: BinaryMemcacheRequest) { - requestBodySize = request.totalBodyLength() - request.keyLength() - request.extrasLength() channel.writeAndFlush(request) - requestSent = true } override fun sendContent(content: MemcacheContent) { - val size = content.content().readableBytes() channel.writeAndFlush(content).addListener { - requestBodyBytesSent += size - requestBodySent = true if(content is LastMemcacheContent) { - requestFinished = true + if(!channelReleased) { + pool.release(channel) + channelReleased = true + log.trace(channel) { + "Channel released" + } + } } } } override fun exceptionCaught(ex: Throwable) { + log.warn(ex.message, ex) connectionClosedByTheRemoteServer = false channel.close() + if(!channelReleased) { + pool.release(channel) + channelReleased = true + log.trace(channel) { + "Channel released" + } + } } }) } else { diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/RemoteBuildCacheServer.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/RemoteBuildCacheServer.kt index c8502fb..17301e0 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/RemoteBuildCacheServer.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/RemoteBuildCacheServer.kt @@ -298,6 +298,7 @@ class RemoteBuildCacheServer(private val cfg: Configuration) { "Closed connection ${ch.id().asShortText()} with ${ch.remoteAddress()}" } } + ch.config().setAutoRead(false) val pipeline = ch.pipeline() cfg.connection.also { conn -> val readIdleTimeout = conn.readIdleTimeout.toMillis() diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCacheHandler.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCacheHandler.kt index 9451a42..09a5591 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCacheHandler.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCacheHandler.kt @@ -5,6 +5,7 @@ import io.netty.channel.ChannelHandlerContext import io.netty.channel.SimpleChannelInboundHandler import io.netty.handler.codec.http.LastHttpContent import io.netty.handler.stream.ChunkedNioFile +import net.woggioni.rbcs.api.CacheHandler import net.woggioni.rbcs.api.message.CacheMessage import net.woggioni.rbcs.api.message.CacheMessage.CacheContent import net.woggioni.rbcs.api.message.CacheMessage.CacheGetRequest @@ -26,7 +27,7 @@ class FileSystemCacheHandler( private val compressionEnabled: Boolean, private val compressionLevel: Int, private val chunkSize: Int -) : SimpleChannelInboundHandler() { +) : CacheHandler() { private inner class InProgressPutRequest( val key : String, @@ -70,7 +71,7 @@ class FileSystemCacheHandler( private fun handleGetRequest(ctx: ChannelHandlerContext, msg: CacheGetRequest) { val key = String(Base64.getUrlEncoder().encode(processCacheKey(msg.key, digestAlgorithm))) cache.get(key)?.also { entryValue -> - ctx.writeAndFlush(CacheValueFoundResponse(msg.key, entryValue.metadata)) + sendMessageAndFlush(ctx, CacheValueFoundResponse(msg.key, entryValue.metadata)) entryValue.channel.let { channel -> if(compressionEnabled) { InflaterInputStream(Channels.newInputStream(channel)).use { stream -> @@ -81,19 +82,19 @@ class FileSystemCacheHandler( while(buf.readableBytes() < chunkSize) { val read = buf.writeBytes(stream, chunkSize) if(read < 0) { - ctx.writeAndFlush(LastCacheContent(buf)) + sendMessageAndFlush(ctx, LastCacheContent(buf)) break@outerLoop } } - ctx.writeAndFlush(CacheContent(buf)) + sendMessageAndFlush(ctx, CacheContent(buf)) } } } else { - ctx.writeAndFlush(ChunkedNioFile(channel, entryValue.offset, entryValue.size - entryValue.offset, chunkSize)) - ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT) + sendMessage(ctx, ChunkedNioFile(channel, entryValue.offset, entryValue.size - entryValue.offset, chunkSize)) + sendMessageAndFlush(ctx, LastHttpContent.EMPTY_LAST_CONTENT) } } - } ?: ctx.writeAndFlush(CacheValueNotFoundResponse()) + } ?: sendMessageAndFlush(ctx, CacheValueNotFoundResponse()) } private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) { @@ -111,7 +112,7 @@ class FileSystemCacheHandler( inProgressPutRequest = null request.write(msg.content()) request.commit() - ctx.writeAndFlush(CachePutResponse(request.key)) + sendMessageAndFlush(ctx, CachePutResponse(request.key)) } } diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheHandler.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheHandler.kt index 3d8d890..7cbdb7a 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheHandler.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheHandler.kt @@ -3,6 +3,7 @@ package net.woggioni.rbcs.server.cache import io.netty.buffer.ByteBuf import io.netty.channel.ChannelHandlerContext import io.netty.channel.SimpleChannelInboundHandler +import net.woggioni.rbcs.api.CacheHandler import net.woggioni.rbcs.api.message.CacheMessage import net.woggioni.rbcs.api.message.CacheMessage.CacheContent import net.woggioni.rbcs.api.message.CacheMessage.CacheGetRequest @@ -13,6 +14,7 @@ import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent import net.woggioni.rbcs.common.ByteBufOutputStream import net.woggioni.rbcs.common.RBCS.processCacheKey +import net.woggioni.rbcs.common.trace import java.util.zip.Deflater import java.util.zip.DeflaterOutputStream import java.util.zip.InflaterOutputStream @@ -22,7 +24,7 @@ class InMemoryCacheHandler( private val digestAlgorithm: String?, private val compressionEnabled: Boolean, private val compressionLevel: Int -) : SimpleChannelInboundHandler() { +) : CacheHandler() { private interface InProgressPutRequest : AutoCloseable { val request: CachePutRequest @@ -86,7 +88,7 @@ class InMemoryCacheHandler( private fun handleGetRequest(ctx: ChannelHandlerContext, msg: CacheGetRequest) { cache.get(processCacheKey(msg.key, digestAlgorithm))?.let { value -> - ctx.writeAndFlush(CacheValueFoundResponse(msg.key, value.metadata)) + sendMessageAndFlush(ctx, CacheValueFoundResponse(msg.key, value.metadata)) if (compressionEnabled) { val buf = ctx.alloc().heapBuffer() InflaterOutputStream(ByteBufOutputStream(buf)).use { @@ -94,11 +96,11 @@ class InMemoryCacheHandler( value.content.release() buf.retain() } - ctx.writeAndFlush(LastCacheContent(buf)) + sendMessage(ctx, LastCacheContent(buf)) } else { - ctx.writeAndFlush(LastCacheContent(value.content)) + sendMessage(ctx, LastCacheContent(value.content)) } - } ?: ctx.writeAndFlush(CacheValueNotFoundResponse()) + } ?: sendMessage(ctx, CacheValueNotFoundResponse()) } private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) { @@ -122,7 +124,7 @@ class InMemoryCacheHandler( inProgressRequest.close() val cacheKey = processCacheKey(inProgressRequest.request.key, digestAlgorithm) cache.put(cacheKey, CacheEntry(inProgressRequest.request.metadata, buf)) - ctx.writeAndFlush(CachePutResponse(inProgressRequest.request.key)) + sendMessageAndFlush(ctx, CachePutResponse(inProgressRequest.request.key)) } } diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/event/RequestCompletedEvent.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/event/RequestCompletedEvent.kt deleted file mode 100644 index c2f08bb..0000000 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/event/RequestCompletedEvent.kt +++ /dev/null @@ -1,4 +0,0 @@ -package net.woggioni.rbcs.server.event - -class RequestCompletedEvent { -} \ No newline at end of file diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/handler/CacheContentHandler.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/handler/CacheContentHandler.kt deleted file mode 100644 index dfde7c2..0000000 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/handler/CacheContentHandler.kt +++ /dev/null @@ -1,79 +0,0 @@ -package net.woggioni.rbcs.server.handler - -import io.netty.channel.ChannelHandler -import io.netty.channel.ChannelHandler.Sharable -import io.netty.channel.ChannelHandlerContext -import io.netty.channel.ChannelOutboundHandler -import io.netty.channel.ChannelPromise -import io.netty.channel.SimpleChannelInboundHandler -import io.netty.handler.codec.http.HttpContent -import io.netty.handler.codec.http.LastHttpContent -import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse -import net.woggioni.rbcs.api.message.CacheMessage.CacheContent -import net.woggioni.rbcs.api.message.CacheMessage.CachePutResponse -import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent -import java.net.SocketAddress - -class CacheContentHandler(private val pairedHandler : ChannelHandler) : SimpleChannelInboundHandler(), ChannelOutboundHandler { - private var requestFinished = false - - override fun channelRead0(ctx: ChannelHandlerContext, msg: HttpContent) { - if(requestFinished) { - ctx.fireChannelRead(msg.retain()) - } else { - when (msg) { - is LastHttpContent -> { - ctx.fireChannelRead(LastCacheContent(msg.content().retain())) - requestFinished = true - } - else -> ctx.fireChannelRead(CacheContent(msg.content().retain())) - } - } - } - - override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { - super.exceptionCaught(ctx, cause) - } - - - override fun bind(ctx: ChannelHandlerContext, localAddress: SocketAddress, promise: ChannelPromise) { - ctx.bind(localAddress, promise) - } - - override fun connect( - ctx: ChannelHandlerContext, - remoteAddress: SocketAddress, - localAddress: SocketAddress, - promise: ChannelPromise - ) { - ctx.connect(remoteAddress, localAddress, promise) - } - - override fun disconnect(ctx: ChannelHandlerContext, promise: ChannelPromise) { - ctx.disconnect(promise) - } - - override fun close(ctx: ChannelHandlerContext, promise: ChannelPromise) { - ctx.close(promise) - } - - override fun deregister(ctx: ChannelHandlerContext, promise: ChannelPromise) { - ctx.deregister(promise) - } - - override fun read(ctx: ChannelHandlerContext) { - ctx.read() - } - - override fun flush(ctx: ChannelHandlerContext) { - ctx.flush() - } - - override fun write(ctx: ChannelHandlerContext, msg: Any, promise: ChannelPromise) { - ctx.write(msg, promise) - if(msg is LastCacheContent || msg is CachePutResponse || msg is CacheValueNotFoundResponse || msg is LastHttpContent) { - ctx.pipeline().remove(pairedHandler) - ctx.pipeline().remove(this) - } - } -} \ No newline at end of file diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/handler/ResponseCapHandler.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/handler/ResponseCapHandler.kt deleted file mode 100644 index ed11a4f..0000000 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/handler/ResponseCapHandler.kt +++ /dev/null @@ -1,56 +0,0 @@ -package net.woggioni.rbcs.server.handler - -import io.netty.channel.ChannelHandlerContext -import io.netty.channel.ChannelInboundHandlerAdapter -import io.netty.channel.ChannelOutboundHandler -import io.netty.channel.ChannelPromise -import net.woggioni.rbcs.server.event.RequestCompletedEvent -import java.net.SocketAddress - -class ResponseCapHandler : ChannelInboundHandlerAdapter(), ChannelOutboundHandler { - val bufferedMessages = mutableListOf() - - override fun bind(ctx: ChannelHandlerContext, localAddress: SocketAddress, promise: ChannelPromise) { - ctx.bind(localAddress, promise) - } - - override fun connect( - ctx: ChannelHandlerContext, - remoteAddress: SocketAddress, - localAddress: SocketAddress, - promise: ChannelPromise - ) { - ctx.connect(remoteAddress, localAddress, promise) - } - - override fun disconnect(ctx: ChannelHandlerContext, promise: ChannelPromise) { - ctx.disconnect(promise) - } - - override fun close(ctx: ChannelHandlerContext, promise: ChannelPromise) { - ctx.close(promise) - } - - override fun deregister(ctx: ChannelHandlerContext, promise: ChannelPromise) { - ctx.deregister(promise) - } - - override fun read(ctx: ChannelHandlerContext) { - ctx.read() - } - - override fun write(ctx: ChannelHandlerContext, msg: Any, promise: ChannelPromise) { - bufferedMessages.add(msg) - } - - override fun flush(ctx: ChannelHandlerContext) { - } - - override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) { - if(evt is RequestCompletedEvent) { - for(msg in bufferedMessages) ctx.write(msg) - ctx.flush() - ctx.pipeline().remove(this) - } - } -} \ No newline at end of file diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/handler/ServerHandler.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/handler/ServerHandler.kt index 3adf7e9..752ac11 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/handler/ServerHandler.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/handler/ServerHandler.kt @@ -8,6 +8,7 @@ import io.netty.handler.codec.http.DefaultFullHttpResponse import io.netty.handler.codec.http.DefaultHttpContent import io.netty.handler.codec.http.DefaultHttpResponse import io.netty.handler.codec.http.DefaultLastHttpContent +import io.netty.handler.codec.http.HttpContent import io.netty.handler.codec.http.HttpHeaderNames import io.netty.handler.codec.http.HttpHeaderValues import io.netty.handler.codec.http.HttpHeaders @@ -17,7 +18,6 @@ import io.netty.handler.codec.http.HttpResponseStatus import io.netty.handler.codec.http.HttpUtil import io.netty.handler.codec.http.HttpVersion import io.netty.handler.codec.http.LastHttpContent -import net.woggioni.rbcs.api.CacheHandlerFactory import net.woggioni.rbcs.api.CacheValueMetadata import net.woggioni.rbcs.api.message.CacheMessage import net.woggioni.rbcs.api.message.CacheMessage.CacheContent @@ -30,10 +30,8 @@ import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent import net.woggioni.rbcs.common.createLogger import net.woggioni.rbcs.common.debug import net.woggioni.rbcs.common.warn -import net.woggioni.rbcs.server.event.RequestCompletedEvent import net.woggioni.rbcs.server.exception.ExceptionHandler import java.nio.file.Path -import java.util.Locale class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupplier : () -> ChannelHandler) : ChannelDuplexHandler() { @@ -47,6 +45,15 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp private var keepAlive = true private var pipelinedRequests = 0 + private fun newRequest() { + pipelinedRequests += 1 + } + + private fun requestCompleted(ctx : ChannelHandlerContext) { + pipelinedRequests -= 1 + if(pipelinedRequests == 0) ctx.read() + } + private fun resetRequestMetadata() { httpVersion = HttpVersion.HTTP_1_1 keepAlive = true @@ -65,22 +72,44 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp } } + private var cacheRequestInProgress : Boolean = false + + override fun handlerAdded(ctx: ChannelHandlerContext) { + ctx.read() + } override fun channelRead(ctx: ChannelHandlerContext, msg: Any) { when (msg) { is HttpRequest -> handleRequest(ctx, msg) + is HttpContent -> { + if(cacheRequestInProgress) { + if(msg is LastHttpContent) { + super.channelRead(ctx, LastCacheContent(msg.content().retain())) + cacheRequestInProgress = false + } else { + super.channelRead(ctx, CacheContent(msg.content().retain())) + } + msg.release() + } else { + super.channelRead(ctx, msg) + } + } else -> super.channelRead(ctx, msg) } } + override fun channelReadComplete(ctx: ChannelHandlerContext) { + super.channelReadComplete(ctx) + if(cacheRequestInProgress) { + ctx.read() + } + } override fun write(ctx: ChannelHandlerContext, msg: Any, promise: ChannelPromise?) { if (msg is CacheMessage) { try { when (msg) { is CachePutResponse -> { - pipelinedRequests -= 1 - ctx.fireUserEventTriggered(RequestCompletedEvent()) val response = DefaultFullHttpResponse(httpVersion, HttpResponseStatus.CREATED) val keyBytes = msg.key.toByteArray(Charsets.UTF_8) response.headers().apply { @@ -92,16 +121,18 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp val buf = ctx.alloc().buffer(keyBytes.size).apply { writeBytes(keyBytes) } - ctx.writeAndFlush(DefaultLastHttpContent(buf)) + ctx.writeAndFlush(DefaultLastHttpContent(buf)).also { + requestCompleted(ctx) + } } is CacheValueNotFoundResponse -> { - pipelinedRequests -= 1 - ctx.fireUserEventTriggered(RequestCompletedEvent()) val response = DefaultFullHttpResponse(httpVersion, HttpResponseStatus.NOT_FOUND) response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0 setKeepAliveHeader(response.headers()) - ctx.writeAndFlush(response) + ctx.writeAndFlush(response).also { + requestCompleted(ctx) + } } is CacheValueFoundResponse -> { @@ -118,9 +149,9 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp } is LastCacheContent -> { - pipelinedRequests -= 1 - ctx.fireUserEventTriggered(RequestCompletedEvent()) - ctx.writeAndFlush(DefaultLastHttpContent(msg.content())) + ctx.writeAndFlush(DefaultLastHttpContent(msg.content())).also { + requestCompleted(ctx) + } } is CacheContent -> { @@ -140,9 +171,8 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp resetRequestMetadata() } } else if(msg is LastHttpContent) { - pipelinedRequests -= 1 - ctx.fireUserEventTriggered(RequestCompletedEvent()) ctx.write(msg, promise) + requestCompleted(ctx) } else super.write(ctx, msg, promise) } @@ -153,15 +183,12 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp if (method === HttpMethod.GET) { val path = Path.of(msg.uri()).normalize() if (path.startsWith(serverPrefix)) { + cacheRequestInProgress = true val relativePath = serverPrefix.relativize(path) - val key = relativePath.toString() - if(pipelinedRequests > 0) { - ctx.pipeline().addBefore(ExceptionHandler.NAME, null, ResponseCapHandler()) - } + val key : String = relativePath.toString() + newRequest() val cacheHandler = cacheHandlerSupplier() - ctx.pipeline().addBefore(ExceptionHandler.NAME, null, CacheContentHandler(cacheHandler)) ctx.pipeline().addBefore(ExceptionHandler.NAME, null, cacheHandler) - pipelinedRequests += 1 key.let(::CacheGetRequest) .let(ctx::fireChannelRead) ?: ctx.channel().write(CacheValueNotFoundResponse()) @@ -176,18 +203,15 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp } else if (method === HttpMethod.PUT) { val path = Path.of(msg.uri()).normalize() if (path.startsWith(serverPrefix)) { + cacheRequestInProgress = true val relativePath = serverPrefix.relativize(path) val key = relativePath.toString() log.debug(ctx) { "Added value for key '$key' to build cache" } - if(pipelinedRequests > 0) { - ctx.pipeline().addBefore(ExceptionHandler.NAME, null, ResponseCapHandler()) - } + newRequest() val cacheHandler = cacheHandlerSupplier() - ctx.pipeline().addBefore(ExceptionHandler.NAME, null, CacheContentHandler(cacheHandler)) ctx.pipeline().addBefore(ExceptionHandler.NAME, null, cacheHandler) - pipelinedRequests += 1 path.fileName?.toString() ?.let { @@ -205,11 +229,8 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp ctx.writeAndFlush(response) } } else if (method == HttpMethod.TRACE) { - if(pipelinedRequests > 0) { - ctx.pipeline().addBefore(ExceptionHandler.NAME, null, ResponseCapHandler()) - } + newRequest() ctx.pipeline().addBefore(ExceptionHandler.NAME, null, TraceHandler) - pipelinedRequests += 1 super.channelRead(ctx, msg) } else { log.warn(ctx) {