added request pipelining support to RemoteBuildCacheClient

This commit is contained in:
2025-03-06 21:58:53 +08:00
parent 5545f618f9
commit 8b639fc0b3

View File

@@ -4,9 +4,7 @@ import io.netty.bootstrap.Bootstrap
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import io.netty.channel.Channel
import io.netty.channel.ChannelHandler
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.channel.ChannelOption
import io.netty.channel.ChannelPipeline
import io.netty.channel.SimpleChannelInboundHandler
@@ -55,7 +53,7 @@ import kotlin.random.Random
import io.netty.util.concurrent.Future as NettyFuture
class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoCloseable {
companion object{
companion object {
private val log = createLogger<RemoteBuildCacheClient>()
}
@@ -73,7 +71,7 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
*tlsClientAuthenticationCredentials.certificateChain
)
profile.tlsTruststore?.let { trustStore ->
if(!trustStore.verifyServerCertificate) {
if (!trustStore.verifyServerCertificate) {
trustManager(object : X509TrustManager {
override fun checkClientTrusted(certChain: Array<out X509Certificate>, p1: String?) {
}
@@ -176,7 +174,7 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
// HTTP handlers
pipeline.addLast("codec", HttpClientCodec())
if(profile.compressionEnabled) {
if (profile.compressionEnabled) {
pipeline.addLast("decompressor", HttpContentDecompressor())
}
pipeline.addLast("aggregator", HttpObjectAggregator(134217728))
@@ -297,47 +295,28 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
// Custom handler for processing responses
pool.acquire().addListener(object : GenericFutureListener<NettyFuture<Channel>> {
private val handlers = mutableListOf<ChannelHandler>()
fun cleanup(channel: Channel, pipeline: ChannelPipeline) {
handlers.forEach(pipeline::remove)
pool.release(channel)
}
override fun operationComplete(channelFuture: Future<Channel>) {
if (channelFuture.isSuccess) {
val channel = channelFuture.now
val pipeline = channel.pipeline()
val timeoutHandler = object : ChannelInboundHandlerAdapter() {
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
if (evt is IdleStateEvent) {
val te = when (evt.state()) {
IdleState.READER_IDLE -> TimeoutException(
"Read timeout",
)
IdleState.WRITER_IDLE -> TimeoutException("Write timeout")
IdleState.ALL_IDLE -> TimeoutException("Idle timeout")
null -> throw IllegalStateException("This should never happen")
}
responseFuture.completeExceptionally(te)
ctx.close()
}
}
}
val closeListener = GenericFutureListener<Future<Void>> {
responseFuture.completeExceptionally(IOException("The remote server closed the connection"))
pool.release(channel)
}
channel.closeFuture().addListener(closeListener)
val responseHandler = object : SimpleChannelInboundHandler<FullHttpResponse>() {
override fun handlerAdded(ctx: ChannelHandlerContext) {
channel.closeFuture().removeListener(closeListener)
}
override fun channelRead0(
ctx: ChannelHandlerContext,
response: FullHttpResponse
) {
channel.closeFuture().removeListener(closeListener)
cleanup(channel, pipeline)
pipeline.remove(this)
responseFuture.complete(response)
}
@@ -352,16 +331,33 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
}
override fun channelInactive(ctx: ChannelHandlerContext) {
pool.release(channel)
responseFuture.completeExceptionally(IOException("The remote server closed the connection"))
super.channelInactive(ctx)
}
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
if (evt is IdleStateEvent) {
val te = when (evt.state()) {
IdleState.READER_IDLE -> TimeoutException(
"Read timeout",
)
IdleState.WRITER_IDLE -> TimeoutException("Write timeout")
IdleState.ALL_IDLE -> TimeoutException("Idle timeout")
null -> throw IllegalStateException("This should never happen")
}
for (handler in arrayOf(timeoutHandler, responseHandler)) {
handlers.add(handler)
responseFuture.completeExceptionally(te)
super.userEventTriggered(ctx, evt)
if (this === pipeline.last()) {
ctx.close()
}
pipeline.addLast(timeoutHandler, responseHandler)
channel.closeFuture().addListener(closeListener)
} else {
super.userEventTriggered(ctx, evt)
}
}
}
pipeline.addLast(responseHandler)
// Prepare the HTTP request
@@ -373,13 +369,14 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
uri.rawPath,
content ?: Unpooled.buffer(0)
).apply {
// Set headers
headers().apply {
if (content != null) {
set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes())
}
set(HttpHeaderNames.HOST, profile.serverURI.host)
set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
if(profile.compressionEnabled) {
if (profile.compressionEnabled) {
set(
HttpHeaderNames.ACCEPT_ENCODING,
HttpHeaderValues.GZIP.toString() + "," + HttpHeaderValues.DEFLATE.toString()
@@ -398,9 +395,15 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
}
}
// Set headers
// Send the request
channel.writeAndFlush(request)
channel.writeAndFlush(request).addListener {
if(!it.isSuccess) {
val ex = it.cause()
log.warn(ex.message, ex)
}
pool.release(channel)
}
} else {
responseFuture.completeExceptionally(channelFuture.cause())
}