From 84d7c977f9423b9c0bbdbd3e59044bcfa5e43dc8 Mon Sep 17 00:00:00 2001 From: Walter Oggioni Date: Fri, 7 Feb 2025 23:17:07 +0800 Subject: [PATCH] added randomizer to retries --- .../kotlin/net/woggioni/rbcs/client/Client.kt | 3 +- .../kotlin/net/woggioni/rbcs/client/retry.kt | 6 +++- .../net/woggioni/rbcs/client/RetryTest.kt | 4 +-- .../kotlin/net/woggioni/rbcs/common/RBCS.kt | 18 ++++++++++++ .../rbcs/server/exception/ExceptionHandler.kt | 18 +++++++++++- .../server/throttling/ThrottlingHandler.kt | 29 ++++++++++--------- 6 files changed, 60 insertions(+), 18 deletions(-) diff --git a/rbcs-client/src/main/kotlin/net/woggioni/rbcs/client/Client.kt b/rbcs-client/src/main/kotlin/net/woggioni/rbcs/client/Client.kt index fe3836f..e3e119f 100644 --- a/rbcs-client/src/main/kotlin/net/woggioni/rbcs/client/Client.kt +++ b/rbcs-client/src/main/kotlin/net/woggioni/rbcs/client/Client.kt @@ -45,9 +45,9 @@ import java.time.Duration import java.util.Base64 import java.util.concurrent.CompletableFuture import java.util.concurrent.atomic.AtomicInteger +import kotlin.random.Random import io.netty.util.concurrent.Future as NettyFuture - class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoCloseable { private val group: NioEventLoopGroup private var sslContext: SslContext @@ -206,6 +206,7 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC retryPolicy.initialDelayMillis.toDouble(), retryPolicy.exp, outcomeHandler, + Random.Default, operation ) } else { diff --git a/rbcs-client/src/main/kotlin/net/woggioni/rbcs/client/retry.kt b/rbcs-client/src/main/kotlin/net/woggioni/rbcs/client/retry.kt index b02ec21..f90b4ab 100644 --- a/rbcs-client/src/main/kotlin/net/woggioni/rbcs/client/retry.kt +++ b/rbcs-client/src/main/kotlin/net/woggioni/rbcs/client/retry.kt @@ -3,6 +3,8 @@ package net.woggioni.rbcs.client import io.netty.util.concurrent.EventExecutorGroup import java.util.concurrent.CompletableFuture import java.util.concurrent.TimeUnit +import kotlin.math.pow +import kotlin.random.Random sealed class OperationOutcome { class Success(val result: T) : OperationOutcome() @@ -24,8 +26,10 @@ fun executeWithRetry( initialDelay: Double, exp: Double, outcomeHandler: OutcomeHandler, + randomizer : Random?, cb: () -> CompletableFuture ): CompletableFuture { + val finalResult = cb() var future = finalResult var shortCircuit = false @@ -46,7 +50,7 @@ fun executeWithRetry( is OutcomeHandlerResult.Retry -> { val res = CompletableFuture() val delay = run { - val scheduledDelay = (initialDelay * Math.pow(exp, i.toDouble())).toLong() + val scheduledDelay = (initialDelay * exp.pow(i.toDouble()) * (1.0 + (randomizer?.nextDouble(-0.5, 0.5) ?: 0.0))).toLong() outcomeHandlerResult.suggestedDelayMillis?.coerceAtMost(scheduledDelay) ?: scheduledDelay } eventExecutorGroup.schedule({ diff --git a/rbcs-client/src/test/kotlin/net/woggioni/rbcs/client/RetryTest.kt b/rbcs-client/src/test/kotlin/net/woggioni/rbcs/client/RetryTest.kt index 760bc6d..c71cda9 100644 --- a/rbcs-client/src/test/kotlin/net/woggioni/rbcs/client/RetryTest.kt +++ b/rbcs-client/src/test/kotlin/net/woggioni/rbcs/client/RetryTest.kt @@ -89,7 +89,7 @@ class RetryTest { val random = Random(testArgs.seed) val future = - executeWithRetry(executor, testArgs.maxAttempt, testArgs.initialDelay, testArgs.exp, outcomeHandler) { + executeWithRetry(executor, testArgs.maxAttempt, testArgs.initialDelay, testArgs.exp, outcomeHandler, null) { val now = System.nanoTime() val result = CompletableFuture() executor.submit { @@ -129,7 +129,7 @@ class RetryTest { previousAttempt.first + testArgs.initialDelay * Math.pow(testArgs.exp, index.toDouble()) * 1e6 val actualTimestamp = timestamp val err = Math.abs(expectedTimestamp - actualTimestamp) / expectedTimestamp - Assertions.assertTrue(err < 1e-3) + Assertions.assertTrue(err < 1e-2) } if (index == attempts.size - 1 && index < testArgs.maxAttempt - 1) { /* diff --git a/rbcs-common/src/main/kotlin/net/woggioni/rbcs/common/RBCS.kt b/rbcs-common/src/main/kotlin/net/woggioni/rbcs/common/RBCS.kt index 8d50375..599f091 100644 --- a/rbcs-common/src/main/kotlin/net/woggioni/rbcs/common/RBCS.kt +++ b/rbcs-common/src/main/kotlin/net/woggioni/rbcs/common/RBCS.kt @@ -12,6 +12,24 @@ object RBCS { const val RBCS_PREFIX: String = "rbcs" const val XML_SCHEMA_NAMESPACE_URI = "http://www.w3.org/2001/XMLSchema-instance" + fun ByteArray.toInt(index : Int = 0) : Long { + if(index + 4 > size) throw IllegalArgumentException("Not enough bytes to decode a 32 bits integer") + var value : Long = 0 + for (b in index until index + 4) { + value = (value shl 8) + (get(b).toInt() and 0xFF) + } + return value + } + + fun ByteArray.toLong(index : Int = 0) : Long { + if(index + 8 > size) throw IllegalArgumentException("Not enough bytes to decode a 64 bits long integer") + var value : Long = 0 + for (b in index until index + 8) { + value = (value shl 8) + (get(b).toInt() and 0xFF) + } + return value + } + fun digest( data: ByteArray, md: MessageDigest = MessageDigest.getInstance("MD5") diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/exception/ExceptionHandler.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/exception/ExceptionHandler.kt index b9f1e2e..05e5719 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/exception/ExceptionHandler.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/exception/ExceptionHandler.kt @@ -17,6 +17,8 @@ import net.woggioni.rbcs.api.exception.CacheException import net.woggioni.rbcs.api.exception.ContentTooLargeException import net.woggioni.rbcs.common.contextLogger import net.woggioni.rbcs.common.debug +import java.net.SocketException +import javax.net.ssl.SSLException import javax.net.ssl.SSLPeerUnverifiedException @ChannelHandler.Sharable @@ -50,7 +52,12 @@ class ExceptionHandler : ChannelDuplexHandler() { override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { when (cause) { is DecoderException -> { - log.error(cause.message, cause) + log.debug(cause.message, cause) + ctx.close() + } + + is SocketException -> { + log.debug(cause.message, cause) ctx.close() } @@ -59,10 +66,16 @@ class ExceptionHandler : ChannelDuplexHandler() { .addListener(ChannelFutureListener.CLOSE_ON_FAILURE) } + is SSLException -> { + log.debug(cause.message, cause) + ctx.close() + } + is ContentTooLargeException -> { ctx.writeAndFlush(TOO_BIG.retainedDuplicate()) .addListener(ChannelFutureListener.CLOSE_ON_FAILURE) } + is ReadTimeoutException -> { log.debug { val channelId = ctx.channel().id().asShortText() @@ -70,6 +83,7 @@ class ExceptionHandler : ChannelDuplexHandler() { } ctx.close() } + is WriteTimeoutException -> { log.debug { val channelId = ctx.channel().id().asShortText() @@ -77,11 +91,13 @@ class ExceptionHandler : ChannelDuplexHandler() { } ctx.close() } + is CacheException -> { log.error(cause.message, cause) ctx.writeAndFlush(NOT_AVAILABLE.retainedDuplicate()) .addListener(ChannelFutureListener.CLOSE_ON_FAILURE) } + else -> { log.error(cause.message, cause) ctx.writeAndFlush(SERVER_ERROR.retainedDuplicate()) diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/throttling/ThrottlingHandler.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/throttling/ThrottlingHandler.kt index 09f6b85..8420a49 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/throttling/ThrottlingHandler.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/throttling/ThrottlingHandler.kt @@ -19,10 +19,13 @@ import java.util.concurrent.TimeUnit @Sharable -class ThrottlingHandler(cfg: Configuration) : - ChannelInboundHandlerAdapter() { +class ThrottlingHandler(cfg: Configuration) : ChannelInboundHandlerAdapter() { + + private companion object { + @JvmStatic + private val log = contextLogger() + } - private val log = contextLogger() private val bucketManager = BucketManager.from(cfg) private val connectionConfiguration = cfg.connection @@ -60,7 +63,7 @@ class ThrottlingHandler(cfg: Configuration) : } } - private fun handleBuckets(buckets : List, ctx : ChannelHandlerContext, msg : Any, delayResponse : Boolean) { + private fun handleBuckets(buckets: List, ctx: ChannelHandlerContext, msg: Any, delayResponse: Boolean) { var nextAttempt = -1L for (bucket in buckets) { val bucketNextAttempt = bucket.removeTokensWithEstimate(1) @@ -68,17 +71,17 @@ class ThrottlingHandler(cfg: Configuration) : nextAttempt = bucketNextAttempt } } - if(nextAttempt < 0) { + if (nextAttempt < 0) { super.channelRead(ctx, msg) - return - } - val waitDuration = Duration.of(LongMath.ceilDiv(nextAttempt, 100_000_000L) * 100L, ChronoUnit.MILLIS) - if (delayResponse && waitDuration < waitThreshold) { - ctx.executor().schedule({ - handleBuckets(buckets, ctx, msg, false) - }, waitDuration.toMillis(), TimeUnit.MILLISECONDS) } else { - sendThrottledResponse(ctx, waitDuration) + val waitDuration = Duration.of(LongMath.ceilDiv(nextAttempt, 100_000_000L) * 100L, ChronoUnit.MILLIS) + if (delayResponse && waitDuration < waitThreshold) { + ctx.executor().schedule({ + handleBuckets(buckets, ctx, msg, false) + }, waitDuration.toMillis(), TimeUnit.MILLISECONDS) + } else { + sendThrottledResponse(ctx, waitDuration) + } } }