Compare commits

...

34 Commits

Author SHA1 Message Date
0463038aaa first commit with streaming support (buggy and unreliable) 2025-02-13 23:02:08 +08:00
7eca8a270d 0.1.6 release
All checks were successful
CI / build (push) Successful in 3m29s
2025-02-08 00:54:25 +08:00
84d7c977f9 added randomizer to retries 2025-02-07 23:19:13 +08:00
317eadce07 used virtual thread for garbage colection in FileSystemCache
All checks were successful
CI / build (push) Successful in 2m32s
2025-02-07 20:45:29 +08:00
af79e74b95 fixed max message size for memcache backend 2025-02-06 23:09:22 +08:00
78ae21caa4 0.1.4 release
All checks were successful
CI / build (push) Successful in 8m14s
2025-02-06 15:24:00 +08:00
6c0eadb9fb renamed project to "Remote Cache Build Server" (RBCS) 2025-02-06 15:20:50 +08:00
5fef1b932e updated lys-catalog version
All checks were successful
CI / build (push) Successful in 2m32s
2025-02-05 21:49:08 +08:00
5e173dbf62 fixed unit tests 2025-02-05 21:24:10 +08:00
53b24e3d54 improved benchmark accuracy 2025-02-05 19:10:25 +08:00
7d0f24fa58 fixed memory leak in InMemoryCache 2025-02-05 19:09:51 +08:00
1b6cf1bd96 fixed memory leak in memcached plugin 2025-02-05 14:41:11 +08:00
4180df2352 added healthcheck command to client 2025-02-05 00:02:17 +08:00
c2e388b931 switched to ZGC in docker image
All checks were successful
CI / build (push) Successful in 3m28s
2025-02-04 22:46:34 +08:00
6c62ac85c0 implemented memcached client with Netty
All checks were successful
CI / build (push) Successful in 1m46s
2025-02-04 22:09:28 +08:00
89153b60f8 fixed race condition in InMemoryCache 2025-02-01 10:14:13 +08:00
a2a40ab60f added semaphore to benchmark command 2025-01-28 00:00:07 +08:00
45458761f3 made TLS client certificate request from the server configurable
All checks were successful
CI / build (push) Successful in 4m2s
2025-01-27 13:32:04 +08:00
90a5834f5f added retry policy to gbcs-client 2025-01-27 13:12:12 +08:00
1823d0b9ca fixed throttling retry-after estimation
All checks were successful
CI / build (push) Successful in 3m9s
2025-01-25 01:25:01 +08:00
649cbba954 initial-available-calls is a positive integer
Some checks failed
CI / build (push) Failing after 2m49s
2025-01-24 21:19:40 +08:00
eb9ccce3be fixed exception handling in the client
Some checks failed
CI / build (push) Failing after 2m49s
2025-01-24 19:56:50 +08:00
316f64cf9d fixed bug with server timeouts
Some checks failed
CI / build (push) Failing after 2m47s
2025-01-24 18:48:25 +08:00
24a49779f9 fixed bug
Some checks failed
CI / build (push) Failing after 2m57s
2025-01-24 18:15:06 +08:00
423b749db9 added throttling
All checks were successful
CI / build (push) Successful in 2m39s
2025-01-24 16:53:19 +08:00
9ce3e7fa0a small refactor 2025-01-20 23:31:00 +08:00
1e6ece37a5 added remote address to logs
Some checks failed
CI / build (push) Failing after 1s
2025-01-20 20:50:48 +08:00
fc9900d821 fixed bug in the server configuration parser
All checks were successful
CI / build (push) Successful in 2m50s
added Jacoco test report
2025-01-20 20:23:09 +08:00
1a78c8092b fixed client bug (unhandled connection touts)
All checks were successful
CI / build (push) Successful in 3m7s
2025-01-20 19:18:20 +08:00
3d1847c408 added server timeouts
All checks were successful
CI / build (push) Successful in 3m18s
2025-01-20 15:45:13 +08:00
702556bfbb added parameter to configure incoming connections backlog size
All checks were successful
CI / build (push) Successful in 2m12s
2025-01-20 10:22:03 +08:00
06e9e7ca09 small optimization to make authenticator a singleton
All checks were successful
CI / build (push) Successful in 1m50s
2025-01-20 09:05:53 +08:00
fa5bb55baa uniformed xml configuration attributes, added max-request-size parameter 2025-01-20 08:24:44 +08:00
007d0fffd6 removed deployment related files 2025-01-17 22:42:11 +08:00
145 changed files with 4102 additions and 1598 deletions

View File

@@ -9,11 +9,6 @@ jobs:
steps:
- name: Checkout sources
uses: actions/checkout@v4
- name: Setup Java
uses: actions/setup-java@v4
with:
distribution: graalvm
java-version: 21
- name: Setup Gradle
uses: gradle/actions/setup-gradle@v3
- name: Execute Gradle build
@@ -36,7 +31,7 @@ jobs:
username: woggioni
password: ${{ secrets.PUBLISHER_TOKEN }}
-
name: Build gbcs Docker image
name: Build rbcs Docker image
uses: docker/build-push-action@v5.3.0
with:
context: "docker/build/docker"
@@ -44,12 +39,12 @@ jobs:
push: true
pull: true
tags: |
gitea.woggioni.net/woggioni/gbcs:latest
gitea.woggioni.net/woggioni/gbcs:${{ steps.retrieve-version.outputs.VERSION }}
gitea.woggioni.net/woggioni/rbcs:latest
gitea.woggioni.net/woggioni/rbcs:${{ steps.retrieve-version.outputs.VERSION }}
target: release
cache-from: type=registry,ref=gitea.woggioni.net/woggioni/gbcs:buildx
cache-from: type=registry,ref=gitea.woggioni.net/woggioni/rbcs:buildx
-
name: Build gbcs memcached Docker image
name: Build rbcs memcache Docker image
uses: docker/build-push-action@v5.3.0
with:
context: "docker/build/docker"
@@ -57,11 +52,11 @@ jobs:
push: true
pull: true
tags: |
gitea.woggioni.net/woggioni/gbcs:memcached
gitea.woggioni.net/woggioni/gbcs:memcached-${{ steps.retrieve-version.outputs.VERSION }}
target: release-memcached
cache-from: type=registry,ref=gitea.woggioni.net/woggioni/gbcs:buildx
cache-to: type=registry,mode=max,compression=zstd,image-manifest=true,oci-mediatypes=true,ref=gitea.woggioni.net/woggioni/gbcs:buildx
gitea.woggioni.net/woggioni/rbcs:memcache
gitea.woggioni.net/woggioni/rbcs:memcache-${{ steps.retrieve-version.outputs.VERSION }}
target: release-memcache
cache-from: type=registry,ref=gitea.woggioni.net/woggioni/rbcs:buildx
cache-to: type=registry,mode=max,compression=zstd,image-manifest=true,oci-mediatypes=true,ref=gitea.woggioni.net/woggioni/rbcs:buildx
- name: Publish artifacts
env:
PUBLISHER_TOKEN: ${{ secrets.PUBLISHER_TOKEN }}

2
.gitignore vendored
View File

