Compare commits

...

5 Commits

Author SHA1 Message Date
e5fe8437a6 temporary commit 2025-02-04 13:59:44 +08:00
89153b60f8 fixed race condition in InMemoryCache 2025-02-01 10:14:13 +08:00
a2a40ab60f added semaphore to benchmark command 2025-01-28 00:00:07 +08:00
45458761f3 made TLS client certificate request from the server configurable
All checks were successful
CI / build (push) Successful in 4m2s
2025-01-27 13:32:04 +08:00
90a5834f5f added retry policy to gbcs-client 2025-01-27 13:12:12 +08:00
47 changed files with 1306 additions and 295 deletions

View File

@@ -46,6 +46,12 @@ allprojects { subproject ->
} }
} }
dependencies {
testImplementation catalog.junit.jupiter.api
testImplementation catalog.junit.jupiter.params
testRuntimeOnly catalog.junit.jupiter.engine
}
test { test {
useJUnitPlatform() useJUnitPlatform()
} }

View File

@@ -5,6 +5,7 @@ plugins {
} }
dependencies { dependencies {
api catalog.netty.buffer
} }
publishing { publishing {

View File

@@ -1,6 +1,8 @@
module net.woggioni.gbcs.api { module net.woggioni.gbcs.api {
requires static lombok; requires static lombok;
requires java.xml; requires java.xml;
requires io.netty.buffer;
exports net.woggioni.gbcs.api; exports net.woggioni.gbcs.api;
exports net.woggioni.gbcs.api.exception; exports net.woggioni.gbcs.api.exception;
exports net.woggioni.gbcs.api.event;
} }

View File

@@ -3,10 +3,10 @@ package net.woggioni.gbcs.api;
import net.woggioni.gbcs.api.exception.ContentTooLargeException; import net.woggioni.gbcs.api.exception.ContentTooLargeException;
import java.nio.channels.ReadableByteChannel; import java.nio.channels.ReadableByteChannel;
import java.util.concurrent.CompletableFuture;
public interface Cache extends AutoCloseable { public interface Cache extends AutoCloseable {
ReadableByteChannel get(String key); CompletableFuture<CallHandle<Void>> get(String key, ResponseEventListener responseEventListener);
CompletableFuture<CallHandle<Void>> put(String key) throws ContentTooLargeException;
void put(String key, byte[] content) throws ContentTooLargeException;
} }

View File

@@ -0,0 +1,10 @@
package net.woggioni.gbcs.api;
import net.woggioni.gbcs.api.event.RequestEvent;
import java.util.concurrent.CompletableFuture;
public interface CallHandle<T> {
void postEvent(RequestEvent evt);
CompletableFuture<T> call();
}

View File

@@ -91,11 +91,14 @@ public class Configuration {
boolean verifyClients; boolean verifyClients;
} }
public enum ClientCertificate {
REQUIRED, OPTIONAL
}
@Value @Value
public static class Tls { public static class Tls {
KeyStore keyStore; KeyStore keyStore;
TrustStore trustStore; TrustStore trustStore;
boolean verifyClients;
} }
@Value @Value
@@ -111,6 +114,7 @@ public class Configuration {
Path file; Path file;
String password; String password;
boolean checkCertificateStatus; boolean checkCertificateStatus;
boolean requireClientCertificate;
} }
@Value @Value

View File

@@ -0,0 +1,7 @@
package net.woggioni.gbcs.api;
import net.woggioni.gbcs.api.event.ResponseEvent;
public interface ResponseEventListener {
void listen(ResponseEvent evt);
}

View File

@@ -0,0 +1,20 @@
package net.woggioni.gbcs.api.event;
import io.netty.buffer.ByteBuf;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import net.woggioni.gbcs.api.CallHandle;
sealed public abstract class RequestEvent {
@Getter
@RequiredArgsConstructor
public static final class ChunkSent extends RequestEvent {
private final ByteBuf chunk;
}
@Getter
@RequiredArgsConstructor
public static final class LastChunkSent extends RequestEvent {
private final ByteBuf chunk;
}
}

View File

@@ -0,0 +1,28 @@
package net.woggioni.gbcs.api.event;
import io.netty.buffer.ByteBuf;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
sealed public abstract class ResponseEvent {
@Getter
@RequiredArgsConstructor
public final static class ChunkReceived extends ResponseEvent {
private final ByteBuf chunk;
}
public final static class NoContent extends ResponseEvent {
}
@Getter
@RequiredArgsConstructor
public final static class LastChunkReceived extends ResponseEvent {
private final ByteBuf chunk;
}
@Getter
@RequiredArgsConstructor
public final static class ExceptionCaught extends ResponseEvent {
private final Throwable cause;
}
}

View File

