forked from woggioni/rbcs
added retry policy to gbcs-client
This commit is contained in:
@@ -66,11 +66,18 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
|
||||
data class BasicAuthenticationCredentials(val username: String, val password: String) : Authentication()
|
||||
}
|
||||
|
||||
class RetryPolicy(
|
||||
val maxAttempts: Int,
|
||||
val initialDelayMillis: Long,
|
||||
val exp: Double
|
||||
)
|
||||
|
||||
data class Profile(
|
||||
val serverURI: URI,
|
||||
val authentication: Authentication?,
|
||||
val connectionTimeout : Duration?,
|
||||
val maxConnections : Int
|
||||
val connectionTimeout: Duration?,
|
||||
val maxConnections: Int,
|
||||
val retryPolicy: RetryPolicy?,
|
||||
)
|
||||
|
||||
companion object {
|
||||
@@ -161,28 +168,76 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
|
||||
pool = FixedChannelPool(bootstrap, channelPoolHandler, profile.maxConnections)
|
||||
}
|
||||
|
||||
fun get(key: String): CompletableFuture<ByteArray?> {
|
||||
return sendRequest(profile.serverURI.resolve(key), HttpMethod.GET, null)
|
||||
.thenApply {
|
||||
val status = it.status()
|
||||
if (it.status() == HttpResponseStatus.NOT_FOUND) {
|
||||
null
|
||||
} else if (it.status() != HttpResponseStatus.OK) {
|
||||
throw HttpException(status)
|
||||
} else {
|
||||
it.content()
|
||||
}
|
||||
}.thenApply { maybeByteBuf ->
|
||||
maybeByteBuf?.let {
|
||||
val result = ByteArray(it.readableBytes())
|
||||
it.getBytes(0, result)
|
||||
result
|
||||
private fun executeWithRetry(operation: () -> CompletableFuture<FullHttpResponse>): CompletableFuture<FullHttpResponse> {
|
||||
val retryPolicy = profile.retryPolicy
|
||||
return if (retryPolicy != null) {
|
||||
val outcomeHandler = OutcomeHandler<FullHttpResponse> { outcome ->
|
||||
when (outcome) {
|
||||
is OperationOutcome.Success -> {
|
||||
val response = outcome.result
|
||||
val status = response.status()
|
||||
when (status) {
|
||||
HttpResponseStatus.TOO_MANY_REQUESTS -> {
|
||||
val retryAfter = response.headers()[HttpHeaderNames.RETRY_AFTER]?.let { headerValue ->
|
||||
try {
|
||||
headerValue.toLong() * 1000
|
||||
} catch (nfe: NumberFormatException) {
|
||||
null
|
||||
}
|
||||
}
|
||||
OutcomeHandlerResult.Retry(retryAfter)
|
||||
}
|
||||
|
||||
HttpResponseStatus.INTERNAL_SERVER_ERROR, HttpResponseStatus.SERVICE_UNAVAILABLE ->
|
||||
OutcomeHandlerResult.Retry()
|
||||
|
||||
else -> OutcomeHandlerResult.DoNotRetry()
|
||||
}
|
||||
}
|
||||
|
||||
is OperationOutcome.Failure -> {
|
||||
OutcomeHandlerResult.Retry()
|
||||
}
|
||||
}
|
||||
}
|
||||
executeWithRetry(
|
||||
group,
|
||||
retryPolicy.maxAttempts,
|
||||
retryPolicy.initialDelayMillis.toDouble(),
|
||||
retryPolicy.exp,
|
||||
outcomeHandler,
|
||||
operation
|
||||
)
|
||||
} else {
|
||||
operation()
|
||||
}
|
||||
}
|
||||
|
||||
fun get(key: String): CompletableFuture<ByteArray?> {
|
||||
return executeWithRetry {
|
||||
sendRequest(profile.serverURI.resolve(key), HttpMethod.GET, null)
|
||||
}.thenApply {
|
||||
val status = it.status()
|
||||
if (it.status() == HttpResponseStatus.NOT_FOUND) {
|
||||
null
|
||||
} else if (it.status() != HttpResponseStatus.OK) {
|
||||
throw HttpException(status)
|
||||
} else {
|
||||
it.content()
|
||||
}
|
||||
}.thenApply { maybeByteBuf ->
|
||||
maybeByteBuf?.let {
|
||||
val result = ByteArray(it.readableBytes())
|
||||
it.getBytes(0, result)
|
||||
result
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun put(key: String, content: ByteArray): CompletableFuture<Unit> {
|
||||
return sendRequest(profile.serverURI.resolve(key), HttpMethod.PUT, content).thenApply {
|
||||
return executeWithRetry {
|
||||
sendRequest(profile.serverURI.resolve(key), HttpMethod.PUT, content)
|
||||
}.thenApply {
|
||||
val status = it.status()
|
||||
if (it.status() != HttpResponseStatus.CREATED && it.status() != HttpResponseStatus.OK) {
|
||||
throw HttpException(status)
|
||||
|
||||
@@ -17,16 +17,18 @@ object Parser {
|
||||
|
||||
fun parse(document: Document): GradleBuildCacheClient.Configuration {
|
||||
val root = document.documentElement
|
||||
|
||||
val profiles = mutableMapOf<String, GradleBuildCacheClient.Configuration.Profile>()
|
||||
|
||||
for (child in root.asIterable()) {
|
||||
val tagName = child.localName
|
||||
when (tagName) {
|
||||
"profile" -> {
|
||||
val name = child.renderAttribute("name") ?: throw ConfigurationException("name attribute is required")
|
||||
val uri = child.renderAttribute("base-url")?.let(::URI) ?: throw ConfigurationException("base-url attribute is required")
|
||||
val name =
|
||||
child.renderAttribute("name") ?: throw ConfigurationException("name attribute is required")
|
||||
val uri = child.renderAttribute("base-url")?.let(::URI)
|
||||
?: throw ConfigurationException("base-url attribute is required")
|
||||
var authentication: GradleBuildCacheClient.Configuration.Authentication? = null
|
||||
var retryPolicy: GradleBuildCacheClient.Configuration.RetryPolicy? = null
|
||||
for (gchild in child.asIterable()) {
|
||||
when (gchild.localName) {
|
||||
"tls-client-auth" -> {
|
||||
@@ -47,14 +49,42 @@ object Parser {
|
||||
.toList()
|
||||
.toTypedArray()
|
||||
authentication =
|
||||
GradleBuildCacheClient.Configuration.Authentication.TlsClientAuthenticationCredentials(key, certChain)
|
||||
GradleBuildCacheClient.Configuration.Authentication.TlsClientAuthenticationCredentials(
|
||||
key,
|
||||
certChain
|
||||
)
|
||||
}
|
||||
|
||||
"basic-auth" -> {
|
||||
val username = gchild.renderAttribute("user") ?: throw ConfigurationException("username attribute is required")
|
||||
val password = gchild.renderAttribute("password") ?: throw ConfigurationException("password attribute is required")
|
||||
val username = gchild.renderAttribute("user")
|
||||
?: throw ConfigurationException("username attribute is required")
|
||||
val password = gchild.renderAttribute("password")
|
||||
?: throw ConfigurationException("password attribute is required")
|
||||
authentication =
|
||||
GradleBuildCacheClient.Configuration.Authentication.BasicAuthenticationCredentials(username, password)
|
||||
GradleBuildCacheClient.Configuration.Authentication.BasicAuthenticationCredentials(
|
||||
username,
|
||||
password
|
||||
)
|
||||
}
|
||||
|
||||
"retry-policy" -> {
|
||||
val maxAttempts =
|
||||
gchild.renderAttribute("max-attempts")
|
||||
?.let(String::toInt)
|
||||
?: throw ConfigurationException("max-attempts attribute is required")
|
||||
val initialDelay =
|
||||
gchild.renderAttribute("initial-delay")
|
||||
?.let(Duration::parse)
|
||||
?: Duration.ofSeconds(1)
|
||||
val exp =
|
||||
gchild.renderAttribute("exp")
|
||||
?.let(String::toDouble)
|
||||
?: 2.0f
|
||||
retryPolicy = GradleBuildCacheClient.Configuration.RetryPolicy(
|
||||
maxAttempts,
|
||||
initialDelay.toMillis(),
|
||||
exp.toDouble()
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -63,7 +93,13 @@ object Parser {
|
||||
?: 50
|
||||
val connectionTimeout = child.renderAttribute("connection-timeout")
|
||||
?.let(Duration::parse)
|
||||
profiles[name] = GradleBuildCacheClient.Configuration.Profile(uri, authentication, connectionTimeout, maxConnections)
|
||||
profiles[name] = GradleBuildCacheClient.Configuration.Profile(
|
||||
uri,
|
||||
authentication,
|
||||
connectionTimeout,
|
||||
maxConnections,
|
||||
retryPolicy
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,75 @@
|
||||
package net.woggioni.gbcs.client
|
||||
|
||||
import io.netty.util.concurrent.EventExecutorGroup
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
sealed class OperationOutcome<T> {
|
||||
class Success<T>(val result: T) : OperationOutcome<T>()
|
||||
class Failure<T>(val ex: Throwable) : OperationOutcome<T>()
|
||||
}
|
||||
|
||||
sealed class OutcomeHandlerResult {
|
||||
class Retry(val suggestedDelayMillis: Long? = null) : OutcomeHandlerResult()
|
||||
class DoNotRetry : OutcomeHandlerResult()
|
||||
}
|
||||
|
||||
fun interface OutcomeHandler<T> {
|
||||
fun shouldRetry(result: OperationOutcome<T>): OutcomeHandlerResult
|
||||
}
|
||||
|
||||
fun <T> executeWithRetry(
|
||||
eventExecutorGroup: EventExecutorGroup,
|
||||
maxAttempts: Int,
|
||||
initialDelay: Double,
|
||||
exp: Double,
|
||||
outcomeHandler: OutcomeHandler<T>,
|
||||
cb: () -> CompletableFuture<T>
|
||||
): CompletableFuture<T> {
|
||||
val finalResult = cb()
|
||||
var future = finalResult
|
||||
var shortCircuit = false
|
||||
for (i in 1 until maxAttempts) {
|
||||
future = future.handle { result, ex ->
|
||||
val operationOutcome = if (ex == null) {
|
||||
OperationOutcome.Success(result)
|
||||
} else {
|
||||
OperationOutcome.Failure(ex.cause ?: ex)
|
||||
}
|
||||
if (shortCircuit) {
|
||||
when(operationOutcome) {
|
||||
is OperationOutcome.Failure -> throw operationOutcome.ex
|
||||
is OperationOutcome.Success -> CompletableFuture.completedFuture(operationOutcome.result)
|
||||
}
|
||||
} else {
|
||||
when(val outcomeHandlerResult = outcomeHandler.shouldRetry(operationOutcome)) {
|
||||
is OutcomeHandlerResult.Retry -> {
|
||||
val res = CompletableFuture<T>()
|
||||
val delay = run {
|
||||
val scheduledDelay = (initialDelay * Math.pow(exp, i.toDouble())).toLong()
|
||||
outcomeHandlerResult.suggestedDelayMillis?.coerceAtMost(scheduledDelay) ?: scheduledDelay
|
||||
}
|
||||
eventExecutorGroup.schedule({
|
||||
cb().handle { result, ex ->
|
||||
if (ex == null) {
|
||||
res.complete(result)
|
||||
} else {
|
||||
res.completeExceptionally(ex)
|
||||
}
|
||||
}
|
||||
}, delay, TimeUnit.MILLISECONDS)
|
||||
res
|
||||
}
|
||||
is OutcomeHandlerResult.DoNotRetry -> {
|
||||
shortCircuit = true
|
||||
when(operationOutcome) {
|
||||
is OperationOutcome.Failure -> throw operationOutcome.ex
|
||||
is OperationOutcome.Success -> CompletableFuture.completedFuture(operationOutcome.result)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}.thenCompose { it }
|
||||
}
|
||||
return future
|
||||
}
|
||||
Reference in New Issue
Block a user