diff --git a/gradle.properties b/gradle.properties index 441ead7..6c23778 100644 --- a/gradle.properties +++ b/gradle.properties @@ -2,7 +2,7 @@ org.gradle.configuration-cache=false org.gradle.parallel=true org.gradle.caching=true -rbcs.version = 0.1.4 +rbcs.version = 0.1.5 lys.version = 2025.02.05 diff --git a/rbcs-cli/build.gradle b/rbcs-cli/build.gradle index eecfc7f..ef25c5f 100644 --- a/rbcs-cli/build.gradle +++ b/rbcs-cli/build.gradle @@ -44,7 +44,6 @@ envelopeJar { dependencies { implementation catalog.jwo implementation catalog.slf4j.api - implementation catalog.netty.codec.http implementation catalog.picocli implementation project(':rbcs-client') diff --git a/rbcs-client/build.gradle b/rbcs-client/build.gradle index e0eefe8..221ef2c 100644 --- a/rbcs-client/build.gradle +++ b/rbcs-client/build.gradle @@ -6,9 +6,11 @@ plugins { dependencies { implementation project(':rbcs-api') implementation project(':rbcs-common') - implementation catalog.picocli implementation catalog.slf4j.api implementation catalog.netty.buffer + implementation catalog.netty.handler + implementation catalog.netty.transport + implementation catalog.netty.common implementation catalog.netty.codec.http testRuntimeOnly catalog.logback.classic diff --git a/rbcs-server/build.gradle b/rbcs-server/build.gradle index 62507bb..be90be2 100644 --- a/rbcs-server/build.gradle +++ b/rbcs-server/build.gradle @@ -9,6 +9,9 @@ dependencies { implementation catalog.jwo implementation catalog.slf4j.api implementation catalog.netty.codec.http + implementation catalog.netty.handler + implementation catalog.netty.buffer + implementation catalog.netty.transport api project(':rbcs-common') api project(':rbcs-api') @@ -36,3 +39,4 @@ publishing { } + diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/RemoteBuildCacheServer.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/RemoteBuildCacheServer.kt index 345a93c..76af690 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/RemoteBuildCacheServer.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/RemoteBuildCacheServer.kt @@ -30,11 +30,13 @@ import io.netty.handler.timeout.IdleStateHandler import io.netty.util.AttributeKey import io.netty.util.concurrent.DefaultEventExecutorGroup import io.netty.util.concurrent.EventExecutorGroup +import net.woggioni.jwo.JWO +import net.woggioni.jwo.Tuple2 import net.woggioni.rbcs.api.Configuration import net.woggioni.rbcs.api.exception.ConfigurationException -import net.woggioni.rbcs.common.RBCS.toUrl import net.woggioni.rbcs.common.PasswordSecurity.decodePasswordHash import net.woggioni.rbcs.common.PasswordSecurity.hashPassword +import net.woggioni.rbcs.common.RBCS.toUrl import net.woggioni.rbcs.common.Xml import net.woggioni.rbcs.common.contextLogger import net.woggioni.rbcs.common.debug @@ -48,8 +50,6 @@ import net.woggioni.rbcs.server.configuration.Serializer import net.woggioni.rbcs.server.exception.ExceptionHandler import net.woggioni.rbcs.server.handler.ServerHandler import net.woggioni.rbcs.server.throttling.ThrottlingHandler -import net.woggioni.jwo.JWO -import net.woggioni.jwo.Tuple2 import java.io.OutputStream import java.net.InetSocketAddress import java.nio.file.Files @@ -59,6 +59,7 @@ import java.security.PrivateKey import java.security.cert.X509Certificate import java.util.Arrays import java.util.Base64 +import java.util.concurrent.Future import java.util.concurrent.TimeUnit import java.util.regex.Matcher import java.util.regex.Pattern @@ -128,11 +129,12 @@ class RemoteBuildCacheServer(private val cfg: Configuration) { val clientCertificate = peerCertificates.first() as X509Certificate val user = userExtractor?.extract(clientCertificate) val group = groupExtractor?.extract(clientCertificate) - val allGroups = ((user?.groups ?: emptySet()).asSequence() + sequenceOf(group).filterNotNull()).toSet() + val allGroups = + ((user?.groups ?: emptySet()).asSequence() + sequenceOf(group).filterNotNull()).toSet() AuthenticationResult(user, allGroups) - } ?: anonymousUserGroups?.let{ AuthenticationResult(null, it) } + } ?: anonymousUserGroups?.let { AuthenticationResult(null, it) } } catch (es: SSLPeerUnverifiedException) { - anonymousUserGroups?.let{ AuthenticationResult(null, it) } + anonymousUserGroups?.let { AuthenticationResult(null, it) } } } } @@ -191,7 +193,7 @@ class RemoteBuildCacheServer(private val cfg: Configuration) { private class ServerInitializer( private val cfg: Configuration, private val eventExecutorGroup: EventExecutorGroup - ) : ChannelInitializer() { + ) : ChannelInitializer(), AutoCloseable { companion object { private fun createSslCtx(tls: Configuration.Tls): SslContext { @@ -213,7 +215,7 @@ class RemoteBuildCacheServer(private val cfg: Configuration) { trustManager( ClientCertificateValidator.getTrustManager(ts, trustStore.isCheckCertificateStatus) ) - if(trustStore.isRequireClientCertificate) ClientAuth.REQUIRE + if (trustStore.isRequireClientCertificate) ClientAuth.REQUIRE else ClientAuth.OPTIONAL } ?: ClientAuth.NONE clientAuth(clientAuth) @@ -245,10 +247,11 @@ class RemoteBuildCacheServer(private val cfg: Configuration) { private val log = contextLogger() + private val cache = cfg.cache.materialize() + private val serverHandler = let { - val cacheImplementation = cfg.cache.materialize() val prefix = Path.of("/").resolve(Path.of(cfg.serverPath ?: "/")) - ServerHandler(cacheImplementation, prefix) + ServerHandler(cache, prefix) } private val exceptionHandler = ExceptionHandler() @@ -311,7 +314,7 @@ class RemoteBuildCacheServer(private val cfg: Configuration) { cfg.connection.also { conn -> val readTimeout = conn.readTimeout.toMillis() val writeTimeout = conn.writeTimeout.toMillis() - if(readTimeout > 0 || writeTimeout > 0) { + if (readTimeout > 0 || writeTimeout > 0) { pipeline.addLast( IdleStateHandler( false, @@ -325,7 +328,7 @@ class RemoteBuildCacheServer(private val cfg: Configuration) { val readIdleTimeout = conn.readIdleTimeout.toMillis() val writeIdleTimeout = conn.writeIdleTimeout.toMillis() val idleTimeout = conn.idleTimeout.toMillis() - if(readIdleTimeout > 0 || writeIdleTimeout > 0 || idleTimeout > 0) { + if (readIdleTimeout > 0 || writeIdleTimeout > 0 || idleTimeout > 0) { pipeline.addLast( IdleStateHandler( true, @@ -340,16 +343,19 @@ class RemoteBuildCacheServer(private val cfg: Configuration) { pipeline.addLast(object : ChannelInboundHandlerAdapter() { override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) { if (evt is IdleStateEvent) { - when(evt.state()) { + when (evt.state()) { IdleState.READER_IDLE -> log.debug { "Read timeout reached on channel ${ch.id().asShortText()}, closing the connection" } + IdleState.WRITER_IDLE -> log.debug { "Write timeout reached on channel ${ch.id().asShortText()}, closing the connection" } + IdleState.ALL_IDLE -> log.debug { "Idle timeout reached on channel ${ch.id().asShortText()}, closing the connection" } + null -> throw IllegalStateException("This should never happen") } ctx.close() @@ -370,26 +376,41 @@ class RemoteBuildCacheServer(private val cfg: Configuration) { pipeline.addLast(eventExecutorGroup, serverHandler) pipeline.addLast(exceptionHandler) } + + override fun close() { + cache.close() + } } class ServerHandle( httpChannelFuture: ChannelFuture, - private val executorGroups: Iterable + private val executorGroups: Iterable, + private val serverInitializer: AutoCloseable ) : AutoCloseable { private val httpChannel: Channel = httpChannelFuture.channel() private val closeFuture: ChannelFuture = httpChannel.closeFuture() private val log = contextLogger() - fun shutdown(): ChannelFuture { + fun shutdown(): Future { return httpChannel.close() } override fun close() { try { closeFuture.sync() - } finally { - executorGroups.forEach { + } catch (ex: Throwable) { + log.error(ex.message, ex) + } + try { + serverInitializer.close() + } catch (ex: Throwable) { + log.error(ex.message, ex) + } + executorGroups.forEach { + try { it.shutdownGracefully().sync() + } catch (ex: Throwable) { + log.error(ex.message, ex) } } log.info { @@ -411,11 +432,12 @@ class RemoteBuildCacheServer(private val cfg: Configuration) { } DefaultEventExecutorGroup(Runtime.getRuntime().availableProcessors(), threadFactory) } + val serverInitializer = ServerInitializer(cfg, eventExecutorGroup) val bootstrap = ServerBootstrap().apply { // Configure the server group(bossGroup, workerGroup) channel(serverSocketChannel) - childHandler(ServerInitializer(cfg, eventExecutorGroup)) + childHandler(serverInitializer) option(ChannelOption.SO_BACKLOG, cfg.incomingConnectionsBacklogSize) childOption(ChannelOption.SO_KEEPALIVE, true) } @@ -427,6 +449,6 @@ class RemoteBuildCacheServer(private val cfg: Configuration) { log.info { "RemoteBuildCacheServer is listening on ${cfg.host}:${cfg.port}" } - return ServerHandle(httpChannel, setOf(bossGroup, workerGroup, eventExecutorGroup)) + return ServerHandle(httpChannel, setOf(bossGroup, workerGroup, eventExecutorGroup), serverInitializer) } } diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCache.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCache.kt index eb5b552..e6a98fc 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCache.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCache.kt @@ -1,12 +1,11 @@ package net.woggioni.rbcs.server.cache import io.netty.buffer.ByteBuf +import net.woggioni.jwo.JWO import net.woggioni.rbcs.api.Cache import net.woggioni.rbcs.common.ByteBufInputStream import net.woggioni.rbcs.common.RBCS.digestString import net.woggioni.rbcs.common.contextLogger -import net.woggioni.jwo.JWO -import net.woggioni.jwo.LockFile import java.nio.channels.Channels import java.nio.channels.FileChannel import java.nio.file.Files @@ -18,7 +17,6 @@ import java.security.MessageDigest import java.time.Duration import java.time.Instant import java.util.concurrent.CompletableFuture -import java.util.concurrent.atomic.AtomicReference import java.util.zip.Deflater import java.util.zip.DeflaterOutputStream import java.util.zip.Inflater @@ -41,7 +39,10 @@ class FileSystemCache( Files.createDirectories(root) } - private var nextGc = AtomicReference(Instant.now().plus(maxAge)) + @Volatile + private var running = true + + private var nextGc = Instant.now() override fun get(key: String) = (digestAlgorithm ?.let(MessageDigest::getInstance) @@ -67,8 +68,6 @@ class FileSystemCache( FileChannel.open(file, StandardOpenOption.READ) } } - }.also { - gc() }.let { CompletableFuture.completedFuture(it) } @@ -98,33 +97,51 @@ class FileSystemCache( Files.delete(tmpFile) throw t } - }.also { - gc() } return CompletableFuture.completedFuture(null) } + private val garbageCollector = Thread.ofVirtual().name("file-system-cache-gc").start { + while (running) { + gc() + } + } + private fun gc() { val now = Instant.now() - val oldValue = nextGc.getAndSet(now.plus(maxAge)) - if (oldValue < now) { - actualGc(now) + if (nextGc < now) { + val oldestEntry = actualGc(now) + nextGc = (oldestEntry ?: now).plus(maxAge) } + Thread.sleep(minOf(Duration.between(now, nextGc), Duration.ofSeconds(1))) } - @Synchronized - private fun actualGc(now: Instant) { - Files.list(root).filter { - val creationTimeStamp = Files.readAttributes(it, BasicFileAttributes::class.java) - .creationTime() - .toInstant() - now > creationTimeStamp.plus(maxAge) - }.forEach { file -> - LockFile.acquire(file, false).use { - Files.delete(file) + /** + * Returns the creation timestamp of the oldest cache entry (if any) + */ + private fun actualGc(now: Instant) : Instant? { + var result :Instant? = null + Files.list(root) + .filter { path -> + JWO.splitExtension(path) + .map { it._2 } + .map { it != ".tmp" } + .orElse(true) } - } + .filter { + val creationTimeStamp = Files.readAttributes(it, BasicFileAttributes::class.java) + .creationTime() + .toInstant() + if(result == null || creationTimeStamp < result) { + result = creationTimeStamp + } + now > creationTimeStamp.plus(maxAge) + }.forEach(Files::delete) + return result } - override fun close() {} + override fun close() { + running = false + garbageCollector.join() + } } \ No newline at end of file diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCache.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCache.kt index ecdc7ee..0eb56b2 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCache.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCache.kt @@ -1,12 +1,12 @@ package net.woggioni.rbcs.server.cache import io.netty.buffer.ByteBuf +import net.woggioni.jwo.JWO import net.woggioni.rbcs.api.Cache import net.woggioni.rbcs.common.ByteBufInputStream import net.woggioni.rbcs.common.ByteBufOutputStream import net.woggioni.rbcs.common.RBCS.digestString import net.woggioni.rbcs.common.contextLogger -import net.woggioni.jwo.JWO import java.nio.channels.Channels import java.security.MessageDigest import java.time.Duration @@ -42,9 +42,11 @@ class InMemoryCache( private val removalQueue = PriorityBlockingQueue() + @Volatile private var running = true - private val garbageCollector = Thread { - while(true) { + + private val garbageCollector = Thread.ofVirtual().name("in-memory-cache-gc").start { + while(running) { val el = removalQueue.take() val buf = el.value val now = Instant.now() @@ -62,8 +64,6 @@ class InMemoryCache( Thread.sleep(minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1))) } } - }.apply { - start() } private fun removeEldest() : Long {