From aeae98d9eb74d33ee8671e3b80b4b70fdbaf5e4f Mon Sep 17 00:00:00 2001 From: Walter Oggioni Date: Tue, 17 Jun 2025 23:06:04 +0800 Subject: [PATCH] resolved race condition hendling pipelined requests --- .../rbcs/server/RemoteBuildCacheServer.kt | 2 +- .../handler/ReadTriggerDuplexHandler.kt | 45 ++++++++++++++++--- 2 files changed, 39 insertions(+), 8 deletions(-) diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/RemoteBuildCacheServer.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/RemoteBuildCacheServer.kt index 2930009..723fb7e 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/RemoteBuildCacheServer.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/RemoteBuildCacheServer.kt @@ -345,7 +345,7 @@ class RemoteBuildCacheServer(private val cfg: Configuration) { maxChunkSize = cfg.connection.chunkSize } pipeline.addLast(HttpServerCodec(httpDecoderConfig)) - pipeline.addLast(ReadTriggerDuplexHandler.NAME, ReadTriggerDuplexHandler) + pipeline.addLast(ReadTriggerDuplexHandler.NAME, ReadTriggerDuplexHandler()) pipeline.addLast(MaxRequestSizeHandler.NAME, MaxRequestSizeHandler(cfg.connection.maxRequestSize)) pipeline.addLast(HttpChunkContentCompressor(1024)) pipeline.addLast(ChunkedWriteHandler()) diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/handler/ReadTriggerDuplexHandler.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/handler/ReadTriggerDuplexHandler.kt index b87eac8..dc53ae2 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/handler/ReadTriggerDuplexHandler.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/handler/ReadTriggerDuplexHandler.kt @@ -1,23 +1,43 @@ package net.woggioni.rbcs.server.handler +import io.netty.buffer.ByteBufHolder import io.netty.channel.ChannelDuplexHandler -import io.netty.channel.ChannelHandler.Sharable import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelPromise import io.netty.handler.codec.http.LastHttpContent +import net.woggioni.rbcs.common.createLogger -@Sharable -object ReadTriggerDuplexHandler : ChannelDuplexHandler() { - val NAME = ReadTriggerDuplexHandler::class.java.name +class ReadTriggerDuplexHandler : ChannelDuplexHandler() { + companion object { + val NAME = ReadTriggerDuplexHandler::class.java.name + private val log = createLogger() + } + + private var inFlight = 0 + private val messageBuffer = ArrayDeque() override fun handlerAdded(ctx: ChannelHandlerContext) { ctx.read() } override fun channelRead(ctx: ChannelHandlerContext, msg: Any) { - super.channelRead(ctx, msg) - if(msg !is LastHttpContent) { + if(inFlight > 0) { + messageBuffer.addLast(msg) + } else { + super.channelRead(ctx, msg) + if(msg !is LastHttpContent) { + invokeRead(ctx) + } else { + inFlight += 1 + } + } + } + + private fun invokeRead(ctx : ChannelHandlerContext) { + if(messageBuffer.isEmpty()) { ctx.read() + } else { + this.channelRead(ctx, messageBuffer.removeFirst()) } } @@ -28,7 +48,18 @@ object ReadTriggerDuplexHandler : ChannelDuplexHandler() { ) { super.write(ctx, msg, promise) if(msg is LastHttpContent) { - ctx.read() + inFlight -= 1 + invokeRead(ctx) } } + + override fun channelInactive(ctx: ChannelHandlerContext) { + while(messageBuffer.isNotEmpty()) { + val msg = messageBuffer.removeFirst() + if(msg is ByteBufHolder) { + msg.release() + } + } + super.channelInactive(ctx) + } } \ No newline at end of file