Compare commits
3 Commits
Author | SHA1 | Date | |
---|---|---|---|
7eca8a270d
|
|||
84d7c977f9
|
|||
317eadce07
|
@@ -2,11 +2,10 @@ org.gradle.configuration-cache=false
|
|||||||
org.gradle.parallel=true
|
org.gradle.parallel=true
|
||||||
org.gradle.caching=true
|
org.gradle.caching=true
|
||||||
|
|
||||||
rbcs.version = 0.1.4
|
rbcs.version = 0.1.6
|
||||||
|
|
||||||
lys.version = 2025.02.05
|
lys.version = 2025.02.08
|
||||||
|
|
||||||
gitea.maven.url = https://gitea.woggioni.net/api/packages/woggioni/maven
|
gitea.maven.url = https://gitea.woggioni.net/api/packages/woggioni/maven
|
||||||
docker.registry.url=gitea.woggioni.net
|
docker.registry.url=gitea.woggioni.net
|
||||||
|
|
||||||
jpms-check.configurationName = runtimeClasspath
|
|
||||||
|
@@ -44,7 +44,6 @@ envelopeJar {
|
|||||||
dependencies {
|
dependencies {
|
||||||
implementation catalog.jwo
|
implementation catalog.jwo
|
||||||
implementation catalog.slf4j.api
|
implementation catalog.slf4j.api
|
||||||
implementation catalog.netty.codec.http
|
|
||||||
implementation catalog.picocli
|
implementation catalog.picocli
|
||||||
|
|
||||||
implementation project(':rbcs-client')
|
implementation project(':rbcs-client')
|
||||||
|
@@ -6,9 +6,11 @@ plugins {
|
|||||||
dependencies {
|
dependencies {
|
||||||
implementation project(':rbcs-api')
|
implementation project(':rbcs-api')
|
||||||
implementation project(':rbcs-common')
|
implementation project(':rbcs-common')
|
||||||
implementation catalog.picocli
|
|
||||||
implementation catalog.slf4j.api
|
implementation catalog.slf4j.api
|
||||||
implementation catalog.netty.buffer
|
implementation catalog.netty.buffer
|
||||||
|
implementation catalog.netty.handler
|
||||||
|
implementation catalog.netty.transport
|
||||||
|
implementation catalog.netty.common
|
||||||
implementation catalog.netty.codec.http
|
implementation catalog.netty.codec.http
|
||||||
|
|
||||||
testRuntimeOnly catalog.logback.classic
|
testRuntimeOnly catalog.logback.classic
|
||||||
|
@@ -45,9 +45,9 @@ import java.time.Duration
|
|||||||
import java.util.Base64
|
import java.util.Base64
|
||||||
import java.util.concurrent.CompletableFuture
|
import java.util.concurrent.CompletableFuture
|
||||||
import java.util.concurrent.atomic.AtomicInteger
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
|
import kotlin.random.Random
|
||||||
import io.netty.util.concurrent.Future as NettyFuture
|
import io.netty.util.concurrent.Future as NettyFuture
|
||||||
|
|
||||||
|
|
||||||
class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoCloseable {
|
class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoCloseable {
|
||||||
private val group: NioEventLoopGroup
|
private val group: NioEventLoopGroup
|
||||||
private var sslContext: SslContext
|
private var sslContext: SslContext
|
||||||
@@ -206,6 +206,7 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
|
|||||||
retryPolicy.initialDelayMillis.toDouble(),
|
retryPolicy.initialDelayMillis.toDouble(),
|
||||||
retryPolicy.exp,
|
retryPolicy.exp,
|
||||||
outcomeHandler,
|
outcomeHandler,
|
||||||
|
Random.Default,
|
||||||
operation
|
operation
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
|
@@ -3,6 +3,8 @@ package net.woggioni.rbcs.client
|
|||||||
import io.netty.util.concurrent.EventExecutorGroup
|
import io.netty.util.concurrent.EventExecutorGroup
|
||||||
import java.util.concurrent.CompletableFuture
|
import java.util.concurrent.CompletableFuture
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
|
import kotlin.math.pow
|
||||||
|
import kotlin.random.Random
|
||||||
|
|
||||||
sealed class OperationOutcome<T> {
|
sealed class OperationOutcome<T> {
|
||||||
class Success<T>(val result: T) : OperationOutcome<T>()
|
class Success<T>(val result: T) : OperationOutcome<T>()
|
||||||
@@ -24,8 +26,10 @@ fun <T> executeWithRetry(
|
|||||||
initialDelay: Double,
|
initialDelay: Double,
|
||||||
exp: Double,
|
exp: Double,
|
||||||
outcomeHandler: OutcomeHandler<T>,
|
outcomeHandler: OutcomeHandler<T>,
|
||||||
|
randomizer : Random?,
|
||||||
cb: () -> CompletableFuture<T>
|
cb: () -> CompletableFuture<T>
|
||||||
): CompletableFuture<T> {
|
): CompletableFuture<T> {
|
||||||
|
|
||||||
val finalResult = cb()
|
val finalResult = cb()
|
||||||
var future = finalResult
|
var future = finalResult
|
||||||
var shortCircuit = false
|
var shortCircuit = false
|
||||||
@@ -46,7 +50,7 @@ fun <T> executeWithRetry(
|
|||||||
is OutcomeHandlerResult.Retry -> {
|
is OutcomeHandlerResult.Retry -> {
|
||||||
val res = CompletableFuture<T>()
|
val res = CompletableFuture<T>()
|
||||||
val delay = run {
|
val delay = run {
|
||||||
val scheduledDelay = (initialDelay * Math.pow(exp, i.toDouble())).toLong()
|
val scheduledDelay = (initialDelay * exp.pow(i.toDouble()) * (1.0 + (randomizer?.nextDouble(-0.5, 0.5) ?: 0.0))).toLong()
|
||||||
outcomeHandlerResult.suggestedDelayMillis?.coerceAtMost(scheduledDelay) ?: scheduledDelay
|
outcomeHandlerResult.suggestedDelayMillis?.coerceAtMost(scheduledDelay) ?: scheduledDelay
|
||||||
}
|
}
|
||||||
eventExecutorGroup.schedule({
|
eventExecutorGroup.schedule({
|
||||||
|
@@ -89,7 +89,7 @@ class RetryTest {
|
|||||||
val random = Random(testArgs.seed)
|
val random = Random(testArgs.seed)
|
||||||
|
|
||||||
val future =
|
val future =
|
||||||
executeWithRetry(executor, testArgs.maxAttempt, testArgs.initialDelay, testArgs.exp, outcomeHandler) {
|
executeWithRetry(executor, testArgs.maxAttempt, testArgs.initialDelay, testArgs.exp, outcomeHandler, null) {
|
||||||
val now = System.nanoTime()
|
val now = System.nanoTime()
|
||||||
val result = CompletableFuture<Int>()
|
val result = CompletableFuture<Int>()
|
||||||
executor.submit {
|
executor.submit {
|
||||||
@@ -129,7 +129,7 @@ class RetryTest {
|
|||||||
previousAttempt.first + testArgs.initialDelay * Math.pow(testArgs.exp, index.toDouble()) * 1e6
|
previousAttempt.first + testArgs.initialDelay * Math.pow(testArgs.exp, index.toDouble()) * 1e6
|
||||||
val actualTimestamp = timestamp
|
val actualTimestamp = timestamp
|
||||||
val err = Math.abs(expectedTimestamp - actualTimestamp) / expectedTimestamp
|
val err = Math.abs(expectedTimestamp - actualTimestamp) / expectedTimestamp
|
||||||
Assertions.assertTrue(err < 1e-3)
|
Assertions.assertTrue(err < 1e-2)
|
||||||
}
|
}
|
||||||
if (index == attempts.size - 1 && index < testArgs.maxAttempt - 1) {
|
if (index == attempts.size - 1 && index < testArgs.maxAttempt - 1) {
|
||||||
/*
|
/*
|
||||||
|
@@ -12,6 +12,24 @@ object RBCS {
|
|||||||
const val RBCS_PREFIX: String = "rbcs"
|
const val RBCS_PREFIX: String = "rbcs"
|
||||||
const val XML_SCHEMA_NAMESPACE_URI = "http://www.w3.org/2001/XMLSchema-instance"
|
const val XML_SCHEMA_NAMESPACE_URI = "http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
|
||||||
|
fun ByteArray.toInt(index : Int = 0) : Long {
|
||||||
|
if(index + 4 > size) throw IllegalArgumentException("Not enough bytes to decode a 32 bits integer")
|
||||||
|
var value : Long = 0
|
||||||
|
for (b in index until index + 4) {
|
||||||
|
value = (value shl 8) + (get(b).toInt() and 0xFF)
|
||||||
|
}
|
||||||
|
return value
|
||||||
|
}
|
||||||
|
|
||||||
|
fun ByteArray.toLong(index : Int = 0) : Long {
|
||||||
|
if(index + 8 > size) throw IllegalArgumentException("Not enough bytes to decode a 64 bits long integer")
|
||||||
|
var value : Long = 0
|
||||||
|
for (b in index until index + 8) {
|
||||||
|
value = (value shl 8) + (get(b).toInt() and 0xFF)
|
||||||
|
}
|
||||||
|
return value
|
||||||
|
}
|
||||||
|
|
||||||
fun digest(
|
fun digest(
|
||||||
data: ByteArray,
|
data: ByteArray,
|
||||||
md: MessageDigest = MessageDigest.getInstance("MD5")
|
md: MessageDigest = MessageDigest.getInstance("MD5")
|
||||||
|
@@ -9,6 +9,9 @@ dependencies {
|
|||||||
implementation catalog.jwo
|
implementation catalog.jwo
|
||||||
implementation catalog.slf4j.api
|
implementation catalog.slf4j.api
|
||||||
implementation catalog.netty.codec.http
|
implementation catalog.netty.codec.http
|
||||||
|
implementation catalog.netty.handler
|
||||||
|
implementation catalog.netty.buffer
|
||||||
|
implementation catalog.netty.transport
|
||||||
|
|
||||||
api project(':rbcs-common')
|
api project(':rbcs-common')
|
||||||
api project(':rbcs-api')
|
api project(':rbcs-api')
|
||||||
@@ -36,3 +39,4 @@ publishing {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@@ -30,11 +30,13 @@ import io.netty.handler.timeout.IdleStateHandler
|
|||||||
import io.netty.util.AttributeKey
|
import io.netty.util.AttributeKey
|
||||||
import io.netty.util.concurrent.DefaultEventExecutorGroup
|
import io.netty.util.concurrent.DefaultEventExecutorGroup
|
||||||
import io.netty.util.concurrent.EventExecutorGroup
|
import io.netty.util.concurrent.EventExecutorGroup
|
||||||
|
import net.woggioni.jwo.JWO
|
||||||
|
import net.woggioni.jwo.Tuple2
|
||||||
import net.woggioni.rbcs.api.Configuration
|
import net.woggioni.rbcs.api.Configuration
|
||||||
import net.woggioni.rbcs.api.exception.ConfigurationException
|
import net.woggioni.rbcs.api.exception.ConfigurationException
|
||||||
import net.woggioni.rbcs.common.RBCS.toUrl
|
|
||||||
import net.woggioni.rbcs.common.PasswordSecurity.decodePasswordHash
|
import net.woggioni.rbcs.common.PasswordSecurity.decodePasswordHash
|
||||||
import net.woggioni.rbcs.common.PasswordSecurity.hashPassword
|
import net.woggioni.rbcs.common.PasswordSecurity.hashPassword
|
||||||
|
import net.woggioni.rbcs.common.RBCS.toUrl
|
||||||
import net.woggioni.rbcs.common.Xml
|
import net.woggioni.rbcs.common.Xml
|
||||||
import net.woggioni.rbcs.common.contextLogger
|
import net.woggioni.rbcs.common.contextLogger
|
||||||
import net.woggioni.rbcs.common.debug
|
import net.woggioni.rbcs.common.debug
|
||||||
@@ -48,8 +50,6 @@ import net.woggioni.rbcs.server.configuration.Serializer
|
|||||||
import net.woggioni.rbcs.server.exception.ExceptionHandler
|
import net.woggioni.rbcs.server.exception.ExceptionHandler
|
||||||
import net.woggioni.rbcs.server.handler.ServerHandler
|
import net.woggioni.rbcs.server.handler.ServerHandler
|
||||||
import net.woggioni.rbcs.server.throttling.ThrottlingHandler
|
import net.woggioni.rbcs.server.throttling.ThrottlingHandler
|
||||||
import net.woggioni.jwo.JWO
|
|
||||||
import net.woggioni.jwo.Tuple2
|
|
||||||
import java.io.OutputStream
|
import java.io.OutputStream
|
||||||
import java.net.InetSocketAddress
|
import java.net.InetSocketAddress
|
||||||
import java.nio.file.Files
|
import java.nio.file.Files
|
||||||
@@ -59,6 +59,7 @@ import java.security.PrivateKey
|
|||||||
import java.security.cert.X509Certificate
|
import java.security.cert.X509Certificate
|
||||||
import java.util.Arrays
|
import java.util.Arrays
|
||||||
import java.util.Base64
|
import java.util.Base64
|
||||||
|
import java.util.concurrent.Future
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
import java.util.regex.Matcher
|
import java.util.regex.Matcher
|
||||||
import java.util.regex.Pattern
|
import java.util.regex.Pattern
|
||||||
@@ -128,11 +129,12 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
|
|||||||
val clientCertificate = peerCertificates.first() as X509Certificate
|
val clientCertificate = peerCertificates.first() as X509Certificate
|
||||||
val user = userExtractor?.extract(clientCertificate)
|
val user = userExtractor?.extract(clientCertificate)
|
||||||
val group = groupExtractor?.extract(clientCertificate)
|
val group = groupExtractor?.extract(clientCertificate)
|
||||||
val allGroups = ((user?.groups ?: emptySet()).asSequence() + sequenceOf(group).filterNotNull()).toSet()
|
val allGroups =
|
||||||
|
((user?.groups ?: emptySet()).asSequence() + sequenceOf(group).filterNotNull()).toSet()
|
||||||
AuthenticationResult(user, allGroups)
|
AuthenticationResult(user, allGroups)
|
||||||
} ?: anonymousUserGroups?.let{ AuthenticationResult(null, it) }
|
} ?: anonymousUserGroups?.let { AuthenticationResult(null, it) }
|
||||||
} catch (es: SSLPeerUnverifiedException) {
|
} catch (es: SSLPeerUnverifiedException) {
|
||||||
anonymousUserGroups?.let{ AuthenticationResult(null, it) }
|
anonymousUserGroups?.let { AuthenticationResult(null, it) }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -191,7 +193,7 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
|
|||||||
private class ServerInitializer(
|
private class ServerInitializer(
|
||||||
private val cfg: Configuration,
|
private val cfg: Configuration,
|
||||||
private val eventExecutorGroup: EventExecutorGroup
|
private val eventExecutorGroup: EventExecutorGroup
|
||||||
) : ChannelInitializer<Channel>() {
|
) : ChannelInitializer<Channel>(), AutoCloseable {
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
private fun createSslCtx(tls: Configuration.Tls): SslContext {
|
private fun createSslCtx(tls: Configuration.Tls): SslContext {
|
||||||
@@ -213,7 +215,7 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
|
|||||||
trustManager(
|
trustManager(
|
||||||
ClientCertificateValidator.getTrustManager(ts, trustStore.isCheckCertificateStatus)
|
ClientCertificateValidator.getTrustManager(ts, trustStore.isCheckCertificateStatus)
|
||||||
)
|
)
|
||||||
if(trustStore.isRequireClientCertificate) ClientAuth.REQUIRE
|
if (trustStore.isRequireClientCertificate) ClientAuth.REQUIRE
|
||||||
else ClientAuth.OPTIONAL
|
else ClientAuth.OPTIONAL
|
||||||
} ?: ClientAuth.NONE
|
} ?: ClientAuth.NONE
|
||||||
clientAuth(clientAuth)
|
clientAuth(clientAuth)
|
||||||
@@ -245,10 +247,11 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
|
|||||||
|
|
||||||
private val log = contextLogger()
|
private val log = contextLogger()
|
||||||
|
|
||||||
|
private val cache = cfg.cache.materialize()
|
||||||
|
|
||||||
private val serverHandler = let {
|
private val serverHandler = let {
|
||||||
val cacheImplementation = cfg.cache.materialize()
|
|
||||||
val prefix = Path.of("/").resolve(Path.of(cfg.serverPath ?: "/"))
|
val prefix = Path.of("/").resolve(Path.of(cfg.serverPath ?: "/"))
|
||||||
ServerHandler(cacheImplementation, prefix)
|
ServerHandler(cache, prefix)
|
||||||
}
|
}
|
||||||
|
|
||||||
private val exceptionHandler = ExceptionHandler()
|
private val exceptionHandler = ExceptionHandler()
|
||||||
@@ -311,7 +314,7 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
|
|||||||
cfg.connection.also { conn ->
|
cfg.connection.also { conn ->
|
||||||
val readTimeout = conn.readTimeout.toMillis()
|
val readTimeout = conn.readTimeout.toMillis()
|
||||||
val writeTimeout = conn.writeTimeout.toMillis()
|
val writeTimeout = conn.writeTimeout.toMillis()
|
||||||
if(readTimeout > 0 || writeTimeout > 0) {
|
if (readTimeout > 0 || writeTimeout > 0) {
|
||||||
pipeline.addLast(
|
pipeline.addLast(
|
||||||
IdleStateHandler(
|
IdleStateHandler(
|
||||||
false,
|
false,
|
||||||
@@ -325,7 +328,7 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
|
|||||||
val readIdleTimeout = conn.readIdleTimeout.toMillis()
|
val readIdleTimeout = conn.readIdleTimeout.toMillis()
|
||||||
val writeIdleTimeout = conn.writeIdleTimeout.toMillis()
|
val writeIdleTimeout = conn.writeIdleTimeout.toMillis()
|
||||||
val idleTimeout = conn.idleTimeout.toMillis()
|
val idleTimeout = conn.idleTimeout.toMillis()
|
||||||
if(readIdleTimeout > 0 || writeIdleTimeout > 0 || idleTimeout > 0) {
|
if (readIdleTimeout > 0 || writeIdleTimeout > 0 || idleTimeout > 0) {
|
||||||
pipeline.addLast(
|
pipeline.addLast(
|
||||||
IdleStateHandler(
|
IdleStateHandler(
|
||||||
true,
|
true,
|
||||||
@@ -340,16 +343,19 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
|
|||||||
pipeline.addLast(object : ChannelInboundHandlerAdapter() {
|
pipeline.addLast(object : ChannelInboundHandlerAdapter() {
|
||||||
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
|
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
|
||||||
if (evt is IdleStateEvent) {
|
if (evt is IdleStateEvent) {
|
||||||
when(evt.state()) {
|
when (evt.state()) {
|
||||||
IdleState.READER_IDLE -> log.debug {
|
IdleState.READER_IDLE -> log.debug {
|
||||||
"Read timeout reached on channel ${ch.id().asShortText()}, closing the connection"
|
"Read timeout reached on channel ${ch.id().asShortText()}, closing the connection"
|
||||||
}
|
}
|
||||||
|
|
||||||
IdleState.WRITER_IDLE -> log.debug {
|
IdleState.WRITER_IDLE -> log.debug {
|
||||||
"Write timeout reached on channel ${ch.id().asShortText()}, closing the connection"
|
"Write timeout reached on channel ${ch.id().asShortText()}, closing the connection"
|
||||||
}
|
}
|
||||||
|
|
||||||
IdleState.ALL_IDLE -> log.debug {
|
IdleState.ALL_IDLE -> log.debug {
|
||||||
"Idle timeout reached on channel ${ch.id().asShortText()}, closing the connection"
|
"Idle timeout reached on channel ${ch.id().asShortText()}, closing the connection"
|
||||||
}
|
}
|
||||||
|
|
||||||
null -> throw IllegalStateException("This should never happen")
|
null -> throw IllegalStateException("This should never happen")
|
||||||
}
|
}
|
||||||
ctx.close()
|
ctx.close()
|
||||||
@@ -370,26 +376,41 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
|
|||||||
pipeline.addLast(eventExecutorGroup, serverHandler)
|
pipeline.addLast(eventExecutorGroup, serverHandler)
|
||||||
pipeline.addLast(exceptionHandler)
|
pipeline.addLast(exceptionHandler)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override fun close() {
|
||||||
|
cache.close()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class ServerHandle(
|
class ServerHandle(
|
||||||
httpChannelFuture: ChannelFuture,
|
httpChannelFuture: ChannelFuture,
|
||||||
private val executorGroups: Iterable<EventExecutorGroup>
|
private val executorGroups: Iterable<EventExecutorGroup>,
|
||||||
|
private val serverInitializer: AutoCloseable
|
||||||
) : AutoCloseable {
|
) : AutoCloseable {
|
||||||
private val httpChannel: Channel = httpChannelFuture.channel()
|
private val httpChannel: Channel = httpChannelFuture.channel()
|
||||||
private val closeFuture: ChannelFuture = httpChannel.closeFuture()
|
private val closeFuture: ChannelFuture = httpChannel.closeFuture()
|
||||||
private val log = contextLogger()
|
private val log = contextLogger()
|
||||||
|
|
||||||
fun shutdown(): ChannelFuture {
|
fun shutdown(): Future<Void> {
|
||||||
return httpChannel.close()
|
return httpChannel.close()
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun close() {
|
override fun close() {
|
||||||
try {
|
try {
|
||||||
closeFuture.sync()
|
closeFuture.sync()
|
||||||
} finally {
|
} catch (ex: Throwable) {
|
||||||
|
log.error(ex.message, ex)
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
serverInitializer.close()
|
||||||
|
} catch (ex: Throwable) {
|
||||||
|
log.error(ex.message, ex)
|
||||||
|
}
|
||||||
executorGroups.forEach {
|
executorGroups.forEach {
|
||||||
|
try {
|
||||||
it.shutdownGracefully().sync()
|
it.shutdownGracefully().sync()
|
||||||
|
} catch (ex: Throwable) {
|
||||||
|
log.error(ex.message, ex)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.info {
|
log.info {
|
||||||
@@ -411,11 +432,12 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
|
|||||||
}
|
}
|
||||||
DefaultEventExecutorGroup(Runtime.getRuntime().availableProcessors(), threadFactory)
|
DefaultEventExecutorGroup(Runtime.getRuntime().availableProcessors(), threadFactory)
|
||||||
}
|
}
|
||||||
|
val serverInitializer = ServerInitializer(cfg, eventExecutorGroup)
|
||||||
val bootstrap = ServerBootstrap().apply {
|
val bootstrap = ServerBootstrap().apply {
|
||||||
// Configure the server
|
// Configure the server
|
||||||
group(bossGroup, workerGroup)
|
group(bossGroup, workerGroup)
|
||||||
channel(serverSocketChannel)
|
channel(serverSocketChannel)
|
||||||
childHandler(ServerInitializer(cfg, eventExecutorGroup))
|
childHandler(serverInitializer)
|
||||||
option(ChannelOption.SO_BACKLOG, cfg.incomingConnectionsBacklogSize)
|
option(ChannelOption.SO_BACKLOG, cfg.incomingConnectionsBacklogSize)
|
||||||
childOption(ChannelOption.SO_KEEPALIVE, true)
|
childOption(ChannelOption.SO_KEEPALIVE, true)
|
||||||
}
|
}
|
||||||
@@ -427,6 +449,6 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
|
|||||||
log.info {
|
log.info {
|
||||||
"RemoteBuildCacheServer is listening on ${cfg.host}:${cfg.port}"
|
"RemoteBuildCacheServer is listening on ${cfg.host}:${cfg.port}"
|
||||||
}
|
}
|
||||||
return ServerHandle(httpChannel, setOf(bossGroup, workerGroup, eventExecutorGroup))
|
return ServerHandle(httpChannel, setOf(bossGroup, workerGroup, eventExecutorGroup), serverInitializer)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -1,12 +1,11 @@
|
|||||||
package net.woggioni.rbcs.server.cache
|
package net.woggioni.rbcs.server.cache
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf
|
import io.netty.buffer.ByteBuf
|
||||||
|
import net.woggioni.jwo.JWO
|
||||||
import net.woggioni.rbcs.api.Cache
|
import net.woggioni.rbcs.api.Cache
|
||||||
import net.woggioni.rbcs.common.ByteBufInputStream
|
import net.woggioni.rbcs.common.ByteBufInputStream
|
||||||
import net.woggioni.rbcs.common.RBCS.digestString
|
import net.woggioni.rbcs.common.RBCS.digestString
|
||||||
import net.woggioni.rbcs.common.contextLogger
|
import net.woggioni.rbcs.common.contextLogger
|
||||||
import net.woggioni.jwo.JWO
|
|
||||||
import net.woggioni.jwo.LockFile
|
|
||||||
import java.nio.channels.Channels
|
import java.nio.channels.Channels
|
||||||
import java.nio.channels.FileChannel
|
import java.nio.channels.FileChannel
|
||||||
import java.nio.file.Files
|
import java.nio.file.Files
|
||||||
@@ -18,7 +17,6 @@ import java.security.MessageDigest
|
|||||||
import java.time.Duration
|
import java.time.Duration
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
import java.util.concurrent.CompletableFuture
|
import java.util.concurrent.CompletableFuture
|
||||||
import java.util.concurrent.atomic.AtomicReference
|
|
||||||
import java.util.zip.Deflater
|
import java.util.zip.Deflater
|
||||||
import java.util.zip.DeflaterOutputStream
|
import java.util.zip.DeflaterOutputStream
|
||||||
import java.util.zip.Inflater
|
import java.util.zip.Inflater
|
||||||
@@ -41,7 +39,10 @@ class FileSystemCache(
|
|||||||
Files.createDirectories(root)
|
Files.createDirectories(root)
|
||||||
}
|
}
|
||||||
|
|
||||||
private var nextGc = AtomicReference(Instant.now().plus(maxAge))
|
@Volatile
|
||||||
|
private var running = true
|
||||||
|
|
||||||
|
private var nextGc = Instant.now()
|
||||||
|
|
||||||
override fun get(key: String) = (digestAlgorithm
|
override fun get(key: String) = (digestAlgorithm
|
||||||
?.let(MessageDigest::getInstance)
|
?.let(MessageDigest::getInstance)
|
||||||
@@ -67,8 +68,6 @@ class FileSystemCache(
|
|||||||
FileChannel.open(file, StandardOpenOption.READ)
|
FileChannel.open(file, StandardOpenOption.READ)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}.also {
|
|
||||||
gc()
|
|
||||||
}.let {
|
}.let {
|
||||||
CompletableFuture.completedFuture(it)
|
CompletableFuture.completedFuture(it)
|
||||||
}
|
}
|
||||||
@@ -98,33 +97,51 @@ class FileSystemCache(
|
|||||||
Files.delete(tmpFile)
|
Files.delete(tmpFile)
|
||||||
throw t
|
throw t
|
||||||
}
|
}
|
||||||
}.also {
|
|
||||||
gc()
|
|
||||||
}
|
}
|
||||||
return CompletableFuture.completedFuture(null)
|
return CompletableFuture.completedFuture(null)
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun gc() {
|
private val garbageCollector = Thread.ofVirtual().name("file-system-cache-gc").start {
|
||||||
val now = Instant.now()
|
while (running) {
|
||||||
val oldValue = nextGc.getAndSet(now.plus(maxAge))
|
gc()
|
||||||
if (oldValue < now) {
|
|
||||||
actualGc(now)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Synchronized
|
private fun gc() {
|
||||||
private fun actualGc(now: Instant) {
|
val now = Instant.now()
|
||||||
Files.list(root).filter {
|
if (nextGc < now) {
|
||||||
|
val oldestEntry = actualGc(now)
|
||||||
|
nextGc = (oldestEntry ?: now).plus(maxAge)
|
||||||
|
}
|
||||||
|
Thread.sleep(minOf(Duration.between(now, nextGc), Duration.ofSeconds(1)))
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the creation timestamp of the oldest cache entry (if any)
|
||||||
|
*/
|
||||||
|
private fun actualGc(now: Instant) : Instant? {
|
||||||
|
var result :Instant? = null
|
||||||
|
Files.list(root)
|
||||||
|
.filter { path ->
|
||||||
|
JWO.splitExtension(path)
|
||||||
|
.map { it._2 }
|
||||||
|
.map { it != ".tmp" }
|
||||||
|
.orElse(true)
|
||||||
|
}
|
||||||
|
.filter {
|
||||||
val creationTimeStamp = Files.readAttributes(it, BasicFileAttributes::class.java)
|
val creationTimeStamp = Files.readAttributes(it, BasicFileAttributes::class.java)
|
||||||
.creationTime()
|
.creationTime()
|
||||||
.toInstant()
|
.toInstant()
|
||||||
|
if(result == null || creationTimeStamp < result) {
|
||||||
|
result = creationTimeStamp
|
||||||
|
}
|
||||||
now > creationTimeStamp.plus(maxAge)
|
now > creationTimeStamp.plus(maxAge)
|
||||||
}.forEach { file ->
|
}.forEach(Files::delete)
|
||||||
LockFile.acquire(file, false).use {
|
return result
|
||||||
Files.delete(file)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun close() {}
|
override fun close() {
|
||||||
|
running = false
|
||||||
|
garbageCollector.join()
|
||||||
|
}
|
||||||
}
|
}
|
@@ -1,12 +1,12 @@
|
|||||||
package net.woggioni.rbcs.server.cache
|
package net.woggioni.rbcs.server.cache
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf
|
import io.netty.buffer.ByteBuf
|
||||||
|
import net.woggioni.jwo.JWO
|
||||||
import net.woggioni.rbcs.api.Cache
|
import net.woggioni.rbcs.api.Cache
|
||||||
import net.woggioni.rbcs.common.ByteBufInputStream
|
import net.woggioni.rbcs.common.ByteBufInputStream
|
||||||
import net.woggioni.rbcs.common.ByteBufOutputStream
|
import net.woggioni.rbcs.common.ByteBufOutputStream
|
||||||
import net.woggioni.rbcs.common.RBCS.digestString
|
import net.woggioni.rbcs.common.RBCS.digestString
|
||||||
import net.woggioni.rbcs.common.contextLogger
|
import net.woggioni.rbcs.common.contextLogger
|
||||||
import net.woggioni.jwo.JWO
|
|
||||||
import java.nio.channels.Channels
|
import java.nio.channels.Channels
|
||||||
import java.security.MessageDigest
|
import java.security.MessageDigest
|
||||||
import java.time.Duration
|
import java.time.Duration
|
||||||
@@ -42,9 +42,11 @@ class InMemoryCache(
|
|||||||
|
|
||||||
private val removalQueue = PriorityBlockingQueue<RemovalQueueElement>()
|
private val removalQueue = PriorityBlockingQueue<RemovalQueueElement>()
|
||||||
|
|
||||||
|
@Volatile
|
||||||
private var running = true
|
private var running = true
|
||||||
private val garbageCollector = Thread {
|
|
||||||
while(true) {
|
private val garbageCollector = Thread.ofVirtual().name("in-memory-cache-gc").start {
|
||||||
|
while(running) {
|
||||||
val el = removalQueue.take()
|
val el = removalQueue.take()
|
||||||
val buf = el.value
|
val buf = el.value
|
||||||
val now = Instant.now()
|
val now = Instant.now()
|
||||||
@@ -62,8 +64,6 @@ class InMemoryCache(
|
|||||||
Thread.sleep(minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1)))
|
Thread.sleep(minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}.apply {
|
|
||||||
start()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun removeEldest() : Long {
|
private fun removeEldest() : Long {
|
||||||
|
@@ -17,6 +17,8 @@ import net.woggioni.rbcs.api.exception.CacheException
|
|||||||
import net.woggioni.rbcs.api.exception.ContentTooLargeException
|
import net.woggioni.rbcs.api.exception.ContentTooLargeException
|
||||||
import net.woggioni.rbcs.common.contextLogger
|
import net.woggioni.rbcs.common.contextLogger
|
||||||
import net.woggioni.rbcs.common.debug
|
import net.woggioni.rbcs.common.debug
|
||||||
|
import java.net.SocketException
|
||||||
|
import javax.net.ssl.SSLException
|
||||||
import javax.net.ssl.SSLPeerUnverifiedException
|
import javax.net.ssl.SSLPeerUnverifiedException
|
||||||
|
|
||||||
@ChannelHandler.Sharable
|
@ChannelHandler.Sharable
|
||||||
@@ -50,7 +52,12 @@ class ExceptionHandler : ChannelDuplexHandler() {
|
|||||||
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
|
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
|
||||||
when (cause) {
|
when (cause) {
|
||||||
is DecoderException -> {
|
is DecoderException -> {
|
||||||
log.error(cause.message, cause)
|
log.debug(cause.message, cause)
|
||||||
|
ctx.close()
|
||||||
|
}
|
||||||
|
|
||||||
|
is SocketException -> {
|
||||||
|
log.debug(cause.message, cause)
|
||||||
ctx.close()
|
ctx.close()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -59,10 +66,16 @@ class ExceptionHandler : ChannelDuplexHandler() {
|
|||||||
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE)
|
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
is SSLException -> {
|
||||||
|
log.debug(cause.message, cause)
|
||||||
|
ctx.close()
|
||||||
|
}
|
||||||
|
|
||||||
is ContentTooLargeException -> {
|
is ContentTooLargeException -> {
|
||||||
ctx.writeAndFlush(TOO_BIG.retainedDuplicate())
|
ctx.writeAndFlush(TOO_BIG.retainedDuplicate())
|
||||||
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE)
|
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE)
|
||||||
}
|
}
|
||||||
|
|
||||||
is ReadTimeoutException -> {
|
is ReadTimeoutException -> {
|
||||||
log.debug {
|
log.debug {
|
||||||
val channelId = ctx.channel().id().asShortText()
|
val channelId = ctx.channel().id().asShortText()
|
||||||
@@ -70,6 +83,7 @@ class ExceptionHandler : ChannelDuplexHandler() {
|
|||||||
}
|
}
|
||||||
ctx.close()
|
ctx.close()
|
||||||
}
|
}
|
||||||
|
|
||||||
is WriteTimeoutException -> {
|
is WriteTimeoutException -> {
|
||||||
log.debug {
|
log.debug {
|
||||||
val channelId = ctx.channel().id().asShortText()
|
val channelId = ctx.channel().id().asShortText()
|
||||||
@@ -77,11 +91,13 @@ class ExceptionHandler : ChannelDuplexHandler() {
|
|||||||
}
|
}
|
||||||
ctx.close()
|
ctx.close()
|
||||||
}
|
}
|
||||||
|
|
||||||
is CacheException -> {
|
is CacheException -> {
|
||||||
log.error(cause.message, cause)
|
log.error(cause.message, cause)
|
||||||
ctx.writeAndFlush(NOT_AVAILABLE.retainedDuplicate())
|
ctx.writeAndFlush(NOT_AVAILABLE.retainedDuplicate())
|
||||||
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE)
|
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE)
|
||||||
}
|
}
|
||||||
|
|
||||||
else -> {
|
else -> {
|
||||||
log.error(cause.message, cause)
|
log.error(cause.message, cause)
|
||||||
ctx.writeAndFlush(SERVER_ERROR.retainedDuplicate())
|
ctx.writeAndFlush(SERVER_ERROR.retainedDuplicate())
|
||||||
|
@@ -19,10 +19,13 @@ import java.util.concurrent.TimeUnit
|
|||||||
|
|
||||||
|
|
||||||
@Sharable
|
@Sharable
|
||||||
class ThrottlingHandler(cfg: Configuration) :
|
class ThrottlingHandler(cfg: Configuration) : ChannelInboundHandlerAdapter() {
|
||||||
ChannelInboundHandlerAdapter() {
|
|
||||||
|
|
||||||
|
private companion object {
|
||||||
|
@JvmStatic
|
||||||
private val log = contextLogger()
|
private val log = contextLogger()
|
||||||
|
}
|
||||||
|
|
||||||
private val bucketManager = BucketManager.from(cfg)
|
private val bucketManager = BucketManager.from(cfg)
|
||||||
|
|
||||||
private val connectionConfiguration = cfg.connection
|
private val connectionConfiguration = cfg.connection
|
||||||
@@ -60,7 +63,7 @@ class ThrottlingHandler(cfg: Configuration) :
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun handleBuckets(buckets : List<Bucket>, ctx : ChannelHandlerContext, msg : Any, delayResponse : Boolean) {
|
private fun handleBuckets(buckets: List<Bucket>, ctx: ChannelHandlerContext, msg: Any, delayResponse: Boolean) {
|
||||||
var nextAttempt = -1L
|
var nextAttempt = -1L
|
||||||
for (bucket in buckets) {
|
for (bucket in buckets) {
|
||||||
val bucketNextAttempt = bucket.removeTokensWithEstimate(1)
|
val bucketNextAttempt = bucket.removeTokensWithEstimate(1)
|
||||||
@@ -68,10 +71,9 @@ class ThrottlingHandler(cfg: Configuration) :
|
|||||||
nextAttempt = bucketNextAttempt
|
nextAttempt = bucketNextAttempt
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if(nextAttempt < 0) {
|
if (nextAttempt < 0) {
|
||||||
super.channelRead(ctx, msg)
|
super.channelRead(ctx, msg)
|
||||||
return
|
} else {
|
||||||
}
|
|
||||||
val waitDuration = Duration.of(LongMath.ceilDiv(nextAttempt, 100_000_000L) * 100L, ChronoUnit.MILLIS)
|
val waitDuration = Duration.of(LongMath.ceilDiv(nextAttempt, 100_000_000L) * 100L, ChronoUnit.MILLIS)
|
||||||
if (delayResponse && waitDuration < waitThreshold) {
|
if (delayResponse && waitDuration < waitThreshold) {
|
||||||
ctx.executor().schedule({
|
ctx.executor().schedule({
|
||||||
@@ -81,6 +83,7 @@ class ThrottlingHandler(cfg: Configuration) :
|
|||||||
sendThrottledResponse(ctx, waitDuration)
|
sendThrottledResponse(ctx, waitDuration)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private fun sendThrottledResponse(ctx: ChannelHandlerContext, retryAfter: Duration) {
|
private fun sendThrottledResponse(ctx: ChannelHandlerContext, retryAfter: Duration) {
|
||||||
val response = DefaultFullHttpResponse(
|
val response = DefaultFullHttpResponse(
|
||||||
|
Reference in New Issue
Block a user