added server timeouts

This commit is contained in:
2025-01-20 15:45:13 +08:00
parent 702556bfbb
commit 3d1847c408
14 changed files with 399 additions and 218 deletions

View File

@@ -9,41 +9,37 @@ import io.netty.channel.ChannelFuture
import io.netty.channel.ChannelFutureListener
import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.channel.ChannelInitializer
import io.netty.channel.ChannelOption
import io.netty.channel.ChannelPromise
import io.netty.channel.DefaultFileRegion
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.codec.DecoderException
import io.netty.handler.codec.compression.CompressionOptions
import io.netty.handler.codec.http.DefaultFullHttpResponse
import io.netty.handler.codec.http.DefaultHttpContent
import io.netty.handler.codec.http.DefaultHttpResponse
import io.netty.handler.codec.http.FullHttpRequest
import io.netty.handler.codec.http.FullHttpResponse
import io.netty.handler.codec.http.HttpContentCompressor
import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpHeaderValues
import io.netty.handler.codec.http.HttpMethod
import io.netty.handler.codec.http.HttpObjectAggregator
import io.netty.handler.codec.http.HttpRequest
import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpServerCodec
import io.netty.handler.codec.http.HttpUtil
import io.netty.handler.codec.http.HttpVersion
import io.netty.handler.codec.http.LastHttpContent
import io.netty.handler.ssl.ClientAuth
import io.netty.handler.ssl.SslContext
import io.netty.handler.ssl.SslContextBuilder
import io.netty.handler.ssl.SslHandler
import io.netty.handler.stream.ChunkedNioFile
import io.netty.handler.stream.ChunkedNioStream
import io.netty.handler.stream.ChunkedWriteHandler
import io.netty.handler.timeout.IdleStateEvent
import io.netty.handler.timeout.IdleStateHandler
import io.netty.handler.timeout.ReadTimeoutException
import io.netty.handler.timeout.ReadTimeoutHandler
import io.netty.handler.timeout.WriteTimeoutException
import io.netty.handler.timeout.WriteTimeoutHandler
import io.netty.util.concurrent.DefaultEventExecutorGroup
import io.netty.util.concurrent.EventExecutorGroup
import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.api.Configuration
import net.woggioni.gbcs.api.Role
import net.woggioni.gbcs.api.exception.ConfigurationException
@@ -53,6 +49,7 @@ import net.woggioni.gbcs.common.PasswordSecurity.decodePasswordHash
import net.woggioni.gbcs.common.PasswordSecurity.hashPassword
import net.woggioni.gbcs.common.Xml
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.common.debug
import net.woggioni.gbcs.common.info
import net.woggioni.gbcs.server.auth.AbstractNettyHttpAuthenticator
import net.woggioni.gbcs.server.auth.Authorizer
@@ -60,11 +57,11 @@ import net.woggioni.gbcs.server.auth.ClientCertificateValidator
import net.woggioni.gbcs.server.auth.RoleAuthorizer
import net.woggioni.gbcs.server.configuration.Parser
import net.woggioni.gbcs.server.configuration.Serializer
import net.woggioni.gbcs.server.handler.ServerHandler
import net.woggioni.jwo.JWO
import net.woggioni.jwo.Tuple2
import java.io.OutputStream
import java.net.InetSocketAddress
import java.nio.channels.FileChannel
import java.nio.file.Files
import java.nio.file.Path
import java.security.KeyStore
@@ -72,6 +69,7 @@ import java.security.PrivateKey
import java.security.cert.X509Certificate
import java.util.Arrays
import java.util.Base64
import java.util.concurrent.TimeUnit
import java.util.regex.Matcher
import java.util.regex.Pattern
import javax.naming.ldap.LdapName
@@ -200,28 +198,6 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
private val eventExecutorGroup: EventExecutorGroup
) : ChannelInitializer<Channel>() {
private val serverHandler = let {
val cacheImplementation = cfg.cache.materialize()
val prefix = Path.of("/").resolve(Path.of(cfg.serverPath ?: "/"))
ServerHandler(cacheImplementation, prefix)
}
private val exceptionHandler = ExceptionHandler()
private val authenticator = when (val auth = cfg.authentication) {
is Configuration.BasicAuthentication -> NettyHttpBasicAuthenticator(cfg.users, RoleAuthorizer())
is Configuration.ClientCertificateAuthentication -> {
ClientCertificateAuthenticator(
RoleAuthorizer(),
cfg.users[""]?.roles,
userExtractor(auth),
groupExtractor(auth)
)
}
else -> null
}
companion object {
private fun createSslCtx(tls: Configuration.Tls): SslContext {
val keyStore = tls.keyStore
@@ -272,6 +248,30 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
}
}
private val log = contextLogger()
private val serverHandler = let {
val cacheImplementation = cfg.cache.materialize()
val prefix = Path.of("/").resolve(Path.of(cfg.serverPath ?: "/"))
ServerHandler(cacheImplementation, prefix)
}
private val exceptionHandler = ExceptionHandler()
private val authenticator = when (val auth = cfg.authentication) {
is Configuration.BasicAuthentication -> NettyHttpBasicAuthenticator(cfg.users, RoleAuthorizer())
is Configuration.ClientCertificateAuthentication -> {
ClientCertificateAuthenticator(
RoleAuthorizer(),
cfg.users[""]?.roles,
userExtractor(auth),
groupExtractor(auth)
)
}
else -> null
}
private val sslContext: SslContext? = cfg.tls?.let(Companion::createSslCtx)
private fun userExtractor(authentication: Configuration.ClientCertificateAuthentication) =
@@ -303,14 +303,37 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
}
override fun initChannel(ch: Channel) {
log.debug {
"Created connection ${ch.id().asShortText()}"
}
ch.closeFuture().addListener {
log.debug {
"Closed connection ${ch.id().asShortText()}"
}
}
val pipeline = ch.pipeline()
cfg.connection.apply {
pipeline.addLast(ReadTimeoutHandler(readTimeout.toMillis(), TimeUnit.MILLISECONDS))
pipeline.addLast(WriteTimeoutHandler(writeTimeout.toMillis(), TimeUnit.MILLISECONDS))
pipeline.addLast(IdleStateHandler(false, 0, 0, idleTimeout.toMillis(), TimeUnit.MILLISECONDS))
}
pipeline.addLast(object : ChannelInboundHandlerAdapter() {
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
if (evt is IdleStateEvent) {
log.debug {
"Idle timeout reached on channel ${ch.id().asShortText()}, closing the connection"
}
ctx.close()
}
}
})
sslContext?.newHandler(ch.alloc())?.also {
pipeline.addLast(SSL_HANDLER_NAME, it)
}
pipeline.addLast(HttpServerCodec())
pipeline.addLast(HttpChunkContentCompressor(1024))
pipeline.addLast(ChunkedWriteHandler())
pipeline.addLast(HttpObjectAggregator(cfg.maxRequestSize))
pipeline.addLast(HttpObjectAggregator(cfg.connection.maxRequestSize))
authenticator?.let {
pipeline.addLast(it)
}
@@ -351,7 +374,20 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
ctx.writeAndFlush(TOO_BIG.retainedDuplicate())
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE)
}
is ReadTimeoutException -> {
log.debug {
val channelId = ctx.channel().id().asShortText()
"Read timeout on channel $channelId, closing the connection"
}
ctx.close()
}
is WriteTimeoutException -> {
log.debug {
val channelId = ctx.channel().id().asShortText()
"Write timeout on channel $channelId, closing the connection"
}
ctx.close()
}
else -> {
log.error(cause.message, cause)
ctx.close()
@@ -360,110 +396,6 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
}
}
@Sharable
private class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
SimpleChannelInboundHandler<FullHttpRequest>() {
private val log = contextLogger()
override fun channelRead0(ctx: ChannelHandlerContext, msg: FullHttpRequest) {
val keepAlive: Boolean = HttpUtil.isKeepAlive(msg)
val method = msg.method()
if (method === HttpMethod.GET) {
val path = Path.of(msg.uri())
val prefix = path.parent
val key = path.fileName.toString()
if (serverPrefix == prefix) {
cache.get(key)?.let { channel ->
log.debug(ctx) {
"Cache hit for key '$key'"
}
val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK)
response.headers()[HttpHeaderNames.CONTENT_TYPE] = HttpHeaderValues.APPLICATION_OCTET_STREAM
if (!keepAlive) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.IDENTITY)
} else {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
}
ctx.write(response)
when (channel) {
is FileChannel -> {
if (keepAlive) {
ctx.write(ChunkedNioFile(channel))
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
} else {
ctx.writeAndFlush(DefaultFileRegion(channel, 0, channel.size()))
.addListener(ChannelFutureListener.CLOSE)
}
}
else -> {
ctx.write(ChunkedNioStream(channel))
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
}
}
} ?: let {
log.debug(ctx) {
"Cache miss for key '$key'"
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response)
}
} else {
log.warn(ctx) {
"Got request for unhandled path '${msg.uri()}'"
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.BAD_REQUEST)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response)
}
} else if (method === HttpMethod.PUT) {
val path = Path.of(msg.uri())
val prefix = path.parent
val key = path.fileName.toString()
if (serverPrefix == prefix) {
log.debug(ctx) {
"Added value for key '$key' to build cache"
}
val bodyBytes = msg.content().run {
if (isDirect) {
ByteArray(readableBytes()).also {
readBytes(it)
}
} else {
array()
}
}
cache.put(key, bodyBytes)
val response = DefaultFullHttpResponse(
msg.protocolVersion(), HttpResponseStatus.CREATED,
Unpooled.copiedBuffer(key.toByteArray())
)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes()
ctx.writeAndFlush(response)
} else {
log.warn(ctx) {
"Got request for unhandled path '${msg.uri()}'"
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.BAD_REQUEST)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
ctx.writeAndFlush(response)
}
} else {
log.warn(ctx) {
"Got request with unhandled method '${msg.method().name()}'"
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.BAD_REQUEST)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
ctx.writeAndFlush(response)
}
}
}
class ServerHandle(
httpChannelFuture: ChannelFuture,
private val executorGroups: Iterable<EventExecutorGroup>
@@ -496,7 +428,7 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
val serverSocketChannel = NioServerSocketChannel::class.java
val workerGroup = bossGroup
val eventExecutorGroup = run {
val threadFactory = if (cfg.isUseVirtualThread) {
val threadFactory = if (cfg.eventExecutor.isUseVirtualThreads) {
Thread.ofVirtual().factory()
} else {
null