Compare commits

..

1 Commits

Author SHA1 Message Date
c3c4bbe5e2 fixed throttling retry-after estimation
All checks were successful
CI / build (push) Successful in 3m38s
2025-01-25 00:39:31 +08:00
24 changed files with 103 additions and 588 deletions

View File

@@ -46,12 +46,6 @@ allprojects { subproject ->
} }
} }
dependencies {
testImplementation catalog.junit.jupiter.api
testImplementation catalog.junit.jupiter.params
testRuntimeOnly catalog.junit.jupiter.engine
}
test { test {
useJUnitPlatform() useJUnitPlatform()
} }

View File

@@ -91,14 +91,11 @@ public class Configuration {
boolean verifyClients; boolean verifyClients;
} }
public enum ClientCertificate {
REQUIRED, OPTIONAL
}
@Value @Value
public static class Tls { public static class Tls {
KeyStore keyStore; KeyStore keyStore;
TrustStore trustStore; TrustStore trustStore;
boolean verifyClients;
} }
@Value @Value
@@ -114,7 +111,6 @@ public class Configuration {
Path file; Path file;
String password; String password;
boolean checkCertificateStatus; boolean checkCertificateStatus;
boolean requireClientCertificate;
} }
@Value @Value

View File

@@ -7,7 +7,6 @@ module net.woggioni.gbcs.cli {
requires kotlin.stdlib; requires kotlin.stdlib;
requires net.woggioni.jwo; requires net.woggioni.jwo;
requires net.woggioni.gbcs.api; requires net.woggioni.gbcs.api;
requires io.netty.codec.http;
exports net.woggioni.gbcs.cli.impl.converters to info.picocli; exports net.woggioni.gbcs.cli.impl.converters to info.picocli;
opens net.woggioni.gbcs.cli.impl.commands to info.picocli; opens net.woggioni.gbcs.cli.impl.commands to info.picocli;

View File

@@ -1,13 +1,10 @@
package net.woggioni.gbcs.cli.impl.commands package net.woggioni.gbcs.cli.impl.commands
import io.netty.handler.codec.http.FullHttpRequest
import io.netty.handler.codec.http.FullHttpResponse
import net.woggioni.gbcs.common.contextLogger import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.common.error import net.woggioni.gbcs.common.error
import net.woggioni.gbcs.common.info import net.woggioni.gbcs.common.info
import net.woggioni.gbcs.cli.impl.GbcsCommand import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.client.GradleBuildCacheClient import net.woggioni.gbcs.client.GradleBuildCacheClient
import net.woggioni.gbcs.client.RequestEventListener
import net.woggioni.jwo.JWO import net.woggioni.jwo.JWO
import picocli.CommandLine import picocli.CommandLine
import java.security.SecureRandom import java.security.SecureRandom
@@ -17,7 +14,6 @@ import java.util.Base64
import java.util.concurrent.ExecutionException import java.util.concurrent.ExecutionException
import java.util.concurrent.Future import java.util.concurrent.Future
import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.Semaphore
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import kotlin.random.Random import kotlin.random.Random
@@ -57,87 +53,53 @@ class BenchmarkCommand : GbcsCommand() {
} }
} }
log.info {
"Starting insertion"
}
val entries = let { val entries = let {
val completionCounter = AtomicLong(0) val completionQueue = LinkedBlockingQueue<Future<Pair<String, ByteArray>>>(numberOfEntries)
val completionQueue = LinkedBlockingQueue<Pair<String, ByteArray>>(numberOfEntries)
val start = Instant.now() val start = Instant.now()
val semaphore = Semaphore(profile.maxConnections * 3)
val totalElapsedTime = AtomicLong(0) val totalElapsedTime = AtomicLong(0)
val iterator = entryGenerator.take(numberOfEntries).iterator() entryGenerator.take(numberOfEntries).forEach { entry ->
while(completionCounter.get() < numberOfEntries) { val requestStart = System.nanoTime()
if(iterator.hasNext()) { val future = client.put(entry.first, entry.second).thenApply { entry }
val entry = iterator.next() future.whenComplete { _, _ ->
semaphore.acquire() totalElapsedTime.addAndGet((System.nanoTime() - requestStart))
val eventListener = object : RequestEventListener { completionQueue.put(future)
var start: Long? = null
override fun requestSent(req: FullHttpRequest) {
this.start = System.nanoTime()
}
override fun responseReceived(res: FullHttpResponse) {
this.start?.let { requestStart ->
totalElapsedTime.addAndGet((System.nanoTime() - requestStart))
}
this.start = null
}
}
val future = client.put(entry.first, entry.second, eventListener).thenApply { entry }
future.whenComplete { result, ex ->
if (ex != null) {
log.error(ex.message, ex)
} else {
completionQueue.put(result)
}
semaphore.release()
completionCounter.incrementAndGet()
}
} }
} }
val inserted = completionQueue.toList() val inserted = sequence<Pair<String, ByteArray>> {
var completionCounter = 0
while (completionCounter < numberOfEntries) {
val future = completionQueue.take()
try {
yield(future.get())
} catch (ee: ExecutionException) {
val cause = ee.cause ?: ee
log.error(cause.message, cause)
}
completionCounter += 1
}
}.toList()
val end = Instant.now() val end = Instant.now()
log.info { log.info {
val elapsed = Duration.between(start, end).toMillis() val elapsed = Duration.between(start, end).toMillis()
val opsPerSecond = String.format("%.2f", numberOfEntries.toDouble() / elapsed * 1000) "Insertion rate: ${numberOfEntries.toDouble() / elapsed * 1000} ops/s"
"Insertion rate: $opsPerSecond ops/s"
} }
log.info { log.info {
val avgTxTime = String.format("%.0f", totalElapsedTime.get() / numberOfEntries.toDouble() / 1e6) "Average time per insertion: ${totalElapsedTime.get() / numberOfEntries.toDouble() * 1000} ms"
"Average time per insertion: $avgTxTime ms"
} }
inserted inserted
} }
log.info { log.info {
"Inserted ${entries.size} entries" "Inserted ${entries.size} entries"
} }
log.info {
"Starting retrieval"
}
if (entries.isNotEmpty()) { if (entries.isNotEmpty()) {
val completionCounter = AtomicLong(0) val completionQueue = LinkedBlockingQueue<Future<Unit>>(entries.size)
val semaphore = Semaphore(profile.maxConnections * 3)
val start = Instant.now() val start = Instant.now()
val totalElapsedTime = AtomicLong(0) val totalElapsedTime = AtomicLong(0)
entries.forEach { entry -> entries.forEach { entry ->
semaphore.acquire() val requestStart = System.nanoTime()
val eventListener = object : RequestEventListener { val future = client.get(entry.first).thenApply {
var start : Long? = null totalElapsedTime.addAndGet((System.nanoTime() - requestStart))
override fun requestSent(req: FullHttpRequest) {
this.start = System.nanoTime()
}
override fun responseReceived(res: FullHttpResponse) {
this.start?.let { requestStart ->
totalElapsedTime.addAndGet((System.nanoTime() - requestStart))
}
this.start = null
}
}
val future = client.get(entry.first, eventListener).thenApply {
if (it == null) { if (it == null) {
log.error { log.error {
"Missing entry for key '${entry.first}'" "Missing entry for key '${entry.first}'"
@@ -149,19 +111,21 @@ class BenchmarkCommand : GbcsCommand() {
} }
} }
future.whenComplete { _, _ -> future.whenComplete { _, _ ->
completionCounter.incrementAndGet() completionQueue.put(future)
semaphore.release()
} }
} }
var completionCounter = 0
while (completionCounter < entries.size) {
completionQueue.take()
completionCounter += 1
}
val end = Instant.now() val end = Instant.now()
log.info { log.info {
val elapsed = Duration.between(start, end).toMillis() val elapsed = Duration.between(start, end).toMillis()
val opsPerSecond = String.format("%.2f", entries.size.toDouble() / elapsed * 1000) "Retrieval rate: ${entries.size.toDouble() / elapsed * 1000} ops/s"
"Retrieval rate: $opsPerSecond ops/s"
} }
log.info { log.info {
val avgTxTime = String.format("%.0f", totalElapsedTime.get() / completionCounter.toDouble() / 1e6) "Average time per retrieval: ${totalElapsedTime.get() / numberOfEntries.toDouble() * 1e6} ms"
"Average time per retrieval: $avgTxTime ms"
} }
} else { } else {
log.error("Skipping retrieval benchmark as it was not possible to insert any entry in the cache") log.error("Skipping retrieval benchmark as it was not possible to insert any entry in the cache")

View File

@@ -10,8 +10,6 @@ dependencies {
implementation catalog.slf4j.api implementation catalog.slf4j.api
implementation catalog.netty.buffer implementation catalog.netty.buffer
implementation catalog.netty.codec.http implementation catalog.netty.codec.http
testRuntimeOnly catalog.logback.classic
} }

View File

@@ -66,18 +66,11 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
data class BasicAuthenticationCredentials(val username: String, val password: String) : Authentication() data class BasicAuthenticationCredentials(val username: String, val password: String) : Authentication()
} }
class RetryPolicy(
val maxAttempts: Int,
val initialDelayMillis: Long,
val exp: Double
)
data class Profile( data class Profile(
val serverURI: URI, val serverURI: URI,
val authentication: Authentication?, val authentication: Authentication?,
val connectionTimeout: Duration?, val connectionTimeout : Duration?,
val maxConnections: Int, val maxConnections : Int
val retryPolicy: RetryPolicy?,
) )
companion object { companion object {
@@ -168,76 +161,28 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
pool = FixedChannelPool(bootstrap, channelPoolHandler, profile.maxConnections) pool = FixedChannelPool(bootstrap, channelPoolHandler, profile.maxConnections)
} }
private fun executeWithRetry(operation: () -> CompletableFuture<FullHttpResponse>): CompletableFuture<FullHttpResponse> { fun get(key: String): CompletableFuture<ByteArray?> {
val retryPolicy = profile.retryPolicy return sendRequest(profile.serverURI.resolve(key), HttpMethod.GET, null)
return if (retryPolicy != null) { .thenApply {
val outcomeHandler = OutcomeHandler<FullHttpResponse> { outcome -> val status = it.status()
when (outcome) { if (it.status() == HttpResponseStatus.NOT_FOUND) {
is OperationOutcome.Success -> { null
val response = outcome.result } else if (it.status() != HttpResponseStatus.OK) {
val status = response.status() throw HttpException(status)
when (status) { } else {
HttpResponseStatus.TOO_MANY_REQUESTS -> { it.content()
val retryAfter = response.headers()[HttpHeaderNames.RETRY_AFTER]?.let { headerValue -> }
try { }.thenApply { maybeByteBuf ->
headerValue.toLong() * 1000 maybeByteBuf?.let {
} catch (nfe: NumberFormatException) { val result = ByteArray(it.readableBytes())
null it.getBytes(0, result)
} result
}
OutcomeHandlerResult.Retry(retryAfter)
}
HttpResponseStatus.INTERNAL_SERVER_ERROR, HttpResponseStatus.SERVICE_UNAVAILABLE ->
OutcomeHandlerResult.Retry()
else -> OutcomeHandlerResult.DoNotRetry()
}
}
is OperationOutcome.Failure -> {
OutcomeHandlerResult.Retry()
}
} }
} }
executeWithRetry(
group,
retryPolicy.maxAttempts,
retryPolicy.initialDelayMillis.toDouble(),
retryPolicy.exp,
outcomeHandler,
operation
)
} else {
operation()
}
} }
fun get(key: String, eventListener : RequestEventListener? = null): CompletableFuture<ByteArray?> { fun put(key: String, content: ByteArray): CompletableFuture<Unit> {
return executeWithRetry { return sendRequest(profile.serverURI.resolve(key), HttpMethod.PUT, content).thenApply {
sendRequest(profile.serverURI.resolve(key), HttpMethod.GET, null, eventListener)
}.thenApply {
val status = it.status()
if (it.status() == HttpResponseStatus.NOT_FOUND) {
null
} else if (it.status() != HttpResponseStatus.OK) {
throw HttpException(status)
} else {
it.content()
}
}.thenApply { maybeByteBuf ->
maybeByteBuf?.let {
val result = ByteArray(it.readableBytes())
it.getBytes(0, result)
result
}
}
}
fun put(key: String, content: ByteArray, eventListener : RequestEventListener? = null): CompletableFuture<Unit> {
return executeWithRetry {
sendRequest(profile.serverURI.resolve(key), HttpMethod.PUT, content, eventListener)
}.thenApply {
val status = it.status() val status = it.status()
if (it.status() != HttpResponseStatus.CREATED && it.status() != HttpResponseStatus.OK) { if (it.status() != HttpResponseStatus.CREATED && it.status() != HttpResponseStatus.OK) {
throw HttpException(status) throw HttpException(status)
@@ -245,7 +190,7 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
} }
} }
private fun sendRequest(uri: URI, method: HttpMethod, body: ByteArray?, eventListener : RequestEventListener?): CompletableFuture<FullHttpResponse> { private fun sendRequest(uri: URI, method: HttpMethod, body: ByteArray?): CompletableFuture<FullHttpResponse> {
val responseFuture = CompletableFuture<FullHttpResponse>() val responseFuture = CompletableFuture<FullHttpResponse>()
// Custom handler for processing responses // Custom handler for processing responses
pool.acquire().addListener(object : GenericFutureListener<NettyFuture<Channel>> { pool.acquire().addListener(object : GenericFutureListener<NettyFuture<Channel>> {
@@ -261,7 +206,6 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
pipeline.removeLast() pipeline.removeLast()
pool.release(channel) pool.release(channel)
responseFuture.complete(response) responseFuture.complete(response)
eventListener?.responseReceived(response)
} }
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
@@ -307,11 +251,7 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
// Set headers // Set headers
// Send the request // Send the request
channel.writeAndFlush(request).addListener { channel.writeAndFlush(request)
if(it.isSuccess) {
eventListener?.requestSent(request)
}
}
} else { } else {
responseFuture.completeExceptionally(channelFuture.cause()) responseFuture.completeExceptionally(channelFuture.cause())
} }

View File

@@ -1,10 +0,0 @@
package net.woggioni.gbcs.client
import io.netty.handler.codec.http.FullHttpRequest
import io.netty.handler.codec.http.FullHttpResponse
interface RequestEventListener {
fun requestSent(req : FullHttpRequest) {}
fun responseReceived(res : FullHttpResponse) {}
fun exceptionCaught(ex : Throwable) {}
}

View File

@@ -17,18 +17,16 @@ object Parser {
fun parse(document: Document): GradleBuildCacheClient.Configuration { fun parse(document: Document): GradleBuildCacheClient.Configuration {
val root = document.documentElement val root = document.documentElement
val profiles = mutableMapOf<String, GradleBuildCacheClient.Configuration.Profile>() val profiles = mutableMapOf<String, GradleBuildCacheClient.Configuration.Profile>()
for (child in root.asIterable()) { for (child in root.asIterable()) {
val tagName = child.localName val tagName = child.localName
when (tagName) { when (tagName) {
"profile" -> { "profile" -> {
val name = val name = child.renderAttribute("name") ?: throw ConfigurationException("name attribute is required")
child.renderAttribute("name") ?: throw ConfigurationException("name attribute is required") val uri = child.renderAttribute("base-url")?.let(::URI) ?: throw ConfigurationException("base-url attribute is required")
val uri = child.renderAttribute("base-url")?.let(::URI)
?: throw ConfigurationException("base-url attribute is required")
var authentication: GradleBuildCacheClient.Configuration.Authentication? = null var authentication: GradleBuildCacheClient.Configuration.Authentication? = null
var retryPolicy: GradleBuildCacheClient.Configuration.RetryPolicy? = null
for (gchild in child.asIterable()) { for (gchild in child.asIterable()) {
when (gchild.localName) { when (gchild.localName) {
"tls-client-auth" -> { "tls-client-auth" -> {
@@ -49,42 +47,14 @@ object Parser {
.toList() .toList()
.toTypedArray() .toTypedArray()
authentication = authentication =
GradleBuildCacheClient.Configuration.Authentication.TlsClientAuthenticationCredentials( GradleBuildCacheClient.Configuration.Authentication.TlsClientAuthenticationCredentials(key, certChain)
key,
certChain
)
} }
"basic-auth" -> { "basic-auth" -> {
val username = gchild.renderAttribute("user") val username = gchild.renderAttribute("user") ?: throw ConfigurationException("username attribute is required")
?: throw ConfigurationException("username attribute is required") val password = gchild.renderAttribute("password") ?: throw ConfigurationException("password attribute is required")
val password = gchild.renderAttribute("password")
?: throw ConfigurationException("password attribute is required")
authentication = authentication =
GradleBuildCacheClient.Configuration.Authentication.BasicAuthenticationCredentials( GradleBuildCacheClient.Configuration.Authentication.BasicAuthenticationCredentials(username, password)
username,
password
)
}
"retry-policy" -> {
val maxAttempts =
gchild.renderAttribute("max-attempts")
?.let(String::toInt)
?: throw ConfigurationException("max-attempts attribute is required")
val initialDelay =
gchild.renderAttribute("initial-delay")
?.let(Duration::parse)
?: Duration.ofSeconds(1)
val exp =
gchild.renderAttribute("exp")
?.let(String::toDouble)
?: 2.0f
retryPolicy = GradleBuildCacheClient.Configuration.RetryPolicy(
maxAttempts,
initialDelay.toMillis(),
exp.toDouble()
)
} }
} }
} }
@@ -93,13 +63,7 @@ object Parser {
?: 50 ?: 50
val connectionTimeout = child.renderAttribute("connection-timeout") val connectionTimeout = child.renderAttribute("connection-timeout")
?.let(Duration::parse) ?.let(Duration::parse)
profiles[name] = GradleBuildCacheClient.Configuration.Profile( profiles[name] = GradleBuildCacheClient.Configuration.Profile(uri, authentication, connectionTimeout, maxConnections)
uri,
authentication,
connectionTimeout,
maxConnections,
retryPolicy
)
} }
} }
} }

View File

@@ -1,75 +0,0 @@
package net.woggioni.gbcs.client
import io.netty.util.concurrent.EventExecutorGroup
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit
sealed class OperationOutcome<T> {
class Success<T>(val result: T) : OperationOutcome<T>()
class Failure<T>(val ex: Throwable) : OperationOutcome<T>()
}
sealed class OutcomeHandlerResult {
class Retry(val suggestedDelayMillis: Long? = null) : OutcomeHandlerResult()
class DoNotRetry : OutcomeHandlerResult()
}
fun interface OutcomeHandler<T> {
fun shouldRetry(result: OperationOutcome<T>): OutcomeHandlerResult
}
fun <T> executeWithRetry(
eventExecutorGroup: EventExecutorGroup,
maxAttempts: Int,
initialDelay: Double,
exp: Double,
outcomeHandler: OutcomeHandler<T>,
cb: () -> CompletableFuture<T>
): CompletableFuture<T> {
val finalResult = cb()
var future = finalResult
var shortCircuit = false
for (i in 1 until maxAttempts) {
future = future.handle { result, ex ->
val operationOutcome = if (ex == null) {
OperationOutcome.Success(result)
} else {
OperationOutcome.Failure(ex.cause ?: ex)
}
if (shortCircuit) {
when(operationOutcome) {
is OperationOutcome.Failure -> throw operationOutcome.ex
is OperationOutcome.Success -> CompletableFuture.completedFuture(operationOutcome.result)
}
} else {
when(val outcomeHandlerResult = outcomeHandler.shouldRetry(operationOutcome)) {
is OutcomeHandlerResult.Retry -> {
val res = CompletableFuture<T>()
val delay = run {
val scheduledDelay = (initialDelay * Math.pow(exp, i.toDouble())).toLong()
outcomeHandlerResult.suggestedDelayMillis?.coerceAtMost(scheduledDelay) ?: scheduledDelay
}
eventExecutorGroup.schedule({
cb().handle { result, ex ->
if (ex == null) {
res.complete(result)
} else {
res.completeExceptionally(ex)
}
}
}, delay, TimeUnit.MILLISECONDS)
res
}
is OutcomeHandlerResult.DoNotRetry -> {
shortCircuit = true
when(operationOutcome) {
is OperationOutcome.Failure -> throw operationOutcome.ex
is OperationOutcome.Success -> CompletableFuture.completedFuture(operationOutcome.result)
}
}
}
}
}.thenCompose { it }
}
return future
}

View File

@@ -13,14 +13,11 @@
</xs:complexType> </xs:complexType>
<xs:complexType name="profileType"> <xs:complexType name="profileType">
<xs:sequence> <xs:choice>
<xs:choice> <xs:element name="no-auth" type="gbcs-client:noAuthType"/>
<xs:element name="no-auth" type="gbcs-client:noAuthType"/> <xs:element name="basic-auth" type="gbcs-client:basicAuthType"/>
<xs:element name="basic-auth" type="gbcs-client:basicAuthType"/> <xs:element name="tls-client-auth" type="gbcs-client:tlsClientAuthType"/>
<xs:element name="tls-client-auth" type="gbcs-client:tlsClientAuthType"/> </xs:choice>
</xs:choice>
<xs:element name="retry-policy" type="gbcs-client:retryType" minOccurs="0"/>
</xs:sequence>
<xs:attribute name="name" type="xs:token" use="required"/> <xs:attribute name="name" type="xs:token" use="required"/>
<xs:attribute name="base-url" type="xs:anyURI" use="required"/> <xs:attribute name="base-url" type="xs:anyURI" use="required"/>
<xs:attribute name="max-connections" type="xs:positiveInteger" default="50"/> <xs:attribute name="max-connections" type="xs:positiveInteger" default="50"/>
@@ -41,10 +38,4 @@
<xs:attribute name="key-password" type="xs:string" use="optional"/> <xs:attribute name="key-password" type="xs:string" use="optional"/>
</xs:complexType> </xs:complexType>
<xs:complexType name="retryType">
<xs:attribute name="max-attempts" type="xs:positiveInteger" use="required"/>
<xs:attribute name="initial-delay" type="xs:duration" default="PT1S"/>
<xs:attribute name="exp" type="xs:double" default="2.0"/>
</xs:complexType>
</xs:schema> </xs:schema>

View File

@@ -1,149 +0,0 @@
package net.woggioni.gbcs.client
import io.netty.util.concurrent.DefaultEventExecutorGroup
import io.netty.util.concurrent.EventExecutorGroup
import net.woggioni.gbcs.common.contextLogger
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtensionContext
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments
import org.junit.jupiter.params.provider.ArgumentsProvider
import org.junit.jupiter.params.provider.ArgumentsSource
import java.util.concurrent.CompletableFuture
import java.util.stream.Stream
import kotlin.random.Random
class RetryTest {
data class TestArgs(
val seed: Int,
val maxAttempt: Int,
val initialDelay: Double,
val exp: Double,
)
class TestArguments : ArgumentsProvider {
override fun provideArguments(context: ExtensionContext): Stream<out Arguments> {
return Stream.of(
TestArgs(
seed = 101325,
maxAttempt = 5,
initialDelay = 50.0,
exp = 2.0,
),
TestArgs(
seed = 101325,
maxAttempt = 20,
initialDelay = 100.0,
exp = 1.1,
),
TestArgs(
seed = 123487,
maxAttempt = 20,
initialDelay = 100.0,
exp = 2.0,
),
TestArgs(
seed = 20082024,
maxAttempt = 10,
initialDelay = 100.0,
exp = 2.0,
)
).map {
object: Arguments {
override fun get() = arrayOf(it)
}
}
}
}
@ArgumentsSource(TestArguments::class)
@ParameterizedTest
fun test(testArgs: TestArgs) {
val log = contextLogger()
log.debug("Start")
val executor: EventExecutorGroup = DefaultEventExecutorGroup(1)
val attempts = mutableListOf<Pair<Long, OperationOutcome<Int>>>()
val outcomeHandler = OutcomeHandler<Int> { outcome ->
when(outcome) {
is OperationOutcome.Success -> {
if(outcome.result % 10 == 0) {
OutcomeHandlerResult.DoNotRetry()
} else {
OutcomeHandlerResult.Retry(null)
}
}
is OperationOutcome.Failure -> {
when(outcome.ex) {
is IllegalStateException -> {
log.debug(outcome.ex.message, outcome.ex)
OutcomeHandlerResult.Retry(null)
}
else -> {
OutcomeHandlerResult.DoNotRetry()
}
}
}
}
}
val random = Random(testArgs.seed)
val future =
executeWithRetry(executor, testArgs.maxAttempt, testArgs.initialDelay, testArgs.exp, outcomeHandler) {
val now = System.nanoTime()
val result = CompletableFuture<Int>()
executor.submit {
val n = random.nextInt(0, Integer.MAX_VALUE)
log.debug("Got new number: {}", n)
if(n % 3 == 0) {
val ex = IllegalStateException("Value $n can be divided by 3")
result.completeExceptionally(ex)
attempts += now to OperationOutcome.Failure(ex)
} else if(n % 7 == 0) {
val ex = RuntimeException("Value $n can be divided by 7")
result.completeExceptionally(ex)
attempts += now to OperationOutcome.Failure(ex)
} else {
result.complete(n)
attempts += now to OperationOutcome.Success(n)
}
}
result
}
Assertions.assertTrue(attempts.size <= testArgs.maxAttempt)
val result = future.handle { res, ex ->
if(ex != null) {
val err = ex.cause ?: ex
log.debug(err.message, err)
OperationOutcome.Failure(err)
} else {
OperationOutcome.Success(res)
}
}.get()
for ((index, attempt) in attempts.withIndex()) {
val (timestamp, value) = attempt
if (index > 0) {
/* Check the delay for subsequent attempts is correct */
val previousAttempt = attempts[index - 1]
val expectedTimestamp =
previousAttempt.first + testArgs.initialDelay * Math.pow(testArgs.exp, index.toDouble()) * 1e6
val actualTimestamp = timestamp
val err = Math.abs(expectedTimestamp - actualTimestamp) / expectedTimestamp
Assertions.assertTrue(err < 1e-3)
}
if (index == attempts.size - 1 && index < testArgs.maxAttempt - 1) {
/*
* If the last attempt index is lower than the maximum number of attempts, then
* check the outcome handler returns DoNotRetry
*/
Assertions.assertTrue(outcomeHandler.shouldRetry(value) is OutcomeHandlerResult.DoNotRetry)
} else if (index < attempts.size - 1) {
/*
* If the attempt is not the last attempt check the outcome handler returns Retry
*/
Assertions.assertTrue(outcomeHandler.shouldRetry(value) is OutcomeHandlerResult.Retry)
}
}
}
}

View File

@@ -1,21 +0,0 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration>
<configuration>
<import class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"/>
<import class="ch.qos.logback.core.ConsoleAppender"/>
<appender name="console" class="ConsoleAppender">
<target>System.err</target>
<encoder class="PatternLayoutEncoder">
<pattern>%d [%highlight(%-5level)] \(%thread\) %logger{36} -%kvp- %msg %n</pattern>
</encoder>
</appender>
<root level="info">
<appender-ref ref="console"/>
</root>
<logger name="io.netty" level="info"/>
<logger name="com.google.code.yanf4j" level="warn"/>
<logger name="net.rubyeye.xmemcached" level="warn"/>
</configuration>

View File

@@ -18,6 +18,9 @@ dependencies {
testImplementation catalog.bcprov.jdk18on testImplementation catalog.bcprov.jdk18on
testImplementation catalog.bcpkix.jdk18on testImplementation catalog.bcpkix.jdk18on
testImplementation catalog.junit.jupiter.api
testImplementation catalog.junit.jupiter.params
testRuntimeOnly catalog.junit.jupiter.engine
testRuntimeOnly project(":gbcs-server-memcached") testRuntimeOnly project(":gbcs-server-memcached")
} }

View File

@@ -208,15 +208,15 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
.map { it as X509Certificate } .map { it as X509Certificate }
.toArray { size -> Array<X509Certificate?>(size) { null } } .toArray { size -> Array<X509Certificate?>(size) { null } }
SslContextBuilder.forServer(serverKey, *serverCert).apply { SslContextBuilder.forServer(serverKey, *serverCert).apply {
val clientAuth = tls.trustStore?.let { trustStore -> if (tls.isVerifyClients) {
val ts = loadKeystore(trustStore.file, trustStore.password) clientAuth(ClientAuth.OPTIONAL)
trustManager( tls.trustStore?.let { trustStore ->
ClientCertificateValidator.getTrustManager(ts, trustStore.isCheckCertificateStatus) val ts = loadKeystore(trustStore.file, trustStore.password)
) trustManager(
if(trustStore.isRequireClientCertificate) ClientAuth.REQUIRE ClientCertificateValidator.getTrustManager(ts, trustStore.isCheckCertificateStatus)
else ClientAuth.OPTIONAL )
} ?: ClientAuth.NONE }
clientAuth(clientAuth) }
}.build() }.build()
} }
} }
@@ -309,33 +309,8 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
} }
val pipeline = ch.pipeline() val pipeline = ch.pipeline()
cfg.connection.also { conn -> cfg.connection.also { conn ->
val readTimeout = conn.readTimeout.toMillis() pipeline.addLast(IdleStateHandler(false, conn.readTimeout.toMillis(), conn.writeTimeout.toMillis(), 0, TimeUnit.MILLISECONDS))
val writeTimeout = conn.writeTimeout.toMillis() pipeline.addLast(IdleStateHandler(true, conn.readIdleTimeout.toMillis(), conn.writeIdleTimeout.toMillis(), conn.idleTimeout.toMillis(), TimeUnit.MILLISECONDS))
if(readTimeout > 0 || writeTimeout > 0) {
pipeline.addLast(
IdleStateHandler(
false,
readTimeout,
writeTimeout,
0,
TimeUnit.MILLISECONDS
)
)
}
val readIdleTimeout = conn.readIdleTimeout.toMillis()
val writeIdleTimeout = conn.writeIdleTimeout.toMillis()
val idleTimeout = conn.idleTimeout.toMillis()
if(readIdleTimeout > 0 || writeIdleTimeout > 0 || idleTimeout > 0) {
pipeline.addLast(
IdleStateHandler(
true,
readIdleTimeout,
writeIdleTimeout,
idleTimeout,
TimeUnit.MILLISECONDS
)
)
}
} }
pipeline.addLast(object : ChannelInboundHandlerAdapter() { pipeline.addLast(object : ChannelInboundHandlerAdapter() {
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) { override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {

View File

@@ -114,9 +114,9 @@ object Parser {
"connection" -> { "connection" -> {
val writeTimeout = child.renderAttribute("write-timeout") val writeTimeout = child.renderAttribute("write-timeout")
?.let(Duration::parse) ?: Duration.of(0, ChronoUnit.SECONDS) ?.let(Duration::parse) ?: Duration.of(10, ChronoUnit.SECONDS)
val readTimeout = child.renderAttribute("read-timeout") val readTimeout = child.renderAttribute("read-timeout")
?.let(Duration::parse) ?: Duration.of(0, ChronoUnit.SECONDS) ?.let(Duration::parse) ?: Duration.of(10, ChronoUnit.SECONDS)
val idleTimeout = child.renderAttribute("idle-timeout") val idleTimeout = child.renderAttribute("idle-timeout")
?.let(Duration::parse) ?: Duration.of(30, ChronoUnit.SECONDS) ?.let(Duration::parse) ?: Duration.of(30, ChronoUnit.SECONDS)
val readIdleTimeout = child.renderAttribute("read-idle-timeout") val readIdleTimeout = child.renderAttribute("read-idle-timeout")
@@ -142,9 +142,10 @@ object Parser {
} }
"tls" -> { "tls" -> {
val verifyClients = child.renderAttribute("verify-clients")
?.let(String::toBoolean) ?: false
var keyStore: KeyStore? = null var keyStore: KeyStore? = null
var trustStore: TrustStore? = null var trustStore: TrustStore? = null
for (granChild in child.asIterable()) { for (granChild in child.asIterable()) {
when (granChild.localName) { when (granChild.localName) {
"keystore" -> { "keystore" -> {
@@ -166,19 +167,15 @@ object Parser {
val checkCertificateStatus = granChild.renderAttribute("check-certificate-status") val checkCertificateStatus = granChild.renderAttribute("check-certificate-status")
?.let(String::toBoolean) ?.let(String::toBoolean)
?: false ?: false
val requireClientCertificate = child.renderAttribute("require-client-certificate")
?.let(String::toBoolean) ?: false
trustStore = TrustStore( trustStore = TrustStore(
trustStoreFile, trustStoreFile,
trustStorePassword, trustStorePassword,
checkCertificateStatus, checkCertificateStatus
requireClientCertificate
) )
} }
} }
} }
tls = Tls(keyStore, trustStore) tls = Tls(keyStore, trustStore, verifyClients)
} }
} }
} }

View File

@@ -154,6 +154,9 @@ object Serializer {
conf.tls?.let { tlsConfiguration -> conf.tls?.let { tlsConfiguration ->
node("tls") { node("tls") {
if(tlsConfiguration.isVerifyClients) {
attr("verify-clients", "true")
}
tlsConfiguration.keyStore?.let { keyStore -> tlsConfiguration.keyStore?.let { keyStore ->
node("keystore") { node("keystore") {
attr("file", keyStore.file.toString()) attr("file", keyStore.file.toString())
@@ -174,7 +177,6 @@ object Serializer {
attr("password", password) attr("password", password)
} }
attr("check-certificate-status", trustStore.isCheckCertificateStatus.toString()) attr("check-certificate-status", trustStore.isCheckCertificateStatus.toString())
attr("require-client-certificate", trustStore.isRequireClientCertificate.toString())
} }
} }
} }

View File

@@ -34,8 +34,8 @@
</xs:complexType> </xs:complexType>
<xs:complexType name="connectionType"> <xs:complexType name="connectionType">
<xs:attribute name="read-timeout" type="xs:duration" use="optional" default="PT0S"/> <xs:attribute name="read-timeout" type="xs:duration" use="optional" default="PT10S"/>
<xs:attribute name="write-timeout" type="xs:duration" use="optional" default="PT0S"/> <xs:attribute name="write-timeout" type="xs:duration" use="optional" default="PT10S"/>
<xs:attribute name="idle-timeout" type="xs:duration" use="optional" default="PT30S"/> <xs:attribute name="idle-timeout" type="xs:duration" use="optional" default="PT30S"/>
<xs:attribute name="read-idle-timeout" type="xs:duration" use="optional" default="PT60S"/> <xs:attribute name="read-idle-timeout" type="xs:duration" use="optional" default="PT60S"/>
<xs:attribute name="write-idle-timeout" type="xs:duration" use="optional" default="PT60S"/> <xs:attribute name="write-idle-timeout" type="xs:duration" use="optional" default="PT60S"/>
@@ -183,6 +183,7 @@
<xs:element name="keystore" type="gbcs:keyStoreType" /> <xs:element name="keystore" type="gbcs:keyStoreType" />
<xs:element name="truststore" type="gbcs:trustStoreType" minOccurs="0"/> <xs:element name="truststore" type="gbcs:trustStoreType" minOccurs="0"/>
</xs:all> </xs:all>
<xs:attribute name="verify-clients" type="xs:boolean" use="optional" default="false"/>
</xs:complexType> </xs:complexType>
<xs:complexType name="keyStoreType"> <xs:complexType name="keyStoreType">
@@ -196,7 +197,6 @@
<xs:attribute name="file" type="xs:string" use="required"/> <xs:attribute name="file" type="xs:string" use="required"/>
<xs:attribute name="password" type="xs:string"/> <xs:attribute name="password" type="xs:string"/>
<xs:attribute name="check-certificate-status" type="xs:boolean"/> <xs:attribute name="check-certificate-status" type="xs:boolean"/>
<xs:attribute name="require-client-certificate" type="xs:boolean" use="optional" default="false"/>
</xs:complexType> </xs:complexType>
<xs:complexType name="propertiesType"> <xs:complexType name="propertiesType">

View File

@@ -171,8 +171,9 @@ abstract class AbstractTlsServerTest : AbstractServerTest() {
), ),
Configuration.Tls( Configuration.Tls(
Configuration.KeyStore(this.serverKeyStoreFile, null, SERVER_CERTIFICATE_ENTRY, PASSWORD), Configuration.KeyStore(this.serverKeyStoreFile, null, SERVER_CERTIFICATE_ENTRY, PASSWORD),
Configuration.TrustStore(this.trustStoreFile, null, false, false), Configuration.TrustStore(this.trustStoreFile, null, false),
) true
),
) )
Xml.write(Serializer.serialize(cfg), System.out) Xml.write(Serializer.serialize(cfg), System.out)
} }

View File

@@ -20,7 +20,6 @@ class ConfigurationTest {
"classpath:net/woggioni/gbcs/server/test/valid/gbcs-default.xml", "classpath:net/woggioni/gbcs/server/test/valid/gbcs-default.xml",
"classpath:net/woggioni/gbcs/server/test/valid/gbcs-memcached.xml", "classpath:net/woggioni/gbcs/server/test/valid/gbcs-memcached.xml",
"classpath:net/woggioni/gbcs/server/test/valid/gbcs-tls.xml", "classpath:net/woggioni/gbcs/server/test/valid/gbcs-tls.xml",
"classpath:net/woggioni/gbcs/server/test/valid/gbcs-memcached-tls.xml",
] ]
) )
@ParameterizedTest @ParameterizedTest

View File

@@ -7,7 +7,6 @@ import org.bouncycastle.asn1.x500.X500Name
import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Order import org.junit.jupiter.api.Order
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import org.junit.jupiter.params.provider.ArgumentsSource
import java.net.http.HttpClient import java.net.http.HttpClient
import java.net.http.HttpRequest import java.net.http.HttpRequest
import java.net.http.HttpResponse import java.net.http.HttpResponse

View File

@@ -1,53 +0,0 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<gbcs:server xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
xmlns:gbcs="urn:net.woggioni.gbcs.server"
xmlns:gbcs-memcached="urn:net.woggioni.gbcs.server.memcached"
xs:schemaLocation="urn:net.woggioni.gbcs.server.memcached jpms://net.woggioni.gbcs.server.memcached/net/woggioni/gbcs/server/memcached/schema/gbcs-memcached.xsd urn:net.woggioni.gbcs.server jpms://net.woggioni.gbcs.server/net/woggioni/gbcs/server/schema/gbcs.xsd"
>
<bind host="0.0.0.0" port="8443" incoming-connections-backlog-size="4096"/>
<connection
max-request-size="67108864"
idle-timeout="PT30S"
read-idle-timeout="PT60S"
write-idle-timeout="PT60S"
read-timeout="PT5M"
write-timeout="PT5M"/>
<event-executor use-virtual-threads="true"/>
<cache xs:type="gbcs-memcached:memcachedCacheType" max-age="P7D" max-size="16777216" compression-mode="zip">
<server host="memcached" port="11211"/>
</cache>
<authorization>
<users>
<user name="woggioni">
<quota calls="1000" period="PT1S"/>
</user>
<user name="gitea">
<quota calls="10" period="PT1S" initial-available-calls="100" max-available-calls="100"/>
</user>
<anonymous>
<quota calls="2" period="PT5S"/>
</anonymous>
</users>
<groups>
<group name="writers">
<users>
<user ref="woggioni"/>
<user ref="gitea"/>
</users>
<roles>
<reader/>
<writer/>
</roles>
</group>
</groups>
</authorization>
<authentication>
<client-certificate>
<user-extractor attribute-name="CN" pattern="(.*)"/>
</client-certificate>
</authentication>
<tls>
<keystore file="/home/luser/ssl/gbcs.woggioni.net.pfx" key-alias="gbcs.woggioni.net" password="KEYSTORE_PASSWOR" key-password="KEY_PASSWORD"/>
<truststore file="/home/luser/ssl/woggioni.net.pfx" check-certificate-status="false" password="TRUSTSTORE_PASSWORD"/>
</tls>
</gbcs:server>

View File

@@ -60,8 +60,8 @@
<user-extractor pattern="user-pattern" attribute-name="CN"/> <user-extractor pattern="user-pattern" attribute-name="CN"/>
</client-certificate> </client-certificate>
</authentication> </authentication>
<tls> <tls verify-clients="true">
<keystore file="keystore.pfx" key-alias="key1" password="password" key-password="key-password"/> <keystore file="keystore.pfx" key-alias="key1" password="password" key-password="key-password"/>
<truststore file="truststore.pfx" password="password" check-certificate-status="true" require-client-certificate="true"/> <truststore file="truststore.pfx" password="password" check-certificate-status="true" />
</tls> </tls>
</gbcs:server> </gbcs:server>

View File

@@ -4,7 +4,7 @@ org.gradle.caching=true
gbcs.version = 0.0.11 gbcs.version = 0.0.11
lys.version = 2025.01.25 lys.version = 2025.01.24
gitea.maven.url = https://gitea.woggioni.net/api/packages/woggioni/maven gitea.maven.url = https://gitea.woggioni.net/api/packages/woggioni/maven
docker.registry.url=gitea.woggioni.net docker.registry.url=gitea.woggioni.net

View File

@@ -1,5 +1,6 @@
pluginManagement { pluginManagement {
repositories { repositories {
mavenLocal()
maven { maven {
url = getProperty('gitea.maven.url') url = getProperty('gitea.maven.url')
} }