From 3d1847c4081a847b5648dbd41b68e6c0e742c924 Mon Sep 17 00:00:00 2001 From: Walter Oggioni Date: Mon, 20 Jan 2025 15:45:13 +0800 Subject: [PATCH] added server timeouts --- docker/Dockerfile | 3 - .../net/woggioni/gbcs/api/Configuration.java | 41 +++- .../gbcs/client/schema/gbcs-client.xsd | 3 + .../gbcs/server/GradleBuildCacheServer.kt | 214 ++++++------------ .../gbcs/server/configuration/Parser.kt | 110 +++++---- .../gbcs/server/configuration/Serializer.kt | 18 +- .../gbcs/server/handler/ServerHandler.kt | 129 +++++++++++ .../net/woggioni/gbcs/server/schema/gbcs.xsd | 19 +- .../test/AbstractBasicAuthServerTest.kt | 16 +- .../gbcs/server/test/AbstractTlsServerTest.kt | 14 +- .../gbcs/server/test/NoAuthServerTest.kt | 14 +- .../gbcs/server/test/gbcs-default.xml | 12 +- .../gbcs/server/test/gbcs-memcached.xml | 12 +- .../woggioni/gbcs/server/test/gbcs-tls.xml | 12 +- 14 files changed, 399 insertions(+), 218 deletions(-) create mode 100644 gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/handler/ServerHandler.kt diff --git a/docker/Dockerfile b/docker/Dockerfile index 44fef27..f9e0c02 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -14,6 +14,3 @@ WORKDIR /home/luser/plugins RUN --mount=type=bind,source=.,target=/build/distributions tar -xf /build/distributions/gbcs-server-memcached*.tar WORKDIR /home/luser ENTRYPOINT ["java", "-jar", "/home/luser/gbcs.jar", "server"] - -FROM release-memcached as compose -COPY --chown=luser:luser conf/gbcs-memcached.xml /home/luser/.config/gbcs/gbcs.xml \ No newline at end of file diff --git a/gbcs-api/src/main/java/net/woggioni/gbcs/api/Configuration.java b/gbcs-api/src/main/java/net/woggioni/gbcs/api/Configuration.java index 81683a9..1251376 100644 --- a/gbcs-api/src/main/java/net/woggioni/gbcs/api/Configuration.java +++ b/gbcs-api/src/main/java/net/woggioni/gbcs/api/Configuration.java @@ -2,10 +2,12 @@ package net.woggioni.gbcs.api; import lombok.EqualsAndHashCode; +import lombok.NonNull; import lombok.Value; import java.nio.file.Path; import java.security.cert.X509Certificate; +import java.time.Duration; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -14,15 +16,32 @@ import java.util.stream.Collectors; public class Configuration { String host; int port; + int incomingConnectionsBacklogSize; String serverPath; + @NonNull + EventExecutor eventExecutor; + @NonNull + Connection connection; Map users; Map groups; Cache cache; Authentication authentication; Tls tls; - boolean useVirtualThread; - int maxRequestSize; - int incomingConnectionsBacklogSize; + + @Value + public static class EventExecutor { + boolean useVirtualThreads; + } + + @Value + public static class Connection { + Duration readTimeout; + Duration writeTimeout; + Duration idleTimeout; + Duration readIdleTimeout; + Duration writeIdleTimeout; + int maxRequestSize; + } @Value public static class Group { @@ -103,28 +122,28 @@ public class Configuration { public static Configuration of( String host, int port, + int incomingConnectionsBacklogSize, String serverPath, + EventExecutor eventExecutor, + Connection connection, Map users, Map groups, Cache cache, Authentication authentication, - Tls tls, - boolean useVirtualThread, - int maxRequestSize, - int incomingConnectionsBacklogSize + Tls tls ) { return new Configuration( host, port, + incomingConnectionsBacklogSize, serverPath != null && !serverPath.isEmpty() && !serverPath.equals("/") ? serverPath : null, + eventExecutor, + connection, users, groups, cache, authentication, - tls, - useVirtualThread, - maxRequestSize, - incomingConnectionsBacklogSize + tls ); } } \ No newline at end of file diff --git a/gbcs-client/src/main/resources/net/woggioni/gbcs/client/schema/gbcs-client.xsd b/gbcs-client/src/main/resources/net/woggioni/gbcs/client/schema/gbcs-client.xsd index 6add7eb..3663106 100644 --- a/gbcs-client/src/main/resources/net/woggioni/gbcs/client/schema/gbcs-client.xsd +++ b/gbcs-client/src/main/resources/net/woggioni/gbcs/client/schema/gbcs-client.xsd @@ -14,6 +14,7 @@ + @@ -22,6 +23,8 @@ + + diff --git a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/GradleBuildCacheServer.kt b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/GradleBuildCacheServer.kt index e533c17..1acb3d4 100644 --- a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/GradleBuildCacheServer.kt +++ b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/GradleBuildCacheServer.kt @@ -9,41 +9,37 @@ import io.netty.channel.ChannelFuture import io.netty.channel.ChannelFutureListener import io.netty.channel.ChannelHandler.Sharable import io.netty.channel.ChannelHandlerContext +import io.netty.channel.ChannelInboundHandlerAdapter import io.netty.channel.ChannelInitializer import io.netty.channel.ChannelOption import io.netty.channel.ChannelPromise -import io.netty.channel.DefaultFileRegion -import io.netty.channel.SimpleChannelInboundHandler import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.socket.nio.NioServerSocketChannel import io.netty.handler.codec.DecoderException import io.netty.handler.codec.compression.CompressionOptions import io.netty.handler.codec.http.DefaultFullHttpResponse import io.netty.handler.codec.http.DefaultHttpContent -import io.netty.handler.codec.http.DefaultHttpResponse -import io.netty.handler.codec.http.FullHttpRequest import io.netty.handler.codec.http.FullHttpResponse import io.netty.handler.codec.http.HttpContentCompressor import io.netty.handler.codec.http.HttpHeaderNames -import io.netty.handler.codec.http.HttpHeaderValues -import io.netty.handler.codec.http.HttpMethod import io.netty.handler.codec.http.HttpObjectAggregator import io.netty.handler.codec.http.HttpRequest import io.netty.handler.codec.http.HttpResponseStatus import io.netty.handler.codec.http.HttpServerCodec -import io.netty.handler.codec.http.HttpUtil import io.netty.handler.codec.http.HttpVersion -import io.netty.handler.codec.http.LastHttpContent import io.netty.handler.ssl.ClientAuth import io.netty.handler.ssl.SslContext import io.netty.handler.ssl.SslContextBuilder import io.netty.handler.ssl.SslHandler -import io.netty.handler.stream.ChunkedNioFile -import io.netty.handler.stream.ChunkedNioStream import io.netty.handler.stream.ChunkedWriteHandler +import io.netty.handler.timeout.IdleStateEvent +import io.netty.handler.timeout.IdleStateHandler +import io.netty.handler.timeout.ReadTimeoutException +import io.netty.handler.timeout.ReadTimeoutHandler +import io.netty.handler.timeout.WriteTimeoutException +import io.netty.handler.timeout.WriteTimeoutHandler import io.netty.util.concurrent.DefaultEventExecutorGroup import io.netty.util.concurrent.EventExecutorGroup -import net.woggioni.gbcs.api.Cache import net.woggioni.gbcs.api.Configuration import net.woggioni.gbcs.api.Role import net.woggioni.gbcs.api.exception.ConfigurationException @@ -53,6 +49,7 @@ import net.woggioni.gbcs.common.PasswordSecurity.decodePasswordHash import net.woggioni.gbcs.common.PasswordSecurity.hashPassword import net.woggioni.gbcs.common.Xml import net.woggioni.gbcs.common.contextLogger +import net.woggioni.gbcs.common.debug import net.woggioni.gbcs.common.info import net.woggioni.gbcs.server.auth.AbstractNettyHttpAuthenticator import net.woggioni.gbcs.server.auth.Authorizer @@ -60,11 +57,11 @@ import net.woggioni.gbcs.server.auth.ClientCertificateValidator import net.woggioni.gbcs.server.auth.RoleAuthorizer import net.woggioni.gbcs.server.configuration.Parser import net.woggioni.gbcs.server.configuration.Serializer +import net.woggioni.gbcs.server.handler.ServerHandler import net.woggioni.jwo.JWO import net.woggioni.jwo.Tuple2 import java.io.OutputStream import java.net.InetSocketAddress -import java.nio.channels.FileChannel import java.nio.file.Files import java.nio.file.Path import java.security.KeyStore @@ -72,6 +69,7 @@ import java.security.PrivateKey import java.security.cert.X509Certificate import java.util.Arrays import java.util.Base64 +import java.util.concurrent.TimeUnit import java.util.regex.Matcher import java.util.regex.Pattern import javax.naming.ldap.LdapName @@ -200,28 +198,6 @@ class GradleBuildCacheServer(private val cfg: Configuration) { private val eventExecutorGroup: EventExecutorGroup ) : ChannelInitializer() { - private val serverHandler = let { - val cacheImplementation = cfg.cache.materialize() - val prefix = Path.of("/").resolve(Path.of(cfg.serverPath ?: "/")) - ServerHandler(cacheImplementation, prefix) - } - - private val exceptionHandler = ExceptionHandler() - - private val authenticator = when (val auth = cfg.authentication) { - is Configuration.BasicAuthentication -> NettyHttpBasicAuthenticator(cfg.users, RoleAuthorizer()) - is Configuration.ClientCertificateAuthentication -> { - ClientCertificateAuthenticator( - RoleAuthorizer(), - cfg.users[""]?.roles, - userExtractor(auth), - groupExtractor(auth) - ) - } - - else -> null - } - companion object { private fun createSslCtx(tls: Configuration.Tls): SslContext { val keyStore = tls.keyStore @@ -272,6 +248,30 @@ class GradleBuildCacheServer(private val cfg: Configuration) { } } + private val log = contextLogger() + + private val serverHandler = let { + val cacheImplementation = cfg.cache.materialize() + val prefix = Path.of("/").resolve(Path.of(cfg.serverPath ?: "/")) + ServerHandler(cacheImplementation, prefix) + } + + private val exceptionHandler = ExceptionHandler() + + private val authenticator = when (val auth = cfg.authentication) { + is Configuration.BasicAuthentication -> NettyHttpBasicAuthenticator(cfg.users, RoleAuthorizer()) + is Configuration.ClientCertificateAuthentication -> { + ClientCertificateAuthenticator( + RoleAuthorizer(), + cfg.users[""]?.roles, + userExtractor(auth), + groupExtractor(auth) + ) + } + + else -> null + } + private val sslContext: SslContext? = cfg.tls?.let(Companion::createSslCtx) private fun userExtractor(authentication: Configuration.ClientCertificateAuthentication) = @@ -303,14 +303,37 @@ class GradleBuildCacheServer(private val cfg: Configuration) { } override fun initChannel(ch: Channel) { + log.debug { + "Created connection ${ch.id().asShortText()}" + } + ch.closeFuture().addListener { + log.debug { + "Closed connection ${ch.id().asShortText()}" + } + } val pipeline = ch.pipeline() + cfg.connection.apply { + pipeline.addLast(ReadTimeoutHandler(readTimeout.toMillis(), TimeUnit.MILLISECONDS)) + pipeline.addLast(WriteTimeoutHandler(writeTimeout.toMillis(), TimeUnit.MILLISECONDS)) + pipeline.addLast(IdleStateHandler(false, 0, 0, idleTimeout.toMillis(), TimeUnit.MILLISECONDS)) + } + pipeline.addLast(object : ChannelInboundHandlerAdapter() { + override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) { + if (evt is IdleStateEvent) { + log.debug { + "Idle timeout reached on channel ${ch.id().asShortText()}, closing the connection" + } + ctx.close() + } + } + }) sslContext?.newHandler(ch.alloc())?.also { pipeline.addLast(SSL_HANDLER_NAME, it) } pipeline.addLast(HttpServerCodec()) pipeline.addLast(HttpChunkContentCompressor(1024)) pipeline.addLast(ChunkedWriteHandler()) - pipeline.addLast(HttpObjectAggregator(cfg.maxRequestSize)) + pipeline.addLast(HttpObjectAggregator(cfg.connection.maxRequestSize)) authenticator?.let { pipeline.addLast(it) } @@ -351,7 +374,20 @@ class GradleBuildCacheServer(private val cfg: Configuration) { ctx.writeAndFlush(TOO_BIG.retainedDuplicate()) .addListener(ChannelFutureListener.CLOSE_ON_FAILURE) } - + is ReadTimeoutException -> { + log.debug { + val channelId = ctx.channel().id().asShortText() + "Read timeout on channel $channelId, closing the connection" + } + ctx.close() + } + is WriteTimeoutException -> { + log.debug { + val channelId = ctx.channel().id().asShortText() + "Write timeout on channel $channelId, closing the connection" + } + ctx.close() + } else -> { log.error(cause.message, cause) ctx.close() @@ -360,110 +396,6 @@ class GradleBuildCacheServer(private val cfg: Configuration) { } } - @Sharable - private class ServerHandler(private val cache: Cache, private val serverPrefix: Path) : - SimpleChannelInboundHandler() { - - private val log = contextLogger() - - override fun channelRead0(ctx: ChannelHandlerContext, msg: FullHttpRequest) { - val keepAlive: Boolean = HttpUtil.isKeepAlive(msg) - val method = msg.method() - if (method === HttpMethod.GET) { - val path = Path.of(msg.uri()) - val prefix = path.parent - val key = path.fileName.toString() - if (serverPrefix == prefix) { - cache.get(key)?.let { channel -> - log.debug(ctx) { - "Cache hit for key '$key'" - } - val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK) - response.headers()[HttpHeaderNames.CONTENT_TYPE] = HttpHeaderValues.APPLICATION_OCTET_STREAM - if (!keepAlive) { - response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE) - response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.IDENTITY) - } else { - response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE) - response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED) - } - ctx.write(response) - when (channel) { - is FileChannel -> { - if (keepAlive) { - ctx.write(ChunkedNioFile(channel)) - ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT) - } else { - ctx.writeAndFlush(DefaultFileRegion(channel, 0, channel.size())) - .addListener(ChannelFutureListener.CLOSE) - } - } - - else -> { - ctx.write(ChunkedNioStream(channel)) - ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT) - } - } - } ?: let { - log.debug(ctx) { - "Cache miss for key '$key'" - } - val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND) - response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0 - ctx.writeAndFlush(response) - } - } else { - log.warn(ctx) { - "Got request for unhandled path '${msg.uri()}'" - } - val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.BAD_REQUEST) - response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0 - ctx.writeAndFlush(response) - } - } else if (method === HttpMethod.PUT) { - val path = Path.of(msg.uri()) - val prefix = path.parent - val key = path.fileName.toString() - - if (serverPrefix == prefix) { - log.debug(ctx) { - "Added value for key '$key' to build cache" - } - val bodyBytes = msg.content().run { - if (isDirect) { - ByteArray(readableBytes()).also { - readBytes(it) - } - } else { - array() - } - } - cache.put(key, bodyBytes) - val response = DefaultFullHttpResponse( - msg.protocolVersion(), HttpResponseStatus.CREATED, - Unpooled.copiedBuffer(key.toByteArray()) - ) - response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes() - ctx.writeAndFlush(response) - } else { - log.warn(ctx) { - "Got request for unhandled path '${msg.uri()}'" - } - val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.BAD_REQUEST) - response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0" - ctx.writeAndFlush(response) - } - } else { - log.warn(ctx) { - "Got request with unhandled method '${msg.method().name()}'" - } - val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.BAD_REQUEST) - response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0" - ctx.writeAndFlush(response) - } - } - } - class ServerHandle( httpChannelFuture: ChannelFuture, private val executorGroups: Iterable @@ -496,7 +428,7 @@ class GradleBuildCacheServer(private val cfg: Configuration) { val serverSocketChannel = NioServerSocketChannel::class.java val workerGroup = bossGroup val eventExecutorGroup = run { - val threadFactory = if (cfg.isUseVirtualThread) { + val threadFactory = if (cfg.eventExecutor.isUseVirtualThreads) { Thread.ofVirtual().factory() } else { null diff --git a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/configuration/Parser.kt b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/configuration/Parser.kt index c5dfc23..d4b4081 100644 --- a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/configuration/Parser.kt +++ b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/configuration/Parser.kt @@ -19,11 +19,16 @@ import org.w3c.dom.Document import org.w3c.dom.Element import org.w3c.dom.TypeInfo import java.nio.file.Paths +import java.time.Duration +import java.time.temporal.ChronoUnit +import java.util.concurrent.TimeUnit object Parser { fun parse(document: Document): Configuration { val root = document.documentElement val anonymousUser = User("", null, emptySet()) + var connection: Configuration.Connection? = null + var eventExecutor: Configuration.EventExecutor? = null var cache: Cache? = null var host = "127.0.0.1" var port = 11080 @@ -31,46 +36,11 @@ object Parser { var groups = emptyMap() var tls: Tls? = null val serverPath = root.renderAttribute("path") - val useVirtualThread = root.renderAttribute("use-virtual-threads") - ?.let(String::toBoolean) ?: true - val maxRequestSize = root.renderAttribute("max-request-size") - ?.let(String::toInt) ?: 67108864 - val incomingConnectionsBacklogSize = root.renderAttribute("incoming-connections-backlog-size") - ?.let(String::toInt) ?: 1024 + var incomingConnectionsBacklogSize = 1024 var authentication: Authentication? = null for (child in root.asIterable()) { val tagName = child.localName when (tagName) { - "authorization" -> { - var knownUsers = sequenceOf(anonymousUser) - for (gchild in child.asIterable()) { - when (gchild.localName) { - "users" -> { - knownUsers += parseUsers(gchild) - } - "groups" -> { - val pair = parseGroups(gchild, knownUsers) - users = pair.first - groups = pair.second - } - } - } - } - - "bind" -> { - host = child.renderAttribute("host") ?: throw ConfigurationException("host attribute is required") - port = Integer.parseInt(child.renderAttribute("port")) - } - - "cache" -> { - cache = (child as TypeInfo).let { tf -> - val typeNamespace = tf.typeNamespace - val typeName = tf.typeName - CacheSerializers.index[typeNamespace to typeName] - ?: throw IllegalArgumentException("Cache provider for namespace '$typeNamespace' not found") - }.deserialize(child) - } - "authentication" -> { for (gchild in child.asIterable()) { when (gchild.localName) { @@ -102,6 +72,66 @@ object Parser { } } + "authorization" -> { + var knownUsers = sequenceOf(anonymousUser) + for (gchild in child.asIterable()) { + when (gchild.localName) { + "users" -> { + knownUsers += parseUsers(gchild) + } + "groups" -> { + val pair = parseGroups(gchild, knownUsers) + users = pair.first + groups = pair.second + } + } + } + } + + "bind" -> { + host = child.renderAttribute("host") ?: throw ConfigurationException("host attribute is required") + port = Integer.parseInt(child.renderAttribute("port")) + incomingConnectionsBacklogSize = child.renderAttribute("incoming-connections-backlog-size") + ?.let(Integer::parseInt) + ?: 1024 + } + + "cache" -> { + cache = (child as TypeInfo).let { tf -> + val typeNamespace = tf.typeNamespace + val typeName = tf.typeName + CacheSerializers.index[typeNamespace to typeName] + ?: throw IllegalArgumentException("Cache provider for namespace '$typeNamespace' not found") + }.deserialize(child) + } + + "connection" -> { + val writeTimeout = child.renderAttribute("write-timeout") + ?.let(Duration::parse) ?: Duration.of(10, ChronoUnit.SECONDS) + val readTimeout = child.renderAttribute("read-timeout") + ?.let(Duration::parse) ?: Duration.of(10, ChronoUnit.SECONDS) + val idleTimeout = child.renderAttribute("idle-timeout") + ?.let(Duration::parse) ?: Duration.of(30, ChronoUnit.SECONDS) + val readIdleTimeout = child.renderAttribute("read-idle-timeout") + ?.let(Duration::parse) ?: Duration.of(60, ChronoUnit.SECONDS) + val writeIdleTimeout = child.renderAttribute("write-idle-timeout") + ?.let(Duration::parse) ?: Duration.of(60, ChronoUnit.SECONDS) + val maxRequestSize = child.renderAttribute("max-request-size") + ?.let(String::toInt) ?: 67108864 + connection = Configuration.Connection( + readTimeout, + writeTimeout, + idleTimeout, + readIdleTimeout, + writeIdleTimeout, + maxRequestSize + ) + } + "event-executor" -> { + val useVirtualThread = root.renderAttribute("use-virtual-threads") + ?.let(String::toBoolean) ?: true + eventExecutor = Configuration.EventExecutor(useVirtualThread) + } "tls" -> { val verifyClients = child.renderAttribute("verify-clients") ?.let(String::toBoolean) ?: false @@ -140,18 +170,18 @@ object Parser { } } } - return Configuration( + return Configuration.of( host, port, + incomingConnectionsBacklogSize, serverPath, + eventExecutor, + connection, users, groups, cache!!, authentication, tls, - useVirtualThread, - maxRequestSize, - incomingConnectionsBacklogSize ) } diff --git a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/configuration/Serializer.kt b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/configuration/Serializer.kt index 9739365..e1f1b46 100644 --- a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/configuration/Serializer.kt +++ b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/configuration/Serializer.kt @@ -14,10 +14,6 @@ object Serializer { it.xmlNamespace to it.xmlSchemaLocation }.toMap() return Xml.of(GBCS.GBCS_NAMESPACE_URI, GBCS.GBCS_PREFIX + ":server") { - attr("use-virtual-threads", conf.isUseVirtualThread.toString()) - attr("max-request-size", conf.maxRequestSize.toString()) - attr("incoming-connections-backlog-size", conf.incomingConnectionsBacklogSize.toString()) - // attr("xmlns:xs", GradleBuildCacheServer.XML_SCHEMA_NAMESPACE_URI) val value = schemaLocations.asSequence().map { (k, v) -> "$k $v" }.joinToString(" ") attr("xs:schemaLocation", value , namespaceURI = GBCS.XML_SCHEMA_NAMESPACE_URI) @@ -30,6 +26,20 @@ object Serializer { node("bind") { attr("host", conf.host) attr("port", conf.port.toString()) + attr("incoming-connections-backlog-size", conf.incomingConnectionsBacklogSize.toString()) + } + node("connection") { + conf.connection.let { connection -> + attr("read-timeout", connection.readTimeout.toString()) + attr("write-timeout", connection.writeTimeout.toString()) + attr("idle-timeout", connection.idleTimeout.toString()) + attr("read-idle-timeout", connection.readIdleTimeout.toString()) + attr("write-idle-timeout", connection.writeIdleTimeout.toString()) + attr("max-request-size", connection.maxRequestSize.toString()) + } + } + node("event-executor") { + attr("use-virtual-threads", conf.eventExecutor.isUseVirtualThreads.toString()) } val cache = conf.cache val serializer : CacheProvider = diff --git a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/handler/ServerHandler.kt b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/handler/ServerHandler.kt new file mode 100644 index 0000000..cdc39d8 --- /dev/null +++ b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/handler/ServerHandler.kt @@ -0,0 +1,129 @@ +package net.woggioni.gbcs.server.handler + +import io.netty.buffer.Unpooled +import io.netty.channel.ChannelFutureListener +import io.netty.channel.ChannelHandler +import io.netty.channel.ChannelHandlerContext +import io.netty.channel.DefaultFileRegion +import io.netty.channel.SimpleChannelInboundHandler +import io.netty.handler.codec.http.DefaultFullHttpResponse +import io.netty.handler.codec.http.DefaultHttpResponse +import io.netty.handler.codec.http.FullHttpRequest +import io.netty.handler.codec.http.HttpHeaderNames +import io.netty.handler.codec.http.HttpHeaderValues +import io.netty.handler.codec.http.HttpMethod +import io.netty.handler.codec.http.HttpResponseStatus +import io.netty.handler.codec.http.HttpUtil +import io.netty.handler.codec.http.LastHttpContent +import io.netty.handler.stream.ChunkedNioFile +import io.netty.handler.stream.ChunkedNioStream +import net.woggioni.gbcs.api.Cache +import net.woggioni.gbcs.common.contextLogger +import net.woggioni.gbcs.server.debug +import net.woggioni.gbcs.server.warn +import java.nio.channels.FileChannel +import java.nio.file.Path + +@ChannelHandler.Sharable +class ServerHandler(private val cache: Cache, private val serverPrefix: Path) : + SimpleChannelInboundHandler() { + + private val log = contextLogger() + + override fun channelRead0(ctx: ChannelHandlerContext, msg: FullHttpRequest) { + val keepAlive: Boolean = HttpUtil.isKeepAlive(msg) + val method = msg.method() + if (method === HttpMethod.GET) { + val path = Path.of(msg.uri()) + val prefix = path.parent + val key = path.fileName.toString() + if (serverPrefix == prefix) { + cache.get(key)?.let { channel -> + log.debug(ctx) { + "Cache hit for key '$key'" + } + val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK) + response.headers()[HttpHeaderNames.CONTENT_TYPE] = HttpHeaderValues.APPLICATION_OCTET_STREAM + if (!keepAlive) { + response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE) + response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.IDENTITY) + } else { + response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE) + response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED) + } + ctx.write(response) + when (channel) { + is FileChannel -> { + if (keepAlive) { + ctx.write(ChunkedNioFile(channel)) + ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT) + } else { + ctx.writeAndFlush(DefaultFileRegion(channel, 0, channel.size())) + .addListener(ChannelFutureListener.CLOSE) + } + } + + else -> { + ctx.write(ChunkedNioStream(channel)) + ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT) + } + } + } ?: let { + log.debug(ctx) { + "Cache miss for key '$key'" + } + val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND) + response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0 + ctx.writeAndFlush(response) + } + } else { + log.warn(ctx) { + "Got request for unhandled path '${msg.uri()}'" + } + val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.BAD_REQUEST) + response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0 + ctx.writeAndFlush(response) + } + } else if (method === HttpMethod.PUT) { + val path = Path.of(msg.uri()) + val prefix = path.parent + val key = path.fileName.toString() + + if (serverPrefix == prefix) { + log.debug(ctx) { + "Added value for key '$key' to build cache" + } + val bodyBytes = msg.content().run { + if (isDirect) { + ByteArray(readableBytes()).also { + readBytes(it) + } + } else { + array() + } + } + cache.put(key, bodyBytes) + val response = DefaultFullHttpResponse( + msg.protocolVersion(), HttpResponseStatus.CREATED, + Unpooled.copiedBuffer(key.toByteArray()) + ) + response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes() + ctx.writeAndFlush(response) + } else { + log.warn(ctx) { + "Got request for unhandled path '${msg.uri()}'" + } + val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.BAD_REQUEST) + response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0" + ctx.writeAndFlush(response) + } + } else { + log.warn(ctx) { + "Got request with unhandled method '${msg.method().name()}'" + } + val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.BAD_REQUEST) + response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0" + ctx.writeAndFlush(response) + } + } +} \ No newline at end of file diff --git a/gbcs-server/src/main/resources/net/woggioni/gbcs/server/schema/gbcs.xsd b/gbcs-server/src/main/resources/net/woggioni/gbcs/server/schema/gbcs.xsd index ca56ae9..ee680b6 100644 --- a/gbcs-server/src/main/resources/net/woggioni/gbcs/server/schema/gbcs.xsd +++ b/gbcs-server/src/main/resources/net/woggioni/gbcs/server/schema/gbcs.xsd @@ -8,6 +8,8 @@ + + @@ -23,14 +25,25 @@ - - - + + + + + + + + + + + + + + diff --git a/gbcs-server/src/test/kotlin/net/woggioni/gbcs/server/test/AbstractBasicAuthServerTest.kt b/gbcs-server/src/test/kotlin/net/woggioni/gbcs/server/test/AbstractBasicAuthServerTest.kt index 5301e67..ee772f1 100644 --- a/gbcs-server/src/test/kotlin/net/woggioni/gbcs/server/test/AbstractBasicAuthServerTest.kt +++ b/gbcs-server/src/test/kotlin/net/woggioni/gbcs/server/test/AbstractBasicAuthServerTest.kt @@ -11,6 +11,7 @@ import java.net.http.HttpRequest import java.nio.charset.StandardCharsets import java.nio.file.Path import java.time.Duration +import java.time.temporal.ChronoUnit import java.util.Base64 import java.util.zip.Deflater import kotlin.random.Random @@ -30,10 +31,20 @@ abstract class AbstractBasicAuthServerTest : AbstractServerTest() { override fun setUp() { this.cacheDir = testDir.resolve("cache") - cfg = Configuration( + cfg = Configuration.of( "127.0.0.1", NetworkUtils.getFreePort(), + 50, serverPath, + Configuration.EventExecutor(false), + Configuration.Connection( + Duration.of(10, ChronoUnit.SECONDS), + Duration.of(10, ChronoUnit.SECONDS), + Duration.of(60, ChronoUnit.SECONDS), + Duration.of(30, ChronoUnit.SECONDS), + Duration.of(30, ChronoUnit.SECONDS), + 0x1000 + ), users.asSequence().map { it.name to it}.toMap(), sequenceOf(writersGroup, readersGroup).map { it.name to it}.toMap(), FileSystemCacheConfiguration(this.cacheDir, @@ -44,9 +55,6 @@ abstract class AbstractBasicAuthServerTest : AbstractServerTest() { ), Configuration.BasicAuthentication(), null, - true, - 0x10000, - 100 ) Xml.write(Serializer.serialize(cfg), System.out) } diff --git a/gbcs-server/src/test/kotlin/net/woggioni/gbcs/server/test/AbstractTlsServerTest.kt b/gbcs-server/src/test/kotlin/net/woggioni/gbcs/server/test/AbstractTlsServerTest.kt index 085db3d..9611973 100644 --- a/gbcs-server/src/test/kotlin/net/woggioni/gbcs/server/test/AbstractTlsServerTest.kt +++ b/gbcs-server/src/test/kotlin/net/woggioni/gbcs/server/test/AbstractTlsServerTest.kt @@ -18,6 +18,7 @@ import java.nio.file.Path import java.security.KeyStore import java.security.KeyStore.PasswordProtection import java.time.Duration +import java.time.temporal.ChronoUnit import java.util.Base64 import java.util.zip.Deflater import javax.net.ssl.KeyManagerFactory @@ -138,7 +139,17 @@ abstract class AbstractTlsServerTest : AbstractServerTest() { cfg = Configuration( "127.0.0.1", NetworkUtils.getFreePort(), + 100, serverPath, + Configuration.EventExecutor(false), + Configuration.Connection( + Duration.of(10, ChronoUnit.SECONDS), + Duration.of(10, ChronoUnit.SECONDS), + Duration.of(60, ChronoUnit.SECONDS), + Duration.of(30, ChronoUnit.SECONDS), + Duration.of(30, ChronoUnit.SECONDS), + 0x1000 + ), users.asSequence().map { it.name to it }.toMap(), sequenceOf(writersGroup, readersGroup).map { it.name to it }.toMap(), FileSystemCacheConfiguration(this.cacheDir, @@ -156,9 +167,6 @@ abstract class AbstractTlsServerTest : AbstractServerTest() { Configuration.TrustStore(this.trustStoreFile, null, false), true ), - false, - 0x10000, - 100 ) Xml.write(Serializer.serialize(cfg), System.out) } diff --git a/gbcs-server/src/test/kotlin/net/woggioni/gbcs/server/test/NoAuthServerTest.kt b/gbcs-server/src/test/kotlin/net/woggioni/gbcs/server/test/NoAuthServerTest.kt index 31f85b6..3c71c68 100644 --- a/gbcs-server/src/test/kotlin/net/woggioni/gbcs/server/test/NoAuthServerTest.kt +++ b/gbcs-server/src/test/kotlin/net/woggioni/gbcs/server/test/NoAuthServerTest.kt @@ -15,6 +15,7 @@ import java.net.http.HttpRequest import java.net.http.HttpResponse import java.nio.file.Path import java.time.Duration +import java.time.temporal.ChronoUnit import java.util.Base64 import java.util.zip.Deflater import kotlin.random.Random @@ -33,7 +34,17 @@ class NoAuthServerTest : AbstractServerTest() { cfg = Configuration( "127.0.0.1", NetworkUtils.getFreePort(), + 100, serverPath, + Configuration.EventExecutor(false), + Configuration.Connection( + Duration.of(10, ChronoUnit.SECONDS), + Duration.of(10, ChronoUnit.SECONDS), + Duration.of(60, ChronoUnit.SECONDS), + Duration.of(30, ChronoUnit.SECONDS), + Duration.of(30, ChronoUnit.SECONDS), + 0x1000 + ), emptyMap(), emptyMap(), FileSystemCacheConfiguration( @@ -45,9 +56,6 @@ class NoAuthServerTest : AbstractServerTest() { ), null, null, - true, - 0x10000, - 100 ) Xml.write(Serializer.serialize(cfg), System.out) } diff --git a/gbcs-server/src/test/resources/net/woggioni/gbcs/server/test/gbcs-default.xml b/gbcs-server/src/test/resources/net/woggioni/gbcs/server/test/gbcs-default.xml index 47ee1f1..b302012 100644 --- a/gbcs-server/src/test/resources/net/woggioni/gbcs/server/test/gbcs-default.xml +++ b/gbcs-server/src/test/resources/net/woggioni/gbcs/server/test/gbcs-default.xml @@ -1,8 +1,16 @@ - - + + + diff --git a/gbcs-server/src/test/resources/net/woggioni/gbcs/server/test/gbcs-memcached.xml b/gbcs-server/src/test/resources/net/woggioni/gbcs/server/test/gbcs-memcached.xml index e3027cc..5c4ab6e 100644 --- a/gbcs-server/src/test/resources/net/woggioni/gbcs/server/test/gbcs-memcached.xml +++ b/gbcs-server/src/test/resources/net/woggioni/gbcs/server/test/gbcs-memcached.xml @@ -1,9 +1,17 @@ - - + + + diff --git a/gbcs-server/src/test/resources/net/woggioni/gbcs/server/test/gbcs-tls.xml b/gbcs-server/src/test/resources/net/woggioni/gbcs/server/test/gbcs-tls.xml index 7e2bbe0..e72736b 100644 --- a/gbcs-server/src/test/resources/net/woggioni/gbcs/server/test/gbcs-tls.xml +++ b/gbcs-server/src/test/resources/net/woggioni/gbcs/server/test/gbcs-tls.xml @@ -1,8 +1,16 @@ - - + + +