Compare commits

..

3 Commits

Author SHA1 Message Date
ba961bd30d added optional even listener to client API 2025-01-27 23:55:59 +08:00
45458761f3 made TLS client certificate request from the server configurable
All checks were successful
CI / build (push) Successful in 4m2s
2025-01-27 13:32:04 +08:00
90a5834f5f added retry policy to gbcs-client 2025-01-27 13:12:12 +08:00
22 changed files with 556 additions and 95 deletions

View File

@@ -46,6 +46,12 @@ allprojects { subproject ->
} }
} }
dependencies {
testImplementation catalog.junit.jupiter.api
testImplementation catalog.junit.jupiter.params
testRuntimeOnly catalog.junit.jupiter.engine
}
test { test {
useJUnitPlatform() useJUnitPlatform()
} }

View File

@@ -91,11 +91,14 @@ public class Configuration {
boolean verifyClients; boolean verifyClients;
} }
public enum ClientCertificate {
REQUIRED, OPTIONAL
}
@Value @Value
public static class Tls { public static class Tls {
KeyStore keyStore; KeyStore keyStore;
TrustStore trustStore; TrustStore trustStore;
boolean verifyClients;
} }
@Value @Value
@@ -111,6 +114,7 @@ public class Configuration {
Path file; Path file;
String password; String password;
boolean checkCertificateStatus; boolean checkCertificateStatus;
boolean requireClientCertificate;
} }
@Value @Value

View File

