diff --git a/rbcs-client/src/main/kotlin/net/woggioni/rbcs/client/RemoteBuildCacheClient.kt b/rbcs-client/src/main/kotlin/net/woggioni/rbcs/client/RemoteBuildCacheClient.kt index 06fb5c1..b029dd3 100644 --- a/rbcs-client/src/main/kotlin/net/woggioni/rbcs/client/RemoteBuildCacheClient.kt +++ b/rbcs-client/src/main/kotlin/net/woggioni/rbcs/client/RemoteBuildCacheClient.kt @@ -4,9 +4,7 @@ import io.netty.bootstrap.Bootstrap import io.netty.buffer.ByteBuf import io.netty.buffer.Unpooled import io.netty.channel.Channel -import io.netty.channel.ChannelHandler import io.netty.channel.ChannelHandlerContext -import io.netty.channel.ChannelInboundHandlerAdapter import io.netty.channel.ChannelOption import io.netty.channel.ChannelPipeline import io.netty.channel.SimpleChannelInboundHandler @@ -55,7 +53,7 @@ import kotlin.random.Random import io.netty.util.concurrent.Future as NettyFuture class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoCloseable { - companion object{ + companion object { private val log = createLogger() } @@ -73,7 +71,7 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC *tlsClientAuthenticationCredentials.certificateChain ) profile.tlsTruststore?.let { trustStore -> - if(!trustStore.verifyServerCertificate) { + if (!trustStore.verifyServerCertificate) { trustManager(object : X509TrustManager { override fun checkClientTrusted(certChain: Array, p1: String?) { } @@ -176,7 +174,7 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC // HTTP handlers pipeline.addLast("codec", HttpClientCodec()) - if(profile.compressionEnabled) { + if (profile.compressionEnabled) { pipeline.addLast("decompressor", HttpContentDecompressor()) } pipeline.addLast("aggregator", HttpObjectAggregator(134217728)) @@ -297,47 +295,28 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC // Custom handler for processing responses pool.acquire().addListener(object : GenericFutureListener> { - private val handlers = mutableListOf() - - fun cleanup(channel: Channel, pipeline: ChannelPipeline) { - handlers.forEach(pipeline::remove) - pool.release(channel) - } override fun operationComplete(channelFuture: Future) { if (channelFuture.isSuccess) { val channel = channelFuture.now val pipeline = channel.pipeline() - val timeoutHandler = object : ChannelInboundHandlerAdapter() { - override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) { - if (evt is IdleStateEvent) { - val te = when (evt.state()) { - IdleState.READER_IDLE -> TimeoutException( - "Read timeout", - ) - IdleState.WRITER_IDLE -> TimeoutException("Write timeout") - - IdleState.ALL_IDLE -> TimeoutException("Idle timeout") - null -> throw IllegalStateException("This should never happen") - } - responseFuture.completeExceptionally(te) - ctx.close() - } - } - } val closeListener = GenericFutureListener> { responseFuture.completeExceptionally(IOException("The remote server closed the connection")) - pool.release(channel) } + channel.closeFuture().addListener(closeListener) val responseHandler = object : SimpleChannelInboundHandler() { + + override fun handlerAdded(ctx: ChannelHandlerContext) { + channel.closeFuture().removeListener(closeListener) + } + override fun channelRead0( ctx: ChannelHandlerContext, response: FullHttpResponse ) { - channel.closeFuture().removeListener(closeListener) - cleanup(channel, pipeline) + pipeline.remove(this) responseFuture.complete(response) } @@ -352,16 +331,33 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC } override fun channelInactive(ctx: ChannelHandlerContext) { - pool.release(channel) responseFuture.completeExceptionally(IOException("The remote server closed the connection")) super.channelInactive(ctx) } + + override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) { + if (evt is IdleStateEvent) { + val te = when (evt.state()) { + IdleState.READER_IDLE -> TimeoutException( + "Read timeout", + ) + + IdleState.WRITER_IDLE -> TimeoutException("Write timeout") + + IdleState.ALL_IDLE -> TimeoutException("Idle timeout") + null -> throw IllegalStateException("This should never happen") + } + responseFuture.completeExceptionally(te) + super.userEventTriggered(ctx, evt) + if (this === pipeline.last()) { + ctx.close() + } + } else { + super.userEventTriggered(ctx, evt) + } + } } - for (handler in arrayOf(timeoutHandler, responseHandler)) { - handlers.add(handler) - } - pipeline.addLast(timeoutHandler, responseHandler) - channel.closeFuture().addListener(closeListener) + pipeline.addLast(responseHandler) // Prepare the HTTP request @@ -373,13 +369,14 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC uri.rawPath, content ?: Unpooled.buffer(0) ).apply { + // Set headers headers().apply { if (content != null) { set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes()) } set(HttpHeaderNames.HOST, profile.serverURI.host) set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE) - if(profile.compressionEnabled) { + if (profile.compressionEnabled) { set( HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP.toString() + "," + HttpHeaderValues.DEFLATE.toString() @@ -398,9 +395,15 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC } } - // Set headers // Send the request - channel.writeAndFlush(request) + channel.writeAndFlush(request).addListener { + if(!it.isSuccess) { + val ex = it.cause() + log.warn(ex.message, ex) + } + + pool.release(channel) + } } else { responseFuture.completeExceptionally(channelFuture.cause()) }