@@ -1,19 +1,17 @@
package net.woggioni.gbcs.cli.impl.commands package net.woggioni.gbcs.cli.impl.commands
import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.client.GradleBuildCacheClient
import net.woggioni.gbcs.common.contextLogger import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.common.error import net.woggioni.gbcs.common.error
import net.woggioni.gbcs.common.info import net.woggioni.gbcs.common.info
import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.client.GradleBuildCacheClient
import net.woggioni.jwo.JWO import net.woggioni.jwo.JWO
import picocli.CommandLine import picocli.CommandLine
import java.security.SecureRandom import java.security.SecureRandom
import java.time.Duration import java.time.Duration
import java.time.Instant import java.time.Instant
import java.util.Base64
import java.util.concurrent.ExecutionException
import java.util.concurrent.Future
import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.Semaphore
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import kotlin.random.Random import kotlin.random.Random
@@ -53,53 +51,55 @@ class BenchmarkCommand : GbcsCommand() {
} }
} }
log.info {
"Starting insertion"
}
val entries = let { val entries = let {
val completionQueue = LinkedBlockingQueue<Future<Pair<String, ByteArray>>>(numberOfEntries) val completionCounter = AtomicLong(0)
val completionQueue = LinkedBlockingQueue<Pair<String, ByteArray>>(numberOfEntries)
val start = Instant.now() val start = Instant.now()
val totalElapsedTime = AtomicLong(0) val semaphore = Semaphore(profile.maxConnections * 3)
entryGenerator.take(numberOfEntries).forEach { entry -> val iterator = entryGenerator.take(numberOfEntries).iterator()
val requestStart = System.nanoTime() while(completionCounter.get() < numberOfEntries) {
if(iterator.hasNext()) {
val entry = iterator.next()
semaphore.acquire()
val future = client.put(entry.first, entry.second).thenApply { entry } val future = client.put(entry.first, entry.second).thenApply { entry }
future.whenComplete { _, _ -> future.whenComplete { result, ex ->
totalElapsedTime.addAndGet((System.nanoTime() - requestStart)) if (ex != null) {
completionQueue.put(future) log.error(ex.message, ex)
} else {
completionQueue.put(result)
}
semaphore.release()
completionCounter.incrementAndGet()
}
} }
} }
val inserted = sequence<Pair<String, ByteArray>> { val inserted = completionQueue.toList()
var completionCounter = 0
while (completionCounter < numberOfEntries) {
val future = completionQueue.take()
try {
yield(future.get())
} catch (ee: ExecutionException) {
val cause = ee.cause ?: ee
log.error(cause.message, cause)
}
completionCounter += 1
}
}.toList()
val end = Instant.now() val end = Instant.now()
log.info { log.info {
val elapsed = Duration.between(start, end).toMillis() val elapsed = Duration.between(start, end).toMillis()
"Insertion rate: ${numberOfEntries.toDouble() / elapsed * 1000} ops/s" val opsPerSecond = String.format("%.2f", numberOfEntries.toDouble() / elapsed * 1000)
} "Insertion rate: $opsPerSecond ops/s"
log.info {
"Average time per insertion: ${totalElapsedTime.get() / numberOfEntries.toDouble() * 1000} ms"
} }
inserted inserted
} }
log.info { log.info {
"Inserted ${entries.size} entries" "Inserted ${entries.size} entries"
} }
log.info {
"Starting retrieval"
}
if (entries.isNotEmpty()) { if (entries.isNotEmpty()) {
val completionQueue = LinkedBlockingQueue<Future<Unit>>(entries.size) val completionCounter = AtomicLong(0)
val semaphore = Semaphore(profile.maxConnections * 3)
val start = Instant.now() val start = Instant.now()
val totalElapsedTime = AtomicLong(0)
entries.forEach { entry -> entries.forEach { entry ->
val requestStart = System.nanoTime() semaphore.acquire()
val future = client.get(entry.first).thenApply { val future = client.get(entry.first).thenApply {
totalElapsedTime.addAndGet((System.nanoTime() - requestStart))
if (it == null) { if (it == null) {
log.error { log.error {
"Missing entry for key '${entry.first}'" "Missing entry for key '${entry.first}'"
@@ -111,21 +111,15 @@ class BenchmarkCommand : GbcsCommand() {
} }
} }
future.whenComplete { _, _ -> future.whenComplete { _, _ ->
completionQueue.put(future) completionCounter.incrementAndGet()
semaphore.release()
} }
} }
var completionCounter = 0
while (completionCounter < entries.size) {
completionQueue.take()
completionCounter += 1
}
val end = Instant.now() val end = Instant.now()
log.info { log.info {
val elapsed = Duration.between(start, end).toMillis() val elapsed = Duration.between(start, end).toMillis()
"Retrieval rate: ${entries.size.toDouble() / elapsed * 1000} ops/s" val opsPerSecond = String.format("%.2f", entries.size.toDouble() / elapsed * 1000)
} "Retrieval rate: $opsPerSecond ops/s"
log.info {
"Average time per retrieval: ${totalElapsedTime.get() / numberOfEntries.toDouble() * 1e6} ms"
} }
} else { } else {
log.error("Skipping retrieval benchmark as it was not possible to insert any entry in the cache") log.error("Skipping retrieval benchmark as it was not possible to insert any entry in the cache")

View File

@@ -10,6 +10,8 @@ dependencies {
implementation catalog.slf4j.api implementation catalog.slf4j.api
implementation catalog.netty.buffer implementation catalog.netty.buffer
implementation catalog.netty.codec.http implementation catalog.netty.codec.http
testRuntimeOnly catalog.logback.classic
} }

View File

@@ -66,11 +66,18 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
data class BasicAuthenticationCredentials(val username: String, val password: String) : Authentication() data class BasicAuthenticationCredentials(val username: String, val password: String) : Authentication()
} }
class RetryPolicy(
val maxAttempts: Int,
val initialDelayMillis: Long,
val exp: Double
)
data class Profile( data class Profile(
val serverURI: URI, val serverURI: URI,
val authentication: Authentication?, val authentication: Authentication?,
val connectionTimeout: Duration?, val connectionTimeout: Duration?,
val maxConnections : Int val maxConnections: Int,
val retryPolicy: RetryPolicy?,
) )
companion object { companion object {
@@ -161,9 +168,55 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
pool = FixedChannelPool(bootstrap, channelPoolHandler, profile.maxConnections) pool = FixedChannelPool(bootstrap, channelPoolHandler, profile.maxConnections)
} }
private fun executeWithRetry(operation: () -> CompletableFuture<FullHttpResponse>): CompletableFuture<FullHttpResponse> {
val retryPolicy = profile.retryPolicy
return if (retryPolicy != null) {
val outcomeHandler = OutcomeHandler<FullHttpResponse> { outcome ->
when (outcome) {
is OperationOutcome.Success -> {
val response = outcome.result
val status = response.status()
when (status) {
HttpResponseStatus.TOO_MANY_REQUESTS -> {
val retryAfter = response.headers()[HttpHeaderNames.RETRY_AFTER]?.let { headerValue ->
try {
headerValue.toLong() * 1000
} catch (nfe: NumberFormatException) {
null
}
}
OutcomeHandlerResult.Retry(retryAfter)
}
HttpResponseStatus.INTERNAL_SERVER_ERROR, HttpResponseStatus.SERVICE_UNAVAILABLE ->
OutcomeHandlerResult.Retry()
else -> OutcomeHandlerResult.DoNotRetry()
}
}
is OperationOutcome.Failure -> {
OutcomeHandlerResult.Retry()
}
}
}
executeWithRetry(
group,
retryPolicy.maxAttempts,
retryPolicy.initialDelayMillis.toDouble(),
retryPolicy.exp,
outcomeHandler,
operation
)
} else {
operation()
}
}
fun get(key: String): CompletableFuture<ByteArray?> { fun get(key: String): CompletableFuture<ByteArray?> {
return sendRequest(profile.serverURI.resolve(key), HttpMethod.GET, null) return executeWithRetry {
.thenApply { sendRequest(profile.serverURI.resolve(key), HttpMethod.GET, null)
}.thenApply {
val status = it.status() val status = it.status()
if (it.status() == HttpResponseStatus.NOT_FOUND) { if (it.status() == HttpResponseStatus.NOT_FOUND) {
null null
@@ -182,7 +235,9 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
} }
fun put(key: String, content: ByteArray): CompletableFuture<Unit> { fun put(key: String, content: ByteArray): CompletableFuture<Unit> {
return sendRequest(profile.serverURI.resolve(key), HttpMethod.PUT, content).thenApply { return executeWithRetry {
sendRequest(profile.serverURI.resolve(key), HttpMethod.PUT, content)
}.thenApply {
val status = it.status() val status = it.status()
if (it.status() != HttpResponseStatus.CREATED && it.status() != HttpResponseStatus.OK) { if (it.status() != HttpResponseStatus.CREATED && it.status() != HttpResponseStatus.OK) {
throw HttpException(status) throw HttpException(status)

View File

@@ -17,16 +17,18 @@ object Parser {
fun parse(document: Document): GradleBuildCacheClient.Configuration { fun parse(document: Document): GradleBuildCacheClient.Configuration {
val root = document.documentElement val root = document.documentElement
val profiles = mutableMapOf<String, GradleBuildCacheClient.Configuration.Profile>() val profiles = mutableMapOf<String, GradleBuildCacheClient.Configuration.Profile>()
for (child in root.asIterable()) { for (child in root.asIterable()) {
val tagName = child.localName val tagName = child.localName
when (tagName) { when (tagName) {
"profile" -> { "profile" -> {
val name = child.renderAttribute("name") ?: throw ConfigurationException("name attribute is required") val name =
val uri = child.renderAttribute("base-url")?.let(::URI) ?: throw ConfigurationException("base-url attribute is required") child.renderAttribute("name") ?: throw ConfigurationException("name attribute is required")
val uri = child.renderAttribute("base-url")?.let(::URI)
?: throw ConfigurationException("base-url attribute is required")
var authentication: GradleBuildCacheClient.Configuration.Authentication? = null var authentication: GradleBuildCacheClient.Configuration.Authentication? = null
var retryPolicy: GradleBuildCacheClient.Configuration.RetryPolicy? = null
for (gchild in child.asIterable()) { for (gchild in child.asIterable()) {
when (gchild.localName) { when (gchild.localName) {
"tls-client-auth" -> { "tls-client-auth" -> {
@@ -47,14 +49,42 @@ object Parser {
.toList() .toList()
.toTypedArray() .toTypedArray()
authentication = authentication =
GradleBuildCacheClient.Configuration.Authentication.TlsClientAuthenticationCredentials(key, certChain) GradleBuildCacheClient.Configuration.Authentication.TlsClientAuthenticationCredentials(
key,
certChain
)
} }
"basic-auth" -> { "basic-auth" -> {
val username = gchild.renderAttribute("user") ?: throw ConfigurationException("username attribute is required") val username = gchild.renderAttribute("user")
val password = gchild.renderAttribute("password") ?: throw ConfigurationException("password attribute is required") ?: throw ConfigurationException("username attribute is required")
val password = gchild.renderAttribute("password")
?: throw ConfigurationException("password attribute is required")
authentication = authentication =
GradleBuildCacheClient.Configuration.Authentication.BasicAuthenticationCredentials(username, password) GradleBuildCacheClient.Configuration.Authentication.BasicAuthenticationCredentials(
username,
password
)
}
"retry-policy" -> {
val maxAttempts =
gchild.renderAttribute("max-attempts")
?.let(String::toInt)
?: throw ConfigurationException("max-attempts attribute is required")
val initialDelay =
gchild.renderAttribute("initial-delay")
?.let(Duration::parse)
?: Duration.ofSeconds(1)
val exp =
gchild.renderAttribute("exp")
?.let(String::toDouble)
?: 2.0f
retryPolicy = GradleBuildCacheClient.Configuration.RetryPolicy(
maxAttempts,
initialDelay.toMillis(),
exp.toDouble()
)
} }
} }
} }
@@ -63,7 +93,13 @@ object Parser {
?: 50 ?: 50
val connectionTimeout = child.renderAttribute("connection-timeout") val connectionTimeout = child.renderAttribute("connection-timeout")
?.let(Duration::parse) ?.let(Duration::parse)
profiles[name] = GradleBuildCacheClient.Configuration.Profile(uri, authentication, connectionTimeout, maxConnections) profiles[name] = GradleBuildCacheClient.Configuration.Profile(
uri,
authentication,
connectionTimeout,
maxConnections,
retryPolicy
)
} }
} }
} }

View File

@@ -0,0 +1,75 @@
package net.woggioni.gbcs.client
import io.netty.util.concurrent.EventExecutorGroup
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit
sealed class OperationOutcome<T> {
class Success<T>(val result: T) : OperationOutcome<T>()
class Failure<T>(val ex: Throwable) : OperationOutcome<T>()
}
sealed class OutcomeHandlerResult {
class Retry(val suggestedDelayMillis: Long? = null) : OutcomeHandlerResult()
class DoNotRetry : OutcomeHandlerResult()
}
fun interface OutcomeHandler<T> {
fun shouldRetry(result: OperationOutcome<T>): OutcomeHandlerResult
}
fun <T> executeWithRetry(
eventExecutorGroup: EventExecutorGroup,
maxAttempts: Int,
initialDelay: Double,
exp: Double,
outcomeHandler: OutcomeHandler<T>,
cb: () -> CompletableFuture<T>
): CompletableFuture<T> {
val finalResult = cb()
var future = finalResult
var shortCircuit = false
for (i in 1 until maxAttempts) {
future = future.handle { result, ex ->
val operationOutcome = if (ex == null) {
OperationOutcome.Success(result)
} else {
OperationOutcome.Failure(ex.cause ?: ex)
}
if (shortCircuit) {
when(operationOutcome) {
is OperationOutcome.Failure -> throw operationOutcome.ex
is OperationOutcome.Success -> CompletableFuture.completedFuture(operationOutcome.result)
}
} else {
when(val outcomeHandlerResult = outcomeHandler.shouldRetry(operationOutcome)) {
is OutcomeHandlerResult.Retry -> {
val res = CompletableFuture<T>()
val delay = run {
val scheduledDelay = (initialDelay * Math.pow(exp, i.toDouble())).toLong()
outcomeHandlerResult.suggestedDelayMillis?.coerceAtMost(scheduledDelay) ?: scheduledDelay
}
eventExecutorGroup.schedule({
cb().handle { result, ex ->
if (ex == null) {
res.complete(result)
} else {
res.completeExceptionally(ex)
}
}
}, delay, TimeUnit.MILLISECONDS)
res
}
is OutcomeHandlerResult.DoNotRetry -> {
shortCircuit = true
when(operationOutcome) {
is OperationOutcome.Failure -> throw operationOutcome.ex
is OperationOutcome.Success -> CompletableFuture.completedFuture(operationOutcome.result)
}
}
}
}
}.thenCompose { it }
}
return future
}

View File

@@ -13,11 +13,14 @@
</xs:complexType> </xs:complexType>
<xs:complexType name="profileType"> <xs:complexType name="profileType">
<xs:sequence>
<xs:choice> <xs:choice>
<xs:element name="no-auth" type="gbcs-client:noAuthType"/> <xs:element name="no-auth" type="gbcs-client:noAuthType"/>
<xs:element name="basic-auth" type="gbcs-client:basicAuthType"/> <xs:element name="basic-auth" type="gbcs-client:basicAuthType"/>
<xs:element name="tls-client-auth" type="gbcs-client:tlsClientAuthType"/> <xs:element name="tls-client-auth" type="gbcs-client:tlsClientAuthType"/>
</xs:choice> </xs:choice>
<xs:element name="retry-policy" type="gbcs-client:retryType" minOccurs="0"/>
</xs:sequence>
<xs:attribute name="name" type="xs:token" use="required"/> <xs:attribute name="name" type="xs:token" use="required"/>
<xs:attribute name="base-url" type="xs:anyURI" use="required"/> <xs:attribute name="base-url" type="xs:anyURI" use="required"/>
<xs:attribute name="max-connections" type="xs:positiveInteger" default="50"/> <xs:attribute name="max-connections" type="xs:positiveInteger" default="50"/>
@@ -38,4 +41,10 @@
<xs:attribute name="key-password" type="xs:string" use="optional"/> <xs:attribute name="key-password" type="xs:string" use="optional"/>
</xs:complexType> </xs:complexType>
<xs:complexType name="retryType">
<xs:attribute name="max-attempts" type="xs:positiveInteger" use="required"/>
<xs:attribute name="initial-delay" type="xs:duration" default="PT1S"/>
<xs:attribute name="exp" type="xs:double" default="2.0"/>
</xs:complexType>
</xs:schema> </xs:schema>

View File

@@ -0,0 +1,149 @@
package net.woggioni.gbcs.client
import io.netty.util.concurrent.DefaultEventExecutorGroup
import io.netty.util.concurrent.EventExecutorGroup
import net.woggioni.gbcs.common.contextLogger
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtensionContext
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments
import org.junit.jupiter.params.provider.ArgumentsProvider
import org.junit.jupiter.params.provider.ArgumentsSource
import java.util.concurrent.CompletableFuture
import java.util.stream.Stream
import kotlin.random.Random
class RetryTest {
data class TestArgs(
val seed: Int,
val maxAttempt: Int,
val initialDelay: Double,
val exp: Double,
)
class TestArguments : ArgumentsProvider {
override fun provideArguments(context: ExtensionContext): Stream<out Arguments> {
return Stream.of(
TestArgs(
seed = 101325,
maxAttempt = 5,
initialDelay = 50.0,
exp = 2.0,
),
TestArgs(
seed = 101325,
maxAttempt = 20,
initialDelay = 100.0,
exp = 1.1,
),
TestArgs(
seed = 123487,
maxAttempt = 20,
initialDelay = 100.0,
exp = 2.0,
),
TestArgs(
seed = 20082024,
maxAttempt = 10,
initialDelay = 100.0,
exp = 2.0,
)
).map {
object: Arguments {
override fun get() = arrayOf(it)
}
}
}
}
@ArgumentsSource(TestArguments::class)
@ParameterizedTest
fun test(testArgs: TestArgs) {
val log = contextLogger()
log.debug("Start")
val executor: EventExecutorGroup = DefaultEventExecutorGroup(1)
val attempts = mutableListOf<Pair<Long, OperationOutcome<Int>>>()
val outcomeHandler = OutcomeHandler<Int> { outcome ->
when(outcome) {
is OperationOutcome.Success -> {
if(outcome.result % 10 == 0) {
OutcomeHandlerResult.DoNotRetry()
} else {
OutcomeHandlerResult.Retry(null)
}
}
is OperationOutcome.Failure -> {
when(outcome.ex) {
is IllegalStateException -> {
log.debug(outcome.ex.message, outcome.ex)
OutcomeHandlerResult.Retry(null)
}
else -> {
OutcomeHandlerResult.DoNotRetry()
}
}
}
}
}
val random = Random(testArgs.seed)
val future =
executeWithRetry(executor, testArgs.maxAttempt, testArgs.initialDelay, testArgs.exp, outcomeHandler) {
val now = System.nanoTime()
val result = CompletableFuture<Int>()
executor.submit {
val n = random.nextInt(0, Integer.MAX_VALUE)
log.debug("Got new number: {}", n)
if(n % 3 == 0) {
val ex = IllegalStateException("Value $n can be divided by 3")
result.completeExceptionally(ex)
attempts += now to OperationOutcome.Failure(ex)
} else if(n % 7 == 0) {
val ex = RuntimeException("Value $n can be divided by 7")
result.completeExceptionally(ex)
attempts += now to OperationOutcome.Failure(ex)
} else {
result.complete(n)
attempts += now to OperationOutcome.Success(n)
}
}
result
}
Assertions.assertTrue(attempts.size <= testArgs.maxAttempt)
val result = future.handle { res, ex ->
if(ex != null) {
val err = ex.cause ?: ex
log.debug(err.message, err)
OperationOutcome.Failure(err)
} else {
OperationOutcome.Success(res)
}
}.get()
for ((index, attempt) in attempts.withIndex()) {
val (timestamp, value) = attempt
if (index > 0) {
/* Check the delay for subsequent attempts is correct */
val previousAttempt = attempts[index - 1]
val expectedTimestamp =
previousAttempt.first + testArgs.initialDelay * Math.pow(testArgs.exp, index.toDouble()) * 1e6
val actualTimestamp = timestamp
val err = Math.abs(expectedTimestamp - actualTimestamp) / expectedTimestamp
Assertions.assertTrue(err < 1e-3)
}
if (index == attempts.size - 1 && index < testArgs.maxAttempt - 1) {
/*
* If the last attempt index is lower than the maximum number of attempts, then
* check the outcome handler returns DoNotRetry
*/
Assertions.assertTrue(outcomeHandler.shouldRetry(value) is OutcomeHandlerResult.DoNotRetry)
} else if (index < attempts.size - 1) {
/*
* If the attempt is not the last attempt check the outcome handler returns Retry
*/
Assertions.assertTrue(outcomeHandler.shouldRetry(value) is OutcomeHandlerResult.Retry)
}
}
}
}

View File

@@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration>
<configuration>
<import class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"/>
<import class="ch.qos.logback.core.ConsoleAppender"/>
<appender name="console" class="ConsoleAppender">
<target>System.err</target>
<encoder class="PatternLayoutEncoder">
<pattern>%d [%highlight(%-5level)] \(%thread\) %logger{36} -%kvp- %msg %n</pattern>
</encoder>
</appender>
<root level="info">
<appender-ref ref="console"/>
</root>
<logger name="io.netty" level="info"/>
<logger name="com.google.code.yanf4j" level="warn"/>
<logger name="net.rubyeye.xmemcached" level="warn"/>
</configuration>

View File

@@ -1,7 +1,9 @@
package net.woggioni.gbcs.common package net.woggioni.gbcs.common
import net.woggioni.jwo.JWO
import java.net.URI import java.net.URI
import java.net.URL import java.net.URL
import java.security.MessageDigest
object GBCS { object GBCS {
fun String.toUrl() : URL = URL.of(URI(this), null) fun String.toUrl() : URL = URL.of(URI(this), null)
@@ -9,4 +11,19 @@ object GBCS {
const val GBCS_NAMESPACE_URI: String = "urn:net.woggioni.gbcs.server" const val GBCS_NAMESPACE_URI: String = "urn:net.woggioni.gbcs.server"
const val GBCS_PREFIX: String = "gbcs" const val GBCS_PREFIX: String = "gbcs"
const val XML_SCHEMA_NAMESPACE_URI = "http://www.w3.org/2001/XMLSchema-instance" const val XML_SCHEMA_NAMESPACE_URI = "http://www.w3.org/2001/XMLSchema-instance"
fun digest(
data: ByteArray,
md: MessageDigest = MessageDigest.getInstance("MD5")
): ByteArray {
md.update(data)
return md.digest()
}
fun digestString(
data: ByteArray,
md: MessageDigest = MessageDigest.getInstance("MD5")
): String {
return JWO.bytesToHex(digest(data, md))
}
} }

View File

@@ -26,13 +26,24 @@ configurations {
canBeResolved = true canBeResolved = true
visible = true visible = true
} }
testImplementation {
extendsFrom compileOnly
}
} }
dependencies { dependencies {
compileOnly project(':gbcs-common') compileOnly project(':gbcs-common')
compileOnly project(':gbcs-api') compileOnly project(':gbcs-api')
compileOnly catalog.jwo compileOnly catalog.jwo
compileOnly catalog.slf4j.api
implementation catalog.xmemcached implementation catalog.xmemcached
implementation catalog.netty.codec.memcache
implementation catalog.netty.common
implementation group: 'io.netty', name: 'netty-handler', version: catalog.versions.netty.get()
testRuntimeOnly catalog.logback.classic
} }
Provider<Tar> bundleTask = tasks.register("bundle", Tar) { Provider<Tar> bundleTask = tasks.register("bundle", Tar) {
@@ -41,6 +52,11 @@ Provider<Tar> bundleTask = tasks.register("bundle", Tar) {
group = BasePlugin.BUILD_GROUP group = BasePlugin.BUILD_GROUP
} }
tasks.named(JavaPlugin.TEST_TASK_NAME, Test) {
systemProperty("io.netty.leakDetectionLevel", "PARANOID")
}
tasks.named(BasePlugin.ASSEMBLE_TASK_NAME) { tasks.named(BasePlugin.ASSEMBLE_TASK_NAME) {
dependsOn(bundleTask) dependsOn(bundleTask)
} }

View File

@@ -7,6 +7,13 @@ module net.woggioni.gbcs.server.memcached {
requires net.woggioni.jwo; requires net.woggioni.jwo;
requires java.xml; requires java.xml;
requires kotlin.stdlib; requires kotlin.stdlib;
requires io.netty.common;
requires io.netty.handler;
requires io.netty.codec.memcache;
requires io.netty.transport;
requires org.slf4j;
requires io.netty.buffer;
requires io.netty.codec;
provides CacheProvider with net.woggioni.gbcs.server.memcached.MemcachedCacheProvider; provides CacheProvider with net.woggioni.gbcs.server.memcached.MemcachedCacheProvider;

View File

@@ -0,0 +1,33 @@
package net.woggioni.gbcs.server.memcached
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.stream.ChunkedInput
import java.nio.channels.ReadableByteChannel
class CustomChunkedInput(private val readableByteChannel: ReadableByteChannel) : ChunkedInput<ByteBuf> {
override fun isEndOfInput(): Boolean {
TODO("Not yet implemented")
}
override fun close() {
TODO("Not yet implemented")
}
override fun readChunk(ctx: ChannelHandlerContext): ByteBuf {
TODO("Not yet implemented")
}
override fun readChunk(allocator: ByteBufAllocator): ByteBuf {
TODO("Not yet implemented")
}
override fun length(): Long {
TODO("Not yet implemented")
}
override fun progress(): Long {
TODO("Not yet implemented")
}
}

View File

@@ -0,0 +1,4 @@
package net.woggioni.gbcs.server.memcached
class MemcachedException(status : Short, msg : String? = null, cause : Throwable? = null)
: RuntimeException(msg ?: "Memcached status $status", cause)

View File

@@ -1,59 +1,85 @@
package net.woggioni.gbcs.server.memcached package net.woggioni.gbcs.server.memcached
import net.rubyeye.xmemcached.XMemcachedClientBuilder import io.netty.buffer.Unpooled
import net.rubyeye.xmemcached.command.BinaryCommandFactory import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus
import net.rubyeye.xmemcached.transcoders.CompressionMode
import net.rubyeye.xmemcached.transcoders.SerializingTranscoder
import net.woggioni.gbcs.api.Cache import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.api.exception.ContentTooLargeException import net.woggioni.gbcs.api.CallHandle
import net.woggioni.gbcs.common.HostAndPort import net.woggioni.gbcs.api.ResponseEventListener
import net.woggioni.jwo.JWO import net.woggioni.gbcs.api.event.RequestEvent
import java.io.ByteArrayInputStream import net.woggioni.gbcs.api.event.ResponseEvent
import java.net.InetSocketAddress import net.woggioni.gbcs.server.memcached.client.MemcachedClient
import java.nio.channels.Channels import net.woggioni.gbcs.server.memcached.client.ResponseEvent.ExceptionCaught
import java.nio.channels.ReadableByteChannel import net.woggioni.gbcs.server.memcached.client.ResponseEvent.LastResponseContentChunkReceived
import java.nio.charset.StandardCharsets import net.woggioni.gbcs.server.memcached.client.ResponseEvent.ResponseContentChunkReceived
import java.security.MessageDigest import net.woggioni.gbcs.server.memcached.client.ResponseEvent.ResponseReceived
import java.time.Duration import net.woggioni.gbcs.server.memcached.client.ResponseListener
import java.util.concurrent.CompletableFuture
class MemcachedCache( class MemcachedCache(
servers: List<HostAndPort>, private val cfg : MemcachedCacheConfiguration
private val maxAge: Duration,
maxSize : Int,
digestAlgorithm: String?,
compressionMode: CompressionMode,
) : Cache { ) : Cache {
private val memcachedClient = XMemcachedClientBuilder( private val client = MemcachedClient(cfg)
servers.stream().map { addr: HostAndPort -> InetSocketAddress(addr.host, addr.port) }.toList()
).apply {
commandFactory = BinaryCommandFactory()
digestAlgorithm?.let { dAlg ->
setKeyProvider { key ->
val md = MessageDigest.getInstance(dAlg)
md.update(key.toByteArray(StandardCharsets.UTF_8))
JWO.bytesToHex(md.digest())
}
}
transcoder = SerializingTranscoder(maxSize).apply {
setCompressionMode(compressionMode)
}
}.build()
override fun get(key: String): ReadableByteChannel? {
return memcachedClient.get<ByteArray>(key)
?.let(::ByteArrayInputStream)
?.let(Channels::newChannel)
}
override fun put(key: String, content: ByteArray) {
try {
memcachedClient[key, maxAge.toSeconds().toInt()] = content
} catch (e: IllegalArgumentException) {
throw ContentTooLargeException(e.message, e)
}
}
override fun close() { override fun close() {
memcachedClient.shutdown() client.close()
}
override fun get(key: String, responseEventListener: ResponseEventListener): CompletableFuture<CallHandle<Void>> {
val listener = ResponseListener { evt ->
when(evt) {
is ResponseContentChunkReceived -> {
responseEventListener.listen(ResponseEvent.ChunkReceived(Unpooled.wrappedBuffer(evt.chunk)))
}
is LastResponseContentChunkReceived -> {
responseEventListener.listen(ResponseEvent.LastChunkReceived(Unpooled.wrappedBuffer(evt.chunk)))
}
is ExceptionCaught -> {
responseEventListener.listen(ResponseEvent.ExceptionCaught(evt.cause))
}
is ResponseReceived -> {
when(val status = evt.response.status) {
BinaryMemcacheResponseStatus.SUCCESS -> {
}
BinaryMemcacheResponseStatus.KEY_ENOENT -> {
responseEventListener.listen(ResponseEvent.NoContent())
}
else -> {
responseEventListener.listen(ResponseEvent.ExceptionCaught(MemcachedException(status)))
}
}
}
}
}
return client.get(key, listener).thenApply { clientCallHandle ->
object : CallHandle<Void> {
override fun postEvent(evt: RequestEvent) {
when(evt) {
is RequestEvent.ChunkSent -> clientCallHandle.sendChunk(evt.chunk.nioBuffer())
is RequestEvent.LastChunkSent -> clientCallHandle.sendChunk(evt.chunk.nioBuffer())
}
}
override fun call(): CompletableFuture<Void> {
return clientCallHandle.waitForResponse().thenApply { null }
}
}
}
}
override fun put(key: String): CompletableFuture<CallHandle<Void>> {
return client.put(key, cfg.maxAge).thenApply { clientCallHandle ->
object : CallHandle<Void> {
override fun postEvent(evt: RequestEvent) {
when(evt) {
is RequestEvent.ChunkSent -> clientCallHandle.sendChunk(evt.chunk.nioBuffer())
is RequestEvent.LastChunkSent -> clientCallHandle.sendChunk(evt.chunk.nioBuffer())
}
}
override fun call(): CompletableFuture<Void> {
return clientCallHandle.waitForResponse().thenApply { null }
}
}
}
} }
} }

View File

@@ -1,25 +1,45 @@
package net.woggioni.gbcs.server.memcached package net.woggioni.gbcs.server.memcached
import net.rubyeye.xmemcached.transcoders.CompressionMode
import net.woggioni.gbcs.api.Configuration import net.woggioni.gbcs.api.Configuration
import net.woggioni.gbcs.common.HostAndPort import net.woggioni.gbcs.common.HostAndPort
import java.time.Duration import java.time.Duration
data class MemcachedCacheConfiguration( data class MemcachedCacheConfiguration(
var servers: List<HostAndPort>, val servers: List<Server>,
var maxAge: Duration = Duration.ofDays(1), val maxAge: Duration = Duration.ofDays(1),
var maxSize: Int = 0x100000, val maxSize: Int = 0x100000,
var digestAlgorithm: String? = null, val digestAlgorithm: String? = null,
var compressionMode: CompressionMode = CompressionMode.ZIP, val compressionMode: CompressionMode? = CompressionMode.DEFLATE,
) : Configuration.Cache { ) : Configuration.Cache {
override fun materialize() = MemcachedCache(
servers, enum class CompressionMode {
maxAge, /**
maxSize, * Gzip mode
digestAlgorithm, */
compressionMode GZIP,
/**
* Deflate mode
*/
DEFLATE
}
class RetryPolicy(
val maxAttempts: Int,
val initialDelayMillis: Long,
val exp: Double
) )
data class Server(
val endpoint : HostAndPort,
val connectionTimeoutMillis : Int?,
val retryPolicy : RetryPolicy?,
val maxConnections : Int
)
override fun materialize() = MemcachedCache(this)
override fun getNamespaceURI() = "urn:net.woggioni.gbcs.server.memcached" override fun getNamespaceURI() = "urn:net.woggioni.gbcs.server.memcached"
override fun getTypeName() = "memcachedCacheType" override fun getTypeName() = "memcachedCacheType"

View File

@@ -51,7 +51,7 @@ class MemcachedCacheProvider : CacheProvider<MemcachedCacheConfiguration> {
} }
return MemcachedCacheConfiguration( return MemcachedCacheConfiguration(
servers, servers.map { MemcachedCacheConfiguration.Server(it, null, null, 1) },
maxAge, maxAge,
maxSize, maxSize,
digestAlgorithm, digestAlgorithm,
@@ -67,8 +67,8 @@ class MemcachedCacheProvider : CacheProvider<MemcachedCacheConfiguration> {
attr("xs:type", "${xmlNamespacePrefix}:$xmlType", GBCS.XML_SCHEMA_NAMESPACE_URI) attr("xs:type", "${xmlNamespacePrefix}:$xmlType", GBCS.XML_SCHEMA_NAMESPACE_URI)
for (server in servers) { for (server in servers) {
node("server") { node("server") {
attr("host", server.host) attr("host", server.endpoint.host)
attr("port", server.port.toString()) attr("port", server.endpoint.port.toString())
} }
} }
attr("max-age", maxAge.toString()) attr("max-age", maxAge.toString())

View File

@@ -0,0 +1,9 @@
package net.woggioni.gbcs.server.memcached.client
import java.nio.ByteBuffer
import java.util.concurrent.CompletableFuture
interface CallHandle {
fun sendChunk(requestBodyChunk : ByteBuffer)
fun waitForResponse() : CompletableFuture<Short>
}

View File

@@ -0,0 +1,24 @@
package net.woggioni.gbcs.server.memcached.client
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
import java.nio.ByteBuffer
data class MemcacheResponse(
val status: Short,
val opcode: Byte,
val cas: Long?,
val opaque: Int?,
val key: ByteBuffer?,
val extra: ByteBuffer?
) {
companion object {
fun of(response : BinaryMemcacheResponse) = MemcacheResponse(
response.status(),
response.opcode(),
response.cas(),
response.opaque(),
response.key()?.nioBuffer(),
response.extras()?.nioBuffer()
)
}
}

View File

@@ -0,0 +1,241 @@
package net.woggioni.gbcs.server.memcached.client
import io.netty.bootstrap.Bootstrap
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelOption
import io.netty.channel.ChannelPipeline
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.pool.AbstractChannelPoolHandler
import io.netty.channel.pool.ChannelPool
import io.netty.channel.pool.FixedChannelPool
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.DecoderException
import io.netty.handler.codec.memcache.DefaultLastMemcacheContent
import io.netty.handler.codec.memcache.DefaultMemcacheContent
import io.netty.handler.codec.memcache.LastMemcacheContent
import io.netty.handler.codec.memcache.MemcacheContent
import io.netty.handler.codec.memcache.MemcacheObject
import io.netty.handler.codec.memcache.binary.BinaryMemcacheClientCodec
import io.netty.handler.codec.memcache.binary.BinaryMemcacheOpcodes
import io.netty.handler.codec.memcache.binary.BinaryMemcacheRequest
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus
import io.netty.handler.codec.memcache.binary.DefaultBinaryMemcacheRequest
import io.netty.util.concurrent.GenericFutureListener
import net.woggioni.gbcs.common.GBCS.digest
import net.woggioni.gbcs.common.HostAndPort
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.server.memcached.MemcachedCacheConfiguration
import net.woggioni.gbcs.server.memcached.MemcachedException
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.security.MessageDigest
import java.time.Duration
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import io.netty.util.concurrent.Future as NettyFuture
class MemcachedClient(private val cfg: MemcachedCacheConfiguration) : AutoCloseable {
private val log = contextLogger()
private val group: NioEventLoopGroup
private val connectionPool: MutableMap<HostAndPort, ChannelPool> = ConcurrentHashMap()
init {
group = NioEventLoopGroup()
}
private fun newConnectionPool(server : MemcachedCacheConfiguration.Server) : FixedChannelPool {
val bootstrap = Bootstrap().apply {
group(group)
channel(NioSocketChannel::class.java)
option(ChannelOption.SO_KEEPALIVE, true)
remoteAddress(InetSocketAddress(server.endpoint.host, server.endpoint.port))
server.connectionTimeoutMillis?.let {
option(ChannelOption.CONNECT_TIMEOUT_MILLIS, it)
}
}
val channelPoolHandler = object : AbstractChannelPoolHandler() {
override fun channelCreated(ch: Channel) {
val pipeline: ChannelPipeline = ch.pipeline()
pipeline.addLast(BinaryMemcacheClientCodec())
}
}
return FixedChannelPool(bootstrap, channelPoolHandler, server.maxConnections)
}
private fun sendRequest(request: BinaryMemcacheRequest,
responseListener: ResponseListener?
): CompletableFuture<CallHandle> {
val server = cfg.servers.let { servers ->
if(servers.size > 1) {
val key = request.key().duplicate()
var checksum = 0
while(key.readableBytes() > 4) {
val byte = key.readInt()
checksum = checksum xor byte
}
while(key.readableBytes() > 0) {
val byte = key.readByte()
checksum = checksum xor byte.toInt()
}
servers[checksum % servers.size]
} else {
servers.first()
}
}
val callHandleFuture = CompletableFuture<CallHandle>()
val result = CompletableFuture<Short>()
// Custom handler for processing responses
val pool = connectionPool.computeIfAbsent(server.endpoint) {
newConnectionPool(server)
}
pool.acquire().addListener(object : GenericFutureListener<NettyFuture<Channel>> {
override fun operationComplete(channelFuture: NettyFuture<Channel>) {
if (channelFuture.isSuccess) {
val channel = channelFuture.now
val pipeline = channel.pipeline()
channel.pipeline().addLast("handler", object : SimpleChannelInboundHandler<MemcacheObject>() {
val response : MemcacheResponse? = null
override fun channelRead0(
ctx: ChannelHandlerContext,
msg: MemcacheObject
) {
if(msg is BinaryMemcacheResponse) {
val resp = MemcacheResponse.of(msg)
responseListener?.listen(ResponseEvent.ResponseReceived(resp))
if(msg.totalBodyLength() == msg.keyLength() + msg.extrasLength()) {
result.complete(resp.status)
}
}
if(responseListener != null) {
when (msg) {
is LastMemcacheContent -> {
responseListener.listen(ResponseEvent.LastResponseContentChunkReceived(msg.content().nioBuffer()))
result.complete(response?.status)
pipeline.removeLast()
pool.release(channel)
}
is MemcacheContent -> {
responseListener.listen(ResponseEvent.ResponseContentChunkReceived(msg.content().nioBuffer()))
}
}
}
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
val ex = when (cause) {
is DecoderException -> cause.cause!!
else -> cause
}
responseListener?.listen(ResponseEvent.ExceptionCaught(ex))
result.completeExceptionally(ex)
ctx.close()
pipeline.removeLast()
pool.release(channel)
}
})
val chunks = mutableListOf <ByteBuffer>()
fun sendRequest() {
val valueLen = chunks.fold(0) { acc : Int, c2 : ByteBuffer ->
acc + c2.remaining()
}
request.setTotalBodyLength(request.keyLength() + request.extrasLength() + valueLen)
channel.write(request)
for((i, chunk) in chunks.withIndex()) {
if(i + 1 < chunks.size) {
channel.write(DefaultMemcacheContent(Unpooled.wrappedBuffer(chunk)))
} else {
channel.write(DefaultLastMemcacheContent(Unpooled.wrappedBuffer(chunk)))
}
}
channel.flush()
}
callHandleFuture.complete(object : CallHandle {
override fun sendChunk(requestBodyChunk: ByteBuffer) {
chunks.addLast(requestBodyChunk)
}
override fun waitForResponse(): CompletableFuture<Short> {
sendRequest()
return result
}
})
} else {
callHandleFuture.completeExceptionally(channelFuture.cause())
}
}
})
return callHandleFuture
}
private fun encodeExpiry(expiry: Duration) : Int {
val expirySeconds = expiry.toSeconds()
return expirySeconds.toInt().takeIf { it.toLong() == expirySeconds }
?: Instant.ofEpochSecond(expirySeconds).epochSecond.toInt()
}
fun get(key: String, responseListener: ResponseListener) : CompletableFuture<CallHandle> {
val request = (cfg.digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digest(key.toByteArray(), md)
} ?: key.toByteArray(Charsets.UTF_8)).let { digest ->
DefaultBinaryMemcacheRequest().apply {
setKey(Unpooled.wrappedBuffer(digest))
setOpcode(BinaryMemcacheOpcodes.GET)
}
}
return sendRequest(request, responseListener)
}
fun put(key: String, expiry : Duration, cas : Long? = null): CompletableFuture<CallHandle> {
val request = (cfg.digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digest(key.toByteArray(), md)
} ?: key.toByteArray(Charsets.UTF_8)).let { digest ->
val extras = Unpooled.buffer(8, 8)
extras.writeInt(0)
extras.writeInt(encodeExpiry(expiry))
DefaultBinaryMemcacheRequest().apply {
setExtras(extras)
setKey(Unpooled.wrappedBuffer(digest))
setOpcode(BinaryMemcacheOpcodes.SET)
cas?.let(this::setCas)
}
}
return sendRequest(request) { evt ->
when (evt) {
is ResponseEvent.ResponseReceived -> {
if (evt.response.status != BinaryMemcacheResponseStatus.SUCCESS) {
throw MemcachedException(evt.response.status)
}
}
else -> {}
}
}
}
fun shutDown(): NettyFuture<*> {
return group.shutdownGracefully()
}
override fun close() {
shutDown().sync()
}
}

View File

@@ -0,0 +1,10 @@
package net.woggioni.gbcs.server.memcached.client
import java.nio.ByteBuffer
sealed interface ResponseEvent {
class ResponseReceived(val response : MemcacheResponse) : ResponseEvent
class ResponseContentChunkReceived(val chunk: ByteBuffer) : ResponseEvent
class LastResponseContentChunkReceived(val chunk: ByteBuffer) : ResponseEvent
class ExceptionCaught(val cause : Throwable) : ResponseEvent
}

View File

@@ -0,0 +1,5 @@
package net.woggioni.gbcs.server.memcached.client
fun interface ResponseListener {
fun listen(evt : ResponseEvent)
}

View File

@@ -0,0 +1,80 @@
package net.woggioni.gbcs.server.memcached.test
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus
import net.woggioni.gbcs.api.event.ChunkReceived
import net.woggioni.gbcs.common.HostAndPort
import net.woggioni.gbcs.server.memcached.MemcachedCacheConfiguration
import net.woggioni.gbcs.server.memcached.client.MemcacheResponse
import net.woggioni.gbcs.server.memcached.client.MemcachedClient
import net.woggioni.gbcs.server.memcached.client.ResponseEvent
import net.woggioni.gbcs.server.memcached.client.ResponseListener
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import java.nio.ByteBuffer
import java.security.SecureRandom
import java.time.Duration
import java.util.Objects
import java.util.concurrent.TimeUnit
import kotlin.random.Random
class MemcachedClientTest {
@Test
fun test() {
val client = MemcachedClient(MemcachedCacheConfiguration(
servers = listOf(
MemcachedCacheConfiguration.Server(
endpoint = HostAndPort("127.0.0.1", 11211),
connectionTimeoutMillis = null,
retryPolicy = null,
maxConnections = 1
)
)
))
val random = Random(SecureRandom.getInstance("NativePRNGNonBlocking").nextLong())
val key = "101325"
val value = random.nextBytes(0x1000)
val requestListener = client.put(key, Duration.ofDays(2), null)
val response = Unpooled.buffer(value.size)
requestListener.thenCompose { listener ->
listener.sendChunk(ByteBuffer.wrap(value))
listener.waitForResponse()
}.get(10, TimeUnit.SECONDS)
client.get(key, object: ResponseListener {
override fun listen(evt: ResponseEvent) {
when(evt) {
is ResponseEvent.ResponseReceived -> {
if(evt.response.status != BinaryMemcacheResponseStatus.SUCCESS) {
Assertions.fail<String> {
"Memcache status ${evt.response.status}"
}
}
}
is ResponseEvent.ResponseContentChunkReceived -> response.writeBytes(evt.chunk)
else -> {}
}
}
}).thenCompose { it.waitForResponse() }.get(1, TimeUnit.SECONDS)
val retrievedResponse = response.array()
Assertions.assertArrayEquals(value, retrievedResponse)
}
@Test
fun test2() {
val a1 = ByteArray(10) {
it.toByte()
}
val a2 = ByteArray(10) {
it.toByte()
}
Assertions.assertTrue(Objects.equals(a1, a1))
}
}

View File

@@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration>
<configuration>
<import class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"/>
<import class="ch.qos.logback.core.ConsoleAppender"/>
<appender name="console" class="ConsoleAppender">
<target>System.err</target>
<encoder class="PatternLayoutEncoder">
<pattern>%d [%highlight(%-5level)] \(%thread\) %logger{36} -%kvp- %msg %n</pattern>
</encoder>
</appender>
<root level="info">
<appender-ref ref="console"/>
</root>
<logger name="io.netty" level="debug"/>
<logger name="com.google.code.yanf4j" level="warn"/>
<logger name="net.rubyeye.xmemcached" level="warn"/>
</configuration>

View File

@@ -18,9 +18,6 @@ dependencies {
testImplementation catalog.bcprov.jdk18on testImplementation catalog.bcprov.jdk18on
testImplementation catalog.bcpkix.jdk18on testImplementation catalog.bcpkix.jdk18on
testImplementation catalog.junit.jupiter.api
testImplementation catalog.junit.jupiter.params
testRuntimeOnly catalog.junit.jupiter.engine
testRuntimeOnly project(":gbcs-server-memcached") testRuntimeOnly project(":gbcs-server-memcached")
} }

View File

@@ -208,15 +208,15 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
.map { it as X509Certificate } .map { it as X509Certificate }
.toArray { size -> Array<X509Certificate?>(size) { null } } .toArray { size -> Array<X509Certificate?>(size) { null } }
SslContextBuilder.forServer(serverKey, *serverCert).apply { SslContextBuilder.forServer(serverKey, *serverCert).apply {
if (tls.isVerifyClients) { val clientAuth = tls.trustStore?.let { trustStore ->
clientAuth(ClientAuth.OPTIONAL)
tls.trustStore?.let { trustStore ->
val ts = loadKeystore(trustStore.file, trustStore.password) val ts = loadKeystore(trustStore.file, trustStore.password)
trustManager( trustManager(
ClientCertificateValidator.getTrustManager(ts, trustStore.isCheckCertificateStatus) ClientCertificateValidator.getTrustManager(ts, trustStore.isCheckCertificateStatus)
) )
} if(trustStore.isRequireClientCertificate) ClientAuth.REQUIRE
} else ClientAuth.OPTIONAL
} ?: ClientAuth.NONE
clientAuth(clientAuth)
}.build() }.build()
} }
} }

View File

@@ -1,21 +0,0 @@
package net.woggioni.gbcs.server.cache
import net.woggioni.jwo.JWO
import java.security.MessageDigest
object CacheUtils {
fun digest(
data: ByteArray,
md: MessageDigest = MessageDigest.getInstance("MD5")
): ByteArray {
md.update(data)
return md.digest()
}
fun digestString(
data: ByteArray,
md: MessageDigest = MessageDigest.getInstance("MD5")
): String {
return JWO.bytesToHex(digest(data, md))
}
}

View File

@@ -1,8 +1,8 @@
package net.woggioni.gbcs.server.cache package net.woggioni.gbcs.server.cache
import net.woggioni.gbcs.api.Cache import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.common.GBCS.digestString
import net.woggioni.gbcs.common.contextLogger import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.server.cache.CacheUtils.digestString
import net.woggioni.jwo.LockFile import net.woggioni.jwo.LockFile
import java.nio.channels.Channels import java.nio.channels.Channels
import java.nio.channels.FileChannel import java.nio.channels.FileChannel

View File

@@ -1,18 +1,15 @@
package net.woggioni.gbcs.server.cache package net.woggioni.gbcs.server.cache
import net.woggioni.gbcs.api.Cache import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.server.cache.CacheUtils.digestString import net.woggioni.gbcs.common.GBCS.digestString
import java.io.ByteArrayInputStream import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream import java.io.ByteArrayOutputStream
import java.nio.ByteBuffer
import java.nio.channels.Channels import java.nio.channels.Channels
import java.security.MessageDigest import java.security.MessageDigest
import java.time.Duration import java.time.Duration
import java.time.Instant import java.time.Instant
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.PriorityBlockingQueue import java.util.concurrent.PriorityBlockingQueue
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
import java.util.zip.Deflater import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream import java.util.zip.DeflaterOutputStream
import java.util.zip.Inflater import java.util.zip.Inflater
@@ -25,33 +22,27 @@ class InMemoryCache(
val compressionLevel: Int val compressionLevel: Int
) : Cache { ) : Cache {
private val map = ConcurrentHashMap<String, MapValue>() private val map = ConcurrentHashMap<String, ByteArray>()
private class MapValue(val rc: AtomicInteger, val payload : AtomicReference<ByteArray>) private class RemovalQueueElement(val key: String, val value : ByteArray, val expiry : Instant) : Comparable<RemovalQueueElement> {
private class RemovalQueueElement(val key: String, val expiry : Instant) : Comparable<RemovalQueueElement> {
override fun compareTo(other: RemovalQueueElement) = expiry.compareTo(other.expiry) override fun compareTo(other: RemovalQueueElement) = expiry.compareTo(other.expiry)
} }
private val removalQueue = PriorityBlockingQueue<RemovalQueueElement>() private val removalQueue = PriorityBlockingQueue<RemovalQueueElement>()
private var running = true private var running = true
private val garbageCollector = Thread({ private val garbageCollector = Thread {
while(true) { while(true) {
val el = removalQueue.take() val el = removalQueue.take()
val now = Instant.now() val now = Instant.now()
if(now > el.expiry) { if(now > el.expiry) {
val value = map[el.key] ?: continue map.remove(el.key, el.value)
val rc = value.rc.decrementAndGet()
if(rc == 0) {
map.remove(el.key)
}
} else { } else {
removalQueue.put(el) removalQueue.put(el)
Thread.sleep(minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1))) Thread.sleep(minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1)))
} }
} }
}).apply { }.apply {
start() start()
} }
@@ -68,8 +59,6 @@ class InMemoryCache(
} ?: key } ?: key
).let { digest -> ).let { digest ->
map[digest] map[digest]
?.let(MapValue::payload)
?.let(AtomicReference<ByteArray>::get)
?.let { value -> ?.let { value ->
if (compressionEnabled) { if (compressionEnabled) {
val inflater = Inflater() val inflater = Inflater()
@@ -96,11 +85,8 @@ class InMemoryCache(
} else { } else {
content content
} }
val mapValue = map.computeIfAbsent(digest) { map[digest] = value
MapValue(AtomicInteger(0), AtomicReference()) removalQueue.put(RemovalQueueElement(digest, value, Instant.now().plus(maxAge)))
}
mapValue.payload.set(value)
removalQueue.put(RemovalQueueElement(digest, Instant.now().plus(maxAge)))
} }
} }
} }

View File

@@ -142,10 +142,9 @@ object Parser {
} }
"tls" -> { "tls" -> {
val verifyClients = child.renderAttribute("verify-clients")
?.let(String::toBoolean) ?: false
var keyStore: KeyStore? = null var keyStore: KeyStore? = null
var trustStore: TrustStore? = null var trustStore: TrustStore? = null
for (granChild in child.asIterable()) { for (granChild in child.asIterable()) {
when (granChild.localName) { when (granChild.localName) {
"keystore" -> { "keystore" -> {
@@ -167,15 +166,19 @@ object Parser {
val checkCertificateStatus = granChild.renderAttribute("check-certificate-status") val checkCertificateStatus = granChild.renderAttribute("check-certificate-status")
?.let(String::toBoolean) ?.let(String::toBoolean)
?: false ?: false
val requireClientCertificate = child.renderAttribute("require-client-certificate")
?.let(String::toBoolean) ?: false
trustStore = TrustStore( trustStore = TrustStore(
trustStoreFile, trustStoreFile,
trustStorePassword, trustStorePassword,
checkCertificateStatus checkCertificateStatus,
requireClientCertificate
) )
} }
} }
} }
tls = Tls(keyStore, trustStore, verifyClients) tls = Tls(keyStore, trustStore)
} }
} }
} }

View File

@@ -154,9 +154,6 @@ object Serializer {
conf.tls?.let { tlsConfiguration -> conf.tls?.let { tlsConfiguration ->
node("tls") { node("tls") {
if(tlsConfiguration.isVerifyClients) {
attr("verify-clients", "true")
}
tlsConfiguration.keyStore?.let { keyStore -> tlsConfiguration.keyStore?.let { keyStore ->
node("keystore") { node("keystore") {
attr("file", keyStore.file.toString()) attr("file", keyStore.file.toString())
@@ -177,6 +174,7 @@ object Serializer {
attr("password", password) attr("password", password)
} }
attr("check-certificate-status", trustStore.isCheckCertificateStatus.toString()) attr("check-certificate-status", trustStore.isCheckCertificateStatus.toString())
attr("require-client-certificate", trustStore.isRequireClientCertificate.toString())
} }
} }
} }

View File

@@ -1,36 +1,83 @@
package net.woggioni.gbcs.server.handler package net.woggioni.gbcs.server.handler
import io.netty.buffer.Unpooled import io.netty.buffer.Unpooled
import io.netty.channel.ChannelFutureListener
import io.netty.channel.ChannelHandler
import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelHandlerContext
import io.netty.channel.DefaultFileRegion
import io.netty.channel.SimpleChannelInboundHandler import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.http.DefaultFullHttpResponse import io.netty.handler.codec.http.DefaultFullHttpResponse
import io.netty.handler.codec.http.DefaultHttpContent
import io.netty.handler.codec.http.DefaultHttpResponse import io.netty.handler.codec.http.DefaultHttpResponse
import io.netty.handler.codec.http.FullHttpRequest import io.netty.handler.codec.http.DefaultLastHttpContent
import io.netty.handler.codec.http.HttpContent
import io.netty.handler.codec.http.HttpHeaderNames import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpHeaderValues import io.netty.handler.codec.http.HttpHeaderValues
import io.netty.handler.codec.http.HttpMessage
import io.netty.handler.codec.http.HttpMethod import io.netty.handler.codec.http.HttpMethod
import io.netty.handler.codec.http.HttpRequest
import io.netty.handler.codec.http.HttpResponseStatus import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpUtil import io.netty.handler.codec.http.HttpUtil
import io.netty.handler.codec.http.LastHttpContent import io.netty.handler.codec.http.LastHttpContent
import io.netty.handler.stream.ChunkedNioStream
import net.woggioni.gbcs.api.Cache import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.api.exception.CacheException import net.woggioni.gbcs.api.CallHandle
import net.woggioni.gbcs.api.ResponseEventListener
import net.woggioni.gbcs.api.event.RequestEvent
import net.woggioni.gbcs.api.event.ResponseEvent
import net.woggioni.gbcs.common.contextLogger import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.server.debug import net.woggioni.gbcs.server.debug
import net.woggioni.gbcs.server.warn import net.woggioni.gbcs.server.warn
import java.nio.channels.FileChannel
import java.nio.file.Path import java.nio.file.Path
import java.util.concurrent.CompletableFuture
@ChannelHandler.Sharable
class ServerHandler(private val cache: Cache, private val serverPrefix: Path) : class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
SimpleChannelInboundHandler<FullHttpRequest>() { SimpleChannelInboundHandler<HttpMessage>() {
companion object {
@JvmStatic
private val log = contextLogger() private val log = contextLogger()
}
override fun channelRead0(ctx: ChannelHandlerContext, msg: FullHttpRequest) { private data class TransientContext(
var key: String?,
var callHandle: CompletableFuture<CallHandle<Void>>
)
private var transientContext: TransientContext? = null
override fun channelRead0(ctx: ChannelHandlerContext, msg: HttpMessage) {
when (msg) {
is HttpRequest -> {
handleRequest(ctx, msg)
}
is LastHttpContent -> {
transientContext?.run {
callHandle.thenCompose { callHandle ->
callHandle.postEvent(RequestEvent.LastChunkSent(msg.content()))
callHandle.call()
}.thenApply {
val response = DefaultFullHttpResponse(
msg.protocolVersion(), HttpResponseStatus.CREATED,
key?.let(String::toByteArray)
?.let(Unpooled::copiedBuffer)
)
// response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes()
ctx.writeAndFlush(response)
}
}
}
is HttpContent -> {
transientContext?.run {
callHandle = callHandle.thenApply { it ->
it.postEvent(RequestEvent.ChunkSent(msg.content()))
it
}
}
}
}
}
private fun handleRequest(ctx: ChannelHandlerContext, msg: HttpRequest) {
val keepAlive: Boolean = HttpUtil.isKeepAlive(msg) val keepAlive: Boolean = HttpUtil.isKeepAlive(msg)
val method = msg.method() val method = msg.method()
if (method === HttpMethod.GET) { if (method === HttpMethod.GET) {
@@ -43,49 +90,61 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
return return
} }
if (serverPrefix == prefix) { if (serverPrefix == prefix) {
try { cache.get(key, object : ResponseEventListener {
cache.get(key) var first = false
} catch(ex : Throwable) { override fun listen(evt: ResponseEvent) {
throw CacheException("Error accessing the cache backend", ex) when (evt) {
}?.let { channel -> is ResponseEvent.NoContent -> {
log.debug(ctx) {
"Cache miss for key '$key'"
}
val response =
DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response)
}
is ResponseEvent.ChunkReceived, is ResponseEvent.LastChunkReceived -> {
if (first) {
first = false
log.debug(ctx) { log.debug(ctx) {
"Cache hit for key '$key'" "Cache hit for key '$key'"
} }
val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK) val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK)
response.headers()[HttpHeaderNames.CONTENT_TYPE] = HttpHeaderValues.APPLICATION_OCTET_STREAM response.headers()[HttpHeaderNames.CONTENT_TYPE] =
HttpHeaderValues.APPLICATION_OCTET_STREAM
if (!keepAlive) { if (!keepAlive) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE) response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.IDENTITY) response.headers()
.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.IDENTITY)
} else { } else {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE) response.headers()
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED) .set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
response.headers()
.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
} }
ctx.write(response) ctx.write(response)
when (channel) { }
is FileChannel -> { if (evt is ResponseEvent.LastChunkReceived)
if (keepAlive) { ctx.write(DefaultLastHttpContent(evt.chunk))
ctx.write(DefaultFileRegion(channel, 0, channel.size())) else if (evt is ResponseEvent.ChunkReceived)
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate()) ctx.write(DefaultHttpContent(evt.chunk))
} else { ctx.flush()
ctx.writeAndFlush(DefaultFileRegion(channel, 0, channel.size())) }
.addListener(ChannelFutureListener.CLOSE)
is ResponseEvent.ExceptionCaught -> {
log.error(evt.cause.message, evt.cause)
val errorResponse = DefaultFullHttpResponse(
msg.protocolVersion(), HttpResponseStatus.INTERNAL_SERVER_ERROR,
evt.cause.message
?.let(String::toByteArray)
?.let(Unpooled::copiedBuffer)
)
ctx.write(errorResponse)
} }
} }
else -> {
ctx.write(ChunkedNioStream(channel)).addListener { evt ->
channel.close()
}
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate())
}
}
} ?: let {
log.debug(ctx) {
"Cache miss for key '$key'"
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response)
} }
}).thenCompose(CallHandle<Void>::call)
} else { } else {
log.warn(ctx) { log.warn(ctx) {
"Got request for unhandled path '${msg.uri()}'" "Got request for unhandled path '${msg.uri()}'"
@@ -93,35 +152,22 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.BAD_REQUEST) val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.BAD_REQUEST)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0 response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response) ctx.writeAndFlush(response)
ctx.channel().read()
} }
} else if (method === HttpMethod.PUT) { } else if (method === HttpMethod.PUT) {
val path = Path.of(msg.uri()) val path = Path.of(msg.uri())
val prefix = path.parent val prefix = path.parent
val key = path.fileName.toString() val key = path.fileName.toString()
if (serverPrefix == prefix) { if (serverPrefix == prefix) {
log.debug(ctx) { log.debug(ctx) {
"Added value for key '$key' to build cache" "Added value for key '$key' to build cache"
} }
val bodyBytes = msg.content().run { transientContext = TransientContext(key, cache.put(key))
if (isDirect) {
ByteArray(readableBytes()).also {
readBytes(it)
}
} else {
array()
}
}
try {
cache.put(key, bodyBytes)
} catch(ex : Throwable) {
throw CacheException("Error accessing the cache backend", ex)
}
val response = DefaultFullHttpResponse( val response = DefaultFullHttpResponse(
msg.protocolVersion(), HttpResponseStatus.CREATED, msg.protocolVersion(), HttpResponseStatus.CREATED,
Unpooled.copiedBuffer(key.toByteArray()) Unpooled.copiedBuffer(key.toByteArray())
) )
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes() // response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes()
ctx.writeAndFlush(response) ctx.writeAndFlush(response)
} else { } else {
log.warn(ctx) { log.warn(ctx) {
@@ -133,7 +179,10 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
} }
} else if (method == HttpMethod.TRACE) { } else if (method == HttpMethod.TRACE) {
val replayedRequestHead = ctx.alloc().buffer() val replayedRequestHead = ctx.alloc().buffer()
replayedRequestHead.writeCharSequence("TRACE ${Path.of(msg.uri())} ${msg.protocolVersion().text()}\r\n", Charsets.US_ASCII) replayedRequestHead.writeCharSequence(
"TRACE ${Path.of(msg.uri())} ${msg.protocolVersion().text()}\r\n",
Charsets.US_ASCII
)
msg.headers().forEach { (key, value) -> msg.headers().forEach { (key, value) ->
replayedRequestHead.apply { replayedRequestHead.apply {
writeCharSequence(key, Charsets.US_ASCII) writeCharSequence(key, Charsets.US_ASCII)
@@ -143,18 +192,30 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
} }
} }
replayedRequestHead.writeCharSequence("\r\n", Charsets.US_ASCII) replayedRequestHead.writeCharSequence("\r\n", Charsets.US_ASCII)
val requestBody = msg.content() val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK)
requestBody.retain()
val responseBody = ctx.alloc().compositeBuffer(2).apply {
addComponents(true, replayedRequestHead)
addComponents(true, requestBody)
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK, responseBody)
response.headers().apply { response.headers().apply {
set(HttpHeaderNames.CONTENT_TYPE, "message/http") set(HttpHeaderNames.CONTENT_TYPE, "message/http")
set(HttpHeaderNames.CONTENT_LENGTH, responseBody.readableBytes())
} }
ctx.writeAndFlush(response) ctx.write(response)
ctx.writeAndFlush(DefaultHttpContent(replayedRequestHead))
val callHandle = object : CallHandle<Void> {
override fun postEvent(evt: RequestEvent) {
when (evt) {
is RequestEvent.ChunkSent -> {
ctx.writeAndFlush(DefaultHttpContent(evt.chunk))
}
is RequestEvent.LastChunkSent -> {
ctx.writeAndFlush(DefaultLastHttpContent(evt.chunk))
}
}
}
override fun call(): CompletableFuture<Void> {
return CompletableFuture.completedFuture(null)
}
}
transientContext = TransientContext(null, CompletableFuture.completedFuture(callHandle))
} else { } else {
log.warn(ctx) { log.warn(ctx) {
"Got request with unhandled method '${msg.method().name()}'" "Got request with unhandled method '${msg.method().name()}'"
@@ -163,5 +224,6 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0" response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
ctx.writeAndFlush(response) ctx.writeAndFlush(response)
} }
} }
} }

