resolved race condition hendling pipelined requests
All checks were successful
CI / build (push) Successful in 2m2s
All checks were successful
CI / build (push) Successful in 2m2s
This commit is contained in:
@@ -345,7 +345,7 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
|
||||
maxChunkSize = cfg.connection.chunkSize
|
||||
}
|
||||
pipeline.addLast(HttpServerCodec(httpDecoderConfig))
|
||||
pipeline.addLast(ReadTriggerDuplexHandler.NAME, ReadTriggerDuplexHandler)
|
||||
pipeline.addLast(ReadTriggerDuplexHandler.NAME, ReadTriggerDuplexHandler())
|
||||
pipeline.addLast(MaxRequestSizeHandler.NAME, MaxRequestSizeHandler(cfg.connection.maxRequestSize))
|
||||
pipeline.addLast(HttpChunkContentCompressor(1024))
|
||||
pipeline.addLast(ChunkedWriteHandler())
|
||||
|
@@ -1,23 +1,43 @@
|
||||
package net.woggioni.rbcs.server.handler
|
||||
|
||||
import io.netty.buffer.ByteBufHolder
|
||||
import io.netty.channel.ChannelDuplexHandler
|
||||
import io.netty.channel.ChannelHandler.Sharable
|
||||
import io.netty.channel.ChannelHandlerContext
|
||||
import io.netty.channel.ChannelPromise
|
||||
import io.netty.handler.codec.http.LastHttpContent
|
||||
import net.woggioni.rbcs.common.createLogger
|
||||
|
||||
@Sharable
|
||||
object ReadTriggerDuplexHandler : ChannelDuplexHandler() {
|
||||
val NAME = ReadTriggerDuplexHandler::class.java.name
|
||||
class ReadTriggerDuplexHandler : ChannelDuplexHandler() {
|
||||
companion object {
|
||||
val NAME = ReadTriggerDuplexHandler::class.java.name
|
||||
private val log = createLogger<ReadTriggerDuplexHandler>()
|
||||
}
|
||||
|
||||
private var inFlight = 0
|
||||
private val messageBuffer = ArrayDeque<Any>()
|
||||
|
||||
override fun handlerAdded(ctx: ChannelHandlerContext) {
|
||||
ctx.read()
|
||||
}
|
||||
|
||||
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
|
||||
super.channelRead(ctx, msg)
|
||||
if(msg !is LastHttpContent) {
|
||||
if(inFlight > 0) {
|
||||
messageBuffer.addLast(msg)
|
||||
} else {
|
||||
super.channelRead(ctx, msg)
|
||||
if(msg !is LastHttpContent) {
|
||||
invokeRead(ctx)
|
||||
} else {
|
||||
inFlight += 1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun invokeRead(ctx : ChannelHandlerContext) {
|
||||
if(messageBuffer.isEmpty()) {
|
||||
ctx.read()
|
||||
} else {
|
||||
this.channelRead(ctx, messageBuffer.removeFirst())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,7 +48,18 @@ object ReadTriggerDuplexHandler : ChannelDuplexHandler() {
|
||||
) {
|
||||
super.write(ctx, msg, promise)
|
||||
if(msg is LastHttpContent) {
|
||||
ctx.read()
|
||||
inFlight -= 1
|
||||
invokeRead(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
override fun channelInactive(ctx: ChannelHandlerContext) {
|
||||
while(messageBuffer.isNotEmpty()) {
|
||||
val msg = messageBuffer.removeFirst()
|
||||
if(msg is ByteBufHolder) {
|
||||
msg.release()
|
||||
}
|
||||
}
|
||||
super.channelInactive(ctx)
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user