added randomizer to retries
This commit is contained in:
@@ -45,9 +45,9 @@ import java.time.Duration
|
||||
import java.util.Base64
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import kotlin.random.Random
|
||||
import io.netty.util.concurrent.Future as NettyFuture
|
||||
|
||||
|
||||
class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoCloseable {
|
||||
private val group: NioEventLoopGroup
|
||||
private var sslContext: SslContext
|
||||
@@ -206,6 +206,7 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
|
||||
retryPolicy.initialDelayMillis.toDouble(),
|
||||
retryPolicy.exp,
|
||||
outcomeHandler,
|
||||
Random.Default,
|
||||
operation
|
||||
)
|
||||
} else {
|
||||
|
@@ -3,6 +3,8 @@ package net.woggioni.rbcs.client
|
||||
import io.netty.util.concurrent.EventExecutorGroup
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import java.util.concurrent.TimeUnit
|
||||
import kotlin.math.pow
|
||||
import kotlin.random.Random
|
||||
|
||||
sealed class OperationOutcome<T> {
|
||||
class Success<T>(val result: T) : OperationOutcome<T>()
|
||||
@@ -24,8 +26,10 @@ fun <T> executeWithRetry(
|
||||
initialDelay: Double,
|
||||
exp: Double,
|
||||
outcomeHandler: OutcomeHandler<T>,
|
||||
randomizer : Random?,
|
||||
cb: () -> CompletableFuture<T>
|
||||
): CompletableFuture<T> {
|
||||
|
||||
val finalResult = cb()
|
||||
var future = finalResult
|
||||
var shortCircuit = false
|
||||
@@ -46,7 +50,7 @@ fun <T> executeWithRetry(
|
||||
is OutcomeHandlerResult.Retry -> {
|
||||
val res = CompletableFuture<T>()
|
||||
val delay = run {
|
||||
val scheduledDelay = (initialDelay * Math.pow(exp, i.toDouble())).toLong()
|
||||
val scheduledDelay = (initialDelay * exp.pow(i.toDouble()) * (1.0 + (randomizer?.nextDouble(-0.5, 0.5) ?: 0.0))).toLong()
|
||||
outcomeHandlerResult.suggestedDelayMillis?.coerceAtMost(scheduledDelay) ?: scheduledDelay
|
||||
}
|
||||
eventExecutorGroup.schedule({
|
||||
|
@@ -89,7 +89,7 @@ class RetryTest {
|
||||
val random = Random(testArgs.seed)
|
||||
|
||||
val future =
|
||||
executeWithRetry(executor, testArgs.maxAttempt, testArgs.initialDelay, testArgs.exp, outcomeHandler) {
|
||||
executeWithRetry(executor, testArgs.maxAttempt, testArgs.initialDelay, testArgs.exp, outcomeHandler, null) {
|
||||
val now = System.nanoTime()
|
||||
val result = CompletableFuture<Int>()
|
||||
executor.submit {
|
||||
@@ -129,7 +129,7 @@ class RetryTest {
|
||||
previousAttempt.first + testArgs.initialDelay * Math.pow(testArgs.exp, index.toDouble()) * 1e6
|
||||
val actualTimestamp = timestamp
|
||||
val err = Math.abs(expectedTimestamp - actualTimestamp) / expectedTimestamp
|
||||
Assertions.assertTrue(err < 1e-3)
|
||||
Assertions.assertTrue(err < 1e-2)
|
||||
}
|
||||
if (index == attempts.size - 1 && index < testArgs.maxAttempt - 1) {
|
||||
/*
|
||||
|
@@ -12,6 +12,24 @@ object RBCS {
|
||||
const val RBCS_PREFIX: String = "rbcs"
|
||||
const val XML_SCHEMA_NAMESPACE_URI = "http://www.w3.org/2001/XMLSchema-instance"
|
||||
|
||||
fun ByteArray.toInt(index : Int = 0) : Long {
|
||||
if(index + 4 > size) throw IllegalArgumentException("Not enough bytes to decode a 32 bits integer")
|
||||
var value : Long = 0
|
||||
for (b in index until index + 4) {
|
||||
value = (value shl 8) + (get(b).toInt() and 0xFF)
|
||||
}
|
||||
return value
|
||||
}
|
||||
|
||||
fun ByteArray.toLong(index : Int = 0) : Long {
|
||||
if(index + 8 > size) throw IllegalArgumentException("Not enough bytes to decode a 64 bits long integer")
|
||||
var value : Long = 0
|
||||
for (b in index until index + 8) {
|
||||
value = (value shl 8) + (get(b).toInt() and 0xFF)
|
||||
}
|
||||
return value
|
||||
}
|
||||
|
||||
fun digest(
|
||||
data: ByteArray,
|
||||
md: MessageDigest = MessageDigest.getInstance("MD5")
|
||||
|
@@ -17,6 +17,8 @@ import net.woggioni.rbcs.api.exception.CacheException
|
||||
import net.woggioni.rbcs.api.exception.ContentTooLargeException
|
||||
import net.woggioni.rbcs.common.contextLogger
|
||||
import net.woggioni.rbcs.common.debug
|
||||
import java.net.SocketException
|
||||
import javax.net.ssl.SSLException
|
||||
import javax.net.ssl.SSLPeerUnverifiedException
|
||||
|
||||
@ChannelHandler.Sharable
|
||||
@@ -50,7 +52,12 @@ class ExceptionHandler : ChannelDuplexHandler() {
|
||||
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
|
||||
when (cause) {
|
||||
is DecoderException -> {
|
||||
log.error(cause.message, cause)
|
||||
log.debug(cause.message, cause)
|
||||
ctx.close()
|
||||
}
|
||||
|
||||
is SocketException -> {
|
||||
log.debug(cause.message, cause)
|
||||
ctx.close()
|
||||
}
|
||||
|
||||
@@ -59,10 +66,16 @@ class ExceptionHandler : ChannelDuplexHandler() {
|
||||
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE)
|
||||
}
|
||||
|
||||
is SSLException -> {
|
||||
log.debug(cause.message, cause)
|
||||
ctx.close()
|
||||
}
|
||||
|
||||
is ContentTooLargeException -> {
|
||||
ctx.writeAndFlush(TOO_BIG.retainedDuplicate())
|
||||
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE)
|
||||
}
|
||||
|
||||
is ReadTimeoutException -> {
|
||||
log.debug {
|
||||
val channelId = ctx.channel().id().asShortText()
|
||||
@@ -70,6 +83,7 @@ class ExceptionHandler : ChannelDuplexHandler() {
|
||||
}
|
||||
ctx.close()
|
||||
}
|
||||
|
||||
is WriteTimeoutException -> {
|
||||
log.debug {
|
||||
val channelId = ctx.channel().id().asShortText()
|
||||
@@ -77,11 +91,13 @@ class ExceptionHandler : ChannelDuplexHandler() {
|
||||
}
|
||||
ctx.close()
|
||||
}
|
||||
|
||||
is CacheException -> {
|
||||
log.error(cause.message, cause)
|
||||
ctx.writeAndFlush(NOT_AVAILABLE.retainedDuplicate())
|
||||
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE)
|
||||
}
|
||||
|
||||
else -> {
|
||||
log.error(cause.message, cause)
|
||||
ctx.writeAndFlush(SERVER_ERROR.retainedDuplicate())
|
||||
|
@@ -19,10 +19,13 @@ import java.util.concurrent.TimeUnit
|
||||
|
||||
|
||||
@Sharable
|
||||
class ThrottlingHandler(cfg: Configuration) :
|
||||
ChannelInboundHandlerAdapter() {
|
||||
class ThrottlingHandler(cfg: Configuration) : ChannelInboundHandlerAdapter() {
|
||||
|
||||
private companion object {
|
||||
@JvmStatic
|
||||
private val log = contextLogger()
|
||||
}
|
||||
|
||||
private val bucketManager = BucketManager.from(cfg)
|
||||
|
||||
private val connectionConfiguration = cfg.connection
|
||||
@@ -60,7 +63,7 @@ class ThrottlingHandler(cfg: Configuration) :
|
||||
}
|
||||
}
|
||||
|
||||
private fun handleBuckets(buckets : List<Bucket>, ctx : ChannelHandlerContext, msg : Any, delayResponse : Boolean) {
|
||||
private fun handleBuckets(buckets: List<Bucket>, ctx: ChannelHandlerContext, msg: Any, delayResponse: Boolean) {
|
||||
var nextAttempt = -1L
|
||||
for (bucket in buckets) {
|
||||
val bucketNextAttempt = bucket.removeTokensWithEstimate(1)
|
||||
@@ -68,10 +71,9 @@ class ThrottlingHandler(cfg: Configuration) :
|
||||
nextAttempt = bucketNextAttempt
|
||||
}
|
||||
}
|
||||
if(nextAttempt < 0) {
|
||||
if (nextAttempt < 0) {
|
||||
super.channelRead(ctx, msg)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
val waitDuration = Duration.of(LongMath.ceilDiv(nextAttempt, 100_000_000L) * 100L, ChronoUnit.MILLIS)
|
||||
if (delayResponse && waitDuration < waitThreshold) {
|
||||
ctx.executor().schedule({
|
||||
@@ -81,6 +83,7 @@ class ThrottlingHandler(cfg: Configuration) :
|
||||
sendThrottledResponse(ctx, waitDuration)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun sendThrottledResponse(ctx: ChannelHandlerContext, retryAfter: Duration) {
|
||||
val response = DefaultFullHttpResponse(
|
||||
|
Reference in New Issue
Block a user