@@ -7,6 +7,7 @@ module net.woggioni.gbcs.cli {
requires kotlin.stdlib; requires kotlin.stdlib;
requires net.woggioni.jwo; requires net.woggioni.jwo;
requires net.woggioni.gbcs.api; requires net.woggioni.gbcs.api;
requires io.netty.codec.http;
exports net.woggioni.gbcs.cli.impl.converters to info.picocli; exports net.woggioni.gbcs.cli.impl.converters to info.picocli;
opens net.woggioni.gbcs.cli.impl.commands to info.picocli; opens net.woggioni.gbcs.cli.impl.commands to info.picocli;

View File

@@ -1,10 +1,13 @@
package net.woggioni.gbcs.cli.impl.commands package net.woggioni.gbcs.cli.impl.commands
import io.netty.handler.codec.http.FullHttpRequest
import io.netty.handler.codec.http.FullHttpResponse
import net.woggioni.gbcs.common.contextLogger import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.common.error import net.woggioni.gbcs.common.error
import net.woggioni.gbcs.common.info import net.woggioni.gbcs.common.info
import net.woggioni.gbcs.cli.impl.GbcsCommand import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.client.GradleBuildCacheClient import net.woggioni.gbcs.client.GradleBuildCacheClient
import net.woggioni.gbcs.client.RequestEventListener
import net.woggioni.jwo.JWO import net.woggioni.jwo.JWO
import picocli.CommandLine import picocli.CommandLine
import java.security.SecureRandom import java.security.SecureRandom
@@ -14,6 +17,7 @@ import java.util.Base64
import java.util.concurrent.ExecutionException import java.util.concurrent.ExecutionException
import java.util.concurrent.Future import java.util.concurrent.Future
import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.Semaphore
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import kotlin.random.Random import kotlin.random.Random
@@ -53,53 +57,87 @@ class BenchmarkCommand : GbcsCommand() {
} }
} }
log.info {
"Starting insertion"
}
val entries = let { val entries = let {
val completionQueue = LinkedBlockingQueue<Future<Pair<String, ByteArray>>>(numberOfEntries) val completionCounter = AtomicLong(0)
val completionQueue = LinkedBlockingQueue<Pair<String, ByteArray>>(numberOfEntries)
val start = Instant.now() val start = Instant.now()
val semaphore = Semaphore(profile.maxConnections * 3)
val totalElapsedTime = AtomicLong(0) val totalElapsedTime = AtomicLong(0)
entryGenerator.take(numberOfEntries).forEach { entry -> val iterator = entryGenerator.take(numberOfEntries).iterator()
val requestStart = System.nanoTime() while(completionCounter.get() < numberOfEntries) {
val future = client.put(entry.first, entry.second).thenApply { entry } if(iterator.hasNext()) {
future.whenComplete { _, _ -> val entry = iterator.next()
semaphore.acquire()
val eventListener = object : RequestEventListener {
var start: Long? = null
override fun requestSent(req: FullHttpRequest) {
this.start = System.nanoTime()
}
override fun responseReceived(res: FullHttpResponse) {
this.start?.let { requestStart ->
totalElapsedTime.addAndGet((System.nanoTime() - requestStart)) totalElapsedTime.addAndGet((System.nanoTime() - requestStart))
completionQueue.put(future) }
this.start = null
}
}
val future = client.put(entry.first, entry.second, eventListener).thenApply { entry }
future.whenComplete { result, ex ->
if (ex != null) {
log.error(ex.message, ex)
} else {
completionQueue.put(result)
}
semaphore.release()
completionCounter.incrementAndGet()
}
} }
} }
val inserted = sequence<Pair<String, ByteArray>> { val inserted = completionQueue.toList()
var completionCounter = 0
while (completionCounter < numberOfEntries) {
val future = completionQueue.take()
try {
yield(future.get())
} catch (ee: ExecutionException) {
val cause = ee.cause ?: ee
log.error(cause.message, cause)
}
completionCounter += 1
}
}.toList()
val end = Instant.now() val end = Instant.now()
log.info { log.info {
val elapsed = Duration.between(start, end).toMillis() val elapsed = Duration.between(start, end).toMillis()
"Insertion rate: ${numberOfEntries.toDouble() / elapsed * 1000} ops/s" val opsPerSecond = String.format("%.2f", numberOfEntries.toDouble() / elapsed * 1000)
"Insertion rate: $opsPerSecond ops/s"
} }
log.info { log.info {
"Average time per insertion: ${totalElapsedTime.get() / numberOfEntries.toDouble() * 1000} ms" val avgTxTime = String.format("%.0f", totalElapsedTime.get() / numberOfEntries.toDouble() / 1e6)
"Average time per insertion: $avgTxTime ms"
} }
inserted inserted
} }
log.info { log.info {
"Inserted ${entries.size} entries" "Inserted ${entries.size} entries"
} }
log.info {
"Starting retrieval"
}
if (entries.isNotEmpty()) { if (entries.isNotEmpty()) {
val completionQueue = LinkedBlockingQueue<Future<Unit>>(entries.size) val completionCounter = AtomicLong(0)
val semaphore = Semaphore(profile.maxConnections * 3)
val start = Instant.now() val start = Instant.now()
val totalElapsedTime = AtomicLong(0) val totalElapsedTime = AtomicLong(0)
entries.forEach { entry -> entries.forEach { entry ->
val requestStart = System.nanoTime() semaphore.acquire()
val future = client.get(entry.first).thenApply { val eventListener = object : RequestEventListener {
var start : Long? = null
override fun requestSent(req: FullHttpRequest) {
this.start = System.nanoTime()
}
override fun responseReceived(res: FullHttpResponse) {
this.start?.let { requestStart ->
totalElapsedTime.addAndGet((System.nanoTime() - requestStart)) totalElapsedTime.addAndGet((System.nanoTime() - requestStart))
}
this.start = null
}
}
val future = client.get(entry.first, eventListener).thenApply {
if (it == null) { if (it == null) {
log.error { log.error {
"Missing entry for key '${entry.first}'" "Missing entry for key '${entry.first}'"
@@ -111,21 +149,19 @@ class BenchmarkCommand : GbcsCommand() {
} }
} }
future.whenComplete { _, _ -> future.whenComplete { _, _ ->
completionQueue.put(future) completionCounter.incrementAndGet()
semaphore.release()
} }
} }
var completionCounter = 0
while (completionCounter < entries.size) {
completionQueue.take()
completionCounter += 1
}
val end = Instant.now() val end = Instant.now()
log.info { log.info {
val elapsed = Duration.between(start, end).toMillis() val elapsed = Duration.between(start, end).toMillis()
"Retrieval rate: ${entries.size.toDouble() / elapsed * 1000} ops/s" val opsPerSecond = String.format("%.2f", entries.size.toDouble() / elapsed * 1000)
"Retrieval rate: $opsPerSecond ops/s"
} }
log.info { log.info {
"Average time per retrieval: ${totalElapsedTime.get() / numberOfEntries.toDouble() * 1e6} ms" val avgTxTime = String.format("%.0f", totalElapsedTime.get() / completionCounter.toDouble() / 1e6)
"Average time per retrieval: $avgTxTime ms"
} }
} else { } else {
log.error("Skipping retrieval benchmark as it was not possible to insert any entry in the cache") log.error("Skipping retrieval benchmark as it was not possible to insert any entry in the cache")

View File

@@ -10,6 +10,8 @@ dependencies {
implementation catalog.slf4j.api implementation catalog.slf4j.api
implementation catalog.netty.buffer implementation catalog.netty.buffer
implementation catalog.netty.codec.http implementation catalog.netty.codec.http
testRuntimeOnly catalog.logback.classic
} }

View File

@@ -66,11 +66,18 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
data class BasicAuthenticationCredentials(val username: String, val password: String) : Authentication() data class BasicAuthenticationCredentials(val username: String, val password: String) : Authentication()
} }
class RetryPolicy(
val maxAttempts: Int,
val initialDelayMillis: Long,
val exp: Double
)
data class Profile( data class Profile(
val serverURI: URI, val serverURI: URI,
val authentication: Authentication?, val authentication: Authentication?,
val connectionTimeout : Duration?, val connectionTimeout: Duration?,
val maxConnections : Int val maxConnections: Int,
val retryPolicy: RetryPolicy?,
) )
companion object { companion object {
@@ -161,9 +168,55 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
pool = FixedChannelPool(bootstrap, channelPoolHandler, profile.maxConnections) pool = FixedChannelPool(bootstrap, channelPoolHandler, profile.maxConnections)
} }
fun get(key: String): CompletableFuture<ByteArray?> { private fun executeWithRetry(operation: () -> CompletableFuture<FullHttpResponse>): CompletableFuture<FullHttpResponse> {
return sendRequest(profile.serverURI.resolve(key), HttpMethod.GET, null) val retryPolicy = profile.retryPolicy
.thenApply { return if (retryPolicy != null) {
val outcomeHandler = OutcomeHandler<FullHttpResponse> { outcome ->
when (outcome) {
is OperationOutcome.Success -> {
val response = outcome.result
val status = response.status()
when (status) {
HttpResponseStatus.TOO_MANY_REQUESTS -> {
val retryAfter = response.headers()[HttpHeaderNames.RETRY_AFTER]?.let { headerValue ->
try {
headerValue.toLong() * 1000
} catch (nfe: NumberFormatException) {
null
}
}
OutcomeHandlerResult.Retry(retryAfter)
}
HttpResponseStatus.INTERNAL_SERVER_ERROR, HttpResponseStatus.SERVICE_UNAVAILABLE ->
OutcomeHandlerResult.Retry()
else -> OutcomeHandlerResult.DoNotRetry()
}
}
is OperationOutcome.Failure -> {
OutcomeHandlerResult.Retry()
}
}
}
executeWithRetry(
group,
retryPolicy.maxAttempts,
retryPolicy.initialDelayMillis.toDouble(),
retryPolicy.exp,
outcomeHandler,
operation
)
} else {
operation()
}
}
fun get(key: String, eventListener : RequestEventListener? = null): CompletableFuture<ByteArray?> {
return executeWithRetry {
sendRequest(profile.serverURI.resolve(key), HttpMethod.GET, null, eventListener)
}.thenApply {
val status = it.status() val status = it.status()
if (it.status() == HttpResponseStatus.NOT_FOUND) { if (it.status() == HttpResponseStatus.NOT_FOUND) {
null null
@@ -181,8 +234,10 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
} }
} }
fun put(key: String, content: ByteArray): CompletableFuture<Unit> { fun put(key: String, content: ByteArray, eventListener : RequestEventListener? = null): CompletableFuture<Unit> {
return sendRequest(profile.serverURI.resolve(key), HttpMethod.PUT, content).thenApply { return executeWithRetry {
sendRequest(profile.serverURI.resolve(key), HttpMethod.PUT, content, eventListener)
}.thenApply {
val status = it.status() val status = it.status()
if (it.status() != HttpResponseStatus.CREATED && it.status() != HttpResponseStatus.OK) { if (it.status() != HttpResponseStatus.CREATED && it.status() != HttpResponseStatus.OK) {
throw HttpException(status) throw HttpException(status)
@@ -190,7 +245,7 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
} }
} }
private fun sendRequest(uri: URI, method: HttpMethod, body: ByteArray?): CompletableFuture<FullHttpResponse> { private fun sendRequest(uri: URI, method: HttpMethod, body: ByteArray?, eventListener : RequestEventListener?): CompletableFuture<FullHttpResponse> {
val responseFuture = CompletableFuture<FullHttpResponse>() val responseFuture = CompletableFuture<FullHttpResponse>()
// Custom handler for processing responses // Custom handler for processing responses
pool.acquire().addListener(object : GenericFutureListener<NettyFuture<Channel>> { pool.acquire().addListener(object : GenericFutureListener<NettyFuture<Channel>> {
@@ -206,6 +261,7 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
pipeline.removeLast() pipeline.removeLast()
pool.release(channel) pool.release(channel)
responseFuture.complete(response) responseFuture.complete(response)
eventListener?.responseReceived(response)
} }
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
@@ -251,7 +307,11 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
// Set headers // Set headers
// Send the request // Send the request
channel.writeAndFlush(request) channel.writeAndFlush(request).addListener {
if(it.isSuccess) {
eventListener?.requestSent(request)
}
}
} else { } else {
responseFuture.completeExceptionally(channelFuture.cause()) responseFuture.completeExceptionally(channelFuture.cause())
} }

View File

@@ -0,0 +1,10 @@
package net.woggioni.gbcs.client
import io.netty.handler.codec.http.FullHttpRequest
import io.netty.handler.codec.http.FullHttpResponse
interface RequestEventListener {
fun requestSent(req : FullHttpRequest) {}
fun responseReceived(res : FullHttpResponse) {}
fun exceptionCaught(ex : Throwable) {}
}

View File

@@ -17,16 +17,18 @@ object Parser {
fun parse(document: Document): GradleBuildCacheClient.Configuration { fun parse(document: Document): GradleBuildCacheClient.Configuration {
val root = document.documentElement val root = document.documentElement
val profiles = mutableMapOf<String, GradleBuildCacheClient.Configuration.Profile>() val profiles = mutableMapOf<String, GradleBuildCacheClient.Configuration.Profile>()
for (child in root.asIterable()) { for (child in root.asIterable()) {
val tagName = child.localName val tagName = child.localName
when (tagName) { when (tagName) {
"profile" -> { "profile" -> {
val name = child.renderAttribute("name") ?: throw ConfigurationException("name attribute is required") val name =
val uri = child.renderAttribute("base-url")?.let(::URI) ?: throw ConfigurationException("base-url attribute is required") child.renderAttribute("name") ?: throw ConfigurationException("name attribute is required")
val uri = child.renderAttribute("base-url")?.let(::URI)
?: throw ConfigurationException("base-url attribute is required")
var authentication: GradleBuildCacheClient.Configuration.Authentication? = null var authentication: GradleBuildCacheClient.Configuration.Authentication? = null
var retryPolicy: GradleBuildCacheClient.Configuration.RetryPolicy? = null
for (gchild in child.asIterable()) { for (gchild in child.asIterable()) {
when (gchild.localName) { when (gchild.localName) {
"tls-client-auth" -> { "tls-client-auth" -> {
@@ -47,14 +49,42 @@ object Parser {
.toList() .toList()
.toTypedArray() .toTypedArray()
authentication = authentication =
GradleBuildCacheClient.Configuration.Authentication.TlsClientAuthenticationCredentials(key, certChain) GradleBuildCacheClient.Configuration.Authentication.TlsClientAuthenticationCredentials(
key,
certChain
)
} }
"basic-auth" -> { "basic-auth" -> {
val username = gchild.renderAttribute("user") ?: throw ConfigurationException("username attribute is required") val username = gchild.renderAttribute("user")
val password = gchild.renderAttribute("password") ?: throw ConfigurationException("password attribute is required") ?: throw ConfigurationException("username attribute is required")
val password = gchild.renderAttribute("password")
?: throw ConfigurationException("password attribute is required")
authentication = authentication =
GradleBuildCacheClient.Configuration.Authentication.BasicAuthenticationCredentials(username, password) GradleBuildCacheClient.Configuration.Authentication.BasicAuthenticationCredentials(
username,
password
)
}
"retry-policy" -> {
val maxAttempts =
gchild.renderAttribute("max-attempts")
?.let(String::toInt)
?: throw ConfigurationException("max-attempts attribute is required")
val initialDelay =
gchild.renderAttribute("initial-delay")
?.let(Duration::parse)
?: Duration.ofSeconds(1)
val exp =
gchild.renderAttribute("exp")
?.let(String::toDouble)
?: 2.0f
retryPolicy = GradleBuildCacheClient.Configuration.RetryPolicy(
maxAttempts,
initialDelay.toMillis(),
exp.toDouble()
)
} }
} }
} }
@@ -63,7 +93,13 @@ object Parser {
?: 50 ?: 50
val connectionTimeout = child.renderAttribute("connection-timeout") val connectionTimeout = child.renderAttribute("connection-timeout")
?.let(Duration::parse) ?.let(Duration::parse)
profiles[name] = GradleBuildCacheClient.Configuration.Profile(uri, authentication, connectionTimeout, maxConnections) profiles[name] = GradleBuildCacheClient.Configuration.Profile(
uri,
authentication,
connectionTimeout,
maxConnections,
retryPolicy
)
} }
} }
} }

View File

@@ -0,0 +1,75 @@
package net.woggioni.gbcs.client
import io.netty.util.concurrent.EventExecutorGroup
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit
sealed class OperationOutcome<T> {
class Success<T>(val result: T) : OperationOutcome<T>()
class Failure<T>(val ex: Throwable) : OperationOutcome<T>()
}
sealed class OutcomeHandlerResult {
class Retry(val suggestedDelayMillis: Long? = null) : OutcomeHandlerResult()
class DoNotRetry : OutcomeHandlerResult()
}
fun interface OutcomeHandler<T> {
fun shouldRetry(result: OperationOutcome<T>): OutcomeHandlerResult
}
fun <T> executeWithRetry(
eventExecutorGroup: EventExecutorGroup,
maxAttempts: Int,
initialDelay: Double,
exp: Double,
outcomeHandler: OutcomeHandler<T>,
cb: () -> CompletableFuture<T>
): CompletableFuture<T> {
val finalResult = cb()
var future = finalResult
var shortCircuit = false
for (i in 1 until maxAttempts) {
future = future.handle { result, ex ->
val operationOutcome = if (ex == null) {
OperationOutcome.Success(result)
} else {
OperationOutcome.Failure(ex.cause ?: ex)
}
if (shortCircuit) {
when(operationOutcome) {
is OperationOutcome.Failure -> throw operationOutcome.ex
is OperationOutcome.Success -> CompletableFuture.completedFuture(operationOutcome.result)
}
} else {
when(val outcomeHandlerResult = outcomeHandler.shouldRetry(operationOutcome)) {
is OutcomeHandlerResult.Retry -> {
val res = CompletableFuture<T>()
val delay = run {
val scheduledDelay = (initialDelay * Math.pow(exp, i.toDouble())).toLong()
outcomeHandlerResult.suggestedDelayMillis?.coerceAtMost(scheduledDelay) ?: scheduledDelay
}
eventExecutorGroup.schedule({
cb().handle { result, ex ->
if (ex == null) {
res.complete(result)
} else {
res.completeExceptionally(ex)
}
}
}, delay, TimeUnit.MILLISECONDS)
res
}
is OutcomeHandlerResult.DoNotRetry -> {
shortCircuit = true
when(operationOutcome) {
is OperationOutcome.Failure -> throw operationOutcome.ex
is OperationOutcome.Success -> CompletableFuture.completedFuture(operationOutcome.result)
}
}
}
}
}.thenCompose { it }
}
return future
}

View File

@@ -13,11 +13,14 @@
</xs:complexType> </xs:complexType>
<xs:complexType name="profileType"> <xs:complexType name="profileType">
<xs:sequence>
<xs:choice> <xs:choice>
<xs:element name="no-auth" type="gbcs-client:noAuthType"/> <xs:element name="no-auth" type="gbcs-client:noAuthType"/>
<xs:element name="basic-auth" type="gbcs-client:basicAuthType"/> <xs:element name="basic-auth" type="gbcs-client:basicAuthType"/>
<xs:element name="tls-client-auth" type="gbcs-client:tlsClientAuthType"/> <xs:element name="tls-client-auth" type="gbcs-client:tlsClientAuthType"/>
</xs:choice> </xs:choice>
<xs:element name="retry-policy" type="gbcs-client:retryType" minOccurs="0"/>
</xs:sequence>
<xs:attribute name="name" type="xs:token" use="required"/> <xs:attribute name="name" type="xs:token" use="required"/>
<xs:attribute name="base-url" type="xs:anyURI" use="required"/> <xs:attribute name="base-url" type="xs:anyURI" use="required"/>
<xs:attribute name="max-connections" type="xs:positiveInteger" default="50"/> <xs:attribute name="max-connections" type="xs:positiveInteger" default="50"/>
@@ -38,4 +41,10 @@
<xs:attribute name="key-password" type="xs:string" use="optional"/> <xs:attribute name="key-password" type="xs:string" use="optional"/>
</xs:complexType> </xs:complexType>
<xs:complexType name="retryType">
<xs:attribute name="max-attempts" type="xs:positiveInteger" use="required"/>
<xs:attribute name="initial-delay" type="xs:duration" default="PT1S"/>
<xs:attribute name="exp" type="xs:double" default="2.0"/>
</xs:complexType>
</xs:schema> </xs:schema>

View File

@@ -0,0 +1,149 @@
package net.woggioni.gbcs.client
import io.netty.util.concurrent.DefaultEventExecutorGroup
import io.netty.util.concurrent.EventExecutorGroup
import net.woggioni.gbcs.common.contextLogger
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtensionContext
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments
import org.junit.jupiter.params.provider.ArgumentsProvider
import org.junit.jupiter.params.provider.ArgumentsSource
import java.util.concurrent.CompletableFuture
import java.util.stream.Stream
import kotlin.random.Random
class RetryTest {
data class TestArgs(
val seed: Int,
val maxAttempt: Int,
val initialDelay: Double,
val exp: Double,
)
class TestArguments : ArgumentsProvider {
override fun provideArguments(context: ExtensionContext): Stream<out Arguments> {
return Stream.of(
TestArgs(
seed = 101325,
maxAttempt = 5,
initialDelay = 50.0,
exp = 2.0,
),
TestArgs(
seed = 101325,
maxAttempt = 20,
initialDelay = 100.0,
exp = 1.1,
),
TestArgs(
seed = 123487,
maxAttempt = 20,
initialDelay = 100.0,
exp = 2.0,
),
TestArgs(
seed = 20082024,
maxAttempt = 10,
initialDelay = 100.0,
exp = 2.0,
)
).map {
object: Arguments {
override fun get() = arrayOf(it)
}
}
}
}
@ArgumentsSource(TestArguments::class)
@ParameterizedTest
fun test(testArgs: TestArgs) {
val log = contextLogger()
log.debug("Start")
val executor: EventExecutorGroup = DefaultEventExecutorGroup(1)
val attempts = mutableListOf<Pair<Long, OperationOutcome<Int>>>()
val outcomeHandler = OutcomeHandler<Int> { outcome ->
when(outcome) {
is OperationOutcome.Success -> {
if(outcome.result % 10 == 0) {
OutcomeHandlerResult.DoNotRetry()
} else {
OutcomeHandlerResult.Retry(null)
}
}
is OperationOutcome.Failure -> {
when(outcome.ex) {
is IllegalStateException -> {
log.debug(outcome.ex.message, outcome.ex)
OutcomeHandlerResult.Retry(null)
}
else -> {
OutcomeHandlerResult.DoNotRetry()
}
}
}
}
}
val random = Random(testArgs.seed)
val future =
executeWithRetry(executor, testArgs.maxAttempt, testArgs.initialDelay, testArgs.exp, outcomeHandler) {
val now = System.nanoTime()
val result = CompletableFuture<Int>()
executor.submit {
val n = random.nextInt(0, Integer.MAX_VALUE)
log.debug("Got new number: {}", n)
if(n % 3 == 0) {
val ex = IllegalStateException("Value $n can be divided by 3")
result.completeExceptionally(ex)
attempts += now to OperationOutcome.Failure(ex)
} else if(n % 7 == 0) {
val ex = RuntimeException("Value $n can be divided by 7")
result.completeExceptionally(ex)
attempts += now to OperationOutcome.Failure(ex)
} else {
result.complete(n)
attempts += now to OperationOutcome.Success(n)
}
}
result
}
Assertions.assertTrue(attempts.size <= testArgs.maxAttempt)
val result = future.handle { res, ex ->
if(ex != null) {
val err = ex.cause ?: ex
log.debug(err.message, err)
OperationOutcome.Failure(err)
} else {
OperationOutcome.Success(res)
}
}.get()
for ((index, attempt) in attempts.withIndex()) {
val (timestamp, value) = attempt
if (index > 0) {
/* Check the delay for subsequent attempts is correct */
val previousAttempt = attempts[index - 1]
val expectedTimestamp =
previousAttempt.first + testArgs.initialDelay * Math.pow(testArgs.exp, index.toDouble()) * 1e6
val actualTimestamp = timestamp
val err = Math.abs(expectedTimestamp - actualTimestamp) / expectedTimestamp
Assertions.assertTrue(err < 1e-3)
}
if (index == attempts.size - 1 && index < testArgs.maxAttempt - 1) {
/*
* If the last attempt index is lower than the maximum number of attempts, then
* check the outcome handler returns DoNotRetry
*/
Assertions.assertTrue(outcomeHandler.shouldRetry(value) is OutcomeHandlerResult.DoNotRetry)
} else if (index < attempts.size - 1) {
/*
* If the attempt is not the last attempt check the outcome handler returns Retry
*/
Assertions.assertTrue(outcomeHandler.shouldRetry(value) is OutcomeHandlerResult.Retry)
}
}
}
}

View File

@@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration>
<configuration>
<import class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"/>
<import class="ch.qos.logback.core.ConsoleAppender"/>
<appender name="console" class="ConsoleAppender">
<target>System.err</target>
<encoder class="PatternLayoutEncoder">
<pattern>%d [%highlight(%-5level)] \(%thread\) %logger{36} -%kvp- %msg %n</pattern>
</encoder>
</appender>
<root level="info">
<appender-ref ref="console"/>
</root>
<logger name="io.netty" level="info"/>
<logger name="com.google.code.yanf4j" level="warn"/>
<logger name="net.rubyeye.xmemcached" level="warn"/>
</configuration>

View File

@@ -18,9 +18,6 @@ dependencies {
testImplementation catalog.bcprov.jdk18on testImplementation catalog.bcprov.jdk18on
testImplementation catalog.bcpkix.jdk18on testImplementation catalog.bcpkix.jdk18on
testImplementation catalog.junit.jupiter.api
testImplementation catalog.junit.jupiter.params
testRuntimeOnly catalog.junit.jupiter.engine
testRuntimeOnly project(":gbcs-server-memcached") testRuntimeOnly project(":gbcs-server-memcached")
} }

View File

@@ -208,15 +208,15 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
.map { it as X509Certificate } .map { it as X509Certificate }
.toArray { size -> Array<X509Certificate?>(size) { null } } .toArray { size -> Array<X509Certificate?>(size) { null } }
SslContextBuilder.forServer(serverKey, *serverCert).apply { SslContextBuilder.forServer(serverKey, *serverCert).apply {
if (tls.isVerifyClients) { val clientAuth = tls.trustStore?.let { trustStore ->
clientAuth(ClientAuth.OPTIONAL)
tls.trustStore?.let { trustStore ->
val ts = loadKeystore(trustStore.file, trustStore.password) val ts = loadKeystore(trustStore.file, trustStore.password)
trustManager( trustManager(
ClientCertificateValidator.getTrustManager(ts, trustStore.isCheckCertificateStatus) ClientCertificateValidator.getTrustManager(ts, trustStore.isCheckCertificateStatus)
) )
} if(trustStore.isRequireClientCertificate) ClientAuth.REQUIRE
} else ClientAuth.OPTIONAL
} ?: ClientAuth.NONE
clientAuth(clientAuth)
}.build() }.build()
} }
} }

View File

@@ -142,10 +142,9 @@ object Parser {
} }
"tls" -> { "tls" -> {
val verifyClients = child.renderAttribute("verify-clients")
?.let(String::toBoolean) ?: false
var keyStore: KeyStore? = null var keyStore: KeyStore? = null
var trustStore: TrustStore? = null var trustStore: TrustStore? = null
for (granChild in child.asIterable()) { for (granChild in child.asIterable()) {
when (granChild.localName) { when (granChild.localName) {
"keystore" -> { "keystore" -> {
@@ -167,15 +166,19 @@ object Parser {
val checkCertificateStatus = granChild.renderAttribute("check-certificate-status") val checkCertificateStatus = granChild.renderAttribute("check-certificate-status")
?.let(String::toBoolean) ?.let(String::toBoolean)
?: false ?: false
val requireClientCertificate = child.renderAttribute("require-client-certificate")
?.let(String::toBoolean) ?: false
trustStore = TrustStore( trustStore = TrustStore(
trustStoreFile, trustStoreFile,
trustStorePassword, trustStorePassword,
checkCertificateStatus checkCertificateStatus,
requireClientCertificate
) )
} }
} }
} }
tls = Tls(keyStore, trustStore, verifyClients) tls = Tls(keyStore, trustStore)
} }
} }
} }

View File

@@ -154,9 +154,6 @@ object Serializer {
conf.tls?.let { tlsConfiguration -> conf.tls?.let { tlsConfiguration ->
node("tls") { node("tls") {
if(tlsConfiguration.isVerifyClients) {
attr("verify-clients", "true")
}
tlsConfiguration.keyStore?.let { keyStore -> tlsConfiguration.keyStore?.let { keyStore ->
node("keystore") { node("keystore") {
attr("file", keyStore.file.toString()) attr("file", keyStore.file.toString())
@@ -177,6 +174,7 @@ object Serializer {
attr("password", password) attr("password", password)
} }
attr("check-certificate-status", trustStore.isCheckCertificateStatus.toString()) attr("check-certificate-status", trustStore.isCheckCertificateStatus.toString())
attr("require-client-certificate", trustStore.isRequireClientCertificate.toString())
} }
} }
} }

View File

@@ -183,7 +183,6 @@
<xs:element name="keystore" type="gbcs:keyStoreType" /> <xs:element name="keystore" type="gbcs:keyStoreType" />
<xs:element name="truststore" type="gbcs:trustStoreType" minOccurs="0"/> <xs:element name="truststore" type="gbcs:trustStoreType" minOccurs="0"/>
</xs:all> </xs:all>
<xs:attribute name="verify-clients" type="xs:boolean" use="optional" default="false"/>
</xs:complexType> </xs:complexType>
<xs:complexType name="keyStoreType"> <xs:complexType name="keyStoreType">
@@ -197,6 +196,7 @@
<xs:attribute name="file" type="xs:string" use="required"/> <xs:attribute name="file" type="xs:string" use="required"/>
<xs:attribute name="password" type="xs:string"/> <xs:attribute name="password" type="xs:string"/>
<xs:attribute name="check-certificate-status" type="xs:boolean"/> <xs:attribute name="check-certificate-status" type="xs:boolean"/>
<xs:attribute name="require-client-certificate" type="xs:boolean" use="optional" default="false"/>
</xs:complexType> </xs:complexType>
<xs:complexType name="propertiesType"> <xs:complexType name="propertiesType">

View File

@@ -171,9 +171,8 @@ abstract class AbstractTlsServerTest : AbstractServerTest() {
), ),
Configuration.Tls( Configuration.Tls(
Configuration.KeyStore(this.serverKeyStoreFile, null, SERVER_CERTIFICATE_ENTRY, PASSWORD), Configuration.KeyStore(this.serverKeyStoreFile, null, SERVER_CERTIFICATE_ENTRY, PASSWORD),
Configuration.TrustStore(this.trustStoreFile, null, false), Configuration.TrustStore(this.trustStoreFile, null, false, false),
true )
),
) )
Xml.write(Serializer.serialize(cfg), System.out) Xml.write(Serializer.serialize(cfg), System.out)
} }

View File

@@ -20,6 +20,7 @@ class ConfigurationTest {
"classpath:net/woggioni/gbcs/server/test/valid/gbcs-default.xml", "classpath:net/woggioni/gbcs/server/test/valid/gbcs-default.xml",
"classpath:net/woggioni/gbcs/server/test/valid/gbcs-memcached.xml", "classpath:net/woggioni/gbcs/server/test/valid/gbcs-memcached.xml",
"classpath:net/woggioni/gbcs/server/test/valid/gbcs-tls.xml", "classpath:net/woggioni/gbcs/server/test/valid/gbcs-tls.xml",
"classpath:net/woggioni/gbcs/server/test/valid/gbcs-memcached-tls.xml",
] ]
) )
@ParameterizedTest @ParameterizedTest