@@ -4,4 +4,4 @@
# Ignore Gradle build output directory
build
gbcs-cli/native-image/*.json
rbcs-cli/native-image/*.json

View File

@@ -1,2 +0,0 @@
FROM gitea.woggioni.net/woggioni/gbcs:memcached
COPY --chown=luser:luser conf/gbcs-memcached.xml /home/luser/.config/gbcs/gbcs.xml

0
README.md Normal file
View File

View File

@@ -15,7 +15,7 @@ allprojects { subproject ->
version = project.currentTag.map { it[0] }.get()
} else {
version = project.gitRevision.map { gitRevision ->
"${getProperty('gbcs.version')}.${gitRevision[0..10]}"
"${getProperty('rbcs.version')}.${gitRevision[0..10]}"
}.get()
}
@@ -46,6 +46,12 @@ allprojects { subproject ->
}
}
dependencies {
testImplementation catalog.junit.jupiter.api
testImplementation catalog.junit.jupiter.params
testRuntimeOnly catalog.junit.jupiter.engine
}
test {
useJUnitPlatform()
}
@@ -66,6 +72,15 @@ allprojects { subproject ->
}
}
pluginManager.withPlugin('jacoco') {
test {
finalizedBy jacocoTestReport
}
jacocoTestReport {
dependsOn test
}
}
pluginManager.withPlugin(catalog.plugins.kotlin.jvm.get().pluginId) {
tasks.withType(KotlinCompile.class) {
compilerOptions.jvmTarget = JvmTarget.JVM_21

View File

@@ -1,13 +0,0 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<gbcs:server useVirtualThreads="true" xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
xmlns:gbcs="urn:net.woggioni.gbcs"
xmlns:gbcs-memcached="urn:net.woggioni.gbcs-memcached"
xs:schemaLocation="urn:net.woggioni.gbcs-memcached jpms://net.woggioni.gbcs.memcached/net/woggioni/gbcs/memcached/schema/gbcs-memcached.xsd urn:net.woggioni.gbcs jpms://net.woggioni.gbcs/net/woggioni/gbcs/schema/gbcs.xsd">
<bind host="0.0.0.0" port="13080" />
<cache xs:type="gbcs-memcached:memcachedCacheType" max-age="P7D" max-size="16777216" compression-mode="zip">
<server host="memcached" port="11211"/>
</cache>
<authentication>
<none/>
</authentication>
</gbcs:server>

View File

@@ -1,36 +0,0 @@
networks:
default:
external: false
ipam:
driver: default
config:
- subnet: 172.118.0.0/16
ip_range: 172.118.0.0/16
gateway: 172.118.0.254
services:
gbcs:
build:
context: .
container_name: gbcs
restart: unless-stopped
ports:
- "127.0.0.1:8080:13080"
- "[::1]:8080:13080"
depends_on:
memcached:
condition: service_started
deploy:
resources:
limits:
cpus: "2.00"
memory: 256M
memcached:
image: memcached
container_name: memcached
restart: unless-stopped
command: -I 64m -m 900m
deploy:
resources:
limits:
cpus: "1.00"
memory: 1G

View File

@@ -1,21 +1,16 @@
FROM alpine:latest AS base-release
RUN --mount=type=cache,target=/var/cache/apk apk update
RUN --mount=type=cache,target=/var/cache/apk apk add openjdk21-jre
FROM eclipse-temurin:21-jre-alpine AS base-release
RUN adduser -D luser
USER luser
WORKDIR /home/luser
FROM base-release AS release
ADD gbcs-cli-envelope-*.jar gbcs.jar
ENTRYPOINT ["java", "-jar", "/home/luser/gbcs.jar", "server"]
ADD rbcs-cli-envelope-*.jar rbcs.jar
ENTRYPOINT ["java", "-XX:+UseZGC", "-XX:+ZGenerational", "-jar", "/home/luser/rbcs.jar", "server"]
FROM base-release AS release-memcached
ADD --chown=luser:luser gbcs-cli-envelope-*.jar gbcs.jar
FROM base-release AS release-memcache
ADD --chown=luser:luser rbcs-cli-envelope-*.jar rbcs.jar
RUN mkdir plugins
WORKDIR /home/luser/plugins
RUN --mount=type=bind,source=.,target=/build/distributions tar -xf /build/distributions/gbcs-server-memcached*.tar
RUN --mount=type=bind,source=.,target=/build/distributions tar -xf /build/distributions/rbcs-server-memcache*.tar
WORKDIR /home/luser
ENTRYPOINT ["java", "-jar", "/home/luser/gbcs.jar", "server"]
FROM release-memcached as compose
COPY --chown=luser:luser conf/gbcs-memcached.xml /home/luser/.config/gbcs/gbcs.xml
ENTRYPOINT ["java", "-XX:+UseZGC", "-XX:+ZGenerational", "-jar", "/home/luser/rbcs.jar", "server"]

View File

@@ -18,8 +18,8 @@ configurations {
}
dependencies {
docker project(path: ':gbcs-cli', configuration: 'release')
docker project(path: ':gbcs-server-memcached', configuration: 'release')
docker project(path: ':rbcs-cli', configuration: 'release')
docker project(path: ':rbcs-server-memcache', configuration: 'release')
}
Provider<Task> cleanTaskProvider = tasks.named(BasePlugin.CLEAN_TASK_NAME) {}
@@ -35,33 +35,33 @@ Provider<Copy> prepareDockerBuild = tasks.register('prepareDockerBuild', Copy) {
Provider<DockerBuildImage> dockerBuild = tasks.register('dockerBuildImage', DockerBuildImage) {
group = 'docker'
dependsOn prepareDockerBuild
images.add('gitea.woggioni.net/woggioni/gbcs:latest')
images.add("gitea.woggioni.net/woggioni/gbcs:${version}")
images.add('gitea.woggioni.net/woggioni/rbcs:latest')
images.add("gitea.woggioni.net/woggioni/rbcs:${version}")
}
Provider<DockerTagImage> dockerTag = tasks.register('dockerTagImage', DockerTagImage) {
group = 'docker'
repository = 'gitea.woggioni.net/woggioni/gbcs'
imageId = 'gitea.woggioni.net/woggioni/gbcs:latest'
repository = 'gitea.woggioni.net/woggioni/rbcs'
imageId = 'gitea.woggioni.net/woggioni/rbcs:latest'
tag = version
}
Provider<DockerTagImage> dockerTagMemcached = tasks.register('dockerTagMemcachedImage', DockerTagImage) {
Provider<DockerTagImage> dockerTagMemcache = tasks.register('dockerTagMemcacheImage', DockerTagImage) {
group = 'docker'
repository = 'gitea.woggioni.net/woggioni/gbcs'
imageId = 'gitea.woggioni.net/woggioni/gbcs:memcached'
tag = "${version}-memcached"
repository = 'gitea.woggioni.net/woggioni/rbcs'
imageId = 'gitea.woggioni.net/woggioni/rbcs:memcache'
tag = "${version}-memcache"
}
Provider<DockerPushImage> dockerPush = tasks.register('dockerPushImage', DockerPushImage) {
group = 'docker'
dependsOn dockerTag, dockerTagMemcached
dependsOn dockerTag, dockerTagMemcache
registryCredentials {
url = getProperty('docker.registry.url')
username = 'woggioni'
password = System.getenv().get("PUBLISHER_TOKEN")
}
images = [dockerTag.flatMap{ it.tag }, dockerTagMemcached.flatMap{ it.tag }]
images = [dockerTag.flatMap{ it.tag }, dockerTagMemcache.flatMap{ it.tag }]
}

View File

@@ -1,6 +0,0 @@
module net.woggioni.gbcs.api {
requires static lombok;
requires java.xml;
exports net.woggioni.gbcs.api;
exports net.woggioni.gbcs.api.exception;
}

View File

@@ -1,11 +0,0 @@
package net.woggioni.gbcs.api;
import net.woggioni.gbcs.api.exception.ContentTooLargeException;
import java.nio.channels.ReadableByteChannel;
public interface Cache extends AutoCloseable {
ReadableByteChannel get(String key);
void put(String key, byte[] content) throws ContentTooLargeException;
}

View File

@@ -1,7 +0,0 @@
package net.woggioni.gbcs.api.exception;
public class GbcsException extends RuntimeException {
public GbcsException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@@ -1,17 +0,0 @@
module net.woggioni.gbcs.cli {
requires org.slf4j;
requires net.woggioni.gbcs.server;
requires info.picocli;
requires net.woggioni.gbcs.common;
requires net.woggioni.gbcs.client;
requires kotlin.stdlib;
requires net.woggioni.jwo;
requires net.woggioni.gbcs.api;
exports net.woggioni.gbcs.cli.impl.converters to info.picocli;
opens net.woggioni.gbcs.cli.impl.commands to info.picocli;
opens net.woggioni.gbcs.cli.impl to info.picocli;
opens net.woggioni.gbcs.cli to info.picocli, net.woggioni.gbcs.common;
exports net.woggioni.gbcs.cli;
}

View File

@@ -1,63 +0,0 @@
package net.woggioni.gbcs.cli
import net.woggioni.gbcs.common.GbcsUrlStreamHandlerFactory
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.cli.impl.AbstractVersionProvider
import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.cli.impl.commands.BenchmarkCommand
import net.woggioni.gbcs.cli.impl.commands.ClientCommand
import net.woggioni.gbcs.cli.impl.commands.GetCommand
import net.woggioni.gbcs.cli.impl.commands.PasswordHashCommand
import net.woggioni.gbcs.cli.impl.commands.PutCommand
import net.woggioni.gbcs.cli.impl.commands.ServerCommand
import net.woggioni.jwo.Application
import picocli.CommandLine
import picocli.CommandLine.Model.CommandSpec
@CommandLine.Command(
name = "gbcs", versionProvider = GradleBuildCacheServerCli.VersionProvider::class
)
class GradleBuildCacheServerCli : GbcsCommand() {
class VersionProvider : AbstractVersionProvider()
companion object {
@JvmStatic
fun main(vararg args: String) {
Thread.currentThread().contextClassLoader = GradleBuildCacheServerCli::class.java.classLoader
GbcsUrlStreamHandlerFactory.install()
val log = contextLogger()
val app = Application.builder("gbcs")
.configurationDirectoryEnvVar("GBCS_CONFIGURATION_DIR")
.configurationDirectoryPropertyKey("net.woggioni.gbcs.conf.dir")
.build()
val gbcsCli = GradleBuildCacheServerCli()
val commandLine = CommandLine(gbcsCli)
commandLine.setExecutionExceptionHandler { ex, cl, parseResult ->
log.error(ex.message, ex)
CommandLine.ExitCode.SOFTWARE
}
commandLine.addSubcommand(ServerCommand(app))
commandLine.addSubcommand(PasswordHashCommand())
commandLine.addSubcommand(
CommandLine(ClientCommand(app)).apply {
addSubcommand(BenchmarkCommand())
addSubcommand(PutCommand())
addSubcommand(GetCommand())
})
System.exit(commandLine.execute(*args))
}
}
@CommandLine.Option(names = ["-V", "--version"], versionHelp = true)
var versionHelp = false
private set
@CommandLine.Spec
private lateinit var spec: CommandSpec
override fun run() {
spec.commandLine().usage(System.out);
}
}

View File

@@ -1,132 +0,0 @@
package net.woggioni.gbcs.cli.impl.commands
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.common.error
import net.woggioni.gbcs.common.info
import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.client.GbcsClient
import picocli.CommandLine
import java.security.SecureRandom
import java.time.Duration
import java.time.Instant
import java.util.Base64
import java.util.concurrent.ExecutionException
import java.util.concurrent.Future
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.atomic.AtomicLong
import kotlin.random.Random
@CommandLine.Command(
name = "benchmark",
description = ["Run a load test against the server"],
showDefaultValues = true
)
class BenchmarkCommand : GbcsCommand() {
private val log = contextLogger()
@CommandLine.Spec
private lateinit var spec: CommandLine.Model.CommandSpec
@CommandLine.Option(
names = ["-e", "--entries"],
description = ["Total number of elements to be added to the cache"],
paramLabel = "NUMBER_OF_ENTRIES"
)
private var numberOfEntries = 1000
override fun run() {
val clientCommand = spec.parent().userObject() as ClientCommand
val profile = clientCommand.profileName.let { profileName ->
clientCommand.configuration.profiles[profileName]
?: throw IllegalArgumentException("Profile $profileName does not exist in configuration")
}
val client = GbcsClient(profile)
val entryGenerator = sequence {
val random = Random(SecureRandom.getInstance("NativePRNGNonBlocking").nextLong())
while (true) {
val key = Base64.getUrlEncoder().encode(random.nextBytes(16)).toString(Charsets.UTF_8)
val value = random.nextBytes(0x1000)
yield(key to value)
}
}
val entries = let {
val completionQueue = LinkedBlockingQueue<Future<Pair<String, ByteArray>>>(numberOfEntries)
val start = Instant.now()
val totalElapsedTime = AtomicLong(0)
entryGenerator.take(numberOfEntries).forEach { entry ->
val requestStart = System.nanoTime()
val future = client.put(entry.first, entry.second).thenApply { entry }
future.whenComplete { _, _ ->
totalElapsedTime.addAndGet((System.nanoTime() - requestStart))
completionQueue.put(future)
}
}
val inserted = sequence<Pair<String, ByteArray>> {
var completionCounter = 0
while (completionCounter < numberOfEntries) {
val future = completionQueue.take()
try {
yield(future.get())
} catch (ee: ExecutionException) {
val cause = ee.cause ?: ee
log.error(cause.message, cause)
}
completionCounter += 1
}
}.toList()
val end = Instant.now()
log.info {
val elapsed = Duration.between(start, end).toMillis()
"Insertion rate: ${numberOfEntries.toDouble() / elapsed * 1000} ops/s"
}
log.info {
"Average time per insertion: ${totalElapsedTime.get() / numberOfEntries.toDouble() * 1000} ms"
}
inserted
}
log.info {
"Inserted ${entries.size} entries"
}
if (entries.isNotEmpty()) {
val completionQueue = LinkedBlockingQueue<Future<Unit>>(entries.size)
val start = Instant.now()
val totalElapsedTime = AtomicLong(0)
entries.forEach { entry ->
val requestStart = System.nanoTime()
val future = client.get(entry.first).thenApply {
totalElapsedTime.addAndGet((System.nanoTime() - requestStart))
if (it == null) {
log.error {
"Missing entry for key '${entry.first}'"
}
} else if (!entry.second.contentEquals(it)) {
log.error {
"Retrieved a value different from what was inserted for key '${entry.first}'"
}
}
}
future.whenComplete { _, _ ->
completionQueue.put(future)
}
}
var completionCounter = 0
while (completionCounter < entries.size) {
completionQueue.take()
completionCounter += 1
}
val end = Instant.now()
log.info {
val elapsed = Duration.between(start, end).toMillis()
"Retrieval rate: ${entries.size.toDouble() / elapsed * 1000} ops/s"
}
log.info {
"Average time per retrieval: ${totalElapsedTime.get() / numberOfEntries.toDouble() * 1e6} ms"
}
} else {
log.error("Skipping retrieval benchmark as it was not possible to insert any entry in the cache")
}
}
}

View File

@@ -1,15 +0,0 @@
plugins {
id 'java-library'
alias catalog.plugins.kotlin.jvm
}
dependencies {
implementation project(':gbcs-api')
implementation project(':gbcs-common')
implementation catalog.picocli
implementation catalog.slf4j.api
implementation catalog.netty.buffer
implementation catalog.netty.codec.http
}

View File

@@ -1,69 +0,0 @@
package net.woggioni.gbcs.client.impl
import net.woggioni.gbcs.api.exception.ConfigurationException
import net.woggioni.gbcs.common.Xml.Companion.asIterable
import net.woggioni.gbcs.common.Xml.Companion.renderAttribute
import net.woggioni.gbcs.client.GbcsClient
import org.w3c.dom.Document
import java.net.URI
import java.nio.file.Files
import java.nio.file.Path
import java.security.KeyStore
import java.security.PrivateKey
import java.security.cert.X509Certificate
object Parser {
fun parse(document: Document): GbcsClient.Configuration {
val root = document.documentElement
val profiles = mutableMapOf<String, GbcsClient.Configuration.Profile>()
for (child in root.asIterable()) {
val tagName = child.localName
when (tagName) {
"profile" -> {
val name = child.renderAttribute("name") ?: throw ConfigurationException("name attribute is required")
val uri = child.renderAttribute("base-url")?.let(::URI) ?: throw ConfigurationException("base-url attribute is required")
var authentication: GbcsClient.Configuration.Authentication? = null
for (gchild in child.asIterable()) {
when (gchild.localName) {
"tls-client-auth" -> {
val keyStoreFile = gchild.renderAttribute("key-store-file")
val keyStorePassword =
gchild.renderAttribute("key-store-password")
val keyAlias = gchild.renderAttribute("key-alias")
val keyPassword = gchild.renderAttribute("key-password")
val keystore = KeyStore.getInstance("PKCS12").apply {
Files.newInputStream(Path.of(keyStoreFile)).use {
load(it, keyStorePassword?.toCharArray())
}
}
val key = keystore.getKey(keyAlias, keyPassword?.toCharArray()) as PrivateKey
val certChain = keystore.getCertificateChain(keyAlias).asSequence()
.map { it as X509Certificate }
.toList()
.toTypedArray()
authentication =
GbcsClient.Configuration.Authentication.TlsClientAuthenticationCredentials(key, certChain)
}
"basic-auth" -> {
val username = gchild.renderAttribute("user") ?: throw ConfigurationException("username attribute is required")
val password = gchild.renderAttribute("password") ?: throw ConfigurationException("password attribute is required")
authentication =
GbcsClient.Configuration.Authentication.BasicAuthenticationCredentials(username, password)
}
}
}
val maxConnections = child.renderAttribute("max-connections")
?.let(String::toInt)
?: 50
profiles[name] = GbcsClient.Configuration.Profile(uri, authentication, maxConnections)
}
}
}
return GbcsClient.Configuration(profiles)
}
}

View File

@@ -1,16 +0,0 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<gbcs-client:profiles xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
xmlns:gbcs-client="urn:net.woggioni.gbcs.client"
xs:schemaLocation="urn:net.woggioni.gbcs.client jms://net.woggioni.gbcs.client/net/woggioni/gbcs/client/schema/gbcs-client.xsd"
>
<profile name="profile1" base-url="https://gbcs1.example.com/">
<tls-client-auth
key-store-file="keystore.pfx"
key-store-password="password"
key-alias="woggioni@c962475fa38"
key-password="key-password"/>
</profile>
<profile name="profile2" base-url="https://gbcs2.example.com/">
<basic-auth user="user" password="password"/>
</profile>
</gbcs-client:profiles>

View File

@@ -1,9 +0,0 @@
module net.woggioni.gbcs.common {
requires java.xml;
requires java.logging;
requires org.slf4j;
requires kotlin.stdlib;
requires net.woggioni.jwo;
exports net.woggioni.gbcs.common;
}

View File

@@ -1,12 +0,0 @@
package net.woggioni.gbcs.common
import java.net.URI
import java.net.URL
object GBCS {
fun String.toUrl() : URL = URL.of(URI(this), null)
const val GBCS_NAMESPACE_URI: String = "urn:net.woggioni.gbcs.server"
const val GBCS_PREFIX: String = "gbcs"
const val XML_SCHEMA_NAMESPACE_URI = "http://www.w3.org/2001/XMLSchema-instance"
}

View File

@@ -1,14 +0,0 @@
import net.woggioni.gbcs.api.CacheProvider;
module net.woggioni.gbcs.server.memcached {
requires net.woggioni.gbcs.common;
requires net.woggioni.gbcs.api;
requires com.googlecode.xmemcached;
requires net.woggioni.jwo;
requires java.xml;
requires kotlin.stdlib;
provides CacheProvider with net.woggioni.gbcs.server.memcached.MemcachedCacheProvider;
opens net.woggioni.gbcs.server.memcached.schema;
}

View File

@@ -1,59 +0,0 @@
package net.woggioni.gbcs.server.memcached
import net.rubyeye.xmemcached.XMemcachedClientBuilder
import net.rubyeye.xmemcached.command.BinaryCommandFactory
import net.rubyeye.xmemcached.transcoders.CompressionMode
import net.rubyeye.xmemcached.transcoders.SerializingTranscoder
import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.api.exception.ContentTooLargeException
import net.woggioni.gbcs.common.HostAndPort
import net.woggioni.jwo.JWO
import java.io.ByteArrayInputStream
import java.net.InetSocketAddress
import java.nio.channels.Channels
import java.nio.channels.ReadableByteChannel
import java.nio.charset.StandardCharsets
import java.security.MessageDigest
import java.time.Duration
class MemcachedCache(
servers: List<HostAndPort>,
private val maxAge: Duration,
maxSize : Int,
digestAlgorithm: String?,
compressionMode: CompressionMode,
) : Cache {
private val memcachedClient = XMemcachedClientBuilder(
servers.stream().map { addr: HostAndPort -> InetSocketAddress(addr.host, addr.port) }.toList()
).apply {
commandFactory = BinaryCommandFactory()
digestAlgorithm?.let { dAlg ->
setKeyProvider { key ->
val md = MessageDigest.getInstance(dAlg)
md.update(key.toByteArray(StandardCharsets.UTF_8))
JWO.bytesToHex(md.digest())
}
}
transcoder = SerializingTranscoder(maxSize).apply {
setCompressionMode(compressionMode)
}
}.build()
override fun get(key: String): ReadableByteChannel? {
return memcachedClient.get<ByteArray>(key)
?.let(::ByteArrayInputStream)
?.let(Channels::newChannel)
}
override fun put(key: String, content: ByteArray) {
try {
memcachedClient[key, maxAge.toSeconds().toInt()] = content
} catch (e: IllegalArgumentException) {
throw ContentTooLargeException(e.message, e)
}
}
override fun close() {
memcachedClient.shutdown()
}
}

View File

@@ -1,26 +0,0 @@
package net.woggioni.gbcs.server.memcached
import net.rubyeye.xmemcached.transcoders.CompressionMode
import net.woggioni.gbcs.api.Configuration
import net.woggioni.gbcs.common.HostAndPort
import java.time.Duration
data class MemcachedCacheConfiguration(
var servers: List<HostAndPort>,
var maxAge: Duration = Duration.ofDays(1),
var maxSize: Int = 0x100000,
var digestAlgorithm: String? = null,
var compressionMode: CompressionMode = CompressionMode.ZIP,
) : Configuration.Cache {
override fun materialize() = MemcachedCache(
servers,
maxAge,
maxSize,
digestAlgorithm,
compressionMode
)
override fun getNamespaceURI() = "urn:net.woggioni.gbcs.server.memcached"
override fun getTypeName() = "memcachedCacheType"
}

View File

@@ -1,88 +0,0 @@
package net.woggioni.gbcs.server.memcached
import net.rubyeye.xmemcached.transcoders.CompressionMode
import net.woggioni.gbcs.api.CacheProvider
import net.woggioni.gbcs.api.exception.ConfigurationException
import net.woggioni.gbcs.common.GBCS
import net.woggioni.gbcs.common.HostAndPort
import net.woggioni.gbcs.common.Xml
import net.woggioni.gbcs.common.Xml.Companion.asIterable
import net.woggioni.gbcs.common.Xml.Companion.renderAttribute
import org.w3c.dom.Document
import org.w3c.dom.Element
import java.time.Duration
class MemcachedCacheProvider : CacheProvider<MemcachedCacheConfiguration> {
override fun getXmlSchemaLocation() = "jpms://net.woggioni.gbcs.server.memcached/net/woggioni/gbcs/server/memcached/schema/gbcs-memcached.xsd"
override fun getXmlType() = "memcachedCacheType"
override fun getXmlNamespace() = "urn:net.woggioni.gbcs.server.memcached"
val xmlNamespacePrefix : String
get() = "gbcs-memcached"
override fun deserialize(el: Element): MemcachedCacheConfiguration {
val servers = mutableListOf<HostAndPort>()
val maxAge = el.renderAttribute("max-age")
?.let(Duration::parse)
?: Duration.ofDays(1)
val maxSize = el.renderAttribute("max-size")
?.let(String::toInt)
?: 0x100000
val compressionMode = el.renderAttribute("compression-mode")
?.let {
when (it) {
"gzip" -> CompressionMode.GZIP
"zip" -> CompressionMode.ZIP
else -> CompressionMode.ZIP
}
}
?: CompressionMode.ZIP
val digestAlgorithm = el.renderAttribute("digest")
for (child in el.asIterable()) {
when (child.nodeName) {
"server" -> {
val host = child.renderAttribute("host") ?: throw ConfigurationException("host attribute is required")
val port = child.renderAttribute("port")?.toInt() ?: throw ConfigurationException("port attribute is required")
servers.add(HostAndPort(host, port))
}
}
}
return MemcachedCacheConfiguration(
servers,
maxAge,
maxSize,
digestAlgorithm,
compressionMode,
)
}
override fun serialize(doc: Document, cache: MemcachedCacheConfiguration) = cache.run {
val result = doc.createElement("cache")
Xml.of(doc, result) {
attr("xmlns:${xmlNamespacePrefix}", xmlNamespace, namespaceURI = "http://www.w3.org/2000/xmlns/")
attr("xs:type", "${xmlNamespacePrefix}:$xmlType", GBCS.XML_SCHEMA_NAMESPACE_URI)
for (server in servers) {
node("server") {
attr("host", server.host)
attr("port", server.port.toString())
}
}
attr("max-age", maxAge.toString())
attr("max-size", maxSize.toString())
digestAlgorithm?.let { digestAlgorithm ->
attr("digest", digestAlgorithm)
}
attr(
"compression-mode", when (compressionMode) {
CompressionMode.GZIP -> "gzip"
CompressionMode.ZIP -> "zip"
}
)
}
result
}
}

View File

@@ -1 +0,0 @@
net.woggioni.gbcs.server.memcached.MemcachedCacheProvider

View File

@@ -1,35 +0,0 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<xs:schema targetNamespace="urn:net.woggioni.gbcs.server.memcached"
xmlns:gbcs-memcached="urn:net.woggioni.gbcs.server.memcached"
xmlns:gbcs="urn:net.woggioni.gbcs.server"
xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:import schemaLocation="jpms://net.woggioni.gbcs.server/net/woggioni/gbcs/server/schema/gbcs.xsd" namespace="urn:net.woggioni.gbcs.server"/>
<xs:complexType name="memcachedServerType">
<xs:attribute name="host" type="xs:token" use="required"/>
<xs:attribute name="port" type="xs:positiveInteger" use="required"/>
</xs:complexType>
<xs:complexType name="memcachedCacheType">
<xs:complexContent>
<xs:extension base="gbcs:cacheType">
<xs:sequence maxOccurs="unbounded">
<xs:element name="server" type="gbcs-memcached:memcachedServerType"/>
</xs:sequence>
<xs:attribute name="max-age" type="xs:duration" default="P1D"/>
<xs:attribute name="max-size" type="xs:unsignedInt" default="1048576"/>
<xs:attribute name="digest" type="xs:token" />
<xs:attribute name="compression-mode" type="gbcs-memcached:compressionType" default="zip"/>
</xs:extension>
</xs:complexContent>
</xs:complexType>
<xs:simpleType name="compressionType">
<xs:restriction base="xs:token">
<xs:enumeration value="zip"/>
<xs:enumeration value="gzip"/>
</xs:restriction>
</xs:simpleType>
</xs:schema>

View File

@@ -1,28 +0,0 @@
import net.woggioni.gbcs.api.CacheProvider;
import net.woggioni.gbcs.server.cache.FileSystemCacheProvider;
module net.woggioni.gbcs.server {
requires java.sql;
requires java.xml;
requires java.logging;
requires java.naming;
requires kotlin.stdlib;
requires io.netty.buffer;
requires io.netty.transport;
requires io.netty.codec.http;
requires io.netty.common;
requires io.netty.handler;
requires io.netty.codec;
requires org.slf4j;
requires net.woggioni.jwo;
requires net.woggioni.gbcs.common;
requires net.woggioni.gbcs.api;
exports net.woggioni.gbcs.server;
opens net.woggioni.gbcs.server;
opens net.woggioni.gbcs.server.schema;
uses CacheProvider;
provides CacheProvider with FileSystemCacheProvider;
}

View File

@@ -1,133 +0,0 @@
package net.woggioni.gbcs.server.cache
import net.woggioni.gbcs.api.Cache
import net.woggioni.jwo.JWO
import net.woggioni.jwo.LockFile
import java.nio.channels.Channels
import java.nio.channels.FileChannel
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardCopyOption
import java.nio.file.StandardOpenOption
import java.nio.file.attribute.BasicFileAttributes
import java.security.MessageDigest
import java.time.Duration
import java.time.Instant
import java.util.concurrent.atomic.AtomicReference
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.Inflater
import java.util.zip.InflaterInputStream
class FileSystemCache(
val root: Path,
val maxAge: Duration,
val digestAlgorithm: String?,
val compressionEnabled: Boolean,
val compressionLevel: Int
) : Cache {
private fun lockFilePath(key: String): Path = root.resolve("$key.lock")
init {
Files.createDirectories(root)
}
private var nextGc = AtomicReference(Instant.now().plus(maxAge))
override fun get(key: String) = (digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digestString(key.toByteArray(), md)
} ?: key).let { digest ->
LockFile.acquire(lockFilePath(digest), true).use {
root.resolve(digest).takeIf(Files::exists)?.let { file ->
if (compressionEnabled) {
val inflater = Inflater()
Channels.newChannel(InflaterInputStream(Files.newInputStream(file), inflater))
} else {
FileChannel.open(file, StandardOpenOption.READ)
}
}
}.also {
gc()
}
}
override fun put(key: String, content: ByteArray) {
(digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digestString(key.toByteArray(), md)
} ?: key).let { digest ->
LockFile.acquire(lockFilePath(digest), false).use {
val file = root.resolve(digest)
val tmpFile = Files.createTempFile(root, null, ".tmp")
try {
Files.newOutputStream(tmpFile).let {
if (compressionEnabled) {
val deflater = Deflater(compressionLevel)
DeflaterOutputStream(it, deflater)
} else {
it
}
}.use {
it.write(content)
}
Files.move(tmpFile, file, StandardCopyOption.ATOMIC_MOVE)
} catch (t: Throwable) {
Files.delete(tmpFile)
throw t
}
}
}.also {
gc()
}
}
private fun gc() {
val now = Instant.now()
val oldValue = nextGc.getAndSet(now.plus(maxAge))
if (oldValue < now) {
actualGc(now)
}
}
@Synchronized
private fun actualGc(now: Instant) {
Files.list(root).filter {
!it.fileName.toString().endsWith(".lock")
}.filter {
val creationTimeStamp = Files.readAttributes(it, BasicFileAttributes::class.java)
.creationTime()
.toInstant()
now > creationTimeStamp.plus(maxAge)
}.forEach { file ->
val lockFile = lockFilePath(file.fileName.toString())
LockFile.acquire(lockFile, false).use {
Files.delete(file)
}
Files.delete(lockFile)
}
}
override fun close() {}
companion object {
fun digest(
data: ByteArray,
md: MessageDigest = MessageDigest.getInstance("MD5")
): ByteArray {
md.update(data)
return md.digest()
}
fun digestString(
data: ByteArray,
md: MessageDigest = MessageDigest.getInstance("MD5")
): String {
return JWO.bytesToHex(digest(data, md))
}
}
}

View File

@@ -1 +0,0 @@
net.woggioni.gbcs.server.cache.FileSystemCacheProvider

View File

@@ -1,10 +0,0 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<gbcs:server useVirtualThreads="false" xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
xmlns:gbcs="urn:net.woggioni.gbcs.server"
xs:schemaLocation="urn:net.woggioni.gbcs.server jpms://net.woggioni.gbcs.server/net/woggioni/gbcs/server/schema/gbcs.xsd">
<bind host="127.0.0.1" port="8080"/>
<cache xs:type="gbcs:fileSystemCacheType" path="/tmp/gbcs" max-age="P7D"/>
<authentication>
<none/>
</authentication>
</gbcs:server>

View File

@@ -1,38 +0,0 @@
package net.woggioni.gbcs.server.test
import net.woggioni.gbcs.common.GBCS.toUrl
import net.woggioni.gbcs.common.GbcsUrlStreamHandlerFactory
import net.woggioni.gbcs.common.Xml
import net.woggioni.gbcs.server.configuration.Parser
import net.woggioni.gbcs.server.configuration.Serializer
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.io.TempDir
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import java.nio.file.Files
import java.nio.file.Path
class ConfigurationTest {
@ValueSource(
strings = [
"classpath:net/woggioni/gbcs/server/test/gbcs-default.xml",
"classpath:net/woggioni/gbcs/server/test/gbcs-memcached.xml",
"classpath:net/woggioni/gbcs/server/test/gbcs-tls.xml",
]
)
@ParameterizedTest
fun test(configurationUrl: String, @TempDir testDir: Path) {
GbcsUrlStreamHandlerFactory.install()
val doc = Xml.parseXml(configurationUrl.toUrl())
val cfg = Parser.parse(doc)
val configFile = testDir.resolve("gbcs.xml")
Files.newOutputStream(configFile).use {
Xml.write(Serializer.serialize(cfg), it)
}
Xml.write(Serializer.serialize(cfg), System.out)
val parsed = Parser.parse(Xml.parseXml(configFile.toUri().toURL()))
Assertions.assertEquals(cfg, parsed)
}
}

View File

@@ -1,10 +0,0 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<gbcs:server useVirtualThreads="false" xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
xmlns:gbcs="urn:net.woggioni.gbcs.server"
xs:schemaLocation="urn:net.woggioni.gbcs.server jpms://net.woggioni.gbcs.server/net/woggioni/gbcs/server/schema/gbcs.xsd">
<bind host="127.0.0.1" port="11443"/>
<cache xs:type="gbcs:fileSystemCacheType" path="/tmp/gbcs" max-age="P7D"/>
<authentication>
<none/>
</authentication>
</gbcs:server>

View File

@@ -1,13 +0,0 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<gbcs:server useVirtualThreads="false" xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
xmlns:gbcs="urn:net.woggioni.gbcs.server"
xmlns:gbcs-memcached="urn:net.woggioni.gbcs.server.memcached"
xs:schemaLocation="urn:net.woggioni.gbcs.server.memcached jpms://net.woggioni.gbcs.server.memcached/net/woggioni/gbcs/server/memcached/schema/gbcs-memcached.xsd urn:net.woggioni.gbcs.server jpms://net.woggioni.gbcs.server/net/woggioni/gbcs/server/schema/gbcs.xsd">
<bind host="127.0.0.1" port="11443" />
<cache xs:type="gbcs-memcached:memcachedCacheType" max-age="P7D" max-size="101325" digest="SHA-256">
<server host="127.0.0.1" port="11211"/>
</cache>
<authentication>
<none/>
</authentication>
</gbcs:server>

View File

@@ -2,11 +2,10 @@ org.gradle.configuration-cache=false
org.gradle.parallel=true
org.gradle.caching=true
gbcs.version = 0.0.8
rbcs.version = 0.1.6
lys.version = 2025.01.17
lys.version = 2025.02.08
gitea.maven.url = https://gitea.woggioni.net/api/packages/woggioni/maven
docker.registry.url=gitea.woggioni.net
jpms-check.configurationName = runtimeClasspath

View File

@@ -5,6 +5,7 @@ plugins {
}
dependencies {
api catalog.netty.buffer
}
publishing {

View File

@@ -0,0 +1,8 @@
module net.woggioni.rbcs.api {
requires static lombok;
requires java.xml;
requires io.netty.buffer;
exports net.woggioni.rbcs.api;
exports net.woggioni.rbcs.api.exception;
exports net.woggioni.rbcs.api.event;
}

View File

@@ -0,0 +1,17 @@
package net.woggioni.rbcs.api;
import io.netty.buffer.ByteBufAllocator;
import java.util.concurrent.CompletableFuture;
public interface Cache extends AutoCloseable {
default void get(String key, ResponseHandle responseHandle, ByteBufAllocator alloc) {
throw new UnsupportedOperationException();
}
default CompletableFuture<RequestHandle> put(String key, ResponseHandle responseHandle, ByteBufAllocator alloc) {
throw new UnsupportedOperationException();
}
}

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.api;
package net.woggioni.rbcs.api;
import org.w3c.dom.Document;
import org.w3c.dom.Element;

View File

@@ -1,11 +1,13 @@
package net.woggioni.gbcs.api;
package net.woggioni.rbcs.api;
import lombok.EqualsAndHashCode;
import lombok.NonNull;
import lombok.Value;
import java.nio.file.Path;
import java.security.cert.X509Certificate;
import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@@ -14,19 +16,48 @@ import java.util.stream.Collectors;
public class Configuration {
String host;
int port;
int incomingConnectionsBacklogSize;
String serverPath;
@NonNull
EventExecutor eventExecutor;
@NonNull
Connection connection;
Map<String, User> users;
Map<String, Group> groups;
Cache cache;
Authentication authentication;
Tls tls;
boolean useVirtualThread;
@Value
public static class EventExecutor {
boolean useVirtualThreads;
}
@Value
public static class Connection {
Duration readTimeout;
Duration writeTimeout;
Duration idleTimeout;
Duration readIdleTimeout;
Duration writeIdleTimeout;
int maxRequestSize;
}
@Value
public static class Quota {
long calls;
Duration period;
long initialAvailableCalls;
long maxAvailableCalls;
}
@Value
public static class Group {
@EqualsAndHashCode.Include
String name;
Set<Role> roles;
Quota groupQuota;
Quota userQuota;
}
@Value
@@ -35,7 +66,7 @@ public class Configuration {
String name;
String password;
Set<Group> groups;
Quota quota;
public Set<Role> getRoles() {
return groups.stream()
@@ -55,12 +86,22 @@ public class Configuration {
}
@Value
public static class Tls {
public static class Throttling {
KeyStore keyStore;
TrustStore trustStore;
boolean verifyClients;
}
public enum ClientCertificate {
REQUIRED, OPTIONAL
}
@Value
public static class Tls {
KeyStore keyStore;
TrustStore trustStore;
}
@Value
public static class KeyStore {
Path file;
@@ -74,6 +115,7 @@ public class Configuration {
Path file;
String password;
boolean checkCertificateStatus;
boolean requireClientCertificate;
}
@Value
@@ -93,7 +135,7 @@ public class Configuration {
}
public interface Cache {
net.woggioni.gbcs.api.Cache materialize();
net.woggioni.rbcs.api.Cache materialize();
String getNamespaceURI();
String getTypeName();
}
@@ -101,24 +143,28 @@ public class Configuration {
public static Configuration of(
String host,
int port,
int incomingConnectionsBacklogSize,
String serverPath,
EventExecutor eventExecutor,
Connection connection,
Map<String, User> users,
Map<String, Group> groups,
Cache cache,
Authentication authentication,
Tls tls,
boolean useVirtualThread
Tls tls
) {
return new Configuration(
host,
port,
incomingConnectionsBacklogSize,
serverPath != null && !serverPath.isEmpty() && !serverPath.equals("/") ? serverPath : null,
eventExecutor,
connection,
users,
groups,
cache,
authentication,
tls,
useVirtualThread
tls
);
}
}

View File

@@ -0,0 +1,8 @@
package net.woggioni.rbcs.api;
import net.woggioni.rbcs.api.event.RequestStreamingEvent;
@FunctionalInterface
public interface RequestHandle {
void handleEvent(RequestStreamingEvent evt);
}

View File

@@ -0,0 +1,8 @@
package net.woggioni.rbcs.api;
import net.woggioni.rbcs.api.event.ResponseStreamingEvent;
@FunctionalInterface
public interface ResponseHandle {
void handleEvent(ResponseStreamingEvent evt);
}

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.api;
package net.woggioni.rbcs.api;
public enum Role {
Reader, Writer

View File

@@ -0,0 +1,26 @@
package net.woggioni.rbcs.api.event;
import io.netty.buffer.ByteBuf;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
public sealed interface RequestStreamingEvent {
@Getter
@RequiredArgsConstructor
non-sealed class ChunkReceived implements RequestStreamingEvent {
private final ByteBuf chunk;
}
final class LastChunkReceived extends ChunkReceived {
public LastChunkReceived(ByteBuf chunk) {
super(chunk);
}
}
@Getter
@RequiredArgsConstructor
final class ExceptionCaught implements RequestStreamingEvent {
private final Throwable exception;
}
}

View File

@@ -0,0 +1,42 @@
package net.woggioni.rbcs.api.event;
import io.netty.buffer.ByteBuf;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import java.nio.channels.FileChannel;
public sealed interface ResponseStreamingEvent {
final class ResponseReceived implements ResponseStreamingEvent {
}
@Getter
@RequiredArgsConstructor
non-sealed class ChunkReceived implements ResponseStreamingEvent {
private final ByteBuf chunk;
}
@Getter
@RequiredArgsConstructor
non-sealed class FileReceived implements ResponseStreamingEvent {
private final FileChannel file;
}
final class LastChunkReceived extends ChunkReceived {
public LastChunkReceived(ByteBuf chunk) {
super(chunk);
}
}
@Getter
@RequiredArgsConstructor
final class ExceptionCaught implements ResponseStreamingEvent {
private final Throwable exception;
}
final class NotFound implements ResponseStreamingEvent { }
NotFound NOT_FOUND = new NotFound();
ResponseReceived RESPONSE_RECEIVED = new ResponseReceived();
}

View File

@@ -0,0 +1,11 @@
package net.woggioni.rbcs.api.exception;
public class CacheException extends RbcsException {
public CacheException(String message, Throwable cause) {
super(message, cause);
}
public CacheException(String message) {
this(message, null);
}
}

View File

@@ -1,6 +1,6 @@
package net.woggioni.gbcs.api.exception;
package net.woggioni.rbcs.api.exception;
public class ConfigurationException extends GbcsException {
public class ConfigurationException extends RbcsException {
public ConfigurationException(String message, Throwable cause) {
super(message, cause);
}

View File

@@ -1,6 +1,6 @@
package net.woggioni.gbcs.api.exception;
package net.woggioni.rbcs.api.exception;
public class ContentTooLargeException extends GbcsException {
public class ContentTooLargeException extends RbcsException {
public ContentTooLargeException(String message, Throwable cause) {
super(message, cause);
}

View File

@@ -0,0 +1,7 @@
package net.woggioni.rbcs.api.exception;
public class RbcsException extends RuntimeException {
public RbcsException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@@ -16,8 +16,10 @@ import net.woggioni.gradle.graalvm.NativeImageTask
import net.woggioni.gradle.graalvm.JlinkPlugin
import net.woggioni.gradle.graalvm.JlinkTask
Property<String> mainModuleName = objects.property(String.class)
mainModuleName.set('net.woggioni.rbcs.cli')
Property<String> mainClassName = objects.property(String.class)
mainClassName.set('net.woggioni.gbcs.cli.GradleBuildCacheServerCli')
mainClassName.set('net.woggioni.rbcs.cli.RemoteBuildCacheServerCli')
tasks.named(JavaPlugin.COMPILE_JAVA_TASK_NAME, JavaCompile) {
options.javaModuleMainClass = mainClassName
@@ -33,7 +35,7 @@ configurations {
}
envelopeJar {
mainModule = 'net.woggioni.gbcs.cli'
mainModule = mainModuleName
mainClass = mainClassName
extraClasspath = ["plugins"]
@@ -42,35 +44,45 @@ envelopeJar {
dependencies {
implementation catalog.jwo
implementation catalog.slf4j.api
implementation catalog.netty.codec.http
implementation catalog.picocli
implementation project(':gbcs-client')
implementation project(':gbcs-server')
implementation project(':rbcs-client')
implementation project(':rbcs-server')
// runtimeOnly catalog.slf4j.jdk14
runtimeOnly catalog.logback.classic
// runtimeOnly catalog.slf4j.simple
}
Provider<EnvelopeJarTask> envelopeJarTaskProvider = tasks.named('envelopeJar', EnvelopeJarTask.class) {
// systemProperties['java.util.logging.config.class'] = 'net.woggioni.gbcs.LoggingConfig'
// systemProperties['log.config.source'] = 'logging.properties'
systemProperties['logback.configurationFile'] = 'classpath:net/woggioni/gbcs/cli/logback.xml'
// systemProperties['java.util.logging.config.class'] = 'net.woggioni.rbcs.LoggingConfig'
// systemProperties['log.config.source'] = 'net/woggioni/rbcs/cli/logging.properties'
// systemProperties['java.util.logging.config.file'] = 'classpath:net/woggioni/rbcs/cli/logging.properties'
systemProperties['logback.configurationFile'] = 'classpath:net/woggioni/rbcs/cli/logback.xml'
systemProperties['io.netty.leakDetectionLevel'] = 'DISABLED'
// systemProperties['org.slf4j.simpleLogger.showDateTime'] = 'true'
// systemProperties['org.slf4j.simpleLogger.defaultLogLevel'] = 'debug'
// systemProperties['org.slf4j.simpleLogger.log.com.google.code.yanf4j'] = 'warn'
// systemProperties['org.slf4j.simpleLogger.log.net.rubyeye.xmemcached'] = 'warn'
// systemProperties['org.slf4j.simpleLogger.dateTimeFormat'] = 'yyyy-MM-dd\'T\'HH:mm:ss.SSSZ'
}
tasks.named(NativeImagePlugin.CONFIGURE_NATIVE_IMAGE_TASK_NAME, NativeImageConfigurationTask) {
mainClass = mainClassName
mainModule = mainModuleName
}
tasks.named(NativeImagePlugin.NATIVE_IMAGE_TASK_NAME, NativeImageTask) {
mainClass = mainClassName
mainModule = mainModuleName
useMusl = true
buildStaticImage = true
}
tasks.named(JlinkPlugin.JLINK_TASK_NAME, JlinkTask) {
mainClass = mainClassName
mainModule = 'net.woggioni.gbcs.cli'
mainModule = 'net.woggioni.rbcs.cli'
}
artifacts {

View File

@@ -1,2 +1,2 @@
Args=-H:Optimize=3 --gc=serial
Args=-H:Optimize=3 --gc=serial --initialize-at-run-time=io.netty
#-H:TraceClassInitialization=io.netty.handler.ssl.BouncyCastleAlpnSslUtils

View File

@@ -0,0 +1,17 @@
module net.woggioni.rbcs.cli {
requires org.slf4j;
requires net.woggioni.rbcs.server;
requires info.picocli;
requires net.woggioni.rbcs.common;
requires net.woggioni.rbcs.client;
requires kotlin.stdlib;
requires net.woggioni.jwo;
requires net.woggioni.rbcs.api;
exports net.woggioni.rbcs.cli.impl.converters to info.picocli;
opens net.woggioni.rbcs.cli.impl.commands to info.picocli;
opens net.woggioni.rbcs.cli.impl to info.picocli;
opens net.woggioni.rbcs.cli to info.picocli, net.woggioni.rbcs.common;
exports net.woggioni.rbcs.cli;
}

View File

@@ -0,0 +1,69 @@
package net.woggioni.rbcs.cli
import net.woggioni.rbcs.cli.impl.AbstractVersionProvider
import net.woggioni.rbcs.cli.impl.RbcsCommand
import net.woggioni.rbcs.cli.impl.commands.BenchmarkCommand
import net.woggioni.rbcs.cli.impl.commands.ClientCommand
import net.woggioni.rbcs.cli.impl.commands.GetCommand
import net.woggioni.rbcs.cli.impl.commands.HealthCheckCommand
import net.woggioni.rbcs.cli.impl.commands.PasswordHashCommand
import net.woggioni.rbcs.cli.impl.commands.PutCommand
import net.woggioni.rbcs.cli.impl.commands.ServerCommand
import net.woggioni.rbcs.common.RbcsUrlStreamHandlerFactory
import net.woggioni.rbcs.common.contextLogger
import net.woggioni.jwo.Application
import picocli.CommandLine
import picocli.CommandLine.Model.CommandSpec
@CommandLine.Command(
name = "rbcs", versionProvider = RemoteBuildCacheServerCli.VersionProvider::class
)
class RemoteBuildCacheServerCli : RbcsCommand() {
class VersionProvider : AbstractVersionProvider()
companion object {
@JvmStatic
fun main(vararg args: String) {
val currentClassLoader = RemoteBuildCacheServerCli::class.java.classLoader
Thread.currentThread().contextClassLoader = currentClassLoader
if(currentClassLoader.javaClass.name == "net.woggioni.envelope.loader.ModuleClassLoader") {
//We're running in an envelope jar and custom URL protocols won't work
RbcsUrlStreamHandlerFactory.install()
}
val log = contextLogger()
val app = Application.builder("rbcs")
.configurationDirectoryEnvVar("RBCS_CONFIGURATION_DIR")
.configurationDirectoryPropertyKey("net.woggioni.rbcs.conf.dir")
.build()
val rbcsCli = RemoteBuildCacheServerCli()
val commandLine = CommandLine(rbcsCli)
commandLine.setExecutionExceptionHandler { ex, cl, parseResult ->
log.error(ex.message, ex)
CommandLine.ExitCode.SOFTWARE
}
commandLine.addSubcommand(ServerCommand(app))
commandLine.addSubcommand(PasswordHashCommand())
commandLine.addSubcommand(
CommandLine(ClientCommand(app)).apply {
addSubcommand(BenchmarkCommand())
addSubcommand(PutCommand())
addSubcommand(GetCommand())
addSubcommand(HealthCheckCommand())
})
System.exit(commandLine.execute(*args))
}
}
@CommandLine.Option(names = ["-V", "--version"], versionHelp = true)
var versionHelp = false
private set
@CommandLine.Spec
private lateinit var spec: CommandSpec
override fun run() {
spec.commandLine().usage(System.out);
}
}

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.cli.impl
package net.woggioni.rbcs.cli.impl
import picocli.CommandLine
import java.util.jar.Attributes

View File

@@ -1,11 +1,11 @@
package net.woggioni.gbcs.cli.impl
package net.woggioni.rbcs.cli.impl
import net.woggioni.jwo.Application
import picocli.CommandLine
import java.nio.file.Path
abstract class GbcsCommand : Runnable {
abstract class RbcsCommand : Runnable {
@CommandLine.Option(names = ["-h", "--help"], usageHelp = true)
var usageHelp = false

View File

@@ -0,0 +1,155 @@
package net.woggioni.rbcs.cli.impl.commands
import net.woggioni.rbcs.cli.impl.RbcsCommand
import net.woggioni.rbcs.client.RemoteBuildCacheClient
import net.woggioni.rbcs.common.contextLogger
import net.woggioni.rbcs.common.error
import net.woggioni.rbcs.common.info
import net.woggioni.jwo.JWO
import net.woggioni.jwo.LongMath
import net.woggioni.rbcs.common.debug
import picocli.CommandLine
import java.security.SecureRandom
import java.time.Duration
import java.time.Instant
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.Semaphore
import java.util.concurrent.atomic.AtomicLong
import kotlin.random.Random
@CommandLine.Command(
name = "benchmark",
description = ["Run a load test against the server"],
showDefaultValues = true
)
class BenchmarkCommand : RbcsCommand() {
private val log = contextLogger()
@CommandLine.Spec
private lateinit var spec: CommandLine.Model.CommandSpec
@CommandLine.Option(
names = ["-e", "--entries"],
description = ["Total number of elements to be added to the cache"],
paramLabel = "NUMBER_OF_ENTRIES"
)
private var numberOfEntries = 1000
@CommandLine.Option(
names = ["-s", "--size"],
description = ["Size of a cache value in bytes"],
paramLabel = "SIZE"
)
private var size = 0x1000
override fun run() {
val clientCommand = spec.parent().userObject() as ClientCommand
val profile = clientCommand.profileName.let { profileName ->
clientCommand.configuration.profiles[profileName]
?: throw IllegalArgumentException("Profile $profileName does not exist in configuration")
}
val progressThreshold = LongMath.ceilDiv(numberOfEntries.toLong(), 20)
RemoteBuildCacheClient(profile).use { client ->
val entryGenerator = sequence {
val random = Random(SecureRandom.getInstance("NativePRNGNonBlocking").nextLong())
while (true) {
val key = JWO.bytesToHex(random.nextBytes(16))
val content = random.nextInt().toByte()
val value = ByteArray(size, { _ -> content })
yield(key to value)
}
}
log.info {
"Starting insertion"
}
val entries = let {
val completionCounter = AtomicLong(0)
val completionQueue = LinkedBlockingQueue<Pair<String, ByteArray>>(numberOfEntries)
val start = Instant.now()
val semaphore = Semaphore(profile.maxConnections * 3)
val iterator = entryGenerator.take(numberOfEntries).iterator()
while (completionCounter.get() < numberOfEntries) {
if (iterator.hasNext()) {
val entry = iterator.next()
semaphore.acquire()
val future = client.put(entry.first, entry.second).thenApply { entry }
future.whenComplete { result, ex ->
if (ex != null) {
log.error(ex.message, ex)
} else {
completionQueue.put(result)
}
semaphore.release()
val completed = completionCounter.incrementAndGet()
if(completed.mod(progressThreshold) == 0L) {
log.debug {
"Inserted $completed / $numberOfEntries"
}
}
}
} else {
Thread.sleep(0)
}
}
val inserted = completionQueue.toList()
val end = Instant.now()
log.info {
val elapsed = Duration.between(start, end).toMillis()
val opsPerSecond = String.format("%.2f", numberOfEntries.toDouble() / elapsed * 1000)
"Insertion rate: $opsPerSecond ops/s"
}
inserted
}
log.info {
"Inserted ${entries.size} entries"
}
log.info {
"Starting retrieval"
}
if (entries.isNotEmpty()) {
val completionCounter = AtomicLong(0)
val semaphore = Semaphore(profile.maxConnections * 3)
val start = Instant.now()
val it = entries.iterator()
while (completionCounter.get() < entries.size) {
if (it.hasNext()) {
val entry = it.next()
val future = client.get(entry.first).thenApply {
if (it == null) {
log.error {
"Missing entry for key '${entry.first}'"
}
} else if (!entry.second.contentEquals(it)) {
log.error {
"Retrieved a value different from what was inserted for key '${entry.first}'"
}
}
}
future.whenComplete { _, _ ->
val completed = completionCounter.incrementAndGet()
if(completed.mod(progressThreshold) == 0L) {
log.debug {
"Retrieved $completed / ${entries.size}"
}
}
semaphore.release()
}
} else {
Thread.sleep(0)
}
}
val end = Instant.now()
log.info {
val elapsed = Duration.between(start, end).toMillis()
val opsPerSecond = String.format("%.2f", entries.size.toDouble() / elapsed * 1000)
"Retrieval rate: $opsPerSecond ops/s"
}
} else {
log.error("Skipping retrieval benchmark as it was not possible to insert any entry in the cache")
}
}
}
}

View File

@@ -1,24 +1,24 @@
package net.woggioni.gbcs.cli.impl.commands
package net.woggioni.rbcs.cli.impl.commands
import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.client.GbcsClient
import net.woggioni.rbcs.cli.impl.RbcsCommand
import net.woggioni.rbcs.client.RemoteBuildCacheClient
import net.woggioni.jwo.Application
import picocli.CommandLine
import java.nio.file.Path
@CommandLine.Command(
name = "client",
description = ["GBCS client"],
description = ["RBCS client"],
showDefaultValues = true
)
class ClientCommand(app : Application) : GbcsCommand() {
class ClientCommand(app : Application) : RbcsCommand() {
@CommandLine.Option(
names = ["-c", "--configuration"],
description = ["Path to the client configuration file"],
paramLabel = "CONFIGURATION_FILE"
)
private var configurationFile : Path = findConfigurationFile(app, "gbcs-client.xml")
private var configurationFile : Path = findConfigurationFile(app, "rbcs-client.xml")
@CommandLine.Option(
names = ["-p", "--profile"],
@@ -28,8 +28,8 @@ class ClientCommand(app : Application) : GbcsCommand() {
)
var profileName : String? = null
val configuration : GbcsClient.Configuration by lazy {
GbcsClient.Configuration.parse(configurationFile)
val configuration : RemoteBuildCacheClient.Configuration by lazy {
RemoteBuildCacheClient.Configuration.parse(configurationFile)
}
override fun run() {

View File

@@ -1,8 +1,8 @@
package net.woggioni.gbcs.cli.impl.commands
package net.woggioni.rbcs.cli.impl.commands
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.client.GbcsClient
import net.woggioni.rbcs.cli.impl.RbcsCommand
import net.woggioni.rbcs.client.RemoteBuildCacheClient
import net.woggioni.rbcs.common.contextLogger
import picocli.CommandLine
import java.nio.file.Files
import java.nio.file.Path
@@ -12,7 +12,7 @@ import java.nio.file.Path
description = ["Fetch a value from the cache with the specified key"],
showDefaultValues = true
)
class GetCommand : GbcsCommand() {
class GetCommand : RbcsCommand() {
private val log = contextLogger()
@CommandLine.Spec
@@ -38,7 +38,7 @@ class GetCommand : GbcsCommand() {
clientCommand.configuration.profiles[profileName]
?: throw IllegalArgumentException("Profile $profileName does not exist in configuration")
}
GbcsClient(profile).use { client ->
RemoteBuildCacheClient(profile).use { client ->
client.get(key).thenApply { value ->
value?.let {
(output?.let(Files::newOutputStream) ?: System.out).use {

View File

@@ -0,0 +1,45 @@
package net.woggioni.rbcs.cli.impl.commands
import net.woggioni.rbcs.cli.impl.RbcsCommand
import net.woggioni.rbcs.client.RemoteBuildCacheClient
import net.woggioni.rbcs.common.contextLogger
import picocli.CommandLine
import java.security.SecureRandom
import kotlin.random.Random
@CommandLine.Command(
name = "health",
description = ["Check server health"],
showDefaultValues = true
)
class HealthCheckCommand : RbcsCommand() {
private val log = contextLogger()
@CommandLine.Spec
private lateinit var spec: CommandLine.Model.CommandSpec
override fun run() {
val clientCommand = spec.parent().userObject() as ClientCommand
val profile = clientCommand.profileName.let { profileName ->
clientCommand.configuration.profiles[profileName]
?: throw IllegalArgumentException("Profile $profileName does not exist in configuration")
}
RemoteBuildCacheClient(profile).use { client ->
val random = Random(SecureRandom.getInstance("NativePRNGNonBlocking").nextLong())
val nonce = ByteArray(0xa0)
random.nextBytes(nonce)
client.healthCheck(nonce).thenApply { value ->
if(value == null) {
throw IllegalStateException("Empty response from server")
}
for(i in 0 until nonce.size) {
for(j in value.size - nonce.size until nonce.size) {
if(nonce[i] != value[j]) {
throw IllegalStateException("Server nonce does not match")
}
}
}
}.get()
}
}
}

View File

@@ -1,8 +1,8 @@
package net.woggioni.gbcs.cli.impl.commands
package net.woggioni.rbcs.cli.impl.commands
import net.woggioni.gbcs.common.PasswordSecurity.hashPassword
import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.cli.impl.converters.OutputStreamConverter
import net.woggioni.rbcs.cli.impl.RbcsCommand
import net.woggioni.rbcs.cli.impl.converters.OutputStreamConverter
import net.woggioni.rbcs.common.PasswordSecurity.hashPassword
import net.woggioni.jwo.UncloseableOutputStream
import picocli.CommandLine
import java.io.OutputStream
@@ -12,10 +12,10 @@ import java.io.PrintWriter
@CommandLine.Command(
name = "password",
description = ["Generate a password hash to add to GBCS configuration file"],
description = ["Generate a password hash to add to RBCS configuration file"],
showDefaultValues = true
)
class PasswordHashCommand : GbcsCommand() {
class PasswordHashCommand : RbcsCommand() {
@CommandLine.Option(
names = ["-o", "--output-file"],
description = ["Write the output to a file instead of stdout"],

View File

@@ -1,9 +1,9 @@
package net.woggioni.gbcs.cli.impl.commands
package net.woggioni.rbcs.cli.impl.commands
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.cli.impl.converters.InputStreamConverter
import net.woggioni.gbcs.client.GbcsClient
import net.woggioni.rbcs.cli.impl.RbcsCommand
import net.woggioni.rbcs.cli.impl.converters.InputStreamConverter
import net.woggioni.rbcs.client.RemoteBuildCacheClient
import net.woggioni.rbcs.common.contextLogger
import picocli.CommandLine
import java.io.InputStream
@@ -12,7 +12,7 @@ import java.io.InputStream
description = ["Add or replace a value to the cache with the specified key"],
showDefaultValues = true
)
class PutCommand : GbcsCommand() {
class PutCommand : RbcsCommand() {
private val log = contextLogger()
@CommandLine.Spec
@@ -39,7 +39,7 @@ class PutCommand : GbcsCommand() {
clientCommand.configuration.profiles[profileName]
?: throw IllegalArgumentException("Profile $profileName does not exist in configuration")
}
GbcsClient(profile).use { client ->
RemoteBuildCacheClient(profile).use { client ->
value.use {
client.put(key, it.readAllBytes())
}.get()

View File

@@ -1,25 +1,26 @@
package net.woggioni.gbcs.cli.impl.commands
package net.woggioni.rbcs.cli.impl.commands
import net.woggioni.gbcs.server.GradleBuildCacheServer
import net.woggioni.gbcs.server.GradleBuildCacheServer.Companion.DEFAULT_CONFIGURATION_URL
import net.woggioni.gbcs.api.Configuration
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.common.debug
import net.woggioni.gbcs.common.info
import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.rbcs.cli.impl.RbcsCommand
import net.woggioni.rbcs.cli.impl.converters.DurationConverter
import net.woggioni.rbcs.common.contextLogger
import net.woggioni.rbcs.common.debug
import net.woggioni.rbcs.common.info
import net.woggioni.rbcs.server.RemoteBuildCacheServer
import net.woggioni.rbcs.server.RemoteBuildCacheServer.Companion.DEFAULT_CONFIGURATION_URL
import net.woggioni.jwo.Application
import net.woggioni.jwo.JWO
import picocli.CommandLine
import java.io.ByteArrayOutputStream
import java.nio.file.Files
import java.nio.file.Path
import java.time.Duration
@CommandLine.Command(
name = "server",
description = ["GBCS server"],
description = ["RBCS server"],
showDefaultValues = true
)
class ServerCommand(app : Application) : GbcsCommand() {
class ServerCommand(app : Application) : RbcsCommand() {
private val log = contextLogger()
@@ -35,16 +36,20 @@ class ServerCommand(app : Application) : GbcsCommand() {
}
}
@CommandLine.Option(
names = ["-t", "--timeout"],
description = ["Exit after the specified time"],
paramLabel = "TIMEOUT",
converter = [DurationConverter::class]
)
private var timeout: Duration? = null
@CommandLine.Option(
names = ["-c", "--config-file"],
description = ["Read the application configuration from this file"],
paramLabel = "CONFIG_FILE"
)
private var configurationFile: Path = findConfigurationFile(app, "gbcs-server.xml")
val configuration : Configuration by lazy {
GradleBuildCacheServer.loadConfiguration(configurationFile)
}
private var configurationFile: Path = findConfigurationFile(app, "rbcs-server.xml")
override fun run() {
if (!Files.exists(configurationFile)) {
@@ -52,16 +57,20 @@ class ServerCommand(app : Application) : GbcsCommand() {
createDefaultConfigurationFile(configurationFile)
}
val configuration = GradleBuildCacheServer.loadConfiguration(configurationFile)
val configuration = RemoteBuildCacheServer.loadConfiguration(configurationFile)
log.debug {
ByteArrayOutputStream().also {
GradleBuildCacheServer.dumpConfiguration(configuration, it)
RemoteBuildCacheServer.dumpConfiguration(configuration, it)
}.let {
"Server configuration:\n${String(it.toByteArray())}"
}
}
val server = GradleBuildCacheServer(configuration)
server.run().use {
val server = RemoteBuildCacheServer(configuration)
server.run().use { server ->
timeout?.let {
Thread.sleep(it)
server.shutdown()
}
}
}
}

View File

@@ -0,0 +1,11 @@
package net.woggioni.rbcs.cli.impl.converters
import picocli.CommandLine
import java.time.Duration
class DurationConverter : CommandLine.ITypeConverter<Duration> {
override fun convert(value: String): Duration {
return Duration.parse(value)
}
}

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.cli.impl.converters
package net.woggioni.rbcs.cli.impl.converters
import picocli.CommandLine
import java.io.InputStream

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.cli.impl.converters
package net.woggioni.rbcs.cli.impl.converters
import picocli.CommandLine
import java.io.OutputStream

View File

@@ -15,7 +15,4 @@
<root level="info">
<appender-ref ref="console"/>
</root>
<logger name="io.netty" level="debug"/>
<logger name="com.google.code.yanf4j" level="warn"/>
<logger name="net.rubyeye.xmemcached" level="warn"/>
</configuration>

19
rbcs-client/build.gradle Normal file
View File

@@ -0,0 +1,19 @@
plugins {
id 'java-library'
alias catalog.plugins.kotlin.jvm
}
dependencies {
implementation project(':rbcs-api')
implementation project(':rbcs-common')
implementation catalog.slf4j.api
implementation catalog.netty.buffer
implementation catalog.netty.handler
implementation catalog.netty.transport
implementation catalog.netty.common
implementation catalog.netty.codec.http
testRuntimeOnly catalog.logback.classic
}

View File

@@ -1,4 +1,4 @@
module net.woggioni.gbcs.client {
module net.woggioni.rbcs.client {
requires io.netty.handler;
requires io.netty.codec.http;
requires io.netty.transport;
@@ -6,12 +6,12 @@ module net.woggioni.gbcs.client {
requires io.netty.common;
requires io.netty.buffer;
requires java.xml;
requires net.woggioni.gbcs.common;
requires net.woggioni.gbcs.api;
requires net.woggioni.rbcs.common;
requires net.woggioni.rbcs.api;
requires io.netty.codec;
requires org.slf4j;
exports net.woggioni.gbcs.client;
exports net.woggioni.rbcs.client;
opens net.woggioni.gbcs.client.schema;
opens net.woggioni.rbcs.client.schema;
}

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.client
package net.woggioni.rbcs.client
import io.netty.bootstrap.Bootstrap
import io.netty.buffer.ByteBuf
@@ -30,23 +30,25 @@ import io.netty.handler.ssl.SslContextBuilder
import io.netty.handler.stream.ChunkedWriteHandler
import io.netty.util.concurrent.Future
import io.netty.util.concurrent.GenericFutureListener
import net.woggioni.gbcs.common.Xml
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.common.debug
import net.woggioni.gbcs.client.impl.Parser
import net.woggioni.rbcs.client.impl.Parser
import net.woggioni.rbcs.common.Xml
import net.woggioni.rbcs.common.contextLogger
import net.woggioni.rbcs.common.debug
import net.woggioni.rbcs.common.trace
import java.net.InetSocketAddress
import java.net.URI
import java.nio.file.Files
import java.nio.file.Path
import java.security.PrivateKey
import java.security.cert.X509Certificate
import java.time.Duration
import java.util.Base64
import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicInteger
import kotlin.random.Random
import io.netty.util.concurrent.Future as NettyFuture
class GbcsClient(private val profile: Configuration.Profile) : AutoCloseable {
class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoCloseable {
private val group: NioEventLoopGroup
private var sslContext: SslContext
private val log = contextLogger()
@@ -64,10 +66,18 @@ class GbcsClient(private val profile: Configuration.Profile) : AutoCloseable {
data class BasicAuthenticationCredentials(val username: String, val password: String) : Authentication()
}
class RetryPolicy(
val maxAttempts: Int,
val initialDelayMillis: Long,
val exp: Double
)
data class Profile(
val serverURI: URI,
val authentication: Authentication?,
val maxConnections : Int
val connectionTimeout: Duration?,
val maxConnections: Int,
val retryPolicy: RetryPolicy?,
)
companion object {
@@ -104,6 +114,9 @@ class GbcsClient(private val profile: Configuration.Profile) : AutoCloseable {
option(ChannelOption.TCP_NODELAY, true)
option(ChannelOption.SO_KEEPALIVE, true)
remoteAddress(InetSocketAddress(host, port))
profile.connectionTimeout?.let {
option(ChannelOption.CONNECT_TIMEOUT_MILLIS, it.toMillis().toInt())
}
}
val channelPoolHandler = object : AbstractChannelPoolHandler() {
@@ -114,20 +127,29 @@ class GbcsClient(private val profile: Configuration.Profile) : AutoCloseable {
private var leaseCount = AtomicInteger()
override fun channelReleased(ch: Channel) {
log.debug {
"Released lease ${leaseCount.decrementAndGet()}"
val activeLeases = leaseCount.decrementAndGet()
log.trace {
"Released channel ${ch.id().asShortText()}, number of active leases: $activeLeases"
}
}
override fun channelAcquired(ch: Channel?) {
log.debug {
"Acquired lease ${leaseCount.getAndIncrement()}"
override fun channelAcquired(ch: Channel) {
val activeLeases = leaseCount.getAndIncrement()
log.trace {
"Acquired channel ${ch.id().asShortText()}, number of active leases: $activeLeases"
}
}
override fun channelCreated(ch: Channel) {
val connectionId = connectionCount.getAndIncrement()
log.debug {
"Created connection ${connectionCount.getAndIncrement()}"
"Created connection $connectionId, total number of active connections: $connectionId"
}
ch.closeFuture().addListener {
val activeConnections = connectionCount.decrementAndGet()
log.debug {
"Closed connection $connectionId, total number of active connections: $activeConnections"
}
}
val pipeline: ChannelPipeline = ch.pipeline()
@@ -139,16 +161,82 @@ class GbcsClient(private val profile: Configuration.Profile) : AutoCloseable {
// HTTP handlers
pipeline.addLast("codec", HttpClientCodec())
pipeline.addLast("decompressor", HttpContentDecompressor())
pipeline.addLast("aggregator", HttpObjectAggregator(1048576))
pipeline.addLast("aggregator", HttpObjectAggregator(134217728))
pipeline.addLast("chunked", ChunkedWriteHandler())
}
}
pool = FixedChannelPool(bootstrap, channelPoolHandler, profile.maxConnections)
}
private fun executeWithRetry(operation: () -> CompletableFuture<FullHttpResponse>): CompletableFuture<FullHttpResponse> {
val retryPolicy = profile.retryPolicy
return if (retryPolicy != null) {
val outcomeHandler = OutcomeHandler<FullHttpResponse> { outcome ->
when (outcome) {
is OperationOutcome.Success -> {
val response = outcome.result
val status = response.status()
when (status) {
HttpResponseStatus.TOO_MANY_REQUESTS -> {
val retryAfter = response.headers()[HttpHeaderNames.RETRY_AFTER]?.let { headerValue ->
try {
headerValue.toLong() * 1000
} catch (nfe: NumberFormatException) {
null
}
}
OutcomeHandlerResult.Retry(retryAfter)
}
HttpResponseStatus.INTERNAL_SERVER_ERROR, HttpResponseStatus.SERVICE_UNAVAILABLE ->
OutcomeHandlerResult.Retry()
else -> OutcomeHandlerResult.DoNotRetry()
}
}
is OperationOutcome.Failure -> {
OutcomeHandlerResult.Retry()
}
}
}
executeWithRetry(
group,
retryPolicy.maxAttempts,
retryPolicy.initialDelayMillis.toDouble(),
retryPolicy.exp,
outcomeHandler,
Random.Default,
operation
)
} else {
operation()
}
}
fun healthCheck(nonce: ByteArray): CompletableFuture<ByteArray?> {
return executeWithRetry {
sendRequest(profile.serverURI, HttpMethod.TRACE, nonce)
}.thenApply {
val status = it.status()
if (it.status() != HttpResponseStatus.OK) {
throw HttpException(status)
} else {
it.content()
}
}.thenApply { maybeByteBuf ->
maybeByteBuf?.let {
val result = ByteArray(it.readableBytes())
it.getBytes(0, result)
result
}
}
}
fun get(key: String): CompletableFuture<ByteArray?> {
return sendRequest(profile.serverURI.resolve(key), HttpMethod.GET, null)
.thenApply {
return executeWithRetry {
sendRequest(profile.serverURI.resolve(key), HttpMethod.GET, null)
}.thenApply {
val status = it.status()
if (it.status() == HttpResponseStatus.NOT_FOUND) {
null
@@ -167,9 +255,11 @@ class GbcsClient(private val profile: Configuration.Profile) : AutoCloseable {
}
fun put(key: String, content: ByteArray): CompletableFuture<Unit> {
return sendRequest(profile.serverURI.resolve(key), HttpMethod.PUT, content).thenApply {
return executeWithRetry {
sendRequest(profile.serverURI.resolve(key), HttpMethod.PUT, content)
}.thenApply {
val status = it.status()
if (it.status() != HttpResponseStatus.CREATED) {
if (it.status() != HttpResponseStatus.CREATED && it.status() != HttpResponseStatus.OK) {
throw HttpException(status)
}
}
@@ -188,9 +278,9 @@ class GbcsClient(private val profile: Configuration.Profile) : AutoCloseable {
ctx: ChannelHandlerContext,
response: FullHttpResponse
) {
responseFuture.complete(response)
pipeline.removeLast()
pool.release(channel)
responseFuture.complete(response)
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
@@ -219,7 +309,7 @@ class GbcsClient(private val profile: Configuration.Profile) : AutoCloseable {
set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes())
}
set(HttpHeaderNames.HOST, profile.serverURI.host)
set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
set(
HttpHeaderNames.ACCEPT_ENCODING,
HttpHeaderValues.GZIP.toString() + "," + HttpHeaderValues.DEFLATE.toString()
@@ -237,6 +327,8 @@ class GbcsClient(private val profile: Configuration.Profile) : AutoCloseable {
// Set headers
// Send the request
channel.writeAndFlush(request)
} else {
responseFuture.completeExceptionally(channelFuture.cause())
}
}
})

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.client
package net.woggioni.rbcs.client
import io.netty.handler.codec.http.HttpResponseStatus

View File

@@ -0,0 +1,108 @@
package net.woggioni.rbcs.client.impl
import net.woggioni.rbcs.api.exception.ConfigurationException
import net.woggioni.rbcs.client.RemoteBuildCacheClient
import net.woggioni.rbcs.common.Xml.Companion.asIterable
import net.woggioni.rbcs.common.Xml.Companion.renderAttribute
import org.w3c.dom.Document
import java.net.URI
import java.nio.file.Files
import java.nio.file.Path
import java.security.KeyStore
import java.security.PrivateKey
import java.security.cert.X509Certificate
import java.time.Duration
object Parser {
fun parse(document: Document): RemoteBuildCacheClient.Configuration {
val root = document.documentElement
val profiles = mutableMapOf<String, RemoteBuildCacheClient.Configuration.Profile>()
for (child in root.asIterable()) {
val tagName = child.localName
when (tagName) {
"profile" -> {
val name =
child.renderAttribute("name") ?: throw ConfigurationException("name attribute is required")
val uri = child.renderAttribute("base-url")?.let(::URI)
?: throw ConfigurationException("base-url attribute is required")
var authentication: RemoteBuildCacheClient.Configuration.Authentication? = null
var retryPolicy: RemoteBuildCacheClient.Configuration.RetryPolicy? = null
for (gchild in child.asIterable()) {
when (gchild.localName) {
"tls-client-auth" -> {
val keyStoreFile = gchild.renderAttribute("key-store-file")
val keyStorePassword =
gchild.renderAttribute("key-store-password")
val keyAlias = gchild.renderAttribute("key-alias")
val keyPassword = gchild.renderAttribute("key-password")
val keystore = KeyStore.getInstance("PKCS12").apply {
Files.newInputStream(Path.of(keyStoreFile)).use {
load(it, keyStorePassword?.toCharArray())
}
}
val key = keystore.getKey(keyAlias, keyPassword?.toCharArray()) as PrivateKey
val certChain = keystore.getCertificateChain(keyAlias).asSequence()
.map { it as X509Certificate }
.toList()
.toTypedArray()
authentication =
RemoteBuildCacheClient.Configuration.Authentication.TlsClientAuthenticationCredentials(
key,
certChain
)
}
"basic-auth" -> {
val username = gchild.renderAttribute("user")
?: throw ConfigurationException("username attribute is required")
val password = gchild.renderAttribute("password")
?: throw ConfigurationException("password attribute is required")
authentication =
RemoteBuildCacheClient.Configuration.Authentication.BasicAuthenticationCredentials(
username,
password
)
}
"retry-policy" -> {
val maxAttempts =
gchild.renderAttribute("max-attempts")
?.let(String::toInt)
?: throw ConfigurationException("max-attempts attribute is required")
val initialDelay =
gchild.renderAttribute("initial-delay")
?.let(Duration::parse)
?: Duration.ofSeconds(1)
val exp =
gchild.renderAttribute("exp")
?.let(String::toDouble)
?: 2.0f
retryPolicy = RemoteBuildCacheClient.Configuration.RetryPolicy(
maxAttempts,
initialDelay.toMillis(),
exp.toDouble()
)
}
}
}
val maxConnections = child.renderAttribute("max-connections")
?.let(String::toInt)
?: 50
val connectionTimeout = child.renderAttribute("connection-timeout")
?.let(Duration::parse)
profiles[name] = RemoteBuildCacheClient.Configuration.Profile(
uri,
authentication,
connectionTimeout,
maxConnections,
retryPolicy
)
}
}
}
return RemoteBuildCacheClient.Configuration(profiles)
}
}

View File

@@ -0,0 +1,79 @@
package net.woggioni.rbcs.client
import io.netty.util.concurrent.EventExecutorGroup
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit
import kotlin.math.pow
import kotlin.random.Random
sealed class OperationOutcome<T> {
class Success<T>(val result: T) : OperationOutcome<T>()
class Failure<T>(val ex: Throwable) : OperationOutcome<T>()
}
sealed class OutcomeHandlerResult {
class Retry(val suggestedDelayMillis: Long? = null) : OutcomeHandlerResult()
class DoNotRetry : OutcomeHandlerResult()
}
fun interface OutcomeHandler<T> {
fun shouldRetry(result: OperationOutcome<T>): OutcomeHandlerResult
}
fun <T> executeWithRetry(
eventExecutorGroup: EventExecutorGroup,
maxAttempts: Int,
initialDelay: Double,
exp: Double,
outcomeHandler: OutcomeHandler<T>,
randomizer : Random?,
cb: () -> CompletableFuture<T>
): CompletableFuture<T> {
val finalResult = cb()
var future = finalResult
var shortCircuit = false
for (i in 1 until maxAttempts) {
future = future.handle { result, ex ->
val operationOutcome = if (ex == null) {
OperationOutcome.Success(result)
} else {
OperationOutcome.Failure(ex.cause ?: ex)
}
if (shortCircuit) {
when(operationOutcome) {
is OperationOutcome.Failure -> throw operationOutcome.ex
is OperationOutcome.Success -> CompletableFuture.completedFuture(operationOutcome.result)
}
} else {
when(val outcomeHandlerResult = outcomeHandler.shouldRetry(operationOutcome)) {
is OutcomeHandlerResult.Retry -> {
val res = CompletableFuture<T>()
val delay = run {
val scheduledDelay = (initialDelay * exp.pow(i.toDouble()) * (1.0 + (randomizer?.nextDouble(-0.5, 0.5) ?: 0.0))).toLong()
outcomeHandlerResult.suggestedDelayMillis?.coerceAtMost(scheduledDelay) ?: scheduledDelay
}
eventExecutorGroup.schedule({
cb().handle { result, ex ->
if (ex == null) {
res.complete(result)
} else {
res.completeExceptionally(ex)
}
}
}, delay, TimeUnit.MILLISECONDS)
res
}
is OutcomeHandlerResult.DoNotRetry -> {
shortCircuit = true
when(operationOutcome) {
is OperationOutcome.Failure -> throw operationOutcome.ex
is OperationOutcome.Success -> CompletableFuture.completedFuture(operationOutcome.result)
}
}
}
}
}.thenCompose { it }
}
return future
}

View File

@@ -1,27 +1,34 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<xs:schema targetNamespace="urn:net.woggioni.gbcs.client"
<xs:schema targetNamespace="urn:net.woggioni.rbcs.client"
xmlns:xs="http://www.w3.org/2001/XMLSchema"
xmlns:gbcs-client="urn:net.woggioni.gbcs.client"
xmlns:rbcs-client="urn:net.woggioni.rbcs.client"
elementFormDefault="unqualified"
>
<xs:element name="profiles" type="gbcs-client:profilesType"/>
<xs:element name="profiles" type="rbcs-client:profilesType"/>
<xs:complexType name="profilesType">
<xs:sequence minOccurs="0">
<xs:element name="profile" type="gbcs-client:profileType" maxOccurs="unbounded"/>
<xs:element name="profile" type="rbcs-client:profileType" maxOccurs="unbounded"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="profileType">
<xs:sequence>
<xs:choice>
<xs:element name="basic-auth" type="gbcs-client:basicAuthType"/>
<xs:element name="tls-client-auth" type="gbcs-client:tlsClientAuthType"/>
<xs:element name="no-auth" type="rbcs-client:noAuthType"/>
<xs:element name="basic-auth" type="rbcs-client:basicAuthType"/>
<xs:element name="tls-client-auth" type="rbcs-client:tlsClientAuthType"/>
</xs:choice>
<xs:element name="retry-policy" type="rbcs-client:retryType" minOccurs="0"/>
</xs:sequence>
<xs:attribute name="name" type="xs:token" use="required"/>
<xs:attribute name="base-url" type="xs:anyURI" use="required"/>
<xs:attribute name="max-connections" type="xs:positiveInteger" default="50"/>
<xs:attribute name="connection-timeout" type="xs:duration"/>
</xs:complexType>
<xs:complexType name="noAuthType"/>
<xs:complexType name="basicAuthType">
<xs:attribute name="user" type="xs:token" use="required"/>
<xs:attribute name="password" type="xs:string" use="required"/>
@@ -34,4 +41,10 @@
<xs:attribute name="key-password" type="xs:string" use="optional"/>
</xs:complexType>
<xs:complexType name="retryType">
<xs:attribute name="max-attempts" type="xs:positiveInteger" use="required"/>
<xs:attribute name="initial-delay" type="xs:duration" default="PT1S"/>
<xs:attribute name="exp" type="xs:double" default="2.0"/>
</xs:complexType>
</xs:schema>

View File

@@ -0,0 +1,148 @@
package net.woggioni.rbcs.client
import io.netty.util.concurrent.DefaultEventExecutorGroup
import io.netty.util.concurrent.EventExecutorGroup
import net.woggioni.rbcs.common.contextLogger
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.extension.ExtensionContext
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments
import org.junit.jupiter.params.provider.ArgumentsProvider
import org.junit.jupiter.params.provider.ArgumentsSource
import java.util.concurrent.CompletableFuture
import java.util.stream.Stream
import kotlin.random.Random
class RetryTest {
data class TestArgs(
val seed: Int,
val maxAttempt: Int,
val initialDelay: Double,
val exp: Double,
)
class TestArguments : ArgumentsProvider {
override fun provideArguments(context: ExtensionContext): Stream<out Arguments> {
return Stream.of(
TestArgs(
seed = 101325,
maxAttempt = 5,
initialDelay = 50.0,
exp = 2.0,
),
TestArgs(
seed = 101325,
maxAttempt = 20,
initialDelay = 100.0,
exp = 1.1,
),
TestArgs(
seed = 123487,
maxAttempt = 20,
initialDelay = 100.0,
exp = 2.0,
),
TestArgs(
seed = 20082024,
maxAttempt = 10,
initialDelay = 100.0,
exp = 2.0,
)
).map {
object: Arguments {
override fun get() = arrayOf(it)
}
}
}
}
@ArgumentsSource(TestArguments::class)
@ParameterizedTest
fun test(testArgs: TestArgs) {
val log = contextLogger()
log.debug("Start")
val executor: EventExecutorGroup = DefaultEventExecutorGroup(1)
val attempts = mutableListOf<Pair<Long, OperationOutcome<Int>>>()
val outcomeHandler = OutcomeHandler<Int> { outcome ->
when(outcome) {
is OperationOutcome.Success -> {
if(outcome.result % 10 == 0) {
OutcomeHandlerResult.DoNotRetry()
} else {
OutcomeHandlerResult.Retry(null)
}
}
is OperationOutcome.Failure -> {
when(outcome.ex) {
is IllegalStateException -> {
log.debug(outcome.ex.message, outcome.ex)
OutcomeHandlerResult.Retry(null)
}
else -> {
OutcomeHandlerResult.DoNotRetry()
}
}
}
}
}
val random = Random(testArgs.seed)
val future =
executeWithRetry(executor, testArgs.maxAttempt, testArgs.initialDelay, testArgs.exp, outcomeHandler, null) {
val now = System.nanoTime()
val result = CompletableFuture<Int>()
executor.submit {
val n = random.nextInt(0, Integer.MAX_VALUE)
log.debug("Got new number: {}", n)
if(n % 3 == 0) {
val ex = IllegalStateException("Value $n can be divided by 3")
result.completeExceptionally(ex)
attempts += now to OperationOutcome.Failure(ex)
} else if(n % 7 == 0) {
val ex = RuntimeException("Value $n can be divided by 7")
result.completeExceptionally(ex)
attempts += now to OperationOutcome.Failure(ex)
} else {
result.complete(n)
attempts += now to OperationOutcome.Success(n)
}
}
result
}
Assertions.assertTrue(attempts.size <= testArgs.maxAttempt)
val result = future.handle { res, ex ->
if(ex != null) {
val err = ex.cause ?: ex
log.debug(err.message, err)
OperationOutcome.Failure(err)
} else {
OperationOutcome.Success(res)
}
}.get()
for ((index, attempt) in attempts.withIndex()) {
val (timestamp, value) = attempt
if (index > 0) {
/* Check the delay for subsequent attempts is correct */
val previousAttempt = attempts[index - 1]
val expectedTimestamp =
previousAttempt.first + testArgs.initialDelay * Math.pow(testArgs.exp, index.toDouble()) * 1e6
val actualTimestamp = timestamp
val err = Math.abs(expectedTimestamp - actualTimestamp) / expectedTimestamp
Assertions.assertTrue(err < 1e-2)
}
if (index == attempts.size - 1 && index < testArgs.maxAttempt - 1) {
/*
* If the last attempt index is lower than the maximum number of attempts, then
* check the outcome handler returns DoNotRetry
*/
Assertions.assertTrue(outcomeHandler.shouldRetry(value) is OutcomeHandlerResult.DoNotRetry)
} else if (index < attempts.size - 1) {
/*
* If the attempt is not the last attempt check the outcome handler returns Retry
*/
Assertions.assertTrue(outcomeHandler.shouldRetry(value) is OutcomeHandlerResult.Retry)
}
}
}
}