View File

@@ -183,7 +183,6 @@
<xs:element name="keystore" type="gbcs:keyStoreType" /> <xs:element name="keystore" type="gbcs:keyStoreType" />
<xs:element name="truststore" type="gbcs:trustStoreType" minOccurs="0"/> <xs:element name="truststore" type="gbcs:trustStoreType" minOccurs="0"/>
</xs:all> </xs:all>
<xs:attribute name="verify-clients" type="xs:boolean" use="optional" default="false"/>
</xs:complexType> </xs:complexType>
<xs:complexType name="keyStoreType"> <xs:complexType name="keyStoreType">
@@ -197,6 +196,7 @@
<xs:attribute name="file" type="xs:string" use="required"/> <xs:attribute name="file" type="xs:string" use="required"/>
<xs:attribute name="password" type="xs:string"/> <xs:attribute name="password" type="xs:string"/>
<xs:attribute name="check-certificate-status" type="xs:boolean"/> <xs:attribute name="check-certificate-status" type="xs:boolean"/>
<xs:attribute name="require-client-certificate" type="xs:boolean" use="optional" default="false"/>
</xs:complexType> </xs:complexType>
<xs:complexType name="propertiesType"> <xs:complexType name="propertiesType">

View File

@@ -171,9 +171,8 @@ abstract class AbstractTlsServerTest : AbstractServerTest() {
), ),
Configuration.Tls( Configuration.Tls(
Configuration.KeyStore(this.serverKeyStoreFile, null, SERVER_CERTIFICATE_ENTRY, PASSWORD), Configuration.KeyStore(this.serverKeyStoreFile, null, SERVER_CERTIFICATE_ENTRY, PASSWORD),
Configuration.TrustStore(this.trustStoreFile, null, false), Configuration.TrustStore(this.trustStoreFile, null, false, false),
true )
),
) )
Xml.write(Serializer.serialize(cfg), System.out) Xml.write(Serializer.serialize(cfg), System.out)
} }

