From 23f2a351a6f7d6fa3e26dd73c5aff50d4cbddf2b Mon Sep 17 00:00:00 2001 From: Walter Oggioni Date: Mon, 24 Feb 2025 13:52:20 +0800 Subject: [PATCH] shared event executor group between server and clients - improved documentation - closed memcache client's thread pools --- README.md | 46 +++++++++- build.gradle | 5 +- rbcs-api/build.gradle | 1 + rbcs-api/src/main/java/module-info.java | 1 + .../net/woggioni/rbcs/api/AsyncCloseable.java | 13 +++ .../rbcs/api/CacheHandlerFactory.java | 12 ++- .../net/woggioni/rbcs/api/Configuration.java | 11 --- rbcs-cli/build.gradle | 43 ++++----- .../kotlin/net/woggioni/rbcs/common/RBCS.kt | 4 +- .../memcache/MemcacheCacheConfiguration.kt | 67 ++++++++++++-- .../server/memcache/client/MemcacheClient.kt | 22 ++--- .../server/memcache/schema/rbcs-memcache.xsd | 2 +- .../rbcs/server/RemoteBuildCacheServer.kt | 92 +++++++++++-------- .../rbcs/server/cache/FileSystemCache.kt | 23 +++-- .../cache/FileSystemCacheConfiguration.kt | 14 ++- .../server/cache/FileSystemCacheProvider.kt | 2 +- .../rbcs/server/cache/InMemoryCache.kt | 47 ++++++---- .../cache/InMemoryCacheConfiguration.kt | 15 ++- .../server/cache/InMemoryCacheProvider.kt | 2 +- .../net/woggioni/rbcs/server/schema/rbcs.xsd | 4 +- 20 files changed, 286 insertions(+), 140 deletions(-) create mode 100644 rbcs-api/src/main/java/net/woggioni/rbcs/api/AsyncCloseable.java diff --git a/README.md b/README.md index b3f4366..028b8bf 100644 --- a/README.md +++ b/README.md @@ -66,11 +66,18 @@ buildCache { url = 'https://rbcs.example.com/' push = true allowInsecureProtocol = false + // The credentials block is only required if you enable + // HTTP basic authentication on RBCS + credentials { + username = 'build-cache-user' + password = 'some-complicated-password' + } } } ``` -alternatively you can add this to `${GRADLE_HOME}/init.gradle` +alternatively you can add this to `${GRADLE_HOME}/init.gradle` to configure the remote cache +at the system level ```groovy gradle.settingsEvaluated { settings -> @@ -79,14 +86,51 @@ gradle.settingsEvaluated { settings -> url = 'https://rbcs.example.com/' push = true allowInsecureProtocol = false + // The credentials block is only required if you enable + // HTTP basic authentication on RBCS + credentials { + username = 'build-cache-user' + password = 'some-complicated-password' + } } } } ``` +add `org.gradle.caching=true` to your `/gradle.properties` or run gradle with `--build-cache`. + +Read [Gradle documentation](https://docs.gradle.org/current/userguide/build_cache.html) for more detailed information. + ### Using RBCS with Maven +1. Create an `extensions.xml` in `/.mvn/extensions.xml` with the following content + ```xml + + + org.apache.maven.extensions + maven-build-cache-extension + 1.2.0 + + + ``` +2. Copy [maven-build-cache-config.xml](https://maven.apache.org/extensions/maven-build-cache-extension/maven-build-cache-config.xml) into `/.mvn/` folder +3. Edit the `cache/configuration/remote` element + ```xml + + https://rbcs.example.com/ + + ``` +4. Run maven with + ```bash + mvn -Dmaven.build.cache.enabled=true -Dmaven.build.cache.debugOutput=true -Dmaven.build.cache.remote.save.enabled=true package + ``` + +Alternatively you can set those properties in your `/pom.xml` + + Read [here](https://maven.apache.org/extensions/maven-build-cache-extension/remote-cache.html) +for more informations ## FAQ ### Why should I use a build cache? diff --git a/build.gradle b/build.gradle index 5afc3b6..a943bb4 100644 --- a/build.gradle +++ b/build.gradle @@ -14,9 +14,7 @@ allprojects { subproject -> if(project.currentTag.isPresent()) { version = project.currentTag.map { it[0] }.get() } else { - version = project.gitRevision.map { gitRevision -> - "${getProperty('rbcs.version')}.${gitRevision[0..10]}" - }.get() + version = "${getProperty('rbcs.version')}-SNAPSHOT" } repositories { @@ -24,7 +22,6 @@ allprojects { subproject -> url = getProperty('gitea.maven.url') content { includeModule 'net.woggioni', 'jwo' - includeModule 'net.woggioni', 'xmemcached' includeGroup 'com.lys' } } diff --git a/rbcs-api/build.gradle b/rbcs-api/build.gradle index a99ec9a..f70de23 100644 --- a/rbcs-api/build.gradle +++ b/rbcs-api/build.gradle @@ -5,6 +5,7 @@ plugins { } dependencies { + api catalog.netty.common api catalog.netty.buffer api catalog.netty.handler } diff --git a/rbcs-api/src/main/java/module-info.java b/rbcs-api/src/main/java/module-info.java index 78cdc90..e6373b5 100644 --- a/rbcs-api/src/main/java/module-info.java +++ b/rbcs-api/src/main/java/module-info.java @@ -4,6 +4,7 @@ module net.woggioni.rbcs.api { requires io.netty.buffer; requires io.netty.handler; requires io.netty.transport; + requires io.netty.common; exports net.woggioni.rbcs.api; exports net.woggioni.rbcs.api.exception; exports net.woggioni.rbcs.api.message; diff --git a/rbcs-api/src/main/java/net/woggioni/rbcs/api/AsyncCloseable.java b/rbcs-api/src/main/java/net/woggioni/rbcs/api/AsyncCloseable.java new file mode 100644 index 0000000..e2e1739 --- /dev/null +++ b/rbcs-api/src/main/java/net/woggioni/rbcs/api/AsyncCloseable.java @@ -0,0 +1,13 @@ +package net.woggioni.rbcs.api; + +import java.util.concurrent.CompletableFuture; + +public interface AsyncCloseable extends AutoCloseable { + + CompletableFuture asyncClose(); + + @Override + default void close() throws Exception { + asyncClose().get(); + } +} diff --git a/rbcs-api/src/main/java/net/woggioni/rbcs/api/CacheHandlerFactory.java b/rbcs-api/src/main/java/net/woggioni/rbcs/api/CacheHandlerFactory.java index aeda6a5..ce7b5fa 100644 --- a/rbcs-api/src/main/java/net/woggioni/rbcs/api/CacheHandlerFactory.java +++ b/rbcs-api/src/main/java/net/woggioni/rbcs/api/CacheHandlerFactory.java @@ -1,7 +1,15 @@ package net.woggioni.rbcs.api; +import io.netty.channel.ChannelFactory; import io.netty.channel.ChannelHandler; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.socket.DatagramChannel; +import io.netty.channel.socket.SocketChannel; -public interface CacheHandlerFactory extends AutoCloseable { - ChannelHandler newHandler(); +public interface CacheHandlerFactory extends AsyncCloseable { + ChannelHandler newHandler( + EventLoopGroup eventLoopGroup, + ChannelFactory socketChannelFactory, + ChannelFactory datagramChannelFactory + ); } diff --git a/rbcs-api/src/main/java/net/woggioni/rbcs/api/Configuration.java b/rbcs-api/src/main/java/net/woggioni/rbcs/api/Configuration.java index 9d7af56..ce23be6 100644 --- a/rbcs-api/src/main/java/net/woggioni/rbcs/api/Configuration.java +++ b/rbcs-api/src/main/java/net/woggioni/rbcs/api/Configuration.java @@ -83,17 +83,6 @@ public class Configuration { Group extract(X509Certificate cert); } - @Value - public static class Throttling { - KeyStore keyStore; - TrustStore trustStore; - boolean verifyClients; - } - - public enum ClientCertificate { - REQUIRED, OPTIONAL - } - @Value public static class Tls { KeyStore keyStore; diff --git a/rbcs-cli/build.gradle b/rbcs-cli/build.gradle index 8c0e917..42a59b6 100644 --- a/rbcs-cli/build.gradle +++ b/rbcs-cli/build.gradle @@ -9,6 +9,7 @@ plugins { id 'maven-publish' } +import net.woggioni.gradle.envelope.EnvelopePlugin import net.woggioni.gradle.envelope.EnvelopeJarTask import net.woggioni.gradle.graalvm.NativeImageConfigurationTask import net.woggioni.gradle.graalvm.NativeImagePlugin @@ -16,15 +17,6 @@ import net.woggioni.gradle.graalvm.NativeImageTask import net.woggioni.gradle.graalvm.JlinkPlugin import net.woggioni.gradle.graalvm.JlinkTask -Property mainModuleName = objects.property(String.class) -mainModuleName.set('net.woggioni.rbcs.cli') -Property mainClassName = objects.property(String.class) -mainClassName.set('net.woggioni.rbcs.cli.RemoteBuildCacheServerCli') - -tasks.named(JavaPlugin.COMPILE_JAVA_TASK_NAME, JavaCompile) { - options.javaModuleMainClass = mainClassName -} - configurations { release { transitive = false @@ -34,13 +26,6 @@ configurations { } } -envelopeJar { - mainModule = mainModuleName - mainClass = mainClassName - - extraClasspath = ["plugins"] -} - dependencies { implementation catalog.jwo implementation catalog.slf4j.api @@ -54,18 +39,24 @@ dependencies { // runtimeOnly catalog.slf4j.simple } -Provider envelopeJarTaskProvider = tasks.named('envelopeJar', EnvelopeJarTask.class) { -// systemProperties['java.util.logging.config.class'] = 'net.woggioni.rbcs.LoggingConfig' -// systemProperties['log.config.source'] = 'net/woggioni/rbcs/cli/logging.properties' -// systemProperties['java.util.logging.config.file'] = 'classpath:net/woggioni/rbcs/cli/logging.properties' + +Property mainModuleName = objects.property(String.class) +mainModuleName.set('net.woggioni.rbcs.cli') +Property mainClassName = objects.property(String.class) +mainClassName.set('net.woggioni.rbcs.cli.RemoteBuildCacheServerCli') + +tasks.named(JavaPlugin.COMPILE_JAVA_TASK_NAME, JavaCompile) { + options.javaModuleMainClass = mainClassName +} + +Provider envelopeJarTaskProvider = tasks.named(EnvelopePlugin.ENVELOPE_JAR_TASK_NAME, EnvelopeJarTask.class) { + mainModule = mainModuleName + mainClass = mainClassName + + extraClasspath = ["plugins"] + systemProperties['logback.configurationFile'] = 'classpath:net/woggioni/rbcs/cli/logback.xml' systemProperties['io.netty.leakDetectionLevel'] = 'DISABLED' - -// systemProperties['org.slf4j.simpleLogger.showDateTime'] = 'true' -// systemProperties['org.slf4j.simpleLogger.defaultLogLevel'] = 'debug' -// systemProperties['org.slf4j.simpleLogger.log.com.google.code.yanf4j'] = 'warn' -// systemProperties['org.slf4j.simpleLogger.log.net.rubyeye.xmemcached'] = 'warn' -// systemProperties['org.slf4j.simpleLogger.dateTimeFormat'] = 'yyyy-MM-dd\'T\'HH:mm:ss.SSSZ' } tasks.named(NativeImagePlugin.CONFIGURE_NATIVE_IMAGE_TASK_NAME, NativeImageConfigurationTask) { diff --git a/rbcs-common/src/main/kotlin/net/woggioni/rbcs/common/RBCS.kt b/rbcs-common/src/main/kotlin/net/woggioni/rbcs/common/RBCS.kt index 86ff9f2..201c2f3 100644 --- a/rbcs-common/src/main/kotlin/net/woggioni/rbcs/common/RBCS.kt +++ b/rbcs-common/src/main/kotlin/net/woggioni/rbcs/common/RBCS.kt @@ -32,7 +32,7 @@ object RBCS { fun digest( data: ByteArray, - md: MessageDigest = MessageDigest.getInstance("MD5") + md: MessageDigest ): ByteArray { md.update(data) return md.digest() @@ -40,7 +40,7 @@ object RBCS { fun digestString( data: ByteArray, - md: MessageDigest = MessageDigest.getInstance("MD5") + md: MessageDigest ): String { return JWO.bytesToHex(digest(data, md)) } diff --git a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheConfiguration.kt b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheConfiguration.kt index d725584..3b84f9f 100644 --- a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheConfiguration.kt +++ b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheConfiguration.kt @@ -1,10 +1,20 @@ package net.woggioni.rbcs.server.memcache +import io.netty.channel.ChannelFactory +import io.netty.channel.ChannelHandler +import io.netty.channel.EventLoopGroup +import io.netty.channel.pool.FixedChannelPool +import io.netty.channel.socket.DatagramChannel +import io.netty.channel.socket.SocketChannel import net.woggioni.rbcs.api.CacheHandlerFactory import net.woggioni.rbcs.api.Configuration import net.woggioni.rbcs.common.HostAndPort import net.woggioni.rbcs.server.memcache.client.MemcacheClient import java.time.Duration +import java.util.concurrent.CompletableFuture +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicReference data class MemcacheCacheConfiguration( val servers: List, @@ -12,7 +22,7 @@ data class MemcacheCacheConfiguration( val digestAlgorithm: String? = null, val compressionMode: CompressionMode? = null, val compressionLevel: Int, - val chunkSize : Int + val chunkSize: Int ) : Configuration.Cache { enum class CompressionMode { @@ -23,19 +33,58 @@ data class MemcacheCacheConfiguration( } data class Server( - val endpoint : HostAndPort, - val connectionTimeoutMillis : Int?, - val maxConnections : Int + val endpoint: HostAndPort, + val connectionTimeoutMillis: Int?, + val maxConnections: Int ) - override fun materialize() = object : CacheHandlerFactory { - private val client = MemcacheClient(this@MemcacheCacheConfiguration.servers, chunkSize) - override fun close() { - client.close() + + private val connectionPoolMap = ConcurrentHashMap() + + override fun newHandler( + eventLoop: EventLoopGroup, + socketChannelFactory: ChannelFactory, + datagramChannelFactory: ChannelFactory + ): ChannelHandler { + return MemcacheCacheHandler( + MemcacheClient( + this@MemcacheCacheConfiguration.servers, + chunkSize, + eventLoop, + socketChannelFactory, + connectionPoolMap + ), + digestAlgorithm, + compressionMode != null, + compressionLevel, + chunkSize, + maxAge + ) + } + + override fun asyncClose() = object : CompletableFuture() { + init { + val failure = AtomicReference(null) + val pools = connectionPoolMap.values.toList() + val npools = pools.size + val finished = AtomicInteger(0) + pools.forEach { pool -> + pool.closeAsync().addListener { + if (!it.isSuccess) { + failure.compareAndSet(null, it.cause()) + } + if(finished.incrementAndGet() == npools) { + when(val ex = failure.get()) { + null -> complete(null) + else -> completeExceptionally(ex) + } + } + } + } + } } - override fun newHandler() = MemcacheCacheHandler(client, digestAlgorithm, compressionMode != null, compressionLevel, chunkSize, maxAge) } override fun getNamespaceURI() = "urn:net.woggioni.rbcs.server.memcache" diff --git a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/client/MemcacheClient.kt b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/client/MemcacheClient.kt index 950b26a..2708f9d 100644 --- a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/client/MemcacheClient.kt +++ b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/client/MemcacheClient.kt @@ -4,16 +4,17 @@ package net.woggioni.rbcs.server.memcache.client import io.netty.bootstrap.Bootstrap import io.netty.buffer.ByteBuf import io.netty.channel.Channel +import io.netty.channel.ChannelFactory import io.netty.channel.ChannelFutureListener import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelOption import io.netty.channel.ChannelPipeline +import io.netty.channel.EventLoopGroup import io.netty.channel.SimpleChannelInboundHandler -import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.pool.AbstractChannelPoolHandler import io.netty.channel.pool.ChannelPool import io.netty.channel.pool.FixedChannelPool -import io.netty.channel.socket.nio.NioSocketChannel +import io.netty.channel.socket.SocketChannel import io.netty.handler.codec.memcache.LastMemcacheContent import io.netty.handler.codec.memcache.MemcacheContent import io.netty.handler.codec.memcache.MemcacheObject @@ -33,23 +34,22 @@ import java.util.concurrent.ConcurrentHashMap import io.netty.util.concurrent.Future as NettyFuture -class MemcacheClient(private val servers: List, private val chunkSize : Int) : AutoCloseable { +class MemcacheClient( + private val servers: List, + private val chunkSize : Int, + private val group: EventLoopGroup, + private val channelFactory: ChannelFactory, + private val connectionPool: ConcurrentHashMap +) : AutoCloseable { private companion object { private val log = createLogger() } - private val group: NioEventLoopGroup - private val connectionPool: MutableMap = ConcurrentHashMap() - - init { - group = NioEventLoopGroup() - } - private fun newConnectionPool(server: MemcacheCacheConfiguration.Server): FixedChannelPool { val bootstrap = Bootstrap().apply { group(group) - channel(NioSocketChannel::class.java) + channelFactory(channelFactory) option(ChannelOption.SO_KEEPALIVE, true) remoteAddress(InetSocketAddress(server.endpoint.host, server.endpoint.port)) server.connectionTimeoutMillis?.let { diff --git a/rbcs-server-memcache/src/main/resources/net/woggioni/rbcs/server/memcache/schema/rbcs-memcache.xsd b/rbcs-server-memcache/src/main/resources/net/woggioni/rbcs/server/memcache/schema/rbcs-memcache.xsd index 66a57a5..4f68d95 100644 --- a/rbcs-server-memcache/src/main/resources/net/woggioni/rbcs/server/memcache/schema/rbcs-memcache.xsd +++ b/rbcs-server-memcache/src/main/resources/net/woggioni/rbcs/server/memcache/schema/rbcs-memcache.xsd @@ -21,7 +21,7 @@ - + diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/RemoteBuildCacheServer.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/RemoteBuildCacheServer.kt index 4c9e77f..88ae5e3 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/RemoteBuildCacheServer.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/RemoteBuildCacheServer.kt @@ -3,6 +3,7 @@ package net.woggioni.rbcs.server import io.netty.bootstrap.ServerBootstrap import io.netty.buffer.ByteBuf import io.netty.channel.Channel +import io.netty.channel.ChannelFactory import io.netty.channel.ChannelFuture import io.netty.channel.ChannelHandler.Sharable import io.netty.channel.ChannelHandlerContext @@ -11,7 +12,12 @@ import io.netty.channel.ChannelInitializer import io.netty.channel.ChannelOption import io.netty.channel.ChannelPromise import io.netty.channel.nio.NioEventLoopGroup +import io.netty.channel.socket.DatagramChannel +import io.netty.channel.socket.ServerSocketChannel +import io.netty.channel.socket.SocketChannel +import io.netty.channel.socket.nio.NioDatagramChannel import io.netty.channel.socket.nio.NioServerSocketChannel +import io.netty.channel.socket.nio.NioSocketChannel import io.netty.handler.codec.compression.CompressionOptions import io.netty.handler.codec.http.DefaultHttpContent import io.netty.handler.codec.http.HttpContentCompressor @@ -31,6 +37,7 @@ import io.netty.util.concurrent.DefaultEventExecutorGroup import io.netty.util.concurrent.EventExecutorGroup import net.woggioni.jwo.JWO import net.woggioni.jwo.Tuple2 +import net.woggioni.rbcs.api.AsyncCloseable import net.woggioni.rbcs.api.Configuration import net.woggioni.rbcs.api.exception.ConfigurationException import net.woggioni.rbcs.common.PasswordSecurity.decodePasswordHash @@ -200,8 +207,10 @@ class RemoteBuildCacheServer(private val cfg: Configuration) { private class ServerInitializer( private val cfg: Configuration, + private val channelFactory : ChannelFactory, + private val datagramChannelFactory : ChannelFactory, private val eventExecutorGroup: EventExecutorGroup - ) : ChannelInitializer(), AutoCloseable { + ) : ChannelInitializer(), AsyncCloseable { companion object { private fun createSslCtx(tls: Configuration.Tls): SslContext { @@ -368,21 +377,20 @@ class RemoteBuildCacheServer(private val cfg: Configuration) { ServerHandler(prefix) } pipeline.addLast(eventExecutorGroup, ServerHandler.NAME, serverHandler) - pipeline.addLast(cacheHandlerFactory.newHandler()) + + pipeline.addLast(cacheHandlerFactory.newHandler(ch.eventLoop(), channelFactory, datagramChannelFactory)) pipeline.addLast(TraceHandler) pipeline.addLast(ExceptionHandler) } - override fun close() { - cacheHandlerFactory.close() - } + override fun asyncClose() = cacheHandlerFactory.asyncClose() } class ServerHandle( closeFuture: ChannelFuture, private val bossGroup: EventExecutorGroup, private val executorGroups: Iterable, - private val serverInitializer: AutoCloseable, + private val serverInitializer: AsyncCloseable, ) : Future by from(closeFuture, executorGroups, serverInitializer) { companion object { @@ -391,42 +399,53 @@ class RemoteBuildCacheServer(private val cfg: Configuration) { private fun from( closeFuture: ChannelFuture, executorGroups: Iterable, - serverInitializer: AutoCloseable + serverInitializer: AsyncCloseable ): CompletableFuture { val result = CompletableFuture() closeFuture.addListener { val errors = mutableListOf() val deadline = Instant.now().plusSeconds(20) - - - for (executorGroup in executorGroups) { - val future = executorGroup.terminationFuture() - try { - val now = Instant.now() - if (now > deadline) { - future.get(0, TimeUnit.SECONDS) - } else { - future.get(Duration.between(now, deadline).toMillis(), TimeUnit.MILLISECONDS) - } - } - catch (te: TimeoutException) { - errors.addLast(te) - log.warn("Timeout while waiting for shutdown of $executorGroup", te) - } catch (ex: Throwable) { - log.warn(ex.message, ex) - errors.addLast(ex) - } - } try { serverInitializer.close() } catch (ex: Throwable) { log.error(ex.message, ex) errors.addLast(ex) } - if(errors.isEmpty()) { - result.complete(null) - } else { - result.completeExceptionally(errors.first()) + + serverInitializer.asyncClose().whenComplete { _, ex -> + if(ex != null) { + log.error(ex.message, ex) + errors.addLast(ex) + } + + executorGroups.map { + it.shutdownGracefully() + } + + for (executorGroup in executorGroups) { + val future = executorGroup.terminationFuture() + try { + val now = Instant.now() + if (now > deadline) { + future.get(0, TimeUnit.SECONDS) + } else { + future.get(Duration.between(now, deadline).toMillis(), TimeUnit.MILLISECONDS) + } + } + catch (te: TimeoutException) { + errors.addLast(te) + log.warn("Timeout while waiting for shutdown of $executorGroup", te) + } catch (ex: Throwable) { + log.warn(ex.message, ex) + errors.addLast(ex) + } + } + + if(errors.isEmpty()) { + result.complete(null) + } else { + result.completeExceptionally(errors.first()) + } } } @@ -441,16 +460,15 @@ class RemoteBuildCacheServer(private val cfg: Configuration) { fun sendShutdownSignal() { bossGroup.shutdownGracefully() - executorGroups.map { - it.shutdownGracefully() - } } } fun run(): ServerHandle { // Create the multithreaded event loops for the server val bossGroup = NioEventLoopGroup(1) - val serverSocketChannel = NioServerSocketChannel::class.java + val channelFactory = ChannelFactory { NioSocketChannel() } + val datagramChannelFactory = ChannelFactory { NioDatagramChannel() } + val serverChannelFactory = ChannelFactory { NioServerSocketChannel() } val workerGroup = NioEventLoopGroup(0) val eventExecutorGroup = run { val threadFactory = if (cfg.eventExecutor.isUseVirtualThreads) { @@ -460,11 +478,11 @@ class RemoteBuildCacheServer(private val cfg: Configuration) { } DefaultEventExecutorGroup(Runtime.getRuntime().availableProcessors(), threadFactory) } - val serverInitializer = ServerInitializer(cfg, eventExecutorGroup) + val serverInitializer = ServerInitializer(cfg, channelFactory, datagramChannelFactory, workerGroup) val bootstrap = ServerBootstrap().apply { // Configure the server group(bossGroup, workerGroup) - channel(serverSocketChannel) + channelFactory(serverChannelFactory) childHandler(serverInitializer) option(ChannelOption.SO_BACKLOG, cfg.incomingConnectionsBacklogSize) childOption(ChannelOption.SO_KEEPALIVE, true) diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCache.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCache.kt index 833bb16..724f060 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCache.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCache.kt @@ -1,6 +1,7 @@ package net.woggioni.rbcs.server.cache import net.woggioni.jwo.JWO +import net.woggioni.rbcs.api.AsyncCloseable import net.woggioni.rbcs.api.CacheValueMetadata import net.woggioni.rbcs.common.createLogger import java.io.ByteArrayOutputStream @@ -18,11 +19,12 @@ import java.nio.file.StandardOpenOption import java.nio.file.attribute.BasicFileAttributes import java.time.Duration import java.time.Instant +import java.util.concurrent.CompletableFuture class FileSystemCache( val root: Path, val maxAge: Duration -) : AutoCloseable { +) : AsyncCloseable { class EntryValue(val metadata: CacheValueMetadata, val channel : FileChannel, val offset : Long, val size : Long) : Serializable @@ -112,9 +114,18 @@ class FileSystemCache( return FileSink(metadata, file, tmpFile) } - private val garbageCollector = Thread.ofVirtual().name("file-system-cache-gc").start { - while (running) { - gc() + private val closeFuture = object : CompletableFuture() { + init { + Thread.ofVirtual().name("file-system-cache-gc").start { + try { + while (running) { + gc() + } + complete(null) + } catch (ex : Throwable) { + completeExceptionally(ex) + } + } } } @@ -151,8 +162,8 @@ class FileSystemCache( return result } - override fun close() { + override fun asyncClose() : CompletableFuture { running = false - garbageCollector.join() + return closeFuture } } \ No newline at end of file diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCacheConfiguration.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCacheConfiguration.kt index 081cdf1..aa7018d 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCacheConfiguration.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCacheConfiguration.kt @@ -1,5 +1,9 @@ package net.woggioni.rbcs.server.cache +import io.netty.channel.ChannelFactory +import io.netty.channel.EventLoopGroup +import io.netty.channel.socket.DatagramChannel +import io.netty.channel.socket.SocketChannel import net.woggioni.jwo.Application import net.woggioni.rbcs.api.CacheHandlerFactory import net.woggioni.rbcs.api.Configuration @@ -19,11 +23,13 @@ data class FileSystemCacheConfiguration( override fun materialize() = object : CacheHandlerFactory { private val cache = FileSystemCache(root ?: Application.builder("rbcs").build().computeCacheDirectory(), maxAge) - override fun close() { - cache.close() - } + override fun asyncClose() = cache.asyncClose() - override fun newHandler() = FileSystemCacheHandler(cache, digestAlgorithm, compressionEnabled, compressionLevel, chunkSize) + override fun newHandler( + eventLoop: EventLoopGroup, + socketChannelFactory: ChannelFactory, + datagramChannelFactory: ChannelFactory + ) = FileSystemCacheHandler(cache, digestAlgorithm, compressionEnabled, compressionLevel, chunkSize) } override fun getNamespaceURI() = RBCS.RBCS_NAMESPACE_URI diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCacheProvider.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCacheProvider.kt index 32092f5..8836597 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCacheProvider.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCacheProvider.kt @@ -30,7 +30,7 @@ class FileSystemCacheProvider : CacheProvider { val compressionLevel = el.renderAttribute("compression-level") ?.let(String::toInt) ?: Deflater.DEFAULT_COMPRESSION - val digestAlgorithm = el.renderAttribute("digest") ?: "MD5" + val digestAlgorithm = el.renderAttribute("digest") val chunkSize = el.renderAttribute("chunk-size") ?.let(Integer::decode) ?: 0x10000 diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCache.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCache.kt index 9d14862..add8bab 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCache.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCache.kt @@ -1,10 +1,12 @@ package net.woggioni.rbcs.server.cache import io.netty.buffer.ByteBuf +import net.woggioni.rbcs.api.AsyncCloseable import net.woggioni.rbcs.api.CacheValueMetadata import net.woggioni.rbcs.common.createLogger import java.time.Duration import java.time.Instant +import java.util.concurrent.CompletableFuture import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.PriorityBlockingQueue import java.util.concurrent.TimeUnit @@ -26,7 +28,7 @@ class CacheEntry( class InMemoryCache( private val maxAge: Duration, private val maxSize: Long -) : AutoCloseable { +) : AsyncCloseable { companion object { private val log = createLogger() @@ -45,26 +47,35 @@ class InMemoryCache( @Volatile private var running = true - private val garbageCollector = Thread.ofVirtual().name("in-memory-cache-gc").start { - while (running) { - val el = removalQueue.poll(1, TimeUnit.SECONDS) ?: continue - val value = el.value - val now = Instant.now() - if (now > el.expiry) { - val removed = map.remove(el.key, value) - if (removed) { - updateSizeAfterRemoval(value.content) - //Decrease the reference count for map - value.content.release() + private val closeFuture = object : CompletableFuture() { + init { + Thread.ofVirtual().name("in-memory-cache-gc").start { + try { + while (running) { + val el = removalQueue.poll(1, TimeUnit.SECONDS) ?: continue + val value = el.value + val now = Instant.now() + if (now > el.expiry) { + val removed = map.remove(el.key, value) + if (removed) { + updateSizeAfterRemoval(value.content) + //Decrease the reference count for map + value.content.release() + } + } else { + removalQueue.put(el) + Thread.sleep(minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1))) + } + } + complete(null) + } catch (ex: Throwable) { + completeExceptionally(ex) } - } else { - removalQueue.put(el) - Thread.sleep(minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1))) } } } - private fun removeEldest(): Long { + fun removeEldest(): Long { while (true) { val el = removalQueue.take() val value = el.value @@ -84,9 +95,9 @@ class InMemoryCache( } } - override fun close() { + override fun asyncClose() : CompletableFuture { running = false - garbageCollector.join() + return closeFuture } fun get(key: ByteArray) = map[CacheKey(key)]?.run { diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheConfiguration.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheConfiguration.kt index a6030e0..a99b3d9 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheConfiguration.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheConfiguration.kt @@ -1,5 +1,10 @@ package net.woggioni.rbcs.server.cache +import io.netty.channel.ChannelFactory +import io.netty.channel.EventLoopGroup +import io.netty.channel.socket.DatagramChannel +import io.netty.channel.socket.SocketChannel +import io.netty.util.concurrent.Future import net.woggioni.rbcs.api.CacheHandlerFactory import net.woggioni.rbcs.api.Configuration import net.woggioni.rbcs.common.RBCS @@ -16,11 +21,13 @@ data class InMemoryCacheConfiguration( override fun materialize() = object : CacheHandlerFactory { private val cache = InMemoryCache(maxAge, maxSize) - override fun close() { - cache.close() - } + override fun asyncClose() = cache.asyncClose() - override fun newHandler() = InMemoryCacheHandler(cache, digestAlgorithm, compressionEnabled, compressionLevel) + override fun newHandler( + eventLoop: EventLoopGroup, + socketChannelFactory: ChannelFactory, + datagramChannelFactory: ChannelFactory + ) = InMemoryCacheHandler(cache, digestAlgorithm, compressionEnabled, compressionLevel) } override fun getNamespaceURI() = RBCS.RBCS_NAMESPACE_URI diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheProvider.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheProvider.kt index f6987a4..023cf46 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheProvider.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheProvider.kt @@ -30,7 +30,7 @@ class InMemoryCacheProvider : CacheProvider { val compressionLevel = el.renderAttribute("compression-level") ?.let(String::toInt) ?: Deflater.DEFAULT_COMPRESSION - val digestAlgorithm = el.renderAttribute("digest") ?: "MD5" + val digestAlgorithm = el.renderAttribute("digest") val chunkSize = el.renderAttribute("chunk-size") ?.let(Integer::decode) ?: 0x10000 diff --git a/rbcs-server/src/main/resources/net/woggioni/rbcs/server/schema/rbcs.xsd b/rbcs-server/src/main/resources/net/woggioni/rbcs/server/schema/rbcs.xsd index 36c2715..cbc4019 100644 --- a/rbcs-server/src/main/resources/net/woggioni/rbcs/server/schema/rbcs.xsd +++ b/rbcs-server/src/main/resources/net/woggioni/rbcs/server/schema/rbcs.xsd @@ -153,7 +153,7 @@ - + Hashing algorithm to apply to the key. If omitted, no hashing is performed. @@ -209,7 +209,7 @@ - + Hashing algorithm to apply to the key. If omitted, no hashing is performed.