View File

@@ -15,6 +15,7 @@
<root level="info">
<appender-ref ref="console"/>
</root>
<logger name="io.netty" level="info"/>
<logger name="com.google.code.yanf4j" level="warn"/>
<logger name="net.rubyeye.xmemcached" level="warn"/>
</configuration>

View File

@@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<rbcs-client:profiles xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rbcs-client="urn:net.woggioni.rbcs.client"
xs:schemaLocation="urn:net.woggioni.rbcs.client jms://net.woggioni.rbcs.client/net/woggioni/rbcs/client/schema/rbcs-client.xsd"
>
<profile name="profile1" base-url="https://rbcs1.example.com/">
<tls-client-auth
key-store-file="keystore.pfx"
key-store-password="password"
key-alias="woggioni@c962475fa38"
key-password="key-password"/>
</profile>
<profile name="profile2" base-url="https://rbcs2.example.com/">
<basic-auth user="user" password="password"/>
</profile>
</rbcs-client:profiles>

View File

@@ -6,9 +6,10 @@ plugins {
}
dependencies {
implementation project(':gbcs-api')
implementation project(':rbcs-api')
implementation catalog.slf4j.api
implementation catalog.jwo
implementation catalog.netty.buffer
}
publishing {

View File

@@ -0,0 +1,11 @@
module net.woggioni.rbcs.common {
requires java.xml;
requires java.logging;
requires org.slf4j;
requires kotlin.stdlib;
requires net.woggioni.jwo;
requires io.netty.buffer;
provides java.net.spi.URLStreamHandlerProvider with net.woggioni.rbcs.common.RbcsUrlStreamHandlerFactory;
exports net.woggioni.rbcs.common;
}

View File

@@ -0,0 +1,15 @@
package net.woggioni.rbcs.common
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.CompositeByteBuf
fun extractChunk(buf: CompositeByteBuf, alloc: ByteBufAllocator): ByteBuf {
val chunk = alloc.compositeBuffer()
for (component in buf.decompose(0, buf.readableBytes())) {
chunk.addComponent(true, component.retain())
}
buf.removeComponents(0, buf.numComponents())
buf.clear()
return chunk
}

View File

@@ -0,0 +1,25 @@
package net.woggioni.rbcs.common
import io.netty.buffer.ByteBuf
import java.io.InputStream
class ByteBufInputStream(private val buf : ByteBuf) : InputStream() {
override fun read(): Int {
return buf.takeIf {
it.readableBytes() > 0
}?.let(ByteBuf::readByte)
?.let(Byte::toInt) ?: -1
}
override fun read(b: ByteArray, off: Int, len: Int): Int {
val readableBytes = buf.readableBytes()
if(readableBytes == 0) return -1
val result = len.coerceAtMost(readableBytes)
buf.readBytes(b, off, result)
return result
}
override fun close() {
buf.release()
}
}

View File

@@ -0,0 +1,18 @@
package net.woggioni.rbcs.common
import io.netty.buffer.ByteBuf
import java.io.OutputStream
class ByteBufOutputStream(private val buf : ByteBuf) : OutputStream() {
override fun write(b: Int) {
buf.writeByte(b)
}
override fun write(b: ByteArray, off: Int, len: Int) {
buf.writeBytes(b, off, len)
}
override fun close() {
buf.release()
}
}

View File

@@ -0,0 +1,7 @@
package net.woggioni.rbcs.common
class ResourceNotFoundException(msg : String? = null, cause: Throwable? = null) : RuntimeException(msg, cause) {
}
class ModuleNotFoundException(msg : String? = null, cause: Throwable? = null) : RuntimeException(msg, cause) {
}

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.common
package net.woggioni.rbcs.common
data class HostAndPort(val host: String, val port: Int = 0) {

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.common
package net.woggioni.rbcs.common
import org.slf4j.Logger
import org.slf4j.LoggerFactory

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.common
package net.woggioni.rbcs.common
import java.security.SecureRandom
import java.security.spec.KeySpec

View File

@@ -0,0 +1,47 @@
package net.woggioni.rbcs.common
import net.woggioni.jwo.JWO
import java.net.URI
import java.net.URL
import java.security.MessageDigest
object RBCS {
fun String.toUrl() : URL = URL.of(URI(this), null)
const val RBCS_NAMESPACE_URI: String = "urn:net.woggioni.rbcs.server"
const val RBCS_PREFIX: String = "rbcs"
const val XML_SCHEMA_NAMESPACE_URI = "http://www.w3.org/2001/XMLSchema-instance"
fun ByteArray.toInt(index : Int = 0) : Long {
if(index + 4 > size) throw IllegalArgumentException("Not enough bytes to decode a 32 bits integer")
var value : Long = 0
for (b in index until index + 4) {
value = (value shl 8) + (get(b).toInt() and 0xFF)
}
return value
}
fun ByteArray.toLong(index : Int = 0) : Long {
if(index + 8 > size) throw IllegalArgumentException("Not enough bytes to decode a 64 bits long integer")
var value : Long = 0
for (b in index until index + 8) {
value = (value shl 8) + (get(b).toInt() and 0xFF)
}
return value
}
fun digest(
data: ByteArray,
md: MessageDigest = MessageDigest.getInstance("MD5")
): ByteArray {
md.update(data)
return md.digest()
}
fun digestString(
data: ByteArray,
md: MessageDigest = MessageDigest.getInstance("MD5")
): String {
return JWO.bytesToHex(digest(data, md))
}
}

View File

@@ -1,19 +1,18 @@
package net.woggioni.gbcs.common
package net.woggioni.rbcs.common
import java.io.IOException
import java.io.InputStream
import java.net.URL
import java.net.URLConnection
import java.net.URLStreamHandler
import java.net.URLStreamHandlerFactory
import java.util.Optional
import java.net.spi.URLStreamHandlerProvider
import java.util.concurrent.atomic.AtomicBoolean
import java.util.stream.Collectors
class GbcsUrlStreamHandlerFactory : URLStreamHandlerFactory {
class RbcsUrlStreamHandlerFactory : URLStreamHandlerProvider() {
private class ClasspathHandler(private val classLoader: ClassLoader = GbcsUrlStreamHandlerFactory::class.java.classLoader) :
private class ClasspathHandler(private val classLoader: ClassLoader = RbcsUrlStreamHandlerFactory::class.java.classLoader) :
URLStreamHandler() {
override fun openConnection(u: URL): URLConnection? {
@@ -36,13 +35,17 @@ class GbcsUrlStreamHandlerFactory : URLStreamHandlerFactory {
private class JpmsHandler : URLStreamHandler() {
override fun openConnection(u: URL): URLConnection {
val thisModule = javaClass.module
val sourceModule = Optional.ofNullable(thisModule)
.map { obj: Module -> obj.layer }
.flatMap { layer: ModuleLayer ->
val moduleName = u.host
layer.findModule(moduleName)
}.orElse(thisModule)
val thisModule = javaClass.module
val sourceModule =
thisModule
?.let(Module::getLayer)
?.let { layer: ModuleLayer ->
layer.findModule(moduleName).orElse(null)
} ?: if(thisModule.layer == null) {
thisModule
} else throw ModuleNotFoundException("Module '$moduleName' not found")
return JpmsResourceURLConnection(u, sourceModule)
}
}
@@ -53,7 +56,9 @@ class GbcsUrlStreamHandlerFactory : URLStreamHandlerFactory {
@Throws(IOException::class)
override fun getInputStream(): InputStream {
return module.getResourceAsStream(getURL().path)
val resource = getURL().path
return module.getResourceAsStream(resource)
?: throw ResourceNotFoundException("Resource '$resource' not found in module '${module.name}'")
}
}
@@ -82,12 +87,12 @@ class GbcsUrlStreamHandlerFactory : URLStreamHandlerFactory {
private val installed = AtomicBoolean(false)
fun install() {
if (!installed.getAndSet(true)) {
URL.setURLStreamHandlerFactory(GbcsUrlStreamHandlerFactory())
URL.setURLStreamHandlerFactory(RbcsUrlStreamHandlerFactory())
}
}
private val packageMap: Map<String, List<Module>> by lazy {
GbcsUrlStreamHandlerFactory::class.java.module.layer
RbcsUrlStreamHandlerFactory::class.java.module.layer
.modules()
.stream()
.flatMap { m: Module ->

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.common
package net.woggioni.rbcs.common
import net.woggioni.jwo.JWO
import org.slf4j.LoggerFactory

View File

@@ -0,0 +1 @@
net.woggioni.rbcs.common.RbcsUrlStreamHandlerFactory

View File

@@ -6,10 +6,10 @@ plugins {
configurations {
bundle {
extendsFrom runtimeClasspath
canBeResolved = true
canBeConsumed = false
visible = false
transitive = false
resolutionStrategy {
dependencies {
@@ -29,10 +29,21 @@ configurations {
}
dependencies {
compileOnly project(':gbcs-common')
compileOnly project(':gbcs-api')
compileOnly catalog.jwo
implementation catalog.xmemcached
implementation project(':rbcs-common')
implementation project(':rbcs-api')
implementation catalog.jwo
implementation catalog.slf4j.api
implementation catalog.netty.common
implementation catalog.netty.handler
implementation catalog.netty.codec.memcache
bundle catalog.netty.codec.memcache
testRuntimeOnly catalog.logback.classic
}
tasks.named(JavaPlugin.TEST_TASK_NAME, Test) {
systemProperty("io.netty.leakDetectionLevel", "PARANOID")
}
Provider<Tar> bundleTask = tasks.register("bundle", Tar) {

View File

@@ -0,0 +1,20 @@
import net.woggioni.rbcs.api.CacheProvider;
module net.woggioni.rbcs.server.memcache {
requires net.woggioni.rbcs.common;
requires net.woggioni.rbcs.api;
requires net.woggioni.jwo;
requires java.xml;
requires kotlin.stdlib;
requires io.netty.transport;
requires io.netty.codec;
requires io.netty.codec.memcache;
requires io.netty.common;
requires io.netty.buffer;
requires io.netty.handler;
requires org.slf4j;
provides CacheProvider with net.woggioni.rbcs.server.memcache.MemcacheCacheProvider;
opens net.woggioni.rbcs.server.memcache.schema;
}

View File

@@ -0,0 +1,4 @@
package net.woggioni.rbcs.server.memcache
class MemcacheException(status : Short, msg : String? = null, cause : Throwable? = null)
: RuntimeException(msg ?: "Memcached status $status", cause)

View File

@@ -0,0 +1,235 @@
package net.woggioni.rbcs.server.memcache
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.Unpooled
import io.netty.handler.codec.memcache.binary.BinaryMemcacheOpcodes
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus
import io.netty.handler.codec.memcache.binary.DefaultBinaryMemcacheRequest
import net.woggioni.rbcs.api.Cache
import net.woggioni.rbcs.api.RequestHandle
import net.woggioni.rbcs.api.ResponseHandle
import net.woggioni.rbcs.api.event.RequestStreamingEvent
import net.woggioni.rbcs.api.event.ResponseStreamingEvent
import net.woggioni.rbcs.api.exception.ContentTooLargeException
import net.woggioni.rbcs.common.ByteBufOutputStream
import net.woggioni.rbcs.common.RBCS.digest
import net.woggioni.rbcs.common.contextLogger
import net.woggioni.rbcs.common.debug
import net.woggioni.rbcs.common.extractChunk
import net.woggioni.rbcs.server.memcache.client.MemcacheClient
import net.woggioni.rbcs.server.memcache.client.MemcacheResponseHandle
import net.woggioni.rbcs.server.memcache.client.StreamingRequestEvent
import net.woggioni.rbcs.server.memcache.client.StreamingResponseEvent
import java.security.MessageDigest
import java.time.Duration
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.Inflater
import java.util.zip.InflaterOutputStream
class MemcacheCache(private val cfg: MemcacheCacheConfiguration) : Cache {
companion object {
@JvmStatic
private val log = contextLogger()
}
private val memcacheClient = MemcacheClient(cfg)
override fun get(key: String, responseHandle: ResponseHandle, alloc: ByteBufAllocator) {
val compressionMode = cfg.compressionMode
val buf = alloc.compositeBuffer()
val stream = ByteBufOutputStream(buf).let { outputStream ->
if (compressionMode != null) {
when (compressionMode) {
MemcacheCacheConfiguration.CompressionMode.DEFLATE -> {
InflaterOutputStream(
outputStream,
Inflater()
)
}
}
} else {
outputStream
}
}
val memcacheResponseHandle = object : MemcacheResponseHandle {
override fun handleEvent(evt: StreamingResponseEvent) {
when (evt) {
is StreamingResponseEvent.ResponseReceived -> {
if (evt.response.status() == BinaryMemcacheResponseStatus.SUCCESS) {
responseHandle.handleEvent(ResponseStreamingEvent.RESPONSE_RECEIVED)
} else if (evt.response.status() == BinaryMemcacheResponseStatus.KEY_ENOENT) {
responseHandle.handleEvent(ResponseStreamingEvent.NOT_FOUND)
} else {
responseHandle.handleEvent(ResponseStreamingEvent.ExceptionCaught(MemcacheException(evt.response.status())))
}
}
is StreamingResponseEvent.LastContentReceived -> {
evt.content.content().let { content ->
content.readBytes(stream, content.readableBytes())
}
buf.retain()
stream.close()
val chunk = extractChunk(buf, alloc)
buf.release()
responseHandle.handleEvent(
ResponseStreamingEvent.LastChunkReceived(
chunk
)
)
}
is StreamingResponseEvent.ContentReceived -> {
evt.content.content().let { content ->
content.readBytes(stream, content.readableBytes())
}
if (buf.readableBytes() >= cfg.chunkSize) {
val chunk = extractChunk(buf, alloc)
responseHandle.handleEvent(ResponseStreamingEvent.ChunkReceived(chunk))
}
}
is StreamingResponseEvent.ExceptionCaught -> {
responseHandle.handleEvent(ResponseStreamingEvent.ExceptionCaught(evt.exception))
}
}
}
}
memcacheClient.sendRequest(Unpooled.wrappedBuffer(key.toByteArray()), memcacheResponseHandle)
.thenApply { memcacheRequestHandle ->
val request = (cfg.digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digest(key.toByteArray(), md)
} ?: key.toByteArray(Charsets.UTF_8)
).let { digest ->
DefaultBinaryMemcacheRequest(Unpooled.wrappedBuffer(digest)).apply {
setOpcode(BinaryMemcacheOpcodes.GET)
}
}
memcacheRequestHandle.handleEvent(StreamingRequestEvent.SendRequest(request))
}.exceptionally { ex ->
responseHandle.handleEvent(ResponseStreamingEvent.ExceptionCaught(ex))
}
}
private fun encodeExpiry(expiry: Duration): Int {
val expirySeconds = expiry.toSeconds()
return expirySeconds.toInt().takeIf { it.toLong() == expirySeconds }
?: Instant.ofEpochSecond(expirySeconds).epochSecond.toInt()
}
override fun put(
key: String,
responseHandle: ResponseHandle,
alloc: ByteBufAllocator
): CompletableFuture<RequestHandle> {
val memcacheResponseHandle = object : MemcacheResponseHandle {
override fun handleEvent(evt: StreamingResponseEvent) {
when (evt) {
is StreamingResponseEvent.ResponseReceived -> {
when (evt.response.status()) {
BinaryMemcacheResponseStatus.SUCCESS -> {
responseHandle.handleEvent(ResponseStreamingEvent.RESPONSE_RECEIVED)
}
BinaryMemcacheResponseStatus.KEY_ENOENT -> {
responseHandle.handleEvent(ResponseStreamingEvent.NOT_FOUND)
}
BinaryMemcacheResponseStatus.E2BIG -> {
responseHandle.handleEvent(
ResponseStreamingEvent.ExceptionCaught(
ContentTooLargeException("Request payload is too big", null)
)
)
}
else -> {
responseHandle.handleEvent(ResponseStreamingEvent.ExceptionCaught(MemcacheException(evt.response.status())))
}
}
}
is StreamingResponseEvent.LastContentReceived -> {
responseHandle.handleEvent(
ResponseStreamingEvent.LastChunkReceived(
evt.content.content().retain()
)
)
}
is StreamingResponseEvent.ContentReceived -> {
responseHandle.handleEvent(ResponseStreamingEvent.ChunkReceived(evt.content.content().retain()))
}
is StreamingResponseEvent.ExceptionCaught -> {
responseHandle.handleEvent(ResponseStreamingEvent.ExceptionCaught(evt.exception))
}
}
}
}
val result: CompletableFuture<RequestHandle> =
memcacheClient.sendRequest(Unpooled.wrappedBuffer(key.toByteArray()), memcacheResponseHandle)
.thenApply { memcacheRequestHandle ->
val request = (cfg.digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digest(key.toByteArray(), md)
} ?: key.toByteArray(Charsets.UTF_8)).let { digest ->
val extras = Unpooled.buffer(8, 8)
extras.writeInt(0)
extras.writeInt(encodeExpiry(cfg.maxAge))
DefaultBinaryMemcacheRequest(Unpooled.wrappedBuffer(digest), extras).apply {
setOpcode(BinaryMemcacheOpcodes.SET)
}
}
// memcacheRequestHandle.handleEvent(StreamingRequestEvent.SendRequest(request))
val compressionMode = cfg.compressionMode
val buf = alloc.heapBuffer()
val stream = ByteBufOutputStream(buf).let { outputStream ->
if (compressionMode != null) {
when (compressionMode) {
MemcacheCacheConfiguration.CompressionMode.DEFLATE -> {
DeflaterOutputStream(
outputStream,
Deflater(Deflater.DEFAULT_COMPRESSION, false)
)
}
}
} else {
outputStream
}
}
RequestHandle { evt ->
when (evt) {
is RequestStreamingEvent.LastChunkReceived -> {
evt.chunk.readBytes(stream, evt.chunk.readableBytes())
buf.retain()
stream.close()
request.setTotalBodyLength(buf.readableBytes() + request.keyLength() + request.extrasLength())
memcacheRequestHandle.handleEvent(StreamingRequestEvent.SendRequest(request))
memcacheRequestHandle.handleEvent(StreamingRequestEvent.SendLastChunk(buf))
}
is RequestStreamingEvent.ChunkReceived -> {
evt.chunk.readBytes(stream, evt.chunk.readableBytes())
}
is RequestStreamingEvent.ExceptionCaught -> {
stream.close()
}
}
}
}
return result
}
override fun close() {
memcacheClient.close()
}
}

View File

@@ -0,0 +1,36 @@
package net.woggioni.rbcs.server.memcache
import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.common.HostAndPort
import java.time.Duration
data class MemcacheCacheConfiguration(
val servers: List<Server>,
val maxAge: Duration = Duration.ofDays(1),
val maxSize: Int = 0x100000,
val digestAlgorithm: String? = null,
val compressionMode: CompressionMode? = null,
val chunkSize : Int
) : Configuration.Cache {
enum class CompressionMode {
/**
* Deflate mode
*/
DEFLATE
}
data class Server(
val endpoint : HostAndPort,
val connectionTimeoutMillis : Int?,
val maxConnections : Int
)
override fun materialize() = MemcacheCache(this)
override fun getNamespaceURI() = "urn:net.woggioni.rbcs.server.memcache"
override fun getTypeName() = "memcacheCacheType"
}

View File

@@ -0,0 +1,103 @@
package net.woggioni.rbcs.server.memcache
import net.woggioni.rbcs.api.CacheProvider
import net.woggioni.rbcs.api.exception.ConfigurationException
import net.woggioni.rbcs.common.RBCS
import net.woggioni.rbcs.common.HostAndPort
import net.woggioni.rbcs.common.Xml
import net.woggioni.rbcs.common.Xml.Companion.asIterable
import net.woggioni.rbcs.common.Xml.Companion.renderAttribute
import org.w3c.dom.Document
import org.w3c.dom.Element
import java.time.Duration
import java.time.temporal.ChronoUnit
class MemcacheCacheProvider : CacheProvider<MemcacheCacheConfiguration> {
override fun getXmlSchemaLocation() = "jpms://net.woggioni.rbcs.server.memcache/net/woggioni/rbcs/server/memcache/schema/rbcs-memcache.xsd"
override fun getXmlType() = "memcacheCacheType"
override fun getXmlNamespace() = "urn:net.woggioni.rbcs.server.memcache"
val xmlNamespacePrefix : String
get() = "rbcs-memcache"
override fun deserialize(el: Element): MemcacheCacheConfiguration {
val servers = mutableListOf<MemcacheCacheConfiguration.Server>()
val maxAge = el.renderAttribute("max-age")
?.let(Duration::parse)
?: Duration.ofDays(1)
val maxSize = el.renderAttribute("max-size")
?.let(Integer::decode)
?: 0x100000
val chunkSize = el.renderAttribute("chunk-size")
?.let(Integer::decode)
?: 0x4000
val compressionMode = el.renderAttribute("compression-mode")
?.let {
when (it) {
"deflate" -> MemcacheCacheConfiguration.CompressionMode.DEFLATE
else -> MemcacheCacheConfiguration.CompressionMode.DEFLATE
}
}
?: MemcacheCacheConfiguration.CompressionMode.DEFLATE
val digestAlgorithm = el.renderAttribute("digest")
for (child in el.asIterable()) {
when (child.nodeName) {
"server" -> {
val host = child.renderAttribute("host") ?: throw ConfigurationException("host attribute is required")
val port = child.renderAttribute("port")?.toInt() ?: throw ConfigurationException("port attribute is required")
val maxConnections = child.renderAttribute("max-connections")?.toInt() ?: 1
val connectionTimeout = child.renderAttribute("connection-timeout")
?.let(Duration::parse)
?.let(Duration::toMillis)
?.let(Long::toInt)
?: 10000
servers.add(MemcacheCacheConfiguration.Server(HostAndPort(host, port), connectionTimeout, maxConnections))
}
}
}
return MemcacheCacheConfiguration(
servers,
maxAge,
maxSize,
digestAlgorithm,
compressionMode,
chunkSize
)
}
override fun serialize(doc: Document, cache: MemcacheCacheConfiguration) = cache.run {
val result = doc.createElement("cache")
Xml.of(doc, result) {
attr("xmlns:${xmlNamespacePrefix}", xmlNamespace, namespaceURI = "http://www.w3.org/2000/xmlns/")
attr("xs:type", "${xmlNamespacePrefix}:$xmlType", RBCS.XML_SCHEMA_NAMESPACE_URI)
for (server in servers) {
node("server") {
attr("host", server.endpoint.host)
attr("port", server.endpoint.port.toString())
server.connectionTimeoutMillis?.let { connectionTimeoutMillis ->
attr("connection-timeout", Duration.of(connectionTimeoutMillis.toLong(), ChronoUnit.MILLIS).toString())
}
attr("max-connections", server.maxConnections.toString())
}
}
attr("max-age", maxAge.toString())
attr("max-size", maxSize.toString())
attr("chunk-size", chunkSize.toString())
digestAlgorithm?.let { digestAlgorithm ->
attr("digest", digestAlgorithm)
}
compressionMode?.let { compressionMode ->
attr(
"compression-mode", when (compressionMode) {
MemcacheCacheConfiguration.CompressionMode.DEFLATE -> "deflate"
}
)
}
}
result
}
}

View File

@@ -0,0 +1,30 @@
package net.woggioni.rbcs.server.memcache.client
import io.netty.buffer.ByteBuf
import io.netty.handler.codec.memcache.LastMemcacheContent
import io.netty.handler.codec.memcache.MemcacheContent
import io.netty.handler.codec.memcache.binary.BinaryMemcacheRequest
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
sealed interface StreamingRequestEvent {
class SendRequest(val request : BinaryMemcacheRequest) : StreamingRequestEvent
open class SendChunk(val chunk : ByteBuf) : StreamingRequestEvent
class SendLastChunk(chunk : ByteBuf) : SendChunk(chunk)
class ExceptionCaught(val exception : Throwable) : StreamingRequestEvent
}
sealed interface StreamingResponseEvent {
class ResponseReceived(val response : BinaryMemcacheResponse) : StreamingResponseEvent
open class ContentReceived(val content : MemcacheContent) : StreamingResponseEvent
class LastContentReceived(val lastContent : LastMemcacheContent) : ContentReceived(lastContent)
class ExceptionCaught(val exception : Throwable) : StreamingResponseEvent
}
interface MemcacheRequestHandle {
fun handleEvent(evt : StreamingRequestEvent)
}
interface MemcacheResponseHandle {
fun handleEvent(evt : StreamingResponseEvent)
}

View File

@@ -0,0 +1,183 @@
package net.woggioni.rbcs.server.memcache.client
import io.netty.bootstrap.Bootstrap
import io.netty.buffer.ByteBuf
import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelOption
import io.netty.channel.ChannelPipeline
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.pool.AbstractChannelPoolHandler
import io.netty.channel.pool.ChannelPool
import io.netty.channel.pool.FixedChannelPool
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.memcache.DefaultLastMemcacheContent
import io.netty.handler.codec.memcache.DefaultMemcacheContent
import io.netty.handler.codec.memcache.LastMemcacheContent
import io.netty.handler.codec.memcache.MemcacheContent
import io.netty.handler.codec.memcache.MemcacheObject
import io.netty.handler.codec.memcache.binary.BinaryMemcacheClientCodec
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
import io.netty.handler.logging.LoggingHandler
import io.netty.util.concurrent.GenericFutureListener
import net.woggioni.rbcs.common.HostAndPort
import net.woggioni.rbcs.common.contextLogger
import net.woggioni.rbcs.common.debug
import net.woggioni.rbcs.server.memcache.MemcacheCacheConfiguration
import java.net.InetSocketAddress
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
import io.netty.util.concurrent.Future as NettyFuture
class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseable {
private companion object {
@JvmStatic
private val log = contextLogger()
}
private val group: NioEventLoopGroup
private val connectionPool: MutableMap<HostAndPort, ChannelPool> = ConcurrentHashMap()
init {
group = NioEventLoopGroup()
}
private val counter = AtomicLong(0)
private fun newConnectionPool(server: MemcacheCacheConfiguration.Server): FixedChannelPool {
val bootstrap = Bootstrap().apply {
group(group)
channel(NioSocketChannel::class.java)
option(ChannelOption.SO_KEEPALIVE, true)
remoteAddress(InetSocketAddress(server.endpoint.host, server.endpoint.port))
server.connectionTimeoutMillis?.let {
option(ChannelOption.CONNECT_TIMEOUT_MILLIS, it)
}
}
val channelPoolHandler = object : AbstractChannelPoolHandler() {
override fun channelCreated(ch: Channel) {
val pipeline: ChannelPipeline = ch.pipeline()
pipeline.addLast(BinaryMemcacheClientCodec())
pipeline.addLast(LoggingHandler())
}
}
return FixedChannelPool(bootstrap, channelPoolHandler, server.maxConnections)
}
fun sendRequest(key: ByteBuf, responseHandle: MemcacheResponseHandle): CompletableFuture<MemcacheRequestHandle> {
val server = cfg.servers.let { servers ->
if (servers.size > 1) {
var checksum = 0
while (key.readableBytes() > 4) {
val byte = key.readInt()
checksum = checksum xor byte
}
while (key.readableBytes() > 0) {
val byte = key.readByte()
checksum = checksum xor byte.toInt()
}
servers[checksum % servers.size]
} else {
servers.first()
}
}
val response = CompletableFuture<MemcacheRequestHandle>()
// Custom handler for processing responses
val pool = connectionPool.computeIfAbsent(server.endpoint) {
newConnectionPool(server)
}
pool.acquire().addListener(object : GenericFutureListener<NettyFuture<Channel>> {
override fun operationComplete(channelFuture: NettyFuture<Channel>) {
if (channelFuture.isSuccess) {
val channel = channelFuture.now
val pipeline = channel.pipeline()
val handler = object : SimpleChannelInboundHandler<MemcacheObject>() {
override fun channelRead0(
ctx: ChannelHandlerContext,
msg: MemcacheObject
) {
when (msg) {
is BinaryMemcacheResponse -> responseHandle.handleEvent(
StreamingResponseEvent.ResponseReceived(
msg
)
)
is LastMemcacheContent -> {
responseHandle.handleEvent(
StreamingResponseEvent.LastContentReceived(
msg
)
)
pipeline.removeLast()
pool.release(channel)
}
is MemcacheContent -> responseHandle.handleEvent(
StreamingResponseEvent.ContentReceived(
msg
)
)
}
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
responseHandle.handleEvent(StreamingResponseEvent.ExceptionCaught(cause))
ctx.close()
pipeline.removeLast()
pool.release(channel)
}
}
channel.pipeline()
.addLast("client-handler", handler)
response.complete(object : MemcacheRequestHandle {
override fun handleEvent(evt: StreamingRequestEvent) {
when (evt) {
is StreamingRequestEvent.SendRequest -> {
channel.writeAndFlush(evt.request)
}
is StreamingRequestEvent.SendLastChunk -> {
channel.writeAndFlush(DefaultLastMemcacheContent(evt.chunk))
val value = counter.incrementAndGet()
log.debug {
"Finished request counter: $value"
}
}
is StreamingRequestEvent.SendChunk -> {
channel.writeAndFlush(DefaultMemcacheContent(evt.chunk))
}
is StreamingRequestEvent.ExceptionCaught -> {
responseHandle.handleEvent(StreamingResponseEvent.ExceptionCaught(evt.exception))
channel.close()
pipeline.removeLast()
pool.release(channel)
}
}
}
})
} else {
response.completeExceptionally(channelFuture.cause())
}
}
})
return response
}
fun shutDown(): NettyFuture<*> {
return group.shutdownGracefully()
}
override fun close() {
shutDown().sync()
}
}

View File

@@ -0,0 +1 @@
net.woggioni.rbcs.server.memcache.MemcacheCacheProvider

View File

@@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<xs:schema targetNamespace="urn:net.woggioni.rbcs.server.memcache"
xmlns:rbcs-memcache="urn:net.woggioni.rbcs.server.memcache"
xmlns:rbcs="urn:net.woggioni.rbcs.server"
xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:import schemaLocation="jpms://net.woggioni.rbcs.server/net/woggioni/rbcs/server/schema/rbcs.xsd" namespace="urn:net.woggioni.rbcs.server"/>
<xs:complexType name="memcacheServerType">
<xs:attribute name="host" type="xs:token" use="required"/>
<xs:attribute name="port" type="xs:positiveInteger" use="required"/>
<xs:attribute name="connection-timeout" type="xs:duration"/>
<xs:attribute name="max-connections" type="xs:positiveInteger" default="1"/>
</xs:complexType>
<xs:complexType name="memcacheCacheType">
<xs:complexContent>
<xs:extension base="rbcs:cacheType">
<xs:sequence maxOccurs="unbounded">
<xs:element name="server" type="rbcs-memcache:memcacheServerType"/>
</xs:sequence>
<xs:attribute name="max-age" type="xs:duration" default="P1D"/>
<xs:attribute name="max-size" type="rbcs:byteSize" default="1048576"/>
<xs:attribute name="chunk-size" type="rbcs:byteSize" default="0x4000"/>
<xs:attribute name="digest" type="xs:token" />
<xs:attribute name="compression-mode" type="rbcs-memcache:compressionType"/>
</xs:extension>
</xs:complexContent>
</xs:complexType>
<xs:simpleType name="compressionType">
<xs:restriction base="xs:token">
<xs:enumeration value="deflate"/>
</xs:restriction>
</xs:simpleType>
</xs:schema>

View File

@@ -1,6 +1,7 @@
plugins {
id 'java-library'
alias catalog.plugins.kotlin.jvm
id 'jacoco'
id 'maven-publish'
}
@@ -8,20 +9,25 @@ dependencies {
implementation catalog.jwo
implementation catalog.slf4j.api
implementation catalog.netty.codec.http
implementation catalog.netty.handler
implementation catalog.netty.buffer
implementation catalog.netty.transport
api project(':gbcs-common')
api project(':gbcs-api')
api project(':rbcs-common')
api project(':rbcs-api')
// runtimeOnly catalog.slf4j.jdk14
testRuntimeOnly catalog.logback.classic
testImplementation catalog.bcprov.jdk18on
testImplementation catalog.bcpkix.jdk18on
testImplementation catalog.junit.jupiter.api
testImplementation catalog.junit.jupiter.params
testRuntimeOnly catalog.junit.jupiter.engine
testRuntimeOnly project(":gbcs-server-memcached")
testRuntimeOnly project(":rbcs-server-memcache")
}
test {
systemProperty("io.netty.leakDetectionLevel", "PARANOID")
systemProperty("jdk.httpclient.redirects.retrylimit", "1")
}
publishing {
@@ -33,3 +39,4 @@ publishing {
}

Some files were not shown because too many files have changed in this diff Show More