View File

@@ -20,6 +20,7 @@ class ConfigurationTest {
"classpath:net/woggioni/gbcs/server/test/valid/gbcs-default.xml", "classpath:net/woggioni/gbcs/server/test/valid/gbcs-default.xml",
"classpath:net/woggioni/gbcs/server/test/valid/gbcs-memcached.xml", "classpath:net/woggioni/gbcs/server/test/valid/gbcs-memcached.xml",
"classpath:net/woggioni/gbcs/server/test/valid/gbcs-tls.xml", "classpath:net/woggioni/gbcs/server/test/valid/gbcs-tls.xml",
"classpath:net/woggioni/gbcs/server/test/valid/gbcs-memcached-tls.xml",
] ]
) )
@ParameterizedTest @ParameterizedTest

View File

@@ -7,6 +7,7 @@ import org.bouncycastle.asn1.x500.X500Name
import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Order import org.junit.jupiter.api.Order
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import org.junit.jupiter.params.provider.ArgumentsSource
import java.net.http.HttpClient import java.net.http.HttpClient
import java.net.http.HttpRequest import java.net.http.HttpRequest
import java.net.http.HttpResponse import java.net.http.HttpResponse

View File

@@ -0,0 +1,53 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<gbcs:server xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
xmlns:gbcs="urn:net.woggioni.gbcs.server"
xmlns:gbcs-memcached="urn:net.woggioni.gbcs.server.memcached"
xs:schemaLocation="urn:net.woggioni.gbcs.server.memcached jpms://net.woggioni.gbcs.server.memcached/net/woggioni/gbcs/server/memcached/schema/gbcs-memcached.xsd urn:net.woggioni.gbcs.server jpms://net.woggioni.gbcs.server/net/woggioni/gbcs/server/schema/gbcs.xsd"
>
<bind host="0.0.0.0" port="8443" incoming-connections-backlog-size="4096"/>
<connection
max-request-size="67108864"
idle-timeout="PT30S"
read-idle-timeout="PT60S"
write-idle-timeout="PT60S"
read-timeout="PT5M"
write-timeout="PT5M"/>
<event-executor use-virtual-threads="true"/>
<cache xs:type="gbcs-memcached:memcachedCacheType" max-age="P7D" max-size="16777216" compression-mode="zip">
<server host="memcached" port="11211"/>
</cache>
<authorization>
<users>
<user name="woggioni">
<quota calls="1000" period="PT1S"/>
</user>
<user name="gitea">
<quota calls="10" period="PT1S" initial-available-calls="100" max-available-calls="100"/>
</user>
<anonymous>
<quota calls="2" period="PT5S"/>
</anonymous>
</users>
<groups>
<group name="writers">
<users>
<user ref="woggioni"/>
<user ref="gitea"/>
</users>
<roles>
<reader/>
<writer/>
</roles>
</group>
</groups>
</authorization>
<authentication>
<client-certificate>
<user-extractor attribute-name="CN" pattern="(.*)"/>
</client-certificate>
</authentication>
<tls>
<keystore file="/home/luser/ssl/gbcs.woggioni.net.pfx" key-alias="gbcs.woggioni.net" password="KEYSTORE_PASSWOR" key-password="KEY_PASSWORD"/>
<truststore file="/home/luser/ssl/woggioni.net.pfx" check-certificate-status="false" password="TRUSTSTORE_PASSWORD"/>
</tls>
</gbcs:server>

View File

@@ -60,8 +60,8 @@
<user-extractor pattern="user-pattern" attribute-name="CN"/> <user-extractor pattern="user-pattern" attribute-name="CN"/>
</client-certificate> </client-certificate>
</authentication> </authentication>
<tls verify-clients="true"> <tls>
<keystore file="keystore.pfx" key-alias="key1" password="password" key-password="key-password"/> <keystore file="keystore.pfx" key-alias="key1" password="password" key-password="key-password"/>
<truststore file="truststore.pfx" password="password" check-certificate-status="true" /> <truststore file="truststore.pfx" password="password" check-certificate-status="true" require-client-certificate="true"/>
</tls> </tls>
</gbcs:server> </gbcs:server>

View File

@@ -2,9 +2,9 @@ org.gradle.configuration-cache=false
org.gradle.parallel=true org.gradle.parallel=true
org.gradle.caching=true org.gradle.caching=true
gbcs.version = 0.0.11 gbcs.version = 0.1.1
lys.version = 2025.01.25 lys.version = 2025.01.31
gitea.maven.url = https://gitea.woggioni.net/api/packages/woggioni/maven gitea.maven.url = https://gitea.woggioni.net/api/packages/woggioni/maven
docker.registry.url=gitea.woggioni.net docker.registry.url=gitea.woggioni.net