View File

@@ -7,6 +7,7 @@ import org.bouncycastle.asn1.x500.X500Name
import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Order import org.junit.jupiter.api.Order
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import org.junit.jupiter.params.provider.ArgumentsSource
import java.net.http.HttpClient import java.net.http.HttpClient
import java.net.http.HttpRequest import java.net.http.HttpRequest
import java.net.http.HttpResponse import java.net.http.HttpResponse

View File

@@ -0,0 +1,53 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<gbcs:server xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
xmlns:gbcs="urn:net.woggioni.gbcs.server"
xmlns:gbcs-memcached="urn:net.woggioni.gbcs.server.memcached"
xs:schemaLocation="urn:net.woggioni.gbcs.server.memcached jpms://net.woggioni.gbcs.server.memcached/net/woggioni/gbcs/server/memcached/schema/gbcs-memcached.xsd urn:net.woggioni.gbcs.server jpms://net.woggioni.gbcs.server/net/woggioni/gbcs/server/schema/gbcs.xsd"
>
<bind host="0.0.0.0" port="8443" incoming-connections-backlog-size="4096"/>
<connection
max-request-size="67108864"
idle-timeout="PT30S"
read-idle-timeout="PT60S"
write-idle-timeout="PT60S"
read-timeout="PT5M"
write-timeout="PT5M"/>
<event-executor use-virtual-threads="true"/>
<cache xs:type="gbcs-memcached:memcachedCacheType" max-age="P7D" max-size="16777216" compression-mode="zip">
<server host="memcached" port="11211"/>
</cache>
<authorization>
<users>
<user name="woggioni">
<quota calls="1000" period="PT1S"/>
</user>
<user name="gitea">
<quota calls="10" period="PT1S" initial-available-calls="100" max-available-calls="100"/>
</user>
<anonymous>
<quota calls="2" period="PT5S"/>
</anonymous>
</users>
<groups>
<group name="writers">
<users>
<user ref="woggioni"/>
<user ref="gitea"/>
</users>
<roles>
<reader/>
<writer/>
</roles>
</group>
</groups>
</authorization>
<authentication>
<client-certificate>
<user-extractor attribute-name="CN" pattern="(.*)"/>
</client-certificate>
</authentication>
<tls>
<keystore file="/home/luser/ssl/gbcs.woggioni.net.pfx" key-alias="gbcs.woggioni.net" password="KEYSTORE_PASSWOR" key-password="KEY_PASSWORD"/>
<truststore file="/home/luser/ssl/woggioni.net.pfx" check-certificate-status="false" password="TRUSTSTORE_PASSWORD"/>
</tls>
</gbcs:server>

View File

@@ -60,8 +60,8 @@
<user-extractor pattern="user-pattern" attribute-name="CN"/> <user-extractor pattern="user-pattern" attribute-name="CN"/>
</client-certificate> </client-certificate>
</authentication> </authentication>
<tls verify-clients="true"> <tls>
<keystore file="keystore.pfx" key-alias="key1" password="password" key-password="key-password"/> <keystore file="keystore.pfx" key-alias="key1" password="password" key-password="key-password"/>
<truststore file="truststore.pfx" password="password" check-certificate-status="true" /> <truststore file="truststore.pfx" password="password" check-certificate-status="true" require-client-certificate="true"/>
</tls> </tls>
</gbcs:server> </gbcs:server>