From 90a5834f5f530334b93c854e38c3e0f66343387c Mon Sep 17 00:00:00 2001 From: Walter Oggioni Date: Mon, 27 Jan 2025 13:12:12 +0800 Subject: [PATCH] added retry policy to gbcs-client --- build.gradle | 6 + gbcs-client/build.gradle | 2 + .../kotlin/net/woggioni/gbcs/client/Client.kt | 93 ++++++++--- .../net/woggioni/gbcs/client/impl/Parser.kt | 52 +++++- .../kotlin/net/woggioni/gbcs/client/retry.kt | 75 +++++++++ .../gbcs/client/schema/gbcs-client.xsd | 19 ++- .../net/woggioni/gbcs/client/RetryTest.kt | 149 ++++++++++++++++++ gbcs-client/src/test/resources/logback.xml | 21 +++ gbcs-server/build.gradle | 3 - 9 files changed, 385 insertions(+), 35 deletions(-) create mode 100644 gbcs-client/src/main/kotlin/net/woggioni/gbcs/client/retry.kt create mode 100644 gbcs-client/src/test/kotlin/net/woggioni/gbcs/client/RetryTest.kt create mode 100644 gbcs-client/src/test/resources/logback.xml diff --git a/build.gradle b/build.gradle index 4875563..5c4748e 100644 --- a/build.gradle +++ b/build.gradle @@ -46,6 +46,12 @@ allprojects { subproject -> } } + dependencies { + testImplementation catalog.junit.jupiter.api + testImplementation catalog.junit.jupiter.params + testRuntimeOnly catalog.junit.jupiter.engine + } + test { useJUnitPlatform() } diff --git a/gbcs-client/build.gradle b/gbcs-client/build.gradle index 957ee4d..70e7696 100644 --- a/gbcs-client/build.gradle +++ b/gbcs-client/build.gradle @@ -10,6 +10,8 @@ dependencies { implementation catalog.slf4j.api implementation catalog.netty.buffer implementation catalog.netty.codec.http + + testRuntimeOnly catalog.logback.classic } diff --git a/gbcs-client/src/main/kotlin/net/woggioni/gbcs/client/Client.kt b/gbcs-client/src/main/kotlin/net/woggioni/gbcs/client/Client.kt index 8dfe523..05eb5a4 100644 --- a/gbcs-client/src/main/kotlin/net/woggioni/gbcs/client/Client.kt +++ b/gbcs-client/src/main/kotlin/net/woggioni/gbcs/client/Client.kt @@ -66,11 +66,18 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC data class BasicAuthenticationCredentials(val username: String, val password: String) : Authentication() } + class RetryPolicy( + val maxAttempts: Int, + val initialDelayMillis: Long, + val exp: Double + ) + data class Profile( val serverURI: URI, val authentication: Authentication?, - val connectionTimeout : Duration?, - val maxConnections : Int + val connectionTimeout: Duration?, + val maxConnections: Int, + val retryPolicy: RetryPolicy?, ) companion object { @@ -161,28 +168,76 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC pool = FixedChannelPool(bootstrap, channelPoolHandler, profile.maxConnections) } - fun get(key: String): CompletableFuture { - return sendRequest(profile.serverURI.resolve(key), HttpMethod.GET, null) - .thenApply { - val status = it.status() - if (it.status() == HttpResponseStatus.NOT_FOUND) { - null - } else if (it.status() != HttpResponseStatus.OK) { - throw HttpException(status) - } else { - it.content() - } - }.thenApply { maybeByteBuf -> - maybeByteBuf?.let { - val result = ByteArray(it.readableBytes()) - it.getBytes(0, result) - result + private fun executeWithRetry(operation: () -> CompletableFuture): CompletableFuture { + val retryPolicy = profile.retryPolicy + return if (retryPolicy != null) { + val outcomeHandler = OutcomeHandler { outcome -> + when (outcome) { + is OperationOutcome.Success -> { + val response = outcome.result + val status = response.status() + when (status) { + HttpResponseStatus.TOO_MANY_REQUESTS -> { + val retryAfter = response.headers()[HttpHeaderNames.RETRY_AFTER]?.let { headerValue -> + try { + headerValue.toLong() * 1000 + } catch (nfe: NumberFormatException) { + null + } + } + OutcomeHandlerResult.Retry(retryAfter) + } + + HttpResponseStatus.INTERNAL_SERVER_ERROR, HttpResponseStatus.SERVICE_UNAVAILABLE -> + OutcomeHandlerResult.Retry() + + else -> OutcomeHandlerResult.DoNotRetry() + } + } + + is OperationOutcome.Failure -> { + OutcomeHandlerResult.Retry() + } } } + executeWithRetry( + group, + retryPolicy.maxAttempts, + retryPolicy.initialDelayMillis.toDouble(), + retryPolicy.exp, + outcomeHandler, + operation + ) + } else { + operation() + } + } + + fun get(key: String): CompletableFuture { + return executeWithRetry { + sendRequest(profile.serverURI.resolve(key), HttpMethod.GET, null) + }.thenApply { + val status = it.status() + if (it.status() == HttpResponseStatus.NOT_FOUND) { + null + } else if (it.status() != HttpResponseStatus.OK) { + throw HttpException(status) + } else { + it.content() + } + }.thenApply { maybeByteBuf -> + maybeByteBuf?.let { + val result = ByteArray(it.readableBytes()) + it.getBytes(0, result) + result + } + } } fun put(key: String, content: ByteArray): CompletableFuture { - return sendRequest(profile.serverURI.resolve(key), HttpMethod.PUT, content).thenApply { + return executeWithRetry { + sendRequest(profile.serverURI.resolve(key), HttpMethod.PUT, content) + }.thenApply { val status = it.status() if (it.status() != HttpResponseStatus.CREATED && it.status() != HttpResponseStatus.OK) { throw HttpException(status) diff --git a/gbcs-client/src/main/kotlin/net/woggioni/gbcs/client/impl/Parser.kt b/gbcs-client/src/main/kotlin/net/woggioni/gbcs/client/impl/Parser.kt index 63c10e3..88c3bfd 100644 --- a/gbcs-client/src/main/kotlin/net/woggioni/gbcs/client/impl/Parser.kt +++ b/gbcs-client/src/main/kotlin/net/woggioni/gbcs/client/impl/Parser.kt @@ -17,16 +17,18 @@ object Parser { fun parse(document: Document): GradleBuildCacheClient.Configuration { val root = document.documentElement - val profiles = mutableMapOf() for (child in root.asIterable()) { val tagName = child.localName when (tagName) { "profile" -> { - val name = child.renderAttribute("name") ?: throw ConfigurationException("name attribute is required") - val uri = child.renderAttribute("base-url")?.let(::URI) ?: throw ConfigurationException("base-url attribute is required") + val name = + child.renderAttribute("name") ?: throw ConfigurationException("name attribute is required") + val uri = child.renderAttribute("base-url")?.let(::URI) + ?: throw ConfigurationException("base-url attribute is required") var authentication: GradleBuildCacheClient.Configuration.Authentication? = null + var retryPolicy: GradleBuildCacheClient.Configuration.RetryPolicy? = null for (gchild in child.asIterable()) { when (gchild.localName) { "tls-client-auth" -> { @@ -47,14 +49,42 @@ object Parser { .toList() .toTypedArray() authentication = - GradleBuildCacheClient.Configuration.Authentication.TlsClientAuthenticationCredentials(key, certChain) + GradleBuildCacheClient.Configuration.Authentication.TlsClientAuthenticationCredentials( + key, + certChain + ) } "basic-auth" -> { - val username = gchild.renderAttribute("user") ?: throw ConfigurationException("username attribute is required") - val password = gchild.renderAttribute("password") ?: throw ConfigurationException("password attribute is required") + val username = gchild.renderAttribute("user") + ?: throw ConfigurationException("username attribute is required") + val password = gchild.renderAttribute("password") + ?: throw ConfigurationException("password attribute is required") authentication = - GradleBuildCacheClient.Configuration.Authentication.BasicAuthenticationCredentials(username, password) + GradleBuildCacheClient.Configuration.Authentication.BasicAuthenticationCredentials( + username, + password + ) + } + + "retry-policy" -> { + val maxAttempts = + gchild.renderAttribute("max-attempts") + ?.let(String::toInt) + ?: throw ConfigurationException("max-attempts attribute is required") + val initialDelay = + gchild.renderAttribute("initial-delay") + ?.let(Duration::parse) + ?: Duration.ofSeconds(1) + val exp = + gchild.renderAttribute("exp") + ?.let(String::toDouble) + ?: 2.0f + retryPolicy = GradleBuildCacheClient.Configuration.RetryPolicy( + maxAttempts, + initialDelay.toMillis(), + exp.toDouble() + ) } } } @@ -63,7 +93,13 @@ object Parser { ?: 50 val connectionTimeout = child.renderAttribute("connection-timeout") ?.let(Duration::parse) - profiles[name] = GradleBuildCacheClient.Configuration.Profile(uri, authentication, connectionTimeout, maxConnections) + profiles[name] = GradleBuildCacheClient.Configuration.Profile( + uri, + authentication, + connectionTimeout, + maxConnections, + retryPolicy + ) } } } diff --git a/gbcs-client/src/main/kotlin/net/woggioni/gbcs/client/retry.kt b/gbcs-client/src/main/kotlin/net/woggioni/gbcs/client/retry.kt new file mode 100644 index 0000000..6003dda --- /dev/null +++ b/gbcs-client/src/main/kotlin/net/woggioni/gbcs/client/retry.kt @@ -0,0 +1,75 @@ +package net.woggioni.gbcs.client + +import io.netty.util.concurrent.EventExecutorGroup +import java.util.concurrent.CompletableFuture +import java.util.concurrent.TimeUnit + +sealed class OperationOutcome { + class Success(val result: T) : OperationOutcome() + class Failure(val ex: Throwable) : OperationOutcome() +} + +sealed class OutcomeHandlerResult { + class Retry(val suggestedDelayMillis: Long? = null) : OutcomeHandlerResult() + class DoNotRetry : OutcomeHandlerResult() +} + +fun interface OutcomeHandler { + fun shouldRetry(result: OperationOutcome): OutcomeHandlerResult +} + +fun executeWithRetry( + eventExecutorGroup: EventExecutorGroup, + maxAttempts: Int, + initialDelay: Double, + exp: Double, + outcomeHandler: OutcomeHandler, + cb: () -> CompletableFuture +): CompletableFuture { + val finalResult = cb() + var future = finalResult + var shortCircuit = false + for (i in 1 until maxAttempts) { + future = future.handle { result, ex -> + val operationOutcome = if (ex == null) { + OperationOutcome.Success(result) + } else { + OperationOutcome.Failure(ex.cause ?: ex) + } + if (shortCircuit) { + when(operationOutcome) { + is OperationOutcome.Failure -> throw operationOutcome.ex + is OperationOutcome.Success -> CompletableFuture.completedFuture(operationOutcome.result) + } + } else { + when(val outcomeHandlerResult = outcomeHandler.shouldRetry(operationOutcome)) { + is OutcomeHandlerResult.Retry -> { + val res = CompletableFuture() + val delay = run { + val scheduledDelay = (initialDelay * Math.pow(exp, i.toDouble())).toLong() + outcomeHandlerResult.suggestedDelayMillis?.coerceAtMost(scheduledDelay) ?: scheduledDelay + } + eventExecutorGroup.schedule({ + cb().handle { result, ex -> + if (ex == null) { + res.complete(result) + } else { + res.completeExceptionally(ex) + } + } + }, delay, TimeUnit.MILLISECONDS) + res + } + is OutcomeHandlerResult.DoNotRetry -> { + shortCircuit = true + when(operationOutcome) { + is OperationOutcome.Failure -> throw operationOutcome.ex + is OperationOutcome.Success -> CompletableFuture.completedFuture(operationOutcome.result) + } + } + } + } + }.thenCompose { it } + } + return future +} \ No newline at end of file diff --git a/gbcs-client/src/main/resources/net/woggioni/gbcs/client/schema/gbcs-client.xsd b/gbcs-client/src/main/resources/net/woggioni/gbcs/client/schema/gbcs-client.xsd index 14cfa8f..d424a6b 100644 --- a/gbcs-client/src/main/resources/net/woggioni/gbcs/client/schema/gbcs-client.xsd +++ b/gbcs-client/src/main/resources/net/woggioni/gbcs/client/schema/gbcs-client.xsd @@ -13,11 +13,14 @@ - - - - - + + + + + + + + @@ -38,4 +41,10 @@ + + + + + + diff --git a/gbcs-client/src/test/kotlin/net/woggioni/gbcs/client/RetryTest.kt b/gbcs-client/src/test/kotlin/net/woggioni/gbcs/client/RetryTest.kt new file mode 100644 index 0000000..6c2f76f --- /dev/null +++ b/gbcs-client/src/test/kotlin/net/woggioni/gbcs/client/RetryTest.kt @@ -0,0 +1,149 @@ +package net.woggioni.gbcs.client + +import io.netty.util.concurrent.DefaultEventExecutorGroup +import io.netty.util.concurrent.EventExecutorGroup +import net.woggioni.gbcs.common.contextLogger +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtensionContext +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.Arguments +import org.junit.jupiter.params.provider.ArgumentsProvider +import org.junit.jupiter.params.provider.ArgumentsSource +import java.util.concurrent.CompletableFuture +import java.util.stream.Stream +import kotlin.random.Random + +class RetryTest { + + data class TestArgs( + val seed: Int, + val maxAttempt: Int, + val initialDelay: Double, + val exp: Double, + ) + + class TestArguments : ArgumentsProvider { + override fun provideArguments(context: ExtensionContext): Stream { + return Stream.of( + TestArgs( + seed = 101325, + maxAttempt = 5, + initialDelay = 50.0, + exp = 2.0, + ), + TestArgs( + seed = 101325, + maxAttempt = 20, + initialDelay = 100.0, + exp = 1.1, + ), + TestArgs( + seed = 123487, + maxAttempt = 20, + initialDelay = 100.0, + exp = 2.0, + ), + TestArgs( + seed = 20082024, + maxAttempt = 10, + initialDelay = 100.0, + exp = 2.0, + ) + ).map { + object: Arguments { + override fun get() = arrayOf(it) + } + } + } + } + + @ArgumentsSource(TestArguments::class) + @ParameterizedTest + fun test(testArgs: TestArgs) { + val log = contextLogger() + log.debug("Start") + val executor: EventExecutorGroup = DefaultEventExecutorGroup(1) + val attempts = mutableListOf>>() + val outcomeHandler = OutcomeHandler { outcome -> + when(outcome) { + is OperationOutcome.Success -> { + if(outcome.result % 10 == 0) { + OutcomeHandlerResult.DoNotRetry() + } else { + OutcomeHandlerResult.Retry(null) + } + } + is OperationOutcome.Failure -> { + when(outcome.ex) { + is IllegalStateException -> { + log.debug(outcome.ex.message, outcome.ex) + OutcomeHandlerResult.Retry(null) + } + else -> { + OutcomeHandlerResult.DoNotRetry() + } + } + } + } + } + val random = Random(testArgs.seed) + + val future = + executeWithRetry(executor, testArgs.maxAttempt, testArgs.initialDelay, testArgs.exp, outcomeHandler) { + val now = System.nanoTime() + val result = CompletableFuture() + executor.submit { + val n = random.nextInt(0, Integer.MAX_VALUE) + log.debug("Got new number: {}", n) + if(n % 3 == 0) { + val ex = IllegalStateException("Value $n can be divided by 3") + result.completeExceptionally(ex) + attempts += now to OperationOutcome.Failure(ex) + } else if(n % 7 == 0) { + val ex = RuntimeException("Value $n can be divided by 7") + result.completeExceptionally(ex) + attempts += now to OperationOutcome.Failure(ex) + } else { + result.complete(n) + attempts += now to OperationOutcome.Success(n) + } + } + result + } + Assertions.assertTrue(attempts.size <= testArgs.maxAttempt) + val result = future.handle { res, ex -> + if(ex != null) { + val err = ex.cause ?: ex + log.debug(err.message, err) + OperationOutcome.Failure(err) + } else { + OperationOutcome.Success(res) + } + }.get() + for ((index, attempt) in attempts.withIndex()) { + val (timestamp, value) = attempt + if (index > 0) { + /* Check the delay for subsequent attempts is correct */ + val previousAttempt = attempts[index - 1] + val expectedTimestamp = + previousAttempt.first + testArgs.initialDelay * Math.pow(testArgs.exp, index.toDouble()) * 1e6 + val actualTimestamp = timestamp + val err = Math.abs(expectedTimestamp - actualTimestamp) / expectedTimestamp + Assertions.assertTrue(err < 1e-3) + } + if (index == attempts.size - 1 && index < testArgs.maxAttempt - 1) { + /* + * If the last attempt index is lower than the maximum number of attempts, then + * check the outcome handler returns DoNotRetry + */ + Assertions.assertTrue(outcomeHandler.shouldRetry(value) is OutcomeHandlerResult.DoNotRetry) + } else if (index < attempts.size - 1) { + /* + * If the attempt is not the last attempt check the outcome handler returns Retry + */ + Assertions.assertTrue(outcomeHandler.shouldRetry(value) is OutcomeHandlerResult.Retry) + } + } + } +} \ No newline at end of file diff --git a/gbcs-client/src/test/resources/logback.xml b/gbcs-client/src/test/resources/logback.xml new file mode 100644 index 0000000..400bcd5 --- /dev/null +++ b/gbcs-client/src/test/resources/logback.xml @@ -0,0 +1,21 @@ + + + + + + + + + System.err + + %d [%highlight(%-5level)] \(%thread\) %logger{36} -%kvp- %msg %n + + + + + + + + + + \ No newline at end of file diff --git a/gbcs-server/build.gradle b/gbcs-server/build.gradle index 80c84a9..6ee91d5 100644 --- a/gbcs-server/build.gradle +++ b/gbcs-server/build.gradle @@ -18,9 +18,6 @@ dependencies { testImplementation catalog.bcprov.jdk18on testImplementation catalog.bcpkix.jdk18on - testImplementation catalog.junit.jupiter.api - testImplementation catalog.junit.jupiter.params - testRuntimeOnly catalog.junit.jupiter.engine testRuntimeOnly project(":gbcs-server-memcached") }