Compare commits

...

4 Commits
0.1.4 ... 0.1.6

Author SHA1 Message Date
7eca8a270d 0.1.6 release
All checks were successful
CI / build (push) Successful in 3m29s
2025-02-08 00:54:25 +08:00
84d7c977f9 added randomizer to retries 2025-02-07 23:19:13 +08:00
317eadce07 used virtual thread for garbage colection in FileSystemCache
All checks were successful
CI / build (push) Successful in 2m32s
2025-02-07 20:45:29 +08:00
af79e74b95 fixed max message size for memcache backend 2025-02-06 23:09:22 +08:00
14 changed files with 156 additions and 71 deletions

View File

@@ -2,11 +2,10 @@ org.gradle.configuration-cache=false
org.gradle.parallel=true
org.gradle.caching=true
rbcs.version = 0.1.4
rbcs.version = 0.1.6
lys.version = 2025.02.05
lys.version = 2025.02.08
gitea.maven.url = https://gitea.woggioni.net/api/packages/woggioni/maven
docker.registry.url=gitea.woggioni.net
jpms-check.configurationName = runtimeClasspath

View File

@@ -44,7 +44,6 @@ envelopeJar {
dependencies {
implementation catalog.jwo
implementation catalog.slf4j.api
implementation catalog.netty.codec.http
implementation catalog.picocli
implementation project(':rbcs-client')

View File

@@ -6,9 +6,11 @@ plugins {
dependencies {
implementation project(':rbcs-api')
implementation project(':rbcs-common')
implementation catalog.picocli
implementation catalog.slf4j.api
implementation catalog.netty.buffer
implementation catalog.netty.handler
implementation catalog.netty.transport
implementation catalog.netty.common
implementation catalog.netty.codec.http
testRuntimeOnly catalog.logback.classic

View File

@@ -45,9 +45,9 @@ import java.time.Duration
import java.util.Base64
import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicInteger
import kotlin.random.Random
import io.netty.util.concurrent.Future as NettyFuture
class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoCloseable {
private val group: NioEventLoopGroup
private var sslContext: SslContext
@@ -206,6 +206,7 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
retryPolicy.initialDelayMillis.toDouble(),
retryPolicy.exp,
outcomeHandler,
Random.Default,
operation
)
} else {

View File

@@ -3,6 +3,8 @@ package net.woggioni.rbcs.client
import io.netty.util.concurrent.EventExecutorGroup
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit
import kotlin.math.pow
import kotlin.random.Random
sealed class OperationOutcome<T> {
class Success<T>(val result: T) : OperationOutcome<T>()
@@ -24,8 +26,10 @@ fun <T> executeWithRetry(
initialDelay: Double,
exp: Double,
outcomeHandler: OutcomeHandler<T>,
randomizer : Random?,
cb: () -> CompletableFuture<T>
): CompletableFuture<T> {
val finalResult = cb()
var future = finalResult
var shortCircuit = false
@@ -46,7 +50,7 @@ fun <T> executeWithRetry(
is OutcomeHandlerResult.Retry -> {
val res = CompletableFuture<T>()
val delay = run {
val scheduledDelay = (initialDelay * Math.pow(exp, i.toDouble())).toLong()
val scheduledDelay = (initialDelay * exp.pow(i.toDouble()) * (1.0 + (randomizer?.nextDouble(-0.5, 0.5) ?: 0.0))).toLong()
outcomeHandlerResult.suggestedDelayMillis?.coerceAtMost(scheduledDelay) ?: scheduledDelay
}
eventExecutorGroup.schedule({

View File

@@ -89,7 +89,7 @@ class RetryTest {
val random = Random(testArgs.seed)
val future =
executeWithRetry(executor, testArgs.maxAttempt, testArgs.initialDelay, testArgs.exp, outcomeHandler) {
executeWithRetry(executor, testArgs.maxAttempt, testArgs.initialDelay, testArgs.exp, outcomeHandler, null) {
val now = System.nanoTime()
val result = CompletableFuture<Int>()
executor.submit {
@@ -129,7 +129,7 @@ class RetryTest {
previousAttempt.first + testArgs.initialDelay * Math.pow(testArgs.exp, index.toDouble()) * 1e6
val actualTimestamp = timestamp
val err = Math.abs(expectedTimestamp - actualTimestamp) / expectedTimestamp
Assertions.assertTrue(err < 1e-3)
Assertions.assertTrue(err < 1e-2)
}
if (index == attempts.size - 1 && index < testArgs.maxAttempt - 1) {
/*

View File

@@ -12,6 +12,24 @@ object RBCS {
const val RBCS_PREFIX: String = "rbcs"
const val XML_SCHEMA_NAMESPACE_URI = "http://www.w3.org/2001/XMLSchema-instance"
fun ByteArray.toInt(index : Int = 0) : Long {
if(index + 4 > size) throw IllegalArgumentException("Not enough bytes to decode a 32 bits integer")
var value : Long = 0
for (b in index until index + 4) {
value = (value shl 8) + (get(b).toInt() and 0xFF)
}
return value
}
fun ByteArray.toLong(index : Int = 0) : Long {
if(index + 8 > size) throw IllegalArgumentException("Not enough bytes to decode a 64 bits long integer")
var value : Long = 0
for (b in index until index + 8) {
value = (value shl 8) + (get(b).toInt() and 0xFF)
}
return value
}
fun digest(
data: ByteArray,
md: MessageDigest = MessageDigest.getInstance("MD5")

View File

@@ -76,7 +76,7 @@ class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseabl
override fun channelCreated(ch: Channel) {
val pipeline: ChannelPipeline = ch.pipeline()
pipeline.addLast(BinaryMemcacheClientCodec())
pipeline.addLast(BinaryMemcacheObjectAggregator(Integer.MAX_VALUE))
pipeline.addLast(BinaryMemcacheObjectAggregator(cfg.maxSize))
}
}
return FixedChannelPool(bootstrap, channelPoolHandler, server.maxConnections)

View File

@@ -9,6 +9,9 @@ dependencies {
implementation catalog.jwo
implementation catalog.slf4j.api
implementation catalog.netty.codec.http
implementation catalog.netty.handler
implementation catalog.netty.buffer
implementation catalog.netty.transport
api project(':rbcs-common')
api project(':rbcs-api')
@@ -36,3 +39,4 @@ publishing {
}

View File

@@ -30,11 +30,13 @@ import io.netty.handler.timeout.IdleStateHandler
import io.netty.util.AttributeKey
import io.netty.util.concurrent.DefaultEventExecutorGroup
import io.netty.util.concurrent.EventExecutorGroup
import net.woggioni.jwo.JWO
import net.woggioni.jwo.Tuple2
import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.api.exception.ConfigurationException
import net.woggioni.rbcs.common.RBCS.toUrl
import net.woggioni.rbcs.common.PasswordSecurity.decodePasswordHash
import net.woggioni.rbcs.common.PasswordSecurity.hashPassword
import net.woggioni.rbcs.common.RBCS.toUrl
import net.woggioni.rbcs.common.Xml
import net.woggioni.rbcs.common.contextLogger
import net.woggioni.rbcs.common.debug
@@ -48,8 +50,6 @@ import net.woggioni.rbcs.server.configuration.Serializer
import net.woggioni.rbcs.server.exception.ExceptionHandler
import net.woggioni.rbcs.server.handler.ServerHandler
import net.woggioni.rbcs.server.throttling.ThrottlingHandler
import net.woggioni.jwo.JWO
import net.woggioni.jwo.Tuple2
import java.io.OutputStream
import java.net.InetSocketAddress
import java.nio.file.Files
@@ -59,6 +59,7 @@ import java.security.PrivateKey
import java.security.cert.X509Certificate
import java.util.Arrays
import java.util.Base64
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
import java.util.regex.Matcher
import java.util.regex.Pattern
@@ -128,11 +129,12 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
val clientCertificate = peerCertificates.first() as X509Certificate
val user = userExtractor?.extract(clientCertificate)
val group = groupExtractor?.extract(clientCertificate)
val allGroups = ((user?.groups ?: emptySet()).asSequence() + sequenceOf(group).filterNotNull()).toSet()
val allGroups =
((user?.groups ?: emptySet()).asSequence() + sequenceOf(group).filterNotNull()).toSet()
AuthenticationResult(user, allGroups)
} ?: anonymousUserGroups?.let{ AuthenticationResult(null, it) }
} ?: anonymousUserGroups?.let { AuthenticationResult(null, it) }
} catch (es: SSLPeerUnverifiedException) {
anonymousUserGroups?.let{ AuthenticationResult(null, it) }
anonymousUserGroups?.let { AuthenticationResult(null, it) }
}
}
}
@@ -191,7 +193,7 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
private class ServerInitializer(
private val cfg: Configuration,
private val eventExecutorGroup: EventExecutorGroup
) : ChannelInitializer<Channel>() {
) : ChannelInitializer<Channel>(), AutoCloseable {
companion object {
private fun createSslCtx(tls: Configuration.Tls): SslContext {
@@ -213,7 +215,7 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
trustManager(
ClientCertificateValidator.getTrustManager(ts, trustStore.isCheckCertificateStatus)
)
if(trustStore.isRequireClientCertificate) ClientAuth.REQUIRE
if (trustStore.isRequireClientCertificate) ClientAuth.REQUIRE
else ClientAuth.OPTIONAL
} ?: ClientAuth.NONE
clientAuth(clientAuth)
@@ -245,10 +247,11 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
private val log = contextLogger()
private val cache = cfg.cache.materialize()
private val serverHandler = let {
val cacheImplementation = cfg.cache.materialize()
val prefix = Path.of("/").resolve(Path.of(cfg.serverPath ?: "/"))
ServerHandler(cacheImplementation, prefix)
ServerHandler(cache, prefix)
}
private val exceptionHandler = ExceptionHandler()
@@ -311,7 +314,7 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
cfg.connection.also { conn ->
val readTimeout = conn.readTimeout.toMillis()
val writeTimeout = conn.writeTimeout.toMillis()
if(readTimeout > 0 || writeTimeout > 0) {
if (readTimeout > 0 || writeTimeout > 0) {
pipeline.addLast(
IdleStateHandler(
false,
@@ -325,7 +328,7 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
val readIdleTimeout = conn.readIdleTimeout.toMillis()
val writeIdleTimeout = conn.writeIdleTimeout.toMillis()
val idleTimeout = conn.idleTimeout.toMillis()
if(readIdleTimeout > 0 || writeIdleTimeout > 0 || idleTimeout > 0) {
if (readIdleTimeout > 0 || writeIdleTimeout > 0 || idleTimeout > 0) {
pipeline.addLast(
IdleStateHandler(
true,
@@ -340,16 +343,19 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
pipeline.addLast(object : ChannelInboundHandlerAdapter() {
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
if (evt is IdleStateEvent) {
when(evt.state()) {
when (evt.state()) {
IdleState.READER_IDLE -> log.debug {
"Read timeout reached on channel ${ch.id().asShortText()}, closing the connection"
}
IdleState.WRITER_IDLE -> log.debug {
"Write timeout reached on channel ${ch.id().asShortText()}, closing the connection"
}
IdleState.ALL_IDLE -> log.debug {
"Idle timeout reached on channel ${ch.id().asShortText()}, closing the connection"
}
null -> throw IllegalStateException("This should never happen")
}
ctx.close()
@@ -370,26 +376,41 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
pipeline.addLast(eventExecutorGroup, serverHandler)
pipeline.addLast(exceptionHandler)
}
override fun close() {
cache.close()
}
}
class ServerHandle(
httpChannelFuture: ChannelFuture,
private val executorGroups: Iterable<EventExecutorGroup>
private val executorGroups: Iterable<EventExecutorGroup>,
private val serverInitializer: AutoCloseable
) : AutoCloseable {
private val httpChannel: Channel = httpChannelFuture.channel()
private val closeFuture: ChannelFuture = httpChannel.closeFuture()
private val log = contextLogger()
fun shutdown(): ChannelFuture {
fun shutdown(): Future<Void> {
return httpChannel.close()
}
override fun close() {
try {
closeFuture.sync()
} finally {
executorGroups.forEach {
} catch (ex: Throwable) {
log.error(ex.message, ex)
}
try {
serverInitializer.close()
} catch (ex: Throwable) {
log.error(ex.message, ex)
}
executorGroups.forEach {
try {
it.shutdownGracefully().sync()
} catch (ex: Throwable) {
log.error(ex.message, ex)
}
}
log.info {
@@ -411,11 +432,12 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
}
DefaultEventExecutorGroup(Runtime.getRuntime().availableProcessors(), threadFactory)
}
val serverInitializer = ServerInitializer(cfg, eventExecutorGroup)
val bootstrap = ServerBootstrap().apply {
// Configure the server
group(bossGroup, workerGroup)
channel(serverSocketChannel)
childHandler(ServerInitializer(cfg, eventExecutorGroup))
childHandler(serverInitializer)
option(ChannelOption.SO_BACKLOG, cfg.incomingConnectionsBacklogSize)
childOption(ChannelOption.SO_KEEPALIVE, true)
}
@@ -427,6 +449,6 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
log.info {
"RemoteBuildCacheServer is listening on ${cfg.host}:${cfg.port}"
}
return ServerHandle(httpChannel, setOf(bossGroup, workerGroup, eventExecutorGroup))
return ServerHandle(httpChannel, setOf(bossGroup, workerGroup, eventExecutorGroup), serverInitializer)
}
}

View File

@@ -1,12 +1,11 @@
package net.woggioni.rbcs.server.cache
import io.netty.buffer.ByteBuf
import net.woggioni.jwo.JWO
import net.woggioni.rbcs.api.Cache
import net.woggioni.rbcs.common.ByteBufInputStream
import net.woggioni.rbcs.common.RBCS.digestString
import net.woggioni.rbcs.common.contextLogger
import net.woggioni.jwo.JWO
import net.woggioni.jwo.LockFile
import java.nio.channels.Channels
import java.nio.channels.FileChannel
import java.nio.file.Files
@@ -18,7 +17,6 @@ import java.security.MessageDigest
import java.time.Duration
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicReference
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.Inflater
@@ -41,7 +39,10 @@ class FileSystemCache(
Files.createDirectories(root)
}
private var nextGc = AtomicReference(Instant.now().plus(maxAge))
@Volatile
private var running = true
private var nextGc = Instant.now()
override fun get(key: String) = (digestAlgorithm
?.let(MessageDigest::getInstance)
@@ -67,8 +68,6 @@ class FileSystemCache(
FileChannel.open(file, StandardOpenOption.READ)
}
}
}.also {
gc()
}.let {
CompletableFuture.completedFuture(it)
}
@@ -98,33 +97,51 @@ class FileSystemCache(
Files.delete(tmpFile)
throw t
}
}.also {
gc()
}
return CompletableFuture.completedFuture(null)
}
private val garbageCollector = Thread.ofVirtual().name("file-system-cache-gc").start {
while (running) {
gc()
}
}
private fun gc() {
val now = Instant.now()
val oldValue = nextGc.getAndSet(now.plus(maxAge))
if (oldValue < now) {
actualGc(now)
if (nextGc < now) {
val oldestEntry = actualGc(now)
nextGc = (oldestEntry ?: now).plus(maxAge)
}
Thread.sleep(minOf(Duration.between(now, nextGc), Duration.ofSeconds(1)))
}
@Synchronized
private fun actualGc(now: Instant) {
Files.list(root).filter {
val creationTimeStamp = Files.readAttributes(it, BasicFileAttributes::class.java)
.creationTime()
.toInstant()
now > creationTimeStamp.plus(maxAge)
}.forEach { file ->
LockFile.acquire(file, false).use {
Files.delete(file)
/**
* Returns the creation timestamp of the oldest cache entry (if any)
*/
private fun actualGc(now: Instant) : Instant? {
var result :Instant? = null
Files.list(root)
.filter { path ->
JWO.splitExtension(path)
.map { it._2 }
.map { it != ".tmp" }
.orElse(true)
}
}
.filter {
val creationTimeStamp = Files.readAttributes(it, BasicFileAttributes::class.java)
.creationTime()
.toInstant()
if(result == null || creationTimeStamp < result) {
result = creationTimeStamp
}
now > creationTimeStamp.plus(maxAge)
}.forEach(Files::delete)
return result
}
override fun close() {}
override fun close() {
running = false
garbageCollector.join()
}
}

View File

@@ -1,12 +1,12 @@
package net.woggioni.rbcs.server.cache
import io.netty.buffer.ByteBuf
import net.woggioni.jwo.JWO
import net.woggioni.rbcs.api.Cache
import net.woggioni.rbcs.common.ByteBufInputStream
import net.woggioni.rbcs.common.ByteBufOutputStream
import net.woggioni.rbcs.common.RBCS.digestString
import net.woggioni.rbcs.common.contextLogger
import net.woggioni.jwo.JWO
import java.nio.channels.Channels
import java.security.MessageDigest
import java.time.Duration
@@ -42,9 +42,11 @@ class InMemoryCache(
private val removalQueue = PriorityBlockingQueue<RemovalQueueElement>()
@Volatile
private var running = true
private val garbageCollector = Thread {
while(true) {
private val garbageCollector = Thread.ofVirtual().name("in-memory-cache-gc").start {
while(running) {
val el = removalQueue.take()
val buf = el.value
val now = Instant.now()
@@ -62,8 +64,6 @@ class InMemoryCache(
Thread.sleep(minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1)))
}
}
}.apply {
start()
}
private fun removeEldest() : Long {

View File

@@ -17,6 +17,8 @@ import net.woggioni.rbcs.api.exception.CacheException
import net.woggioni.rbcs.api.exception.ContentTooLargeException
import net.woggioni.rbcs.common.contextLogger
import net.woggioni.rbcs.common.debug
import java.net.SocketException
import javax.net.ssl.SSLException
import javax.net.ssl.SSLPeerUnverifiedException
@ChannelHandler.Sharable
@@ -50,7 +52,12 @@ class ExceptionHandler : ChannelDuplexHandler() {
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
when (cause) {
is DecoderException -> {
log.error(cause.message, cause)
log.debug(cause.message, cause)
ctx.close()
}
is SocketException -> {
log.debug(cause.message, cause)
ctx.close()
}
@@ -59,10 +66,16 @@ class ExceptionHandler : ChannelDuplexHandler() {
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE)
}
is SSLException -> {
log.debug(cause.message, cause)
ctx.close()
}
is ContentTooLargeException -> {
ctx.writeAndFlush(TOO_BIG.retainedDuplicate())
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE)
}
is ReadTimeoutException -> {
log.debug {
val channelId = ctx.channel().id().asShortText()
@@ -70,6 +83,7 @@ class ExceptionHandler : ChannelDuplexHandler() {
}
ctx.close()
}
is WriteTimeoutException -> {
log.debug {
val channelId = ctx.channel().id().asShortText()
@@ -77,11 +91,13 @@ class ExceptionHandler : ChannelDuplexHandler() {
}
ctx.close()
}
is CacheException -> {
log.error(cause.message, cause)
ctx.writeAndFlush(NOT_AVAILABLE.retainedDuplicate())
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE)
}
else -> {
log.error(cause.message, cause)
ctx.writeAndFlush(SERVER_ERROR.retainedDuplicate())

View File

@@ -19,10 +19,13 @@ import java.util.concurrent.TimeUnit
@Sharable
class ThrottlingHandler(cfg: Configuration) :
ChannelInboundHandlerAdapter() {
class ThrottlingHandler(cfg: Configuration) : ChannelInboundHandlerAdapter() {
private companion object {
@JvmStatic
private val log = contextLogger()
}
private val log = contextLogger()
private val bucketManager = BucketManager.from(cfg)
private val connectionConfiguration = cfg.connection
@@ -60,7 +63,7 @@ class ThrottlingHandler(cfg: Configuration) :
}
}
private fun handleBuckets(buckets : List<Bucket>, ctx : ChannelHandlerContext, msg : Any, delayResponse : Boolean) {
private fun handleBuckets(buckets: List<Bucket>, ctx: ChannelHandlerContext, msg: Any, delayResponse: Boolean) {
var nextAttempt = -1L
for (bucket in buckets) {
val bucketNextAttempt = bucket.removeTokensWithEstimate(1)
@@ -68,17 +71,17 @@ class ThrottlingHandler(cfg: Configuration) :
nextAttempt = bucketNextAttempt
}
}
if(nextAttempt < 0) {
if (nextAttempt < 0) {
super.channelRead(ctx, msg)
return
}
val waitDuration = Duration.of(LongMath.ceilDiv(nextAttempt, 100_000_000L) * 100L, ChronoUnit.MILLIS)
if (delayResponse && waitDuration < waitThreshold) {
ctx.executor().schedule({
handleBuckets(buckets, ctx, msg, false)
}, waitDuration.toMillis(), TimeUnit.MILLISECONDS)
} else {
sendThrottledResponse(ctx, waitDuration)
val waitDuration = Duration.of(LongMath.ceilDiv(nextAttempt, 100_000_000L) * 100L, ChronoUnit.MILLIS)
if (delayResponse && waitDuration < waitThreshold) {
ctx.executor().schedule({
handleBuckets(buckets, ctx, msg, false)
}, waitDuration.toMillis(), TimeUnit.MILLISECONDS)
} else {
sendThrottledResponse(ctx, waitDuration)
}
}
}