used virtual thread for garbage colection in FileSystemCache
All checks were successful
CI / build (push) Successful in 2m32s

This commit is contained in:
2025-02-07 20:45:29 +08:00
parent af79e74b95
commit 317eadce07
7 changed files with 94 additions and 50 deletions

View File

@@ -2,7 +2,7 @@ org.gradle.configuration-cache=false
org.gradle.parallel=true
org.gradle.caching=true
rbcs.version = 0.1.4
rbcs.version = 0.1.5
lys.version = 2025.02.05

View File

@@ -44,7 +44,6 @@ envelopeJar {
dependencies {
implementation catalog.jwo
implementation catalog.slf4j.api
implementation catalog.netty.codec.http
implementation catalog.picocli
implementation project(':rbcs-client')

View File

@@ -6,9 +6,11 @@ plugins {
dependencies {
implementation project(':rbcs-api')
implementation project(':rbcs-common')
implementation catalog.picocli
implementation catalog.slf4j.api
implementation catalog.netty.buffer
implementation catalog.netty.handler
implementation catalog.netty.transport
implementation catalog.netty.common
implementation catalog.netty.codec.http
testRuntimeOnly catalog.logback.classic

View File

@@ -9,6 +9,9 @@ dependencies {
implementation catalog.jwo
implementation catalog.slf4j.api
implementation catalog.netty.codec.http
implementation catalog.netty.handler
implementation catalog.netty.buffer
implementation catalog.netty.transport
api project(':rbcs-common')
api project(':rbcs-api')
@@ -36,3 +39,4 @@ publishing {
}

View File

@@ -30,11 +30,13 @@ import io.netty.handler.timeout.IdleStateHandler
import io.netty.util.AttributeKey
import io.netty.util.concurrent.DefaultEventExecutorGroup
import io.netty.util.concurrent.EventExecutorGroup
import net.woggioni.jwo.JWO
import net.woggioni.jwo.Tuple2
import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.api.exception.ConfigurationException
import net.woggioni.rbcs.common.RBCS.toUrl
import net.woggioni.rbcs.common.PasswordSecurity.decodePasswordHash
import net.woggioni.rbcs.common.PasswordSecurity.hashPassword
import net.woggioni.rbcs.common.RBCS.toUrl
import net.woggioni.rbcs.common.Xml
import net.woggioni.rbcs.common.contextLogger
import net.woggioni.rbcs.common.debug
@@ -48,8 +50,6 @@ import net.woggioni.rbcs.server.configuration.Serializer
import net.woggioni.rbcs.server.exception.ExceptionHandler
import net.woggioni.rbcs.server.handler.ServerHandler
import net.woggioni.rbcs.server.throttling.ThrottlingHandler
import net.woggioni.jwo.JWO
import net.woggioni.jwo.Tuple2
import java.io.OutputStream
import java.net.InetSocketAddress
import java.nio.file.Files
@@ -59,6 +59,7 @@ import java.security.PrivateKey
import java.security.cert.X509Certificate
import java.util.Arrays
import java.util.Base64
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
import java.util.regex.Matcher
import java.util.regex.Pattern
@@ -128,7 +129,8 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
val clientCertificate = peerCertificates.first() as X509Certificate
val user = userExtractor?.extract(clientCertificate)
val group = groupExtractor?.extract(clientCertificate)
val allGroups = ((user?.groups ?: emptySet()).asSequence() + sequenceOf(group).filterNotNull()).toSet()
val allGroups =
((user?.groups ?: emptySet()).asSequence() + sequenceOf(group).filterNotNull()).toSet()
AuthenticationResult(user, allGroups)
} ?: anonymousUserGroups?.let { AuthenticationResult(null, it) }
} catch (es: SSLPeerUnverifiedException) {
@@ -191,7 +193,7 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
private class ServerInitializer(
private val cfg: Configuration,
private val eventExecutorGroup: EventExecutorGroup
) : ChannelInitializer<Channel>() {
) : ChannelInitializer<Channel>(), AutoCloseable {
companion object {
private fun createSslCtx(tls: Configuration.Tls): SslContext {
@@ -245,10 +247,11 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
private val log = contextLogger()
private val cache = cfg.cache.materialize()
private val serverHandler = let {
val cacheImplementation = cfg.cache.materialize()
val prefix = Path.of("/").resolve(Path.of(cfg.serverPath ?: "/"))
ServerHandler(cacheImplementation, prefix)
ServerHandler(cache, prefix)
}
private val exceptionHandler = ExceptionHandler()
@@ -344,12 +347,15 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
IdleState.READER_IDLE -> log.debug {
"Read timeout reached on channel ${ch.id().asShortText()}, closing the connection"
}
IdleState.WRITER_IDLE -> log.debug {
"Write timeout reached on channel ${ch.id().asShortText()}, closing the connection"
}
IdleState.ALL_IDLE -> log.debug {
"Idle timeout reached on channel ${ch.id().asShortText()}, closing the connection"
}
null -> throw IllegalStateException("This should never happen")
}
ctx.close()
@@ -370,26 +376,41 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
pipeline.addLast(eventExecutorGroup, serverHandler)
pipeline.addLast(exceptionHandler)
}
override fun close() {
cache.close()
}
}
class ServerHandle(
httpChannelFuture: ChannelFuture,
private val executorGroups: Iterable<EventExecutorGroup>
private val executorGroups: Iterable<EventExecutorGroup>,
private val serverInitializer: AutoCloseable
) : AutoCloseable {
private val httpChannel: Channel = httpChannelFuture.channel()
private val closeFuture: ChannelFuture = httpChannel.closeFuture()
private val log = contextLogger()
fun shutdown(): ChannelFuture {
fun shutdown(): Future<Void> {
return httpChannel.close()
}
override fun close() {
try {
closeFuture.sync()
} finally {
} catch (ex: Throwable) {
log.error(ex.message, ex)
}
try {
serverInitializer.close()
} catch (ex: Throwable) {
log.error(ex.message, ex)
}
executorGroups.forEach {
try {
it.shutdownGracefully().sync()
} catch (ex: Throwable) {
log.error(ex.message, ex)
}
}
log.info {
@@ -411,11 +432,12 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
}
DefaultEventExecutorGroup(Runtime.getRuntime().availableProcessors(), threadFactory)
}
val serverInitializer = ServerInitializer(cfg, eventExecutorGroup)
val bootstrap = ServerBootstrap().apply {
// Configure the server
group(bossGroup, workerGroup)
channel(serverSocketChannel)
childHandler(ServerInitializer(cfg, eventExecutorGroup))
childHandler(serverInitializer)
option(ChannelOption.SO_BACKLOG, cfg.incomingConnectionsBacklogSize)
childOption(ChannelOption.SO_KEEPALIVE, true)
}
@@ -427,6 +449,6 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
log.info {
"RemoteBuildCacheServer is listening on ${cfg.host}:${cfg.port}"
}
return ServerHandle(httpChannel, setOf(bossGroup, workerGroup, eventExecutorGroup))
return ServerHandle(httpChannel, setOf(bossGroup, workerGroup, eventExecutorGroup), serverInitializer)
}
}

View File

@@ -1,12 +1,11 @@
package net.woggioni.rbcs.server.cache
import io.netty.buffer.ByteBuf
import net.woggioni.jwo.JWO
import net.woggioni.rbcs.api.Cache
import net.woggioni.rbcs.common.ByteBufInputStream
import net.woggioni.rbcs.common.RBCS.digestString
import net.woggioni.rbcs.common.contextLogger
import net.woggioni.jwo.JWO
import net.woggioni.jwo.LockFile
import java.nio.channels.Channels
import java.nio.channels.FileChannel
import java.nio.file.Files
@@ -18,7 +17,6 @@ import java.security.MessageDigest
import java.time.Duration
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicReference
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.Inflater
@@ -41,7 +39,10 @@ class FileSystemCache(
Files.createDirectories(root)
}
private var nextGc = AtomicReference(Instant.now().plus(maxAge))
@Volatile
private var running = true
private var nextGc = Instant.now()
override fun get(key: String) = (digestAlgorithm
?.let(MessageDigest::getInstance)
@@ -67,8 +68,6 @@ class FileSystemCache(
FileChannel.open(file, StandardOpenOption.READ)
}
}
}.also {
gc()
}.let {
CompletableFuture.completedFuture(it)
}
@@ -98,33 +97,51 @@ class FileSystemCache(
Files.delete(tmpFile)
throw t
}
}.also {
gc()
}
return CompletableFuture.completedFuture(null)
}
private fun gc() {
val now = Instant.now()
val oldValue = nextGc.getAndSet(now.plus(maxAge))
if (oldValue < now) {
actualGc(now)
private val garbageCollector = Thread.ofVirtual().name("file-system-cache-gc").start {
while (running) {
gc()
}
}
@Synchronized
private fun actualGc(now: Instant) {
Files.list(root).filter {
private fun gc() {
val now = Instant.now()
if (nextGc < now) {
val oldestEntry = actualGc(now)
nextGc = (oldestEntry ?: now).plus(maxAge)
}
Thread.sleep(minOf(Duration.between(now, nextGc), Duration.ofSeconds(1)))
}
/**
* Returns the creation timestamp of the oldest cache entry (if any)
*/
private fun actualGc(now: Instant) : Instant? {
var result :Instant? = null
Files.list(root)
.filter { path ->
JWO.splitExtension(path)
.map { it._2 }
.map { it != ".tmp" }
.orElse(true)
}
.filter {
val creationTimeStamp = Files.readAttributes(it, BasicFileAttributes::class.java)
.creationTime()
.toInstant()
if(result == null || creationTimeStamp < result) {
result = creationTimeStamp
}
now > creationTimeStamp.plus(maxAge)
}.forEach { file ->
LockFile.acquire(file, false).use {
Files.delete(file)
}
}
}.forEach(Files::delete)
return result
}
override fun close() {}
override fun close() {
running = false
garbageCollector.join()
}
}

View File

@@ -1,12 +1,12 @@
package net.woggioni.rbcs.server.cache
import io.netty.buffer.ByteBuf
import net.woggioni.jwo.JWO
import net.woggioni.rbcs.api.Cache
import net.woggioni.rbcs.common.ByteBufInputStream
import net.woggioni.rbcs.common.ByteBufOutputStream
import net.woggioni.rbcs.common.RBCS.digestString
import net.woggioni.rbcs.common.contextLogger
import net.woggioni.jwo.JWO
import java.nio.channels.Channels
import java.security.MessageDigest
import java.time.Duration
@@ -42,9 +42,11 @@ class InMemoryCache(
private val removalQueue = PriorityBlockingQueue<RemovalQueueElement>()
@Volatile
private var running = true
private val garbageCollector = Thread {
while(true) {
private val garbageCollector = Thread.ofVirtual().name("in-memory-cache-gc").start {
while(running) {
val el = removalQueue.take()
val buf = el.value
val now = Instant.now()
@@ -62,8 +64,6 @@ class InMemoryCache(
Thread.sleep(minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1)))
}
}
}.apply {
start()
}
private fun removeEldest() : Long {