shared event executor group between server and clients
All checks were successful
CI / build (push) Successful in 3m44s

- improved documentation
- closed memcache client's thread pools
This commit is contained in:
2025-02-24 13:52:20 +08:00
parent c7d2b89d82
commit 23f2a351a6
20 changed files with 286 additions and 140 deletions

View File

@@ -3,6 +3,7 @@ package net.woggioni.rbcs.server
import io.netty.bootstrap.ServerBootstrap
import io.netty.buffer.ByteBuf
import io.netty.channel.Channel
import io.netty.channel.ChannelFactory
import io.netty.channel.ChannelFuture
import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.ChannelHandlerContext
@@ -11,7 +12,12 @@ import io.netty.channel.ChannelInitializer
import io.netty.channel.ChannelOption
import io.netty.channel.ChannelPromise
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.DatagramChannel
import io.netty.channel.socket.ServerSocketChannel
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioDatagramChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.compression.CompressionOptions
import io.netty.handler.codec.http.DefaultHttpContent
import io.netty.handler.codec.http.HttpContentCompressor
@@ -31,6 +37,7 @@ import io.netty.util.concurrent.DefaultEventExecutorGroup
import io.netty.util.concurrent.EventExecutorGroup
import net.woggioni.jwo.JWO
import net.woggioni.jwo.Tuple2
import net.woggioni.rbcs.api.AsyncCloseable
import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.api.exception.ConfigurationException
import net.woggioni.rbcs.common.PasswordSecurity.decodePasswordHash
@@ -200,8 +207,10 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
private class ServerInitializer(
private val cfg: Configuration,
private val channelFactory : ChannelFactory<SocketChannel>,
private val datagramChannelFactory : ChannelFactory<DatagramChannel>,
private val eventExecutorGroup: EventExecutorGroup
) : ChannelInitializer<Channel>(), AutoCloseable {
) : ChannelInitializer<Channel>(), AsyncCloseable {
companion object {
private fun createSslCtx(tls: Configuration.Tls): SslContext {
@@ -368,21 +377,20 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
ServerHandler(prefix)
}
pipeline.addLast(eventExecutorGroup, ServerHandler.NAME, serverHandler)
pipeline.addLast(cacheHandlerFactory.newHandler())
pipeline.addLast(cacheHandlerFactory.newHandler(ch.eventLoop(), channelFactory, datagramChannelFactory))
pipeline.addLast(TraceHandler)
pipeline.addLast(ExceptionHandler)
}
override fun close() {
cacheHandlerFactory.close()
}
override fun asyncClose() = cacheHandlerFactory.asyncClose()
}
class ServerHandle(
closeFuture: ChannelFuture,
private val bossGroup: EventExecutorGroup,
private val executorGroups: Iterable<EventExecutorGroup>,
private val serverInitializer: AutoCloseable,
private val serverInitializer: AsyncCloseable,
) : Future<Void> by from(closeFuture, executorGroups, serverInitializer) {
companion object {
@@ -391,42 +399,53 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
private fun from(
closeFuture: ChannelFuture,
executorGroups: Iterable<EventExecutorGroup>,
serverInitializer: AutoCloseable
serverInitializer: AsyncCloseable
): CompletableFuture<Void> {
val result = CompletableFuture<Void>()
closeFuture.addListener {
val errors = mutableListOf<Throwable>()
val deadline = Instant.now().plusSeconds(20)
for (executorGroup in executorGroups) {
val future = executorGroup.terminationFuture()
try {
val now = Instant.now()
if (now > deadline) {
future.get(0, TimeUnit.SECONDS)
} else {
future.get(Duration.between(now, deadline).toMillis(), TimeUnit.MILLISECONDS)
}
}
catch (te: TimeoutException) {
errors.addLast(te)
log.warn("Timeout while waiting for shutdown of $executorGroup", te)
} catch (ex: Throwable) {
log.warn(ex.message, ex)
errors.addLast(ex)
}
}
try {
serverInitializer.close()
} catch (ex: Throwable) {
log.error(ex.message, ex)
errors.addLast(ex)
}
if(errors.isEmpty()) {
result.complete(null)
} else {
result.completeExceptionally(errors.first())
serverInitializer.asyncClose().whenComplete { _, ex ->
if(ex != null) {
log.error(ex.message, ex)
errors.addLast(ex)
}
executorGroups.map {
it.shutdownGracefully()
}
for (executorGroup in executorGroups) {
val future = executorGroup.terminationFuture()
try {
val now = Instant.now()
if (now > deadline) {
future.get(0, TimeUnit.SECONDS)
} else {
future.get(Duration.between(now, deadline).toMillis(), TimeUnit.MILLISECONDS)
}
}
catch (te: TimeoutException) {
errors.addLast(te)
log.warn("Timeout while waiting for shutdown of $executorGroup", te)
} catch (ex: Throwable) {
log.warn(ex.message, ex)
errors.addLast(ex)
}
}
if(errors.isEmpty()) {
result.complete(null)
} else {
result.completeExceptionally(errors.first())
}
}
}
@@ -441,16 +460,15 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
fun sendShutdownSignal() {
bossGroup.shutdownGracefully()
executorGroups.map {
it.shutdownGracefully()
}
}
}
fun run(): ServerHandle {
// Create the multithreaded event loops for the server
val bossGroup = NioEventLoopGroup(1)
val serverSocketChannel = NioServerSocketChannel::class.java
val channelFactory = ChannelFactory<SocketChannel> { NioSocketChannel() }
val datagramChannelFactory = ChannelFactory<DatagramChannel> { NioDatagramChannel() }
val serverChannelFactory = ChannelFactory<ServerSocketChannel> { NioServerSocketChannel() }
val workerGroup = NioEventLoopGroup(0)
val eventExecutorGroup = run {
val threadFactory = if (cfg.eventExecutor.isUseVirtualThreads) {
@@ -460,11 +478,11 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
}
DefaultEventExecutorGroup(Runtime.getRuntime().availableProcessors(), threadFactory)
}
val serverInitializer = ServerInitializer(cfg, eventExecutorGroup)
val serverInitializer = ServerInitializer(cfg, channelFactory, datagramChannelFactory, workerGroup)
val bootstrap = ServerBootstrap().apply {
// Configure the server
group(bossGroup, workerGroup)
channel(serverSocketChannel)
channelFactory(serverChannelFactory)
childHandler(serverInitializer)
option(ChannelOption.SO_BACKLOG, cfg.incomingConnectionsBacklogSize)
childOption(ChannelOption.SO_KEEPALIVE, true)

View File

@@ -1,6 +1,7 @@
package net.woggioni.rbcs.server.cache
import net.woggioni.jwo.JWO
import net.woggioni.rbcs.api.AsyncCloseable
import net.woggioni.rbcs.api.CacheValueMetadata
import net.woggioni.rbcs.common.createLogger
import java.io.ByteArrayOutputStream
@@ -18,11 +19,12 @@ import java.nio.file.StandardOpenOption
import java.nio.file.attribute.BasicFileAttributes
import java.time.Duration
import java.time.Instant
import java.util.concurrent.CompletableFuture
class FileSystemCache(
val root: Path,
val maxAge: Duration
) : AutoCloseable {
) : AsyncCloseable {
class EntryValue(val metadata: CacheValueMetadata, val channel : FileChannel, val offset : Long, val size : Long) : Serializable
@@ -112,9 +114,18 @@ class FileSystemCache(
return FileSink(metadata, file, tmpFile)
}
private val garbageCollector = Thread.ofVirtual().name("file-system-cache-gc").start {
while (running) {
gc()
private val closeFuture = object : CompletableFuture<Void>() {
init {
Thread.ofVirtual().name("file-system-cache-gc").start {
try {
while (running) {
gc()
}
complete(null)
} catch (ex : Throwable) {
completeExceptionally(ex)
}
}
}
}
@@ -151,8 +162,8 @@ class FileSystemCache(
return result
}
override fun close() {
override fun asyncClose() : CompletableFuture<Void> {
running = false
garbageCollector.join()
return closeFuture
}
}

View File

@@ -1,5 +1,9 @@
package net.woggioni.rbcs.server.cache
import io.netty.channel.ChannelFactory
import io.netty.channel.EventLoopGroup
import io.netty.channel.socket.DatagramChannel
import io.netty.channel.socket.SocketChannel
import net.woggioni.jwo.Application
import net.woggioni.rbcs.api.CacheHandlerFactory
import net.woggioni.rbcs.api.Configuration
@@ -19,11 +23,13 @@ data class FileSystemCacheConfiguration(
override fun materialize() = object : CacheHandlerFactory {
private val cache = FileSystemCache(root ?: Application.builder("rbcs").build().computeCacheDirectory(), maxAge)
override fun close() {
cache.close()
}
override fun asyncClose() = cache.asyncClose()
override fun newHandler() = FileSystemCacheHandler(cache, digestAlgorithm, compressionEnabled, compressionLevel, chunkSize)
override fun newHandler(
eventLoop: EventLoopGroup,
socketChannelFactory: ChannelFactory<SocketChannel>,
datagramChannelFactory: ChannelFactory<DatagramChannel>
) = FileSystemCacheHandler(cache, digestAlgorithm, compressionEnabled, compressionLevel, chunkSize)
}
override fun getNamespaceURI() = RBCS.RBCS_NAMESPACE_URI

View File

@@ -30,7 +30,7 @@ class FileSystemCacheProvider : CacheProvider<FileSystemCacheConfiguration> {
val compressionLevel = el.renderAttribute("compression-level")
?.let(String::toInt)
?: Deflater.DEFAULT_COMPRESSION
val digestAlgorithm = el.renderAttribute("digest") ?: "MD5"
val digestAlgorithm = el.renderAttribute("digest")
val chunkSize = el.renderAttribute("chunk-size")
?.let(Integer::decode)
?: 0x10000

View File

@@ -1,10 +1,12 @@
package net.woggioni.rbcs.server.cache
import io.netty.buffer.ByteBuf
import net.woggioni.rbcs.api.AsyncCloseable
import net.woggioni.rbcs.api.CacheValueMetadata
import net.woggioni.rbcs.common.createLogger
import java.time.Duration
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.PriorityBlockingQueue
import java.util.concurrent.TimeUnit
@@ -26,7 +28,7 @@ class CacheEntry(
class InMemoryCache(
private val maxAge: Duration,
private val maxSize: Long
) : AutoCloseable {
) : AsyncCloseable {
companion object {
private val log = createLogger<InMemoryCache>()
@@ -45,26 +47,35 @@ class InMemoryCache(
@Volatile
private var running = true
private val garbageCollector = Thread.ofVirtual().name("in-memory-cache-gc").start {
while (running) {
val el = removalQueue.poll(1, TimeUnit.SECONDS) ?: continue
val value = el.value
val now = Instant.now()
if (now > el.expiry) {
val removed = map.remove(el.key, value)
if (removed) {
updateSizeAfterRemoval(value.content)
//Decrease the reference count for map
value.content.release()
private val closeFuture = object : CompletableFuture<Void>() {
init {
Thread.ofVirtual().name("in-memory-cache-gc").start {
try {
while (running) {
val el = removalQueue.poll(1, TimeUnit.SECONDS) ?: continue
val value = el.value
val now = Instant.now()
if (now > el.expiry) {
val removed = map.remove(el.key, value)
if (removed) {
updateSizeAfterRemoval(value.content)
//Decrease the reference count for map
value.content.release()
}
} else {
removalQueue.put(el)
Thread.sleep(minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1)))
}
}
complete(null)
} catch (ex: Throwable) {
completeExceptionally(ex)
}
} else {
removalQueue.put(el)
Thread.sleep(minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1)))
}
}
}
private fun removeEldest(): Long {
fun removeEldest(): Long {
while (true) {
val el = removalQueue.take()
val value = el.value
@@ -84,9 +95,9 @@ class InMemoryCache(
}
}
override fun close() {
override fun asyncClose() : CompletableFuture<Void> {
running = false
garbageCollector.join()
return closeFuture
}
fun get(key: ByteArray) = map[CacheKey(key)]?.run {

View File

@@ -1,5 +1,10 @@
package net.woggioni.rbcs.server.cache
import io.netty.channel.ChannelFactory
import io.netty.channel.EventLoopGroup
import io.netty.channel.socket.DatagramChannel
import io.netty.channel.socket.SocketChannel
import io.netty.util.concurrent.Future
import net.woggioni.rbcs.api.CacheHandlerFactory
import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.common.RBCS
@@ -16,11 +21,13 @@ data class InMemoryCacheConfiguration(
override fun materialize() = object : CacheHandlerFactory {
private val cache = InMemoryCache(maxAge, maxSize)
override fun close() {
cache.close()
}
override fun asyncClose() = cache.asyncClose()
override fun newHandler() = InMemoryCacheHandler(cache, digestAlgorithm, compressionEnabled, compressionLevel)
override fun newHandler(
eventLoop: EventLoopGroup,
socketChannelFactory: ChannelFactory<SocketChannel>,
datagramChannelFactory: ChannelFactory<DatagramChannel>
) = InMemoryCacheHandler(cache, digestAlgorithm, compressionEnabled, compressionLevel)
}
override fun getNamespaceURI() = RBCS.RBCS_NAMESPACE_URI

View File

@@ -30,7 +30,7 @@ class InMemoryCacheProvider : CacheProvider<InMemoryCacheConfiguration> {
val compressionLevel = el.renderAttribute("compression-level")
?.let(String::toInt)
?: Deflater.DEFAULT_COMPRESSION
val digestAlgorithm = el.renderAttribute("digest") ?: "MD5"
val digestAlgorithm = el.renderAttribute("digest")
val chunkSize = el.renderAttribute("chunk-size")
?.let(Integer::decode)
?: 0x10000

View File

@@ -153,7 +153,7 @@
</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="digest" type="xs:token" default="MD5">
<xs:attribute name="digest" type="xs:token">
<xs:annotation>
<xs:documentation>
Hashing algorithm to apply to the key. If omitted, no hashing is performed.
@@ -209,7 +209,7 @@
</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="digest" type="xs:token" default="MD5">
<xs:attribute name="digest" type="xs:token" default="SHA3-224">
<xs:annotation>
<xs:documentation>
Hashing algorithm to apply to the key. If omitted, no hashing is performed.