Compare commits

...

5 Commits

Author SHA1 Message Date
e5fe8437a6 temporary commit 2025-02-04 13:59:44 +08:00
89153b60f8 fixed race condition in InMemoryCache 2025-02-01 10:14:13 +08:00
a2a40ab60f added semaphore to benchmark command 2025-01-28 00:00:07 +08:00
45458761f3 made TLS client certificate request from the server configurable
All checks were successful
CI / build (push) Successful in 4m2s
2025-01-27 13:32:04 +08:00
90a5834f5f added retry policy to gbcs-client 2025-01-27 13:12:12 +08:00
47 changed files with 1306 additions and 295 deletions

View File

@@ -46,6 +46,12 @@ allprojects { subproject ->
}
}
dependencies {
testImplementation catalog.junit.jupiter.api
testImplementation catalog.junit.jupiter.params
testRuntimeOnly catalog.junit.jupiter.engine
}
test {
useJUnitPlatform()
}

View File

@@ -5,6 +5,7 @@ plugins {
}
dependencies {
api catalog.netty.buffer
}
publishing {

View File

@@ -1,6 +1,8 @@
module net.woggioni.gbcs.api {
requires static lombok;
requires java.xml;
requires io.netty.buffer;
exports net.woggioni.gbcs.api;
exports net.woggioni.gbcs.api.exception;
exports net.woggioni.gbcs.api.event;
}

View File

@@ -3,10 +3,10 @@ package net.woggioni.gbcs.api;
import net.woggioni.gbcs.api.exception.ContentTooLargeException;
import java.nio.channels.ReadableByteChannel;
import java.util.concurrent.CompletableFuture;
public interface Cache extends AutoCloseable {
ReadableByteChannel get(String key);
void put(String key, byte[] content) throws ContentTooLargeException;
CompletableFuture<CallHandle<Void>> get(String key, ResponseEventListener responseEventListener);
CompletableFuture<CallHandle<Void>> put(String key) throws ContentTooLargeException;
}

View File

@@ -0,0 +1,10 @@
package net.woggioni.gbcs.api;
import net.woggioni.gbcs.api.event.RequestEvent;
import java.util.concurrent.CompletableFuture;
public interface CallHandle<T> {
void postEvent(RequestEvent evt);
CompletableFuture<T> call();
}

View File

@@ -91,11 +91,14 @@ public class Configuration {
boolean verifyClients;
}
public enum ClientCertificate {
REQUIRED, OPTIONAL
}
@Value
public static class Tls {
KeyStore keyStore;
TrustStore trustStore;
boolean verifyClients;
}
@Value
@@ -111,6 +114,7 @@ public class Configuration {
Path file;
String password;
boolean checkCertificateStatus;
boolean requireClientCertificate;
}
@Value

View File

@@ -0,0 +1,7 @@
package net.woggioni.gbcs.api;
import net.woggioni.gbcs.api.event.ResponseEvent;
public interface ResponseEventListener {
void listen(ResponseEvent evt);
}

View File

@@ -0,0 +1,20 @@
package net.woggioni.gbcs.api.event;
import io.netty.buffer.ByteBuf;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import net.woggioni.gbcs.api.CallHandle;
sealed public abstract class RequestEvent {
@Getter
@RequiredArgsConstructor
public static final class ChunkSent extends RequestEvent {
private final ByteBuf chunk;
}
@Getter
@RequiredArgsConstructor
public static final class LastChunkSent extends RequestEvent {
private final ByteBuf chunk;
}
}

View File

@@ -0,0 +1,28 @@
package net.woggioni.gbcs.api.event;
import io.netty.buffer.ByteBuf;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
sealed public abstract class ResponseEvent {
@Getter
@RequiredArgsConstructor
public final static class ChunkReceived extends ResponseEvent {
private final ByteBuf chunk;
}
public final static class NoContent extends ResponseEvent {
}
@Getter
@RequiredArgsConstructor
public final static class LastChunkReceived extends ResponseEvent {
private final ByteBuf chunk;
}
@Getter
@RequiredArgsConstructor
public final static class ExceptionCaught extends ResponseEvent {
private final Throwable cause;
}
}

View File

@@ -1,19 +1,17 @@
package net.woggioni.gbcs.cli.impl.commands
import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.client.GradleBuildCacheClient
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.common.error
import net.woggioni.gbcs.common.info
import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.client.GradleBuildCacheClient
import net.woggioni.jwo.JWO
import picocli.CommandLine
import java.security.SecureRandom
import java.time.Duration
import java.time.Instant
import java.util.Base64
import java.util.concurrent.ExecutionException
import java.util.concurrent.Future
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.Semaphore
import java.util.concurrent.atomic.AtomicLong
import kotlin.random.Random
@@ -53,53 +51,55 @@ class BenchmarkCommand : GbcsCommand() {
}
}
log.info {
"Starting insertion"
}
val entries = let {
val completionQueue = LinkedBlockingQueue<Future<Pair<String, ByteArray>>>(numberOfEntries)
val completionCounter = AtomicLong(0)
val completionQueue = LinkedBlockingQueue<Pair<String, ByteArray>>(numberOfEntries)
val start = Instant.now()
val totalElapsedTime = AtomicLong(0)
entryGenerator.take(numberOfEntries).forEach { entry ->
val requestStart = System.nanoTime()
val future = client.put(entry.first, entry.second).thenApply { entry }
future.whenComplete { _, _ ->
totalElapsedTime.addAndGet((System.nanoTime() - requestStart))
completionQueue.put(future)
val semaphore = Semaphore(profile.maxConnections * 3)
val iterator = entryGenerator.take(numberOfEntries).iterator()
while(completionCounter.get() < numberOfEntries) {
if(iterator.hasNext()) {
val entry = iterator.next()
semaphore.acquire()
val future = client.put(entry.first, entry.second).thenApply { entry }
future.whenComplete { result, ex ->
if (ex != null) {
log.error(ex.message, ex)
} else {
completionQueue.put(result)
}
semaphore.release()
completionCounter.incrementAndGet()
}
}
}
val inserted = sequence<Pair<String, ByteArray>> {
var completionCounter = 0
while (completionCounter < numberOfEntries) {
val future = completionQueue.take()
try {
yield(future.get())
} catch (ee: ExecutionException) {
val cause = ee.cause ?: ee
log.error(cause.message, cause)
}
completionCounter += 1
}
}.toList()
val inserted = completionQueue.toList()
val end = Instant.now()
log.info {
val elapsed = Duration.between(start, end).toMillis()
"Insertion rate: ${numberOfEntries.toDouble() / elapsed * 1000} ops/s"
}
log.info {
"Average time per insertion: ${totalElapsedTime.get() / numberOfEntries.toDouble() * 1000} ms"
val opsPerSecond = String.format("%.2f", numberOfEntries.toDouble() / elapsed * 1000)
"Insertion rate: $opsPerSecond ops/s"
}
inserted
}
log.info {
"Inserted ${entries.size} entries"
}
log.info {
"Starting retrieval"
}
if (entries.isNotEmpty()) {
val completionQueue = LinkedBlockingQueue<Future<Unit>>(entries.size)
val completionCounter = AtomicLong(0)
val semaphore = Semaphore(profile.maxConnections * 3)
val start = Instant.now()
val totalElapsedTime = AtomicLong(0)
entries.forEach { entry ->
val requestStart = System.nanoTime()
semaphore.acquire()
val future = client.get(entry.first).thenApply {
totalElapsedTime.addAndGet((System.nanoTime() - requestStart))
if (it == null) {
log.error {
"Missing entry for key '${entry.first}'"
@@ -111,21 +111,15 @@ class BenchmarkCommand : GbcsCommand() {
}
}
future.whenComplete { _, _ ->
completionQueue.put(future)
completionCounter.incrementAndGet()
semaphore.release()
}
}
var completionCounter = 0
while (completionCounter < entries.size) {
completionQueue.take()
completionCounter += 1
}
val end = Instant.now()
log.info {
val elapsed = Duration.between(start, end).toMillis()
"Retrieval rate: ${entries.size.toDouble() / elapsed * 1000} ops/s"
}
log.info {
"Average time per retrieval: ${totalElapsedTime.get() / numberOfEntries.toDouble() * 1e6} ms"
val opsPerSecond = String.format("%.2f", entries.size.toDouble() / elapsed * 1000)
"Retrieval rate: $opsPerSecond ops/s"
}
} else {
log.error("Skipping retrieval benchmark as it was not possible to insert any entry in the cache")

View File

@@ -10,6 +10,8 @@ dependencies {
implementation catalog.slf4j.api
implementation catalog.netty.buffer
implementation catalog.netty.codec.http
testRuntimeOnly catalog.logback.classic
}

View File

@@ -66,11 +66,18 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
data class BasicAuthenticationCredentials(val username: String, val password: String) : Authentication()
}
class RetryPolicy(
val maxAttempts: Int,
val initialDelayMillis: Long,
val exp: Double
)
data class Profile(
val serverURI: URI,
val authentication: Authentication?,
val connectionTimeout : Duration?,
val maxConnections : Int
val connectionTimeout: Duration?,
val maxConnections: Int,
val retryPolicy: RetryPolicy?,
)
companion object {
@@ -161,28 +168,76 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
pool = FixedChannelPool(bootstrap, channelPoolHandler, profile.maxConnections)
}
fun get(key: String): CompletableFuture<ByteArray?> {
return sendRequest(profile.serverURI.resolve(key), HttpMethod.GET, null)
.thenApply {
val status = it.status()
if (it.status() == HttpResponseStatus.NOT_FOUND) {
null
} else if (it.status() != HttpResponseStatus.OK) {
throw HttpException(status)
} else {
it.content()
}
}.thenApply { maybeByteBuf ->
maybeByteBuf?.let {
val result = ByteArray(it.readableBytes())
it.getBytes(0, result)
result
private fun executeWithRetry(operation: () -> CompletableFuture<FullHttpResponse>): CompletableFuture<FullHttpResponse> {
val retryPolicy = profile.retryPolicy
return if (retryPolicy != null) {
val outcomeHandler = OutcomeHandler<FullHttpResponse> { outcome ->
when (outcome) {
is OperationOutcome.Success -> {
val response = outcome.result
val status = response.status()
when (status) {
HttpResponseStatus.TOO_MANY_REQUESTS -> {
val retryAfter = response.headers()[HttpHeaderNames.RETRY_AFTER]?.let { headerValue ->
try {
headerValue.toLong() * 1000
} catch (nfe: NumberFormatException) {
null
}
}
OutcomeHandlerResult.Retry(retryAfter)
}
HttpResponseStatus.INTERNAL_SERVER_ERROR, HttpResponseStatus.SERVICE_UNAVAILABLE ->
OutcomeHandlerResult.Retry()
else -> OutcomeHandlerResult.DoNotRetry()
}
}
is OperationOutcome.Failure -> {
OutcomeHandlerResult.Retry()
}
}
}
executeWithRetry(
group,
retryPolicy.maxAttempts,
retryPolicy.initialDelayMillis.toDouble(),
retryPolicy.exp,
outcomeHandler,
operation
)
} else {
operation()
}
}
fun get(key: String): CompletableFuture<ByteArray?> {
return executeWithRetry {
sendRequest(profile.serverURI.resolve(key), HttpMethod.GET, null)
}.thenApply {
val status = it.status()
if (it.status() == HttpResponseStatus.NOT_FOUND) {
null
} else if (it.status() != HttpResponseStatus.OK) {
throw HttpException(status)
} else {
it.content()
}
}.thenApply { maybeByteBuf ->
maybeByteBuf?.let {
val result = ByteArray(it.readableBytes())
it.getBytes(0, result)
result
}
}
}
fun put(key: String, content: ByteArray): CompletableFuture<Unit> {
return sendRequest(profile.serverURI.resolve(key), HttpMethod.PUT, content).thenApply {
return executeWithRetry {
sendRequest(profile.serverURI.resolve(key), HttpMethod.PUT, content)
}.thenApply {
val status = it.status()
if (it.status() != HttpResponseStatus.CREATED && it.status() != HttpResponseStatus.OK) {
throw HttpException(status)

View File

@@ -17,16 +17,18 @@ object Parser {
fun parse(document: Document): GradleBuildCacheClient.Configuration {
val root = document.documentElement
val profiles = mutableMapOf<String, GradleBuildCacheClient.Configuration.Profile>()
for (child in root.asIterable()) {
val tagName = child.localName
when (tagName) {
"profile" -> {
val name = child.renderAttribute("name") ?: throw ConfigurationException("name attribute is required")
val uri = child.renderAttribute("base-url")?.let(::URI) ?: throw ConfigurationException("base-url attribute is required")
val name =
child.renderAttribute("name") ?: throw ConfigurationException("name attribute is required")
val uri = child.renderAttribute("base-url")?.let(::URI)
?: throw ConfigurationException("base-url attribute is required")
var authentication: GradleBuildCacheClient.Configuration.Authentication? = null
var retryPolicy: GradleBuildCacheClient.Configuration.RetryPolicy? = null
for (gchild in child.asIterable()) {
when (gchild.localName) {
"tls-client-auth" -> {
@@ -47,14 +49,42 @@ object Parser {
.toList()
.toTypedArray()
authentication =
GradleBuildCacheClient.Configuration.Authentication.TlsClientAuthenticationCredentials(key, certChain)
GradleBuildCacheClient.Configuration.Authentication.TlsClientAuthenticationCredentials(
key,
certChain
)
}
"basic-auth" -> {
val username = gchild.renderAttribute("user") ?: throw ConfigurationException("username attribute is required")
val password = gchild.renderAttribute("password") ?: throw ConfigurationException("password attribute is required")
val username = gchild.renderAttribute("user")
?: throw ConfigurationException("username attribute is required")
val password = gchild.renderAttribute("password")
?: throw ConfigurationException("password attribute is required")
authentication =
GradleBuildCacheClient.Configuration.Authentication.BasicAuthenticationCredentials(username, password)
GradleBuildCacheClient.Configuration.Authentication.BasicAuthenticationCredentials(
username,
password
)
}
"retry-policy" -> {
val maxAttempts =
gchild.renderAttribute("max-attempts")
?.let(String::toInt)
?: throw ConfigurationException("max-attempts attribute is required")
val initialDelay =
gchild.renderAttribute("initial-delay")
?.let(Duration::parse)
?: Duration.ofSeconds(1)
val exp =
gchild.renderAttribute("exp")
?.let(String::toDouble)
?: 2.0f
retryPolicy = GradleBuildCacheClient.Configuration.RetryPolicy(
maxAttempts,
initialDelay.toMillis(),
exp.toDouble()
)
}
}
}
@@ -63,7 +93,13 @@ object Parser {
?: 50
val connectionTimeout = child.renderAttribute("connection-timeout")
?.let(Duration::parse)
profiles[name] = GradleBuildCacheClient.Configuration.Profile(uri, authentication, connectionTimeout, maxConnections)
profiles[name] = GradleBuildCacheClient.Configuration.Profile(
uri,
authentication,
connectionTimeout,
maxConnections,
retryPolicy
)
}
}
}

View File

@@ -0,0 +1,75 @@
package net.woggioni.gbcs.client
import io.netty.util.concurrent.EventExecutorGroup
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit
sealed class OperationOutcome<T> {
class Success<T>(val result: T) : OperationOutcome<T>()
class Failure<T>(val ex: Throwable) : OperationOutcome<T>()
}
sealed class OutcomeHandlerResult {
class Retry(val suggestedDelayMillis: Long? = null) : OutcomeHandlerResult()
class DoNotRetry : OutcomeHandlerResult()
}
fun interface OutcomeHandler<T> {
fun shouldRetry(result: OperationOutcome<T>): OutcomeHandlerResult
}
fun <T> executeWithRetry(
eventExecutorGroup: EventExecutorGroup,
maxAttempts: Int,
initialDelay: Double,
exp: Double,
outcomeHandler: OutcomeHandler<T>,
cb: () -> CompletableFuture<T>
): CompletableFuture<T> {
val finalResult = cb()
var future = finalResult
var shortCircuit = false
for (i in 1 until maxAttempts) {
future = future.handle { result, ex ->
val operationOutcome = if (ex == null) {
OperationOutcome.Success(result)
} else {
OperationOutcome.Failure(ex.cause ?: ex)
}
if (shortCircuit) {
when(operationOutcome) {
is OperationOutcome.Failure -> throw operationOutcome.ex
is OperationOutcome.Success -> CompletableFuture.completedFuture(operationOutcome.result)
}
} else {
when(val outcomeHandlerResult = outcomeHandler.shouldRetry(operationOutcome)) {
is OutcomeHandlerResult.Retry -> {
val res = CompletableFuture<T>()
val delay = run {
val scheduledDelay = (initialDelay * Math.pow(exp, i.toDouble())).toLong()
outcomeHandlerResult.suggestedDelayMillis?.coerceAtMost(scheduledDelay) ?: scheduledDelay
}
eventExecutorGroup.schedule({
cb().handle { result, ex ->
if (ex == null) {
res.complete(result)
} else {
res.completeExceptionally(ex)
}
}
}, delay, TimeUnit.MILLISECONDS)
res
}
is OutcomeHandlerResult.DoNotRetry -> {
shortCircuit = true
when(operationOutcome) {
is OperationOutcome.Failure -> throw operationOutcome.ex
is OperationOutcome.Success -> CompletableFuture.completedFuture(operationOutcome.result)
}
}
}
}
}.thenCompose { it }
}
return future
}

View File

@@ -13,11 +13,14 @@
</xs:complexType>
<xs:complexType name="profileType">
<xs:choice>
<xs:element name="no-auth" type="gbcs-client:noAuthType"/>
<xs:element name="basic-auth" type="gbcs-client:basicAuthType"/>
<xs:element name="tls-client-auth" type="gbcs-client:tlsClientAuthType"/>
</xs:choice>
<xs:sequence>
<xs:choice>
<xs:element name="no-auth" type="gbcs-client:noAuthType"/>
<xs:element name="basic-auth" type="gbcs-client:basicAuthType"/>
<xs:element name="tls-client-auth" type="gbcs-client:tlsClientAuthType"/>
</xs:choice>
<xs:element name="retry-policy" type="gbcs-client:retryType" minOccurs="0"/>
</xs:sequence>
<xs:attribute name="name" type="xs:token" use="required"/>
<xs:attribute name="base-url" type="xs:anyURI" use="required"/>
<xs:attribute name="max-connections" type="xs:positiveInteger" default="50"/>
@@ -38,4 +41,10 @@
<xs:attribute name="key-password" type="xs:string" use="optional"/>
</xs:complexType>
<xs:complexType name="retryType">
<xs:attribute name="max-attempts" type="xs:positiveInteger" use="required"/>
<xs:attribute name="initial-delay" type="xs:duration" default="PT1S"/>
<xs:attribute name="exp" type="xs:double" default="2.0"/>
</xs:complexType>
</xs:schema>

View File

@@ -0,0 +1,149 @@
package net.woggioni.gbcs.client
import io.netty.util.concurrent.DefaultEventExecutorGroup
import io.netty.util.concurrent.EventExecutorGroup
import net.woggioni.gbcs.common.contextLogger
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtensionContext
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments
import org.junit.jupiter.params.provider.ArgumentsProvider
import org.junit.jupiter.params.provider.ArgumentsSource
import java.util.concurrent.CompletableFuture
import java.util.stream.Stream
import kotlin.random.Random
class RetryTest {
data class TestArgs(
val seed: Int,
val maxAttempt: Int,
val initialDelay: Double,
val exp: Double,
)
class TestArguments : ArgumentsProvider {
override fun provideArguments(context: ExtensionContext): Stream<out Arguments> {
return Stream.of(
TestArgs(
seed = 101325,
maxAttempt = 5,
initialDelay = 50.0,
exp = 2.0,
),
TestArgs(
seed = 101325,
maxAttempt = 20,
initialDelay = 100.0,
exp = 1.1,
),
TestArgs(
seed = 123487,
maxAttempt = 20,
initialDelay = 100.0,
exp = 2.0,
),
TestArgs(
seed = 20082024,
maxAttempt = 10,
initialDelay = 100.0,
exp = 2.0,
)
).map {
object: Arguments {
override fun get() = arrayOf(it)
}
}
}
}
@ArgumentsSource(TestArguments::class)
@ParameterizedTest
fun test(testArgs: TestArgs) {
val log = contextLogger()
log.debug("Start")
val executor: EventExecutorGroup = DefaultEventExecutorGroup(1)
val attempts = mutableListOf<Pair<Long, OperationOutcome<Int>>>()
val outcomeHandler = OutcomeHandler<Int> { outcome ->
when(outcome) {
is OperationOutcome.Success -> {
if(outcome.result % 10 == 0) {
OutcomeHandlerResult.DoNotRetry()
} else {
OutcomeHandlerResult.Retry(null)
}
}
is OperationOutcome.Failure -> {
when(outcome.ex) {
is IllegalStateException -> {
log.debug(outcome.ex.message, outcome.ex)
OutcomeHandlerResult.Retry(null)
}
else -> {
OutcomeHandlerResult.DoNotRetry()
}
}
}
}
}
val random = Random(testArgs.seed)
val future =
executeWithRetry(executor, testArgs.maxAttempt, testArgs.initialDelay, testArgs.exp, outcomeHandler) {
val now = System.nanoTime()
val result = CompletableFuture<Int>()
executor.submit {
val n = random.nextInt(0, Integer.MAX_VALUE)
log.debug("Got new number: {}", n)
if(n % 3 == 0) {
val ex = IllegalStateException("Value $n can be divided by 3")
result.completeExceptionally(ex)
attempts += now to OperationOutcome.Failure(ex)
} else if(n % 7 == 0) {
val ex = RuntimeException("Value $n can be divided by 7")
result.completeExceptionally(ex)
attempts += now to OperationOutcome.Failure(ex)
} else {
result.complete(n)
attempts += now to OperationOutcome.Success(n)
}
}
result
}
Assertions.assertTrue(attempts.size <= testArgs.maxAttempt)
val result = future.handle { res, ex ->
if(ex != null) {
val err = ex.cause ?: ex
log.debug(err.message, err)
OperationOutcome.Failure(err)
} else {
OperationOutcome.Success(res)
}
}.get()
for ((index, attempt) in attempts.withIndex()) {
val (timestamp, value) = attempt
if (index > 0) {
/* Check the delay for subsequent attempts is correct */
val previousAttempt = attempts[index - 1]
val expectedTimestamp =
previousAttempt.first + testArgs.initialDelay * Math.pow(testArgs.exp, index.toDouble()) * 1e6
val actualTimestamp = timestamp
val err = Math.abs(expectedTimestamp - actualTimestamp) / expectedTimestamp
Assertions.assertTrue(err < 1e-3)
}
if (index == attempts.size - 1 && index < testArgs.maxAttempt - 1) {
/*
* If the last attempt index is lower than the maximum number of attempts, then
* check the outcome handler returns DoNotRetry
*/
Assertions.assertTrue(outcomeHandler.shouldRetry(value) is OutcomeHandlerResult.DoNotRetry)
} else if (index < attempts.size - 1) {
/*
* If the attempt is not the last attempt check the outcome handler returns Retry
*/
Assertions.assertTrue(outcomeHandler.shouldRetry(value) is OutcomeHandlerResult.Retry)
}
}
}
}

View File

@@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration>
<configuration>
<import class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"/>
<import class="ch.qos.logback.core.ConsoleAppender"/>
<appender name="console" class="ConsoleAppender">
<target>System.err</target>
<encoder class="PatternLayoutEncoder">
<pattern>%d [%highlight(%-5level)] \(%thread\) %logger{36} -%kvp- %msg %n</pattern>
</encoder>
</appender>
<root level="info">
<appender-ref ref="console"/>
</root>
<logger name="io.netty" level="info"/>
<logger name="com.google.code.yanf4j" level="warn"/>
<logger name="net.rubyeye.xmemcached" level="warn"/>
</configuration>

View File

@@ -1,7 +1,9 @@
package net.woggioni.gbcs.common
import net.woggioni.jwo.JWO
import java.net.URI
import java.net.URL
import java.security.MessageDigest
object GBCS {
fun String.toUrl() : URL = URL.of(URI(this), null)
@@ -9,4 +11,19 @@ object GBCS {
const val GBCS_NAMESPACE_URI: String = "urn:net.woggioni.gbcs.server"
const val GBCS_PREFIX: String = "gbcs"
const val XML_SCHEMA_NAMESPACE_URI = "http://www.w3.org/2001/XMLSchema-instance"
fun digest(
data: ByteArray,
md: MessageDigest = MessageDigest.getInstance("MD5")
): ByteArray {
md.update(data)
return md.digest()
}
fun digestString(
data: ByteArray,
md: MessageDigest = MessageDigest.getInstance("MD5")
): String {
return JWO.bytesToHex(digest(data, md))
}
}

View File

@@ -26,13 +26,24 @@ configurations {
canBeResolved = true
visible = true
}
testImplementation {
extendsFrom compileOnly
}
}
dependencies {
compileOnly project(':gbcs-common')
compileOnly project(':gbcs-api')
compileOnly catalog.jwo
compileOnly catalog.slf4j.api
implementation catalog.xmemcached
implementation catalog.netty.codec.memcache
implementation catalog.netty.common
implementation group: 'io.netty', name: 'netty-handler', version: catalog.versions.netty.get()
testRuntimeOnly catalog.logback.classic
}
Provider<Tar> bundleTask = tasks.register("bundle", Tar) {
@@ -41,6 +52,11 @@ Provider<Tar> bundleTask = tasks.register("bundle", Tar) {
group = BasePlugin.BUILD_GROUP
}
tasks.named(JavaPlugin.TEST_TASK_NAME, Test) {
systemProperty("io.netty.leakDetectionLevel", "PARANOID")
}
tasks.named(BasePlugin.ASSEMBLE_TASK_NAME) {
dependsOn(bundleTask)
}

View File

@@ -7,6 +7,13 @@ module net.woggioni.gbcs.server.memcached {
requires net.woggioni.jwo;
requires java.xml;
requires kotlin.stdlib;
requires io.netty.common;
requires io.netty.handler;
requires io.netty.codec.memcache;
requires io.netty.transport;
requires org.slf4j;
requires io.netty.buffer;
requires io.netty.codec;
provides CacheProvider with net.woggioni.gbcs.server.memcached.MemcachedCacheProvider;

View File

@@ -0,0 +1,33 @@
package net.woggioni.gbcs.server.memcached
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.stream.ChunkedInput
import java.nio.channels.ReadableByteChannel
class CustomChunkedInput(private val readableByteChannel: ReadableByteChannel) : ChunkedInput<ByteBuf> {
override fun isEndOfInput(): Boolean {
TODO("Not yet implemented")
}
override fun close() {
TODO("Not yet implemented")
}
override fun readChunk(ctx: ChannelHandlerContext): ByteBuf {
TODO("Not yet implemented")
}
override fun readChunk(allocator: ByteBufAllocator): ByteBuf {
TODO("Not yet implemented")
}
override fun length(): Long {
TODO("Not yet implemented")
}
override fun progress(): Long {
TODO("Not yet implemented")
}
}

View File

@@ -0,0 +1,4 @@
package net.woggioni.gbcs.server.memcached
class MemcachedException(status : Short, msg : String? = null, cause : Throwable? = null)
: RuntimeException(msg ?: "Memcached status $status", cause)

View File

@@ -1,59 +1,85 @@
package net.woggioni.gbcs.server.memcached
import net.rubyeye.xmemcached.XMemcachedClientBuilder
import net.rubyeye.xmemcached.command.BinaryCommandFactory
import net.rubyeye.xmemcached.transcoders.CompressionMode
import net.rubyeye.xmemcached.transcoders.SerializingTranscoder
import io.netty.buffer.Unpooled
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus
import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.api.exception.ContentTooLargeException
import net.woggioni.gbcs.common.HostAndPort
import net.woggioni.jwo.JWO
import java.io.ByteArrayInputStream
import java.net.InetSocketAddress
import java.nio.channels.Channels
import java.nio.channels.ReadableByteChannel
import java.nio.charset.StandardCharsets
import java.security.MessageDigest
import java.time.Duration
import net.woggioni.gbcs.api.CallHandle
import net.woggioni.gbcs.api.ResponseEventListener
import net.woggioni.gbcs.api.event.RequestEvent
import net.woggioni.gbcs.api.event.ResponseEvent
import net.woggioni.gbcs.server.memcached.client.MemcachedClient
import net.woggioni.gbcs.server.memcached.client.ResponseEvent.ExceptionCaught
import net.woggioni.gbcs.server.memcached.client.ResponseEvent.LastResponseContentChunkReceived
import net.woggioni.gbcs.server.memcached.client.ResponseEvent.ResponseContentChunkReceived
import net.woggioni.gbcs.server.memcached.client.ResponseEvent.ResponseReceived
import net.woggioni.gbcs.server.memcached.client.ResponseListener
import java.util.concurrent.CompletableFuture
class MemcachedCache(
servers: List<HostAndPort>,
private val maxAge: Duration,
maxSize : Int,
digestAlgorithm: String?,
compressionMode: CompressionMode,
private val cfg : MemcachedCacheConfiguration
) : Cache {
private val memcachedClient = XMemcachedClientBuilder(
servers.stream().map { addr: HostAndPort -> InetSocketAddress(addr.host, addr.port) }.toList()
).apply {
commandFactory = BinaryCommandFactory()
digestAlgorithm?.let { dAlg ->
setKeyProvider { key ->
val md = MessageDigest.getInstance(dAlg)
md.update(key.toByteArray(StandardCharsets.UTF_8))
JWO.bytesToHex(md.digest())
}
}
transcoder = SerializingTranscoder(maxSize).apply {
setCompressionMode(compressionMode)
}
}.build()
override fun get(key: String): ReadableByteChannel? {
return memcachedClient.get<ByteArray>(key)
?.let(::ByteArrayInputStream)
?.let(Channels::newChannel)
}
override fun put(key: String, content: ByteArray) {
try {
memcachedClient[key, maxAge.toSeconds().toInt()] = content
} catch (e: IllegalArgumentException) {
throw ContentTooLargeException(e.message, e)
}
}
private val client = MemcachedClient(cfg)
override fun close() {
memcachedClient.shutdown()
client.close()
}
override fun get(key: String, responseEventListener: ResponseEventListener): CompletableFuture<CallHandle<Void>> {
val listener = ResponseListener { evt ->
when(evt) {
is ResponseContentChunkReceived -> {
responseEventListener.listen(ResponseEvent.ChunkReceived(Unpooled.wrappedBuffer(evt.chunk)))
}
is LastResponseContentChunkReceived -> {
responseEventListener.listen(ResponseEvent.LastChunkReceived(Unpooled.wrappedBuffer(evt.chunk)))
}
is ExceptionCaught -> {
responseEventListener.listen(ResponseEvent.ExceptionCaught(evt.cause))
}
is ResponseReceived -> {
when(val status = evt.response.status) {
BinaryMemcacheResponseStatus.SUCCESS -> {
}
BinaryMemcacheResponseStatus.KEY_ENOENT -> {
responseEventListener.listen(ResponseEvent.NoContent())
}
else -> {
responseEventListener.listen(ResponseEvent.ExceptionCaught(MemcachedException(status)))
}
}
}
}
}
return client.get(key, listener).thenApply { clientCallHandle ->
object : CallHandle<Void> {
override fun postEvent(evt: RequestEvent) {
when(evt) {
is RequestEvent.ChunkSent -> clientCallHandle.sendChunk(evt.chunk.nioBuffer())
is RequestEvent.LastChunkSent -> clientCallHandle.sendChunk(evt.chunk.nioBuffer())
}
}
override fun call(): CompletableFuture<Void> {
return clientCallHandle.waitForResponse().thenApply { null }
}
}
}
}
override fun put(key: String): CompletableFuture<CallHandle<Void>> {
return client.put(key, cfg.maxAge).thenApply { clientCallHandle ->
object : CallHandle<Void> {
override fun postEvent(evt: RequestEvent) {
when(evt) {
is RequestEvent.ChunkSent -> clientCallHandle.sendChunk(evt.chunk.nioBuffer())
is RequestEvent.LastChunkSent -> clientCallHandle.sendChunk(evt.chunk.nioBuffer())
}
}
override fun call(): CompletableFuture<Void> {
return clientCallHandle.waitForResponse().thenApply { null }
}
}
}
}
}

View File

@@ -1,25 +1,45 @@
package net.woggioni.gbcs.server.memcached
import net.rubyeye.xmemcached.transcoders.CompressionMode
import net.woggioni.gbcs.api.Configuration
import net.woggioni.gbcs.common.HostAndPort
import java.time.Duration
data class MemcachedCacheConfiguration(
var servers: List<HostAndPort>,
var maxAge: Duration = Duration.ofDays(1),
var maxSize: Int = 0x100000,
var digestAlgorithm: String? = null,
var compressionMode: CompressionMode = CompressionMode.ZIP,
val servers: List<Server>,
val maxAge: Duration = Duration.ofDays(1),
val maxSize: Int = 0x100000,
val digestAlgorithm: String? = null,
val compressionMode: CompressionMode? = CompressionMode.DEFLATE,
) : Configuration.Cache {
override fun materialize() = MemcachedCache(
servers,
maxAge,
maxSize,
digestAlgorithm,
compressionMode
enum class CompressionMode {
/**
* Gzip mode
*/
GZIP,
/**
* Deflate mode
*/
DEFLATE
}
class RetryPolicy(
val maxAttempts: Int,
val initialDelayMillis: Long,
val exp: Double
)
data class Server(
val endpoint : HostAndPort,
val connectionTimeoutMillis : Int?,
val retryPolicy : RetryPolicy?,
val maxConnections : Int
)
override fun materialize() = MemcachedCache(this)
override fun getNamespaceURI() = "urn:net.woggioni.gbcs.server.memcached"
override fun getTypeName() = "memcachedCacheType"

View File

@@ -51,7 +51,7 @@ class MemcachedCacheProvider : CacheProvider<MemcachedCacheConfiguration> {
}
return MemcachedCacheConfiguration(
servers,
servers.map { MemcachedCacheConfiguration.Server(it, null, null, 1) },
maxAge,
maxSize,
digestAlgorithm,
@@ -67,8 +67,8 @@ class MemcachedCacheProvider : CacheProvider<MemcachedCacheConfiguration> {
attr("xs:type", "${xmlNamespacePrefix}:$xmlType", GBCS.XML_SCHEMA_NAMESPACE_URI)
for (server in servers) {
node("server") {
attr("host", server.host)
attr("port", server.port.toString())
attr("host", server.endpoint.host)
attr("port", server.endpoint.port.toString())
}
}
attr("max-age", maxAge.toString())

View File

@@ -0,0 +1,9 @@
package net.woggioni.gbcs.server.memcached.client
import java.nio.ByteBuffer
import java.util.concurrent.CompletableFuture
interface CallHandle {
fun sendChunk(requestBodyChunk : ByteBuffer)
fun waitForResponse() : CompletableFuture<Short>
}

View File

@@ -0,0 +1,24 @@
package net.woggioni.gbcs.server.memcached.client
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
import java.nio.ByteBuffer
data class MemcacheResponse(
val status: Short,
val opcode: Byte,
val cas: Long?,
val opaque: Int?,
val key: ByteBuffer?,
val extra: ByteBuffer?
) {
companion object {
fun of(response : BinaryMemcacheResponse) = MemcacheResponse(
response.status(),
response.opcode(),
response.cas(),
response.opaque(),
response.key()?.nioBuffer(),
response.extras()?.nioBuffer()
)
}
}

View File

@@ -0,0 +1,241 @@
package net.woggioni.gbcs.server.memcached.client
import io.netty.bootstrap.Bootstrap
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelOption
import io.netty.channel.ChannelPipeline
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.pool.AbstractChannelPoolHandler
import io.netty.channel.pool.ChannelPool
import io.netty.channel.pool.FixedChannelPool
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.DecoderException
import io.netty.handler.codec.memcache.DefaultLastMemcacheContent
import io.netty.handler.codec.memcache.DefaultMemcacheContent
import io.netty.handler.codec.memcache.LastMemcacheContent
import io.netty.handler.codec.memcache.MemcacheContent
import io.netty.handler.codec.memcache.MemcacheObject
import io.netty.handler.codec.memcache.binary.BinaryMemcacheClientCodec
import io.netty.handler.codec.memcache.binary.BinaryMemcacheOpcodes
import io.netty.handler.codec.memcache.binary.BinaryMemcacheRequest
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus
import io.netty.handler.codec.memcache.binary.DefaultBinaryMemcacheRequest
import io.netty.util.concurrent.GenericFutureListener
import net.woggioni.gbcs.common.GBCS.digest
import net.woggioni.gbcs.common.HostAndPort
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.server.memcached.MemcachedCacheConfiguration
import net.woggioni.gbcs.server.memcached.MemcachedException
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.security.MessageDigest
import java.time.Duration
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import io.netty.util.concurrent.Future as NettyFuture
class MemcachedClient(private val cfg: MemcachedCacheConfiguration) : AutoCloseable {
private val log = contextLogger()
private val group: NioEventLoopGroup
private val connectionPool: MutableMap<HostAndPort, ChannelPool> = ConcurrentHashMap()
init {
group = NioEventLoopGroup()
}
private fun newConnectionPool(server : MemcachedCacheConfiguration.Server) : FixedChannelPool {
val bootstrap = Bootstrap().apply {
group(group)
channel(NioSocketChannel::class.java)
option(ChannelOption.SO_KEEPALIVE, true)
remoteAddress(InetSocketAddress(server.endpoint.host, server.endpoint.port))
server.connectionTimeoutMillis?.let {
option(ChannelOption.CONNECT_TIMEOUT_MILLIS, it)
}
}
val channelPoolHandler = object : AbstractChannelPoolHandler() {
override fun channelCreated(ch: Channel) {
val pipeline: ChannelPipeline = ch.pipeline()
pipeline.addLast(BinaryMemcacheClientCodec())
}
}
return FixedChannelPool(bootstrap, channelPoolHandler, server.maxConnections)
}
private fun sendRequest(request: BinaryMemcacheRequest,
responseListener: ResponseListener?
): CompletableFuture<CallHandle> {
val server = cfg.servers.let { servers ->
if(servers.size > 1) {
val key = request.key().duplicate()
var checksum = 0
while(key.readableBytes() > 4) {
val byte = key.readInt()
checksum = checksum xor byte
}
while(key.readableBytes() > 0) {
val byte = key.readByte()
checksum = checksum xor byte.toInt()
}
servers[checksum % servers.size]
} else {
servers.first()
}
}
val callHandleFuture = CompletableFuture<CallHandle>()
val result = CompletableFuture<Short>()
// Custom handler for processing responses
val pool = connectionPool.computeIfAbsent(server.endpoint) {
newConnectionPool(server)
}
pool.acquire().addListener(object : GenericFutureListener<NettyFuture<Channel>> {
override fun operationComplete(channelFuture: NettyFuture<Channel>) {
if (channelFuture.isSuccess) {
val channel = channelFuture.now
val pipeline = channel.pipeline()
channel.pipeline().addLast("handler", object : SimpleChannelInboundHandler<MemcacheObject>() {
val response : MemcacheResponse? = null
override fun channelRead0(
ctx: ChannelHandlerContext,
msg: MemcacheObject
) {
if(msg is BinaryMemcacheResponse) {
val resp = MemcacheResponse.of(msg)
responseListener?.listen(ResponseEvent.ResponseReceived(resp))
if(msg.totalBodyLength() == msg.keyLength() + msg.extrasLength()) {
result.complete(resp.status)
}
}
if(responseListener != null) {
when (msg) {
is LastMemcacheContent -> {
responseListener.listen(ResponseEvent.LastResponseContentChunkReceived(msg.content().nioBuffer()))
result.complete(response?.status)
pipeline.removeLast()
pool.release(channel)
}
is MemcacheContent -> {
responseListener.listen(ResponseEvent.ResponseContentChunkReceived(msg.content().nioBuffer()))
}
}
}
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
val ex = when (cause) {
is DecoderException -> cause.cause!!
else -> cause
}
responseListener?.listen(ResponseEvent.ExceptionCaught(ex))
result.completeExceptionally(ex)
ctx.close()
pipeline.removeLast()
pool.release(channel)
}
})
val chunks = mutableListOf <ByteBuffer>()
fun sendRequest() {
val valueLen = chunks.fold(0) { acc : Int, c2 : ByteBuffer ->
acc + c2.remaining()
}
request.setTotalBodyLength(request.keyLength() + request.extrasLength() + valueLen)
channel.write(request)
for((i, chunk) in chunks.withIndex()) {
if(i + 1 < chunks.size) {
channel.write(DefaultMemcacheContent(Unpooled.wrappedBuffer(chunk)))
} else {
channel.write(DefaultLastMemcacheContent(Unpooled.wrappedBuffer(chunk)))
}
}
channel.flush()
}
callHandleFuture.complete(object : CallHandle {
override fun sendChunk(requestBodyChunk: ByteBuffer) {
chunks.addLast(requestBodyChunk)
}
override fun waitForResponse(): CompletableFuture<Short> {
sendRequest()
return result
}
})
} else {
callHandleFuture.completeExceptionally(channelFuture.cause())
}
}
})
return callHandleFuture
}
private fun encodeExpiry(expiry: Duration) : Int {
val expirySeconds = expiry.toSeconds()
return expirySeconds.toInt().takeIf { it.toLong() == expirySeconds }
?: Instant.ofEpochSecond(expirySeconds).epochSecond.toInt()
}
fun get(key: String, responseListener: ResponseListener) : CompletableFuture<CallHandle> {
val request = (cfg.digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digest(key.toByteArray(), md)
} ?: key.toByteArray(Charsets.UTF_8)).let { digest ->
DefaultBinaryMemcacheRequest().apply {
setKey(Unpooled.wrappedBuffer(digest))
setOpcode(BinaryMemcacheOpcodes.GET)
}
}
return sendRequest(request, responseListener)
}
fun put(key: String, expiry : Duration, cas : Long? = null): CompletableFuture<CallHandle> {
val request = (cfg.digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digest(key.toByteArray(), md)
} ?: key.toByteArray(Charsets.UTF_8)).let { digest ->
val extras = Unpooled.buffer(8, 8)
extras.writeInt(0)
extras.writeInt(encodeExpiry(expiry))
DefaultBinaryMemcacheRequest().apply {
setExtras(extras)
setKey(Unpooled.wrappedBuffer(digest))
setOpcode(BinaryMemcacheOpcodes.SET)
cas?.let(this::setCas)
}
}
return sendRequest(request) { evt ->
when (evt) {
is ResponseEvent.ResponseReceived -> {
if (evt.response.status != BinaryMemcacheResponseStatus.SUCCESS) {
throw MemcachedException(evt.response.status)
}
}
else -> {}
}
}
}
fun shutDown(): NettyFuture<*> {
return group.shutdownGracefully()
}
override fun close() {
shutDown().sync()
}
}

View File

@@ -0,0 +1,10 @@
package net.woggioni.gbcs.server.memcached.client
import java.nio.ByteBuffer
sealed interface ResponseEvent {
class ResponseReceived(val response : MemcacheResponse) : ResponseEvent
class ResponseContentChunkReceived(val chunk: ByteBuffer) : ResponseEvent
class LastResponseContentChunkReceived(val chunk: ByteBuffer) : ResponseEvent
class ExceptionCaught(val cause : Throwable) : ResponseEvent
}

View File

@@ -0,0 +1,5 @@
package net.woggioni.gbcs.server.memcached.client
fun interface ResponseListener {
fun listen(evt : ResponseEvent)
}

View File

@@ -0,0 +1,80 @@
package net.woggioni.gbcs.server.memcached.test
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus
import net.woggioni.gbcs.api.event.ChunkReceived
import net.woggioni.gbcs.common.HostAndPort
import net.woggioni.gbcs.server.memcached.MemcachedCacheConfiguration
import net.woggioni.gbcs.server.memcached.client.MemcacheResponse
import net.woggioni.gbcs.server.memcached.client.MemcachedClient
import net.woggioni.gbcs.server.memcached.client.ResponseEvent
import net.woggioni.gbcs.server.memcached.client.ResponseListener
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import java.nio.ByteBuffer
import java.security.SecureRandom
import java.time.Duration
import java.util.Objects
import java.util.concurrent.TimeUnit
import kotlin.random.Random
class MemcachedClientTest {
@Test
fun test() {
val client = MemcachedClient(MemcachedCacheConfiguration(
servers = listOf(
MemcachedCacheConfiguration.Server(
endpoint = HostAndPort("127.0.0.1", 11211),
connectionTimeoutMillis = null,
retryPolicy = null,
maxConnections = 1
)
)
))
val random = Random(SecureRandom.getInstance("NativePRNGNonBlocking").nextLong())
val key = "101325"
val value = random.nextBytes(0x1000)
val requestListener = client.put(key, Duration.ofDays(2), null)
val response = Unpooled.buffer(value.size)
requestListener.thenCompose { listener ->
listener.sendChunk(ByteBuffer.wrap(value))
listener.waitForResponse()
}.get(10, TimeUnit.SECONDS)
client.get(key, object: ResponseListener {
override fun listen(evt: ResponseEvent) {
when(evt) {
is ResponseEvent.ResponseReceived -> {
if(evt.response.status != BinaryMemcacheResponseStatus.SUCCESS) {
Assertions.fail<String> {
"Memcache status ${evt.response.status}"
}
}
}
is ResponseEvent.ResponseContentChunkReceived -> response.writeBytes(evt.chunk)
else -> {}
}
}
}).thenCompose { it.waitForResponse() }.get(1, TimeUnit.SECONDS)
val retrievedResponse = response.array()
Assertions.assertArrayEquals(value, retrievedResponse)
}
@Test
fun test2() {
val a1 = ByteArray(10) {
it.toByte()
}
val a2 = ByteArray(10) {
it.toByte()
}
Assertions.assertTrue(Objects.equals(a1, a1))
}
}

View File

@@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration>
<configuration>
<import class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"/>
<import class="ch.qos.logback.core.ConsoleAppender"/>
<appender name="console" class="ConsoleAppender">
<target>System.err</target>
<encoder class="PatternLayoutEncoder">
<pattern>%d [%highlight(%-5level)] \(%thread\) %logger{36} -%kvp- %msg %n</pattern>
</encoder>
</appender>
<root level="info">
<appender-ref ref="console"/>
</root>
<logger name="io.netty" level="debug"/>
<logger name="com.google.code.yanf4j" level="warn"/>
<logger name="net.rubyeye.xmemcached" level="warn"/>
</configuration>

View File

@@ -18,9 +18,6 @@ dependencies {
testImplementation catalog.bcprov.jdk18on
testImplementation catalog.bcpkix.jdk18on
testImplementation catalog.junit.jupiter.api
testImplementation catalog.junit.jupiter.params
testRuntimeOnly catalog.junit.jupiter.engine
testRuntimeOnly project(":gbcs-server-memcached")
}

View File

@@ -208,15 +208,15 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
.map { it as X509Certificate }
.toArray { size -> Array<X509Certificate?>(size) { null } }
SslContextBuilder.forServer(serverKey, *serverCert).apply {
if (tls.isVerifyClients) {
clientAuth(ClientAuth.OPTIONAL)
tls.trustStore?.let { trustStore ->
val ts = loadKeystore(trustStore.file, trustStore.password)
trustManager(
ClientCertificateValidator.getTrustManager(ts, trustStore.isCheckCertificateStatus)
)
}
}
val clientAuth = tls.trustStore?.let { trustStore ->
val ts = loadKeystore(trustStore.file, trustStore.password)
trustManager(
ClientCertificateValidator.getTrustManager(ts, trustStore.isCheckCertificateStatus)
)
if(trustStore.isRequireClientCertificate) ClientAuth.REQUIRE
else ClientAuth.OPTIONAL
} ?: ClientAuth.NONE
clientAuth(clientAuth)
}.build()
}
}

View File

@@ -1,21 +0,0 @@
package net.woggioni.gbcs.server.cache
import net.woggioni.jwo.JWO
import java.security.MessageDigest
object CacheUtils {
fun digest(
data: ByteArray,
md: MessageDigest = MessageDigest.getInstance("MD5")
): ByteArray {
md.update(data)
return md.digest()
}
fun digestString(
data: ByteArray,
md: MessageDigest = MessageDigest.getInstance("MD5")
): String {
return JWO.bytesToHex(digest(data, md))
}
}

View File

@@ -1,8 +1,8 @@
package net.woggioni.gbcs.server.cache
import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.common.GBCS.digestString
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.server.cache.CacheUtils.digestString
import net.woggioni.jwo.LockFile
import java.nio.channels.Channels
import java.nio.channels.FileChannel

View File

@@ -1,18 +1,15 @@
package net.woggioni.gbcs.server.cache
import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.server.cache.CacheUtils.digestString
import net.woggioni.gbcs.common.GBCS.digestString
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.nio.ByteBuffer
import java.nio.channels.Channels
import java.security.MessageDigest
import java.time.Duration
import java.time.Instant
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.PriorityBlockingQueue
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.Inflater
@@ -25,33 +22,27 @@ class InMemoryCache(
val compressionLevel: Int
) : Cache {
private val map = ConcurrentHashMap<String, MapValue>()
private class MapValue(val rc: AtomicInteger, val payload : AtomicReference<ByteArray>)
private class RemovalQueueElement(val key: String, val expiry : Instant) : Comparable<RemovalQueueElement> {
override fun compareTo(other: RemovalQueueElement)= expiry.compareTo(other.expiry)
private val map = ConcurrentHashMap<String, ByteArray>()
private class RemovalQueueElement(val key: String, val value : ByteArray, val expiry : Instant) : Comparable<RemovalQueueElement> {
override fun compareTo(other: RemovalQueueElement) = expiry.compareTo(other.expiry)
}
private val removalQueue = PriorityBlockingQueue<RemovalQueueElement>()
private var running = true
private val garbageCollector = Thread({
private val garbageCollector = Thread {
while(true) {
val el = removalQueue.take()
val now = Instant.now()
if(now > el.expiry) {
val value = map[el.key] ?: continue
val rc = value.rc.decrementAndGet()
if(rc == 0) {
map.remove(el.key)
}
map.remove(el.key, el.value)
} else {
removalQueue.put(el)
Thread.sleep(minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1)))
}
}
}).apply {
}.apply {
start()
}
@@ -68,8 +59,6 @@ class InMemoryCache(
} ?: key
).let { digest ->
map[digest]
?.let(MapValue::payload)
?.let(AtomicReference<ByteArray>::get)
?.let { value ->
if (compressionEnabled) {
val inflater = Inflater()
@@ -96,11 +85,8 @@ class InMemoryCache(
} else {
content
}
val mapValue = map.computeIfAbsent(digest) {
MapValue(AtomicInteger(0), AtomicReference())
}
mapValue.payload.set(value)
removalQueue.put(RemovalQueueElement(digest, Instant.now().plus(maxAge)))
map[digest] = value
removalQueue.put(RemovalQueueElement(digest, value, Instant.now().plus(maxAge)))
}
}
}

View File

@@ -142,10 +142,9 @@ object Parser {
}
"tls" -> {
val verifyClients = child.renderAttribute("verify-clients")
?.let(String::toBoolean) ?: false
var keyStore: KeyStore? = null
var trustStore: TrustStore? = null
for (granChild in child.asIterable()) {
when (granChild.localName) {
"keystore" -> {
@@ -167,15 +166,19 @@ object Parser {
val checkCertificateStatus = granChild.renderAttribute("check-certificate-status")
?.let(String::toBoolean)
?: false
val requireClientCertificate = child.renderAttribute("require-client-certificate")
?.let(String::toBoolean) ?: false
trustStore = TrustStore(
trustStoreFile,
trustStorePassword,
checkCertificateStatus
checkCertificateStatus,
requireClientCertificate
)
}
}
}
tls = Tls(keyStore, trustStore, verifyClients)
tls = Tls(keyStore, trustStore)
}
}
}

View File

@@ -154,9 +154,6 @@ object Serializer {
conf.tls?.let { tlsConfiguration ->
node("tls") {
if(tlsConfiguration.isVerifyClients) {
attr("verify-clients", "true")
}
tlsConfiguration.keyStore?.let { keyStore ->
node("keystore") {
attr("file", keyStore.file.toString())
@@ -177,6 +174,7 @@ object Serializer {
attr("password", password)
}
attr("check-certificate-status", trustStore.isCheckCertificateStatus.toString())
attr("require-client-certificate", trustStore.isRequireClientCertificate.toString())
}
}
}

View File

@@ -1,36 +1,83 @@
package net.woggioni.gbcs.server.handler
import io.netty.buffer.Unpooled
import io.netty.channel.ChannelFutureListener
import io.netty.channel.ChannelHandler
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.DefaultFileRegion
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.http.DefaultFullHttpResponse
import io.netty.handler.codec.http.DefaultHttpContent
import io.netty.handler.codec.http.DefaultHttpResponse
import io.netty.handler.codec.http.FullHttpRequest
import io.netty.handler.codec.http.DefaultLastHttpContent
import io.netty.handler.codec.http.HttpContent
import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpHeaderValues
import io.netty.handler.codec.http.HttpMessage
import io.netty.handler.codec.http.HttpMethod
import io.netty.handler.codec.http.HttpRequest
import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpUtil
import io.netty.handler.codec.http.LastHttpContent
import io.netty.handler.stream.ChunkedNioStream
import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.api.exception.CacheException
import net.woggioni.gbcs.api.CallHandle
import net.woggioni.gbcs.api.ResponseEventListener
import net.woggioni.gbcs.api.event.RequestEvent
import net.woggioni.gbcs.api.event.ResponseEvent
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.server.debug
import net.woggioni.gbcs.server.warn
import java.nio.channels.FileChannel
import java.nio.file.Path
import java.util.concurrent.CompletableFuture
@ChannelHandler.Sharable
class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
SimpleChannelInboundHandler<FullHttpRequest>() {
SimpleChannelInboundHandler<HttpMessage>() {
private val log = contextLogger()
companion object {
@JvmStatic
private val log = contextLogger()
}
override fun channelRead0(ctx: ChannelHandlerContext, msg: FullHttpRequest) {
private data class TransientContext(
var key: String?,
var callHandle: CompletableFuture<CallHandle<Void>>
)
private var transientContext: TransientContext? = null
override fun channelRead0(ctx: ChannelHandlerContext, msg: HttpMessage) {
when (msg) {
is HttpRequest -> {
handleRequest(ctx, msg)
}
is LastHttpContent -> {
transientContext?.run {
callHandle.thenCompose { callHandle ->
callHandle.postEvent(RequestEvent.LastChunkSent(msg.content()))
callHandle.call()
}.thenApply {
val response = DefaultFullHttpResponse(
msg.protocolVersion(), HttpResponseStatus.CREATED,
key?.let(String::toByteArray)
?.let(Unpooled::copiedBuffer)
)
// response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes()
ctx.writeAndFlush(response)
}
}
}
is HttpContent -> {
transientContext?.run {
callHandle = callHandle.thenApply { it ->
it.postEvent(RequestEvent.ChunkSent(msg.content()))
it
}
}
}
}
}
private fun handleRequest(ctx: ChannelHandlerContext, msg: HttpRequest) {
val keepAlive: Boolean = HttpUtil.isKeepAlive(msg)
val method = msg.method()
if (method === HttpMethod.GET) {
@@ -43,49 +90,61 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
return
}
if (serverPrefix == prefix) {
try {
cache.get(key)
} catch(ex : Throwable) {
throw CacheException("Error accessing the cache backend", ex)
}?.let { channel ->
log.debug(ctx) {
"Cache hit for key '$key'"
}
val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK)
response.headers()[HttpHeaderNames.CONTENT_TYPE] = HttpHeaderValues.APPLICATION_OCTET_STREAM
if (!keepAlive) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.IDENTITY)
} else {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
}
ctx.write(response)
when (channel) {
is FileChannel -> {
if (keepAlive) {
ctx.write(DefaultFileRegion(channel, 0, channel.size()))
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate())
} else {
ctx.writeAndFlush(DefaultFileRegion(channel, 0, channel.size()))
.addListener(ChannelFutureListener.CLOSE)
cache.get(key, object : ResponseEventListener {
var first = false
override fun listen(evt: ResponseEvent) {
when (evt) {
is ResponseEvent.NoContent -> {
log.debug(ctx) {
"Cache miss for key '$key'"
}
val response =
DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response)
}
is ResponseEvent.ChunkReceived, is ResponseEvent.LastChunkReceived -> {
if (first) {
first = false
log.debug(ctx) {
"Cache hit for key '$key'"
}
val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK)
response.headers()[HttpHeaderNames.CONTENT_TYPE] =
HttpHeaderValues.APPLICATION_OCTET_STREAM
if (!keepAlive) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
response.headers()
.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.IDENTITY)
} else {
response.headers()
.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
response.headers()
.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
}
ctx.write(response)
}
if (evt is ResponseEvent.LastChunkReceived)
ctx.write(DefaultLastHttpContent(evt.chunk))
else if (evt is ResponseEvent.ChunkReceived)
ctx.write(DefaultHttpContent(evt.chunk))
ctx.flush()
}
is ResponseEvent.ExceptionCaught -> {
log.error(evt.cause.message, evt.cause)
val errorResponse = DefaultFullHttpResponse(
msg.protocolVersion(), HttpResponseStatus.INTERNAL_SERVER_ERROR,
evt.cause.message
?.let(String::toByteArray)
?.let(Unpooled::copiedBuffer)
)
ctx.write(errorResponse)
}
}
else -> {
ctx.write(ChunkedNioStream(channel)).addListener { evt ->
channel.close()
}
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate())
}
}
} ?: let {
log.debug(ctx) {
"Cache miss for key '$key'"
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response)
}
}).thenCompose(CallHandle<Void>::call)
} else {
log.warn(ctx) {
"Got request for unhandled path '${msg.uri()}'"
@@ -93,35 +152,22 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.BAD_REQUEST)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response)
ctx.channel().read()
}
} else if (method === HttpMethod.PUT) {
val path = Path.of(msg.uri())
val prefix = path.parent
val key = path.fileName.toString()
if (serverPrefix == prefix) {
log.debug(ctx) {
"Added value for key '$key' to build cache"
}
val bodyBytes = msg.content().run {
if (isDirect) {
ByteArray(readableBytes()).also {
readBytes(it)
}
} else {
array()
}
}
try {
cache.put(key, bodyBytes)
} catch(ex : Throwable) {
throw CacheException("Error accessing the cache backend", ex)
}
transientContext = TransientContext(key, cache.put(key))
val response = DefaultFullHttpResponse(
msg.protocolVersion(), HttpResponseStatus.CREATED,
Unpooled.copiedBuffer(key.toByteArray())
)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes()
// response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes()
ctx.writeAndFlush(response)
} else {
log.warn(ctx) {
@@ -131,9 +177,12 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
ctx.writeAndFlush(response)
}
} else if(method == HttpMethod.TRACE) {
} else if (method == HttpMethod.TRACE) {
val replayedRequestHead = ctx.alloc().buffer()
replayedRequestHead.writeCharSequence("TRACE ${Path.of(msg.uri())} ${msg.protocolVersion().text()}\r\n", Charsets.US_ASCII)
replayedRequestHead.writeCharSequence(
"TRACE ${Path.of(msg.uri())} ${msg.protocolVersion().text()}\r\n",
Charsets.US_ASCII
)
msg.headers().forEach { (key, value) ->
replayedRequestHead.apply {
writeCharSequence(key, Charsets.US_ASCII)
@@ -143,18 +192,30 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
}
}
replayedRequestHead.writeCharSequence("\r\n", Charsets.US_ASCII)
val requestBody = msg.content()
requestBody.retain()
val responseBody = ctx.alloc().compositeBuffer(2).apply {
addComponents(true, replayedRequestHead)
addComponents(true, requestBody)
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK, responseBody)
val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK)
response.headers().apply {
set(HttpHeaderNames.CONTENT_TYPE, "message/http")
set(HttpHeaderNames.CONTENT_LENGTH, responseBody.readableBytes())
}
ctx.writeAndFlush(response)
ctx.write(response)
ctx.writeAndFlush(DefaultHttpContent(replayedRequestHead))
val callHandle = object : CallHandle<Void> {
override fun postEvent(evt: RequestEvent) {
when (evt) {
is RequestEvent.ChunkSent -> {
ctx.writeAndFlush(DefaultHttpContent(evt.chunk))
}
is RequestEvent.LastChunkSent -> {
ctx.writeAndFlush(DefaultLastHttpContent(evt.chunk))
}
}
}
override fun call(): CompletableFuture<Void> {
return CompletableFuture.completedFuture(null)
}
}
transientContext = TransientContext(null, CompletableFuture.completedFuture(callHandle))
} else {
log.warn(ctx) {
"Got request with unhandled method '${msg.method().name()}'"
@@ -163,5 +224,6 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
ctx.writeAndFlush(response)
}
}
}

View File

@@ -183,7 +183,6 @@
<xs:element name="keystore" type="gbcs:keyStoreType" />
<xs:element name="truststore" type="gbcs:trustStoreType" minOccurs="0"/>
</xs:all>
<xs:attribute name="verify-clients" type="xs:boolean" use="optional" default="false"/>
</xs:complexType>
<xs:complexType name="keyStoreType">
@@ -197,6 +196,7 @@
<xs:attribute name="file" type="xs:string" use="required"/>
<xs:attribute name="password" type="xs:string"/>
<xs:attribute name="check-certificate-status" type="xs:boolean"/>
<xs:attribute name="require-client-certificate" type="xs:boolean" use="optional" default="false"/>
</xs:complexType>
<xs:complexType name="propertiesType">

View File

@@ -171,9 +171,8 @@ abstract class AbstractTlsServerTest : AbstractServerTest() {
),
Configuration.Tls(
Configuration.KeyStore(this.serverKeyStoreFile, null, SERVER_CERTIFICATE_ENTRY, PASSWORD),
Configuration.TrustStore(this.trustStoreFile, null, false),
true
),
Configuration.TrustStore(this.trustStoreFile, null, false, false),
)
)
Xml.write(Serializer.serialize(cfg), System.out)
}

View File

@@ -20,6 +20,7 @@ class ConfigurationTest {
"classpath:net/woggioni/gbcs/server/test/valid/gbcs-default.xml",
"classpath:net/woggioni/gbcs/server/test/valid/gbcs-memcached.xml",
"classpath:net/woggioni/gbcs/server/test/valid/gbcs-tls.xml",
"classpath:net/woggioni/gbcs/server/test/valid/gbcs-memcached-tls.xml",
]
)
@ParameterizedTest

View File

@@ -7,6 +7,7 @@ import org.bouncycastle.asn1.x500.X500Name
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Order
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.provider.ArgumentsSource
import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse

View File

@@ -0,0 +1,53 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<gbcs:server xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
xmlns:gbcs="urn:net.woggioni.gbcs.server"
xmlns:gbcs-memcached="urn:net.woggioni.gbcs.server.memcached"
xs:schemaLocation="urn:net.woggioni.gbcs.server.memcached jpms://net.woggioni.gbcs.server.memcached/net/woggioni/gbcs/server/memcached/schema/gbcs-memcached.xsd urn:net.woggioni.gbcs.server jpms://net.woggioni.gbcs.server/net/woggioni/gbcs/server/schema/gbcs.xsd"
>
<bind host="0.0.0.0" port="8443" incoming-connections-backlog-size="4096"/>
<connection
max-request-size="67108864"
idle-timeout="PT30S"
read-idle-timeout="PT60S"
write-idle-timeout="PT60S"
read-timeout="PT5M"
write-timeout="PT5M"/>
<event-executor use-virtual-threads="true"/>
<cache xs:type="gbcs-memcached:memcachedCacheType" max-age="P7D" max-size="16777216" compression-mode="zip">
<server host="memcached" port="11211"/>
</cache>
<authorization>
<users>
<user name="woggioni">
<quota calls="1000" period="PT1S"/>
</user>
<user name="gitea">
<quota calls="10" period="PT1S" initial-available-calls="100" max-available-calls="100"/>
</user>
<anonymous>
<quota calls="2" period="PT5S"/>
</anonymous>
</users>
<groups>
<group name="writers">
<users>
<user ref="woggioni"/>
<user ref="gitea"/>
</users>
<roles>
<reader/>
<writer/>
</roles>
</group>
</groups>
</authorization>
<authentication>
<client-certificate>
<user-extractor attribute-name="CN" pattern="(.*)"/>
</client-certificate>
</authentication>
<tls>
<keystore file="/home/luser/ssl/gbcs.woggioni.net.pfx" key-alias="gbcs.woggioni.net" password="KEYSTORE_PASSWOR" key-password="KEY_PASSWORD"/>
<truststore file="/home/luser/ssl/woggioni.net.pfx" check-certificate-status="false" password="TRUSTSTORE_PASSWORD"/>
</tls>
</gbcs:server>

View File

@@ -60,8 +60,8 @@
<user-extractor pattern="user-pattern" attribute-name="CN"/>
</client-certificate>
</authentication>
<tls verify-clients="true">
<tls>
<keystore file="keystore.pfx" key-alias="key1" password="password" key-password="key-password"/>
<truststore file="truststore.pfx" password="password" check-certificate-status="true" />
<truststore file="truststore.pfx" password="password" check-certificate-status="true" require-client-certificate="true"/>
</tls>
</gbcs:server>

View File

@@ -2,9 +2,9 @@ org.gradle.configuration-cache=false
org.gradle.parallel=true
org.gradle.caching=true
gbcs.version = 0.0.11
gbcs.version = 0.1.1
lys.version = 2025.01.25
lys.version = 2025.01.31
gitea.maven.url = https://gitea.woggioni.net/api/packages/woggioni/maven
docker.registry.url=gitea.woggioni.net