Compare commits

..

1 Commits

Author SHA1 Message Date
e5fe8437a6 temporary commit 2025-02-04 13:59:44 +08:00
73 changed files with 986 additions and 926 deletions

View File

@@ -44,7 +44,7 @@ jobs:
target: release target: release
cache-from: type=registry,ref=gitea.woggioni.net/woggioni/gbcs:buildx cache-from: type=registry,ref=gitea.woggioni.net/woggioni/gbcs:buildx
- -
name: Build gbcs memcache Docker image name: Build gbcs memcached Docker image
uses: docker/build-push-action@v5.3.0 uses: docker/build-push-action@v5.3.0
with: with:
context: "docker/build/docker" context: "docker/build/docker"
@@ -52,9 +52,9 @@ jobs:
push: true push: true
pull: true pull: true
tags: | tags: |
gitea.woggioni.net/woggioni/gbcs:memcache gitea.woggioni.net/woggioni/gbcs:memcached
gitea.woggioni.net/woggioni/gbcs:memcache-${{ steps.retrieve-version.outputs.VERSION }} gitea.woggioni.net/woggioni/gbcs:memcached-${{ steps.retrieve-version.outputs.VERSION }}
target: release-memcache target: release-memcached
cache-from: type=registry,ref=gitea.woggioni.net/woggioni/gbcs:buildx cache-from: type=registry,ref=gitea.woggioni.net/woggioni/gbcs:buildx
cache-to: type=registry,mode=max,compression=zstd,image-manifest=true,oci-mediatypes=true,ref=gitea.woggioni.net/woggioni/gbcs:buildx cache-to: type=registry,mode=max,compression=zstd,image-manifest=true,oci-mediatypes=true,ref=gitea.woggioni.net/woggioni/gbcs:buildx
- name: Publish artifacts - name: Publish artifacts

View File

@@ -5,12 +5,12 @@ WORKDIR /home/luser
FROM base-release AS release FROM base-release AS release
ADD gbcs-cli-envelope-*.jar gbcs.jar ADD gbcs-cli-envelope-*.jar gbcs.jar
ENTRYPOINT ["java", "-XX:+UseZGC", "-XX:+ZGenerational", "-jar", "/home/luser/gbcs.jar", "server"] ENTRYPOINT ["java", "-jar", "/home/luser/gbcs.jar", "server"]
FROM base-release AS release-memcache FROM base-release AS release-memcached
ADD --chown=luser:luser gbcs-cli-envelope-*.jar gbcs.jar ADD --chown=luser:luser gbcs-cli-envelope-*.jar gbcs.jar
RUN mkdir plugins RUN mkdir plugins
WORKDIR /home/luser/plugins WORKDIR /home/luser/plugins
RUN --mount=type=bind,source=.,target=/build/distributions tar -xf /build/distributions/gbcs-server-memcache*.tar RUN --mount=type=bind,source=.,target=/build/distributions tar -xf /build/distributions/gbcs-server-memcached*.tar
WORKDIR /home/luser WORKDIR /home/luser
ENTRYPOINT ["java", "-XX:+UseZGC", "-XX:+ZGenerational", "-jar", "/home/luser/gbcs.jar", "server"] ENTRYPOINT ["java", "-jar", "/home/luser/gbcs.jar", "server"]

View File

@@ -19,7 +19,7 @@ configurations {
dependencies { dependencies {
docker project(path: ':gbcs-cli', configuration: 'release') docker project(path: ':gbcs-cli', configuration: 'release')
docker project(path: ':gbcs-server-memcache', configuration: 'release') docker project(path: ':gbcs-server-memcached', configuration: 'release')
} }
Provider<Task> cleanTaskProvider = tasks.named(BasePlugin.CLEAN_TASK_NAME) {} Provider<Task> cleanTaskProvider = tasks.named(BasePlugin.CLEAN_TASK_NAME) {}
@@ -46,22 +46,22 @@ Provider<DockerTagImage> dockerTag = tasks.register('dockerTagImage', DockerTagI
tag = version tag = version
} }
Provider<DockerTagImage> dockerTagMemcache = tasks.register('dockerTagMemcacheImage', DockerTagImage) { Provider<DockerTagImage> dockerTagMemcached = tasks.register('dockerTagMemcachedImage', DockerTagImage) {
group = 'docker' group = 'docker'
repository = 'gitea.woggioni.net/woggioni/gbcs' repository = 'gitea.woggioni.net/woggioni/gbcs'
imageId = 'gitea.woggioni.net/woggioni/gbcs:memcache' imageId = 'gitea.woggioni.net/woggioni/gbcs:memcached'
tag = "${version}-memcache" tag = "${version}-memcached"
} }
Provider<DockerPushImage> dockerPush = tasks.register('dockerPushImage', DockerPushImage) { Provider<DockerPushImage> dockerPush = tasks.register('dockerPushImage', DockerPushImage) {
group = 'docker' group = 'docker'
dependsOn dockerTag, dockerTagMemcache dependsOn dockerTag, dockerTagMemcached
registryCredentials { registryCredentials {
url = getProperty('docker.registry.url') url = getProperty('docker.registry.url')
username = 'woggioni' username = 'woggioni'
password = System.getenv().get("PUBLISHER_TOKEN") password = System.getenv().get("PUBLISHER_TOKEN")
} }
images = [dockerTag.flatMap{ it.tag }, dockerTagMemcache.flatMap{ it.tag }] images = [dockerTag.flatMap{ it.tag }, dockerTagMemcached.flatMap{ it.tag }]
} }

View File

@@ -4,4 +4,5 @@ module net.woggioni.gbcs.api {
requires io.netty.buffer; requires io.netty.buffer;
exports net.woggioni.gbcs.api; exports net.woggioni.gbcs.api;
exports net.woggioni.gbcs.api.exception; exports net.woggioni.gbcs.api.exception;
exports net.woggioni.gbcs.api.event;
} }

View File

@@ -1,6 +1,5 @@
package net.woggioni.gbcs.api; package net.woggioni.gbcs.api;
import io.netty.buffer.ByteBuf;
import net.woggioni.gbcs.api.exception.ContentTooLargeException; import net.woggioni.gbcs.api.exception.ContentTooLargeException;
import java.nio.channels.ReadableByteChannel; import java.nio.channels.ReadableByteChannel;
@@ -8,7 +7,6 @@ import java.util.concurrent.CompletableFuture;
public interface Cache extends AutoCloseable { public interface Cache extends AutoCloseable {
CompletableFuture<ReadableByteChannel> get(String key); CompletableFuture<CallHandle<Void>> get(String key, ResponseEventListener responseEventListener);
CompletableFuture<CallHandle<Void>> put(String key) throws ContentTooLargeException;
CompletableFuture<Void> put(String key, ByteBuf content) throws ContentTooLargeException;
} }

View File

@@ -0,0 +1,10 @@
package net.woggioni.gbcs.api;
import net.woggioni.gbcs.api.event.RequestEvent;
import java.util.concurrent.CompletableFuture;
public interface CallHandle<T> {
void postEvent(RequestEvent evt);
CompletableFuture<T> call();
}

View File

@@ -27,7 +27,6 @@ public class Configuration {
Cache cache; Cache cache;
Authentication authentication; Authentication authentication;
Tls tls; Tls tls;
boolean useNativeTransport;
@Value @Value
public static class EventExecutor { public static class EventExecutor {
@@ -57,8 +56,7 @@ public class Configuration {
@EqualsAndHashCode.Include @EqualsAndHashCode.Include
String name; String name;
Set<Role> roles; Set<Role> roles;
Quota groupQuota; Quota quota;
Quota userQuota;
} }
@Value @Value
@@ -152,8 +150,7 @@ public class Configuration {
Map<String, Group> groups, Map<String, Group> groups,
Cache cache, Cache cache,
Authentication authentication, Authentication authentication,
Tls tls, Tls tls
boolean useNativeTransport
) { ) {
return new Configuration( return new Configuration(
host, host,
@@ -166,8 +163,7 @@ public class Configuration {
groups, groups,
cache, cache,
authentication, authentication,
tls, tls
useNativeTransport
); );
} }
} }

View File

@@ -0,0 +1,7 @@
package net.woggioni.gbcs.api;
import net.woggioni.gbcs.api.event.ResponseEvent;
public interface ResponseEventListener {
void listen(ResponseEvent evt);
}

View File

@@ -0,0 +1,20 @@
package net.woggioni.gbcs.api.event;
import io.netty.buffer.ByteBuf;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import net.woggioni.gbcs.api.CallHandle;
sealed public abstract class RequestEvent {
@Getter
@RequiredArgsConstructor
public static final class ChunkSent extends RequestEvent {
private final ByteBuf chunk;
}
@Getter
@RequiredArgsConstructor
public static final class LastChunkSent extends RequestEvent {
private final ByteBuf chunk;
}
}

View File

@@ -0,0 +1,28 @@
package net.woggioni.gbcs.api.event;
import io.netty.buffer.ByteBuf;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
sealed public abstract class ResponseEvent {
@Getter
@RequiredArgsConstructor
public final static class ChunkReceived extends ResponseEvent {
private final ByteBuf chunk;
}
public final static class NoContent extends ResponseEvent {
}
@Getter
@RequiredArgsConstructor
public final static class LastChunkReceived extends ResponseEvent {
private final ByteBuf chunk;
}
@Getter
@RequiredArgsConstructor
public final static class ExceptionCaught extends ResponseEvent {
private final Throwable cause;
}
}

View File

@@ -32,13 +32,6 @@ configurations {
canBeResolved = true canBeResolved = true
visible = true visible = true
} }
nativeLibraries {
transitive = false
canBeConsumed = false
canBeResolved = true
visible = false
}
} }
envelopeJar { envelopeJar {
@@ -60,8 +53,6 @@ dependencies {
// runtimeOnly catalog.slf4j.jdk14 // runtimeOnly catalog.slf4j.jdk14
runtimeOnly catalog.logback.classic runtimeOnly catalog.logback.classic
// runtimeOnly catalog.slf4j.simple // runtimeOnly catalog.slf4j.simple
nativeLibraries group: 'io.netty', name: 'netty-transport-native-epoll', version: catalog.versions.netty.get(), classifier: 'linux-x86_64'
} }
Provider<EnvelopeJarTask> envelopeJarTaskProvider = tasks.named('envelopeJar', EnvelopeJarTask.class) { Provider<EnvelopeJarTask> envelopeJarTaskProvider = tasks.named('envelopeJar', EnvelopeJarTask.class) {
@@ -76,9 +67,6 @@ Provider<EnvelopeJarTask> envelopeJarTaskProvider = tasks.named('envelopeJar', E
// systemProperties['org.slf4j.simpleLogger.log.com.google.code.yanf4j'] = 'warn' // systemProperties['org.slf4j.simpleLogger.log.com.google.code.yanf4j'] = 'warn'
// systemProperties['org.slf4j.simpleLogger.log.net.rubyeye.xmemcached'] = 'warn' // systemProperties['org.slf4j.simpleLogger.log.net.rubyeye.xmemcached'] = 'warn'
// systemProperties['org.slf4j.simpleLogger.dateTimeFormat'] = 'yyyy-MM-dd\'T\'HH:mm:ss.SSSZ' // systemProperties['org.slf4j.simpleLogger.dateTimeFormat'] = 'yyyy-MM-dd\'T\'HH:mm:ss.SSSZ'
from {
configurations.nativeLibraries.collect { it.isDirectory() ? it : zipTree(it) }
}
} }
tasks.named(NativeImagePlugin.CONFIGURE_NATIVE_IMAGE_TASK_NAME, NativeImageConfigurationTask) { tasks.named(NativeImagePlugin.CONFIGURE_NATIVE_IMAGE_TASK_NAME, NativeImageConfigurationTask) {

View File

@@ -1,19 +1,19 @@
package net.woggioni.gbcs.cli package net.woggioni.gbcs.cli
import net.woggioni.gbcs.common.GbcsUrlStreamHandlerFactory
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.cli.impl.AbstractVersionProvider import net.woggioni.gbcs.cli.impl.AbstractVersionProvider
import net.woggioni.gbcs.cli.impl.GbcsCommand import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.cli.impl.commands.BenchmarkCommand import net.woggioni.gbcs.cli.impl.commands.BenchmarkCommand
import net.woggioni.gbcs.cli.impl.commands.ClientCommand import net.woggioni.gbcs.cli.impl.commands.ClientCommand
import net.woggioni.gbcs.cli.impl.commands.GetCommand import net.woggioni.gbcs.cli.impl.commands.GetCommand
import net.woggioni.gbcs.cli.impl.commands.HealthCheckCommand
import net.woggioni.gbcs.cli.impl.commands.PasswordHashCommand import net.woggioni.gbcs.cli.impl.commands.PasswordHashCommand
import net.woggioni.gbcs.cli.impl.commands.PutCommand import net.woggioni.gbcs.cli.impl.commands.PutCommand
import net.woggioni.gbcs.cli.impl.commands.ServerCommand import net.woggioni.gbcs.cli.impl.commands.ServerCommand
import net.woggioni.gbcs.common.GbcsUrlStreamHandlerFactory
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.jwo.Application import net.woggioni.jwo.Application
import picocli.CommandLine import picocli.CommandLine
import picocli.CommandLine.Model.CommandSpec import picocli.CommandLine.Model.CommandSpec
import java.net.URI
@CommandLine.Command( @CommandLine.Command(
@@ -25,12 +25,8 @@ class GradleBuildCacheServerCli : GbcsCommand() {
companion object { companion object {
@JvmStatic @JvmStatic
fun main(vararg args: String) { fun main(vararg args: String) {
val currentClassLoader = GradleBuildCacheServerCli::class.java.classLoader Thread.currentThread().contextClassLoader = GradleBuildCacheServerCli::class.java.classLoader
Thread.currentThread().contextClassLoader = currentClassLoader GbcsUrlStreamHandlerFactory.install()
if(currentClassLoader.javaClass.name == "net.woggioni.envelope.loader.ModuleClassLoader") {
//We're running in an envelope jar and custom URL protocols won't work
GbcsUrlStreamHandlerFactory.install()
}
val log = contextLogger() val log = contextLogger()
val app = Application.builder("gbcs") val app = Application.builder("gbcs")
.configurationDirectoryEnvVar("GBCS_CONFIGURATION_DIR") .configurationDirectoryEnvVar("GBCS_CONFIGURATION_DIR")
@@ -49,7 +45,6 @@ class GradleBuildCacheServerCli : GbcsCommand() {
addSubcommand(BenchmarkCommand()) addSubcommand(BenchmarkCommand())
addSubcommand(PutCommand()) addSubcommand(PutCommand())
addSubcommand(GetCommand()) addSubcommand(GetCommand())
addSubcommand(HealthCheckCommand())
}) })
System.exit(commandLine.execute(*args)) System.exit(commandLine.execute(*args))
} }

View File

@@ -33,110 +33,96 @@ class BenchmarkCommand : GbcsCommand() {
) )
private var numberOfEntries = 1000 private var numberOfEntries = 1000
@CommandLine.Option(
names = ["-s", "--size"],
description = ["Size of a cache value in bytes"],
paramLabel = "SIZE"
)
private var size = 0x1000
override fun run() { override fun run() {
val clientCommand = spec.parent().userObject() as ClientCommand val clientCommand = spec.parent().userObject() as ClientCommand
val profile = clientCommand.profileName.let { profileName -> val profile = clientCommand.profileName.let { profileName ->
clientCommand.configuration.profiles[profileName] clientCommand.configuration.profiles[profileName]
?: throw IllegalArgumentException("Profile $profileName does not exist in configuration") ?: throw IllegalArgumentException("Profile $profileName does not exist in configuration")
} }
GradleBuildCacheClient(profile).use { client -> val client = GradleBuildCacheClient(profile)
val entryGenerator = sequence { val entryGenerator = sequence {
val random = Random(SecureRandom.getInstance("NativePRNGNonBlocking").nextLong()) val random = Random(SecureRandom.getInstance("NativePRNGNonBlocking").nextLong())
while (true) { while (true) {
val key = JWO.bytesToHex(random.nextBytes(16)) val key = JWO.bytesToHex(random.nextBytes(16))
val content = random.nextInt().toByte() val content = random.nextInt().toByte()
val value = ByteArray(size, { _ -> content }) val value = ByteArray(0x1000, { _ -> content })
yield(key to value) yield(key to value)
}
} }
}
log.info { log.info {
"Starting insertion" "Starting insertion"
} }
val entries = let { val entries = let {
val completionCounter = AtomicLong(0) val completionCounter = AtomicLong(0)
val completionQueue = LinkedBlockingQueue<Pair<String, ByteArray>>(numberOfEntries) val completionQueue = LinkedBlockingQueue<Pair<String, ByteArray>>(numberOfEntries)
val start = Instant.now() val start = Instant.now()
val semaphore = Semaphore(profile.maxConnections * 3) val semaphore = Semaphore(profile.maxConnections * 3)
val iterator = entryGenerator.take(numberOfEntries).iterator() val iterator = entryGenerator.take(numberOfEntries).iterator()
while (completionCounter.get() < numberOfEntries) { while(completionCounter.get() < numberOfEntries) {
if (iterator.hasNext()) { if(iterator.hasNext()) {
val entry = iterator.next() val entry = iterator.next()
semaphore.acquire() semaphore.acquire()
val future = client.put(entry.first, entry.second).thenApply { entry } val future = client.put(entry.first, entry.second).thenApply { entry }
future.whenComplete { result, ex -> future.whenComplete { result, ex ->
if (ex != null) { if (ex != null) {
log.error(ex.message, ex) log.error(ex.message, ex)
} else { } else {
completionQueue.put(result) completionQueue.put(result)
}
semaphore.release()
completionCounter.incrementAndGet()
} }
} else { semaphore.release()
Thread.sleep(0) completionCounter.incrementAndGet()
} }
} }
}
val inserted = completionQueue.toList() val inserted = completionQueue.toList()
val end = Instant.now() val end = Instant.now()
log.info {
val elapsed = Duration.between(start, end).toMillis()
val opsPerSecond = String.format("%.2f", numberOfEntries.toDouble() / elapsed * 1000)
"Insertion rate: $opsPerSecond ops/s"
}
inserted
}
log.info { log.info {
"Inserted ${entries.size} entries" val elapsed = Duration.between(start, end).toMillis()
val opsPerSecond = String.format("%.2f", numberOfEntries.toDouble() / elapsed * 1000)
"Insertion rate: $opsPerSecond ops/s"
} }
log.info { inserted
"Starting retrieval" }
} log.info {
if (entries.isNotEmpty()) { "Inserted ${entries.size} entries"
val completionCounter = AtomicLong(0) }
val semaphore = Semaphore(profile.maxConnections * 3) log.info {
val start = Instant.now() "Starting retrieval"
val it = entries.iterator() }
while (completionCounter.get() < entries.size) { if (entries.isNotEmpty()) {
if (it.hasNext()) { val completionCounter = AtomicLong(0)
val entry = it.next() val semaphore = Semaphore(profile.maxConnections * 3)
val future = client.get(entry.first).thenApply { val start = Instant.now()
if (it == null) { entries.forEach { entry ->
log.error { semaphore.acquire()
"Missing entry for key '${entry.first}'"
} val future = client.get(entry.first).thenApply {
} else if (!entry.second.contentEquals(it)) { if (it == null) {
log.error { log.error {
"Retrieved a value different from what was inserted for key '${entry.first}'" "Missing entry for key '${entry.first}'"
}
}
} }
future.whenComplete { _, _ -> } else if (!entry.second.contentEquals(it)) {
completionCounter.incrementAndGet() log.error {
semaphore.release() "Retrieved a value different from what was inserted for key '${entry.first}'"
} }
} else {
Thread.sleep(0)
} }
} }
val end = Instant.now() future.whenComplete { _, _ ->
log.info { completionCounter.incrementAndGet()
val elapsed = Duration.between(start, end).toMillis() semaphore.release()
val opsPerSecond = String.format("%.2f", entries.size.toDouble() / elapsed * 1000)
"Retrieval rate: $opsPerSecond ops/s"
} }
} else {
log.error("Skipping retrieval benchmark as it was not possible to insert any entry in the cache")
} }
val end = Instant.now()
log.info {
val elapsed = Duration.between(start, end).toMillis()
val opsPerSecond = String.format("%.2f", entries.size.toDouble() / elapsed * 1000)
"Retrieval rate: $opsPerSecond ops/s"
}
} else {
log.error("Skipping retrieval benchmark as it was not possible to insert any entry in the cache")
} }
} }
} }

View File

@@ -1,8 +1,8 @@
package net.woggioni.gbcs.cli.impl.commands package net.woggioni.gbcs.cli.impl.commands
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.cli.impl.GbcsCommand import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.client.GradleBuildCacheClient import net.woggioni.gbcs.client.GradleBuildCacheClient
import net.woggioni.gbcs.common.contextLogger
import picocli.CommandLine import picocli.CommandLine
import java.nio.file.Files import java.nio.file.Files
import java.nio.file.Path import java.nio.file.Path

View File

@@ -1,45 +0,0 @@
package net.woggioni.gbcs.cli.impl.commands
import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.client.GradleBuildCacheClient
import net.woggioni.gbcs.common.contextLogger
import picocli.CommandLine
import java.security.SecureRandom
import kotlin.random.Random
@CommandLine.Command(
name = "health",
description = ["Check server health"],
showDefaultValues = true
)
class HealthCheckCommand : GbcsCommand() {
private val log = contextLogger()
@CommandLine.Spec
private lateinit var spec: CommandLine.Model.CommandSpec
override fun run() {
val clientCommand = spec.parent().userObject() as ClientCommand
val profile = clientCommand.profileName.let { profileName ->
clientCommand.configuration.profiles[profileName]
?: throw IllegalArgumentException("Profile $profileName does not exist in configuration")
}
GradleBuildCacheClient(profile).use { client ->
val random = Random(SecureRandom.getInstance("NativePRNGNonBlocking").nextLong())
val nonce = ByteArray(0xa0)
random.nextBytes(nonce)
client.healthCheck(nonce).thenApply { value ->
if(value == null) {
throw IllegalStateException("Empty response from server")
}
for(i in 0 until nonce.size) {
for(j in value.size - nonce.size until nonce.size) {
if(nonce[i] != value[j]) {
throw IllegalStateException("Server nonce does not match")
}
}
}
}.get()
}
}
}

View File

@@ -1,8 +1,8 @@
package net.woggioni.gbcs.cli.impl.commands package net.woggioni.gbcs.cli.impl.commands
import net.woggioni.gbcs.common.PasswordSecurity.hashPassword
import net.woggioni.gbcs.cli.impl.GbcsCommand import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.cli.impl.converters.OutputStreamConverter import net.woggioni.gbcs.cli.impl.converters.OutputStreamConverter
import net.woggioni.gbcs.common.PasswordSecurity.hashPassword
import net.woggioni.jwo.UncloseableOutputStream import net.woggioni.jwo.UncloseableOutputStream
import picocli.CommandLine import picocli.CommandLine
import java.io.OutputStream import java.io.OutputStream

View File

@@ -1,9 +1,9 @@
package net.woggioni.gbcs.cli.impl.commands package net.woggioni.gbcs.cli.impl.commands
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.cli.impl.GbcsCommand import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.cli.impl.converters.InputStreamConverter import net.woggioni.gbcs.cli.impl.converters.InputStreamConverter
import net.woggioni.gbcs.client.GradleBuildCacheClient import net.woggioni.gbcs.client.GradleBuildCacheClient
import net.woggioni.gbcs.common.contextLogger
import picocli.CommandLine import picocli.CommandLine
import java.io.InputStream import java.io.InputStream

View File

@@ -1,20 +1,18 @@
package net.woggioni.gbcs.cli.impl.commands package net.woggioni.gbcs.cli.impl.commands
import net.woggioni.gbcs.server.GradleBuildCacheServer
import net.woggioni.gbcs.server.GradleBuildCacheServer.Companion.DEFAULT_CONFIGURATION_URL
import net.woggioni.gbcs.api.Configuration import net.woggioni.gbcs.api.Configuration
import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.cli.impl.converters.DurationConverter
import net.woggioni.gbcs.common.contextLogger import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.common.debug import net.woggioni.gbcs.common.debug
import net.woggioni.gbcs.common.info import net.woggioni.gbcs.common.info
import net.woggioni.gbcs.server.GradleBuildCacheServer import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.server.GradleBuildCacheServer.Companion.DEFAULT_CONFIGURATION_URL
import net.woggioni.jwo.Application import net.woggioni.jwo.Application
import net.woggioni.jwo.JWO import net.woggioni.jwo.JWO
import picocli.CommandLine import picocli.CommandLine
import java.io.ByteArrayOutputStream import java.io.ByteArrayOutputStream
import java.nio.file.Files import java.nio.file.Files
import java.nio.file.Path import java.nio.file.Path
import java.time.Duration
@CommandLine.Command( @CommandLine.Command(
name = "server", name = "server",
@@ -37,14 +35,6 @@ class ServerCommand(app : Application) : GbcsCommand() {
} }
} }
@CommandLine.Option(
names = ["-t", "--timeout"],
description = ["Exit after the specified time"],
paramLabel = "TIMEOUT",
converter = [DurationConverter::class]
)
private var timeout: Duration? = null
@CommandLine.Option( @CommandLine.Option(
names = ["-c", "--config-file"], names = ["-c", "--config-file"],
description = ["Read the application configuration from this file"], description = ["Read the application configuration from this file"],
@@ -52,6 +42,10 @@ class ServerCommand(app : Application) : GbcsCommand() {
) )
private var configurationFile: Path = findConfigurationFile(app, "gbcs-server.xml") private var configurationFile: Path = findConfigurationFile(app, "gbcs-server.xml")
val configuration : Configuration by lazy {
GradleBuildCacheServer.loadConfiguration(configurationFile)
}
override fun run() { override fun run() {
if (!Files.exists(configurationFile)) { if (!Files.exists(configurationFile)) {
Files.createDirectories(configurationFile.parent) Files.createDirectories(configurationFile.parent)
@@ -67,11 +61,7 @@ class ServerCommand(app : Application) : GbcsCommand() {
} }
} }
val server = GradleBuildCacheServer(configuration) val server = GradleBuildCacheServer(configuration)
server.run().use { server -> server.run().use {
timeout?.let {
Thread.sleep(it)
server.shutdown()
}
} }
} }
} }

View File

@@ -1,11 +0,0 @@
package net.woggioni.gbcs.cli.impl.converters
import picocli.CommandLine
import java.time.Duration
class DurationConverter : CommandLine.ITypeConverter<Duration> {
override fun convert(value: String): Duration {
return Duration.parse(value)
}
}

View File

@@ -15,4 +15,6 @@
<root level="info"> <root level="info">
<appender-ref ref="console"/> <appender-ref ref="console"/>
</root> </root>
<logger name="com.google.code.yanf4j" level="warn"/>
<logger name="net.rubyeye.xmemcached" level="warn"/>
</configuration> </configuration>

View File

@@ -213,25 +213,6 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
} }
} }
fun healthCheck(nonce: ByteArray): CompletableFuture<ByteArray?> {
return executeWithRetry {
sendRequest(profile.serverURI, HttpMethod.TRACE, nonce)
}.thenApply {
val status = it.status()
if (it.status() != HttpResponseStatus.OK) {
throw HttpException(status)
} else {
it.content()
}
}.thenApply { maybeByteBuf ->
maybeByteBuf?.let {
val result = ByteArray(it.readableBytes())
it.getBytes(0, result)
result
}
}
}
fun get(key: String): CompletableFuture<ByteArray?> { fun get(key: String): CompletableFuture<ByteArray?> {
return executeWithRetry { return executeWithRetry {
sendRequest(profile.serverURI.resolve(key), HttpMethod.GET, null) sendRequest(profile.serverURI.resolve(key), HttpMethod.GET, null)

View File

@@ -1,9 +1,9 @@
package net.woggioni.gbcs.client.impl package net.woggioni.gbcs.client.impl
import net.woggioni.gbcs.api.exception.ConfigurationException import net.woggioni.gbcs.api.exception.ConfigurationException
import net.woggioni.gbcs.client.GradleBuildCacheClient
import net.woggioni.gbcs.common.Xml.Companion.asIterable import net.woggioni.gbcs.common.Xml.Companion.asIterable
import net.woggioni.gbcs.common.Xml.Companion.renderAttribute import net.woggioni.gbcs.common.Xml.Companion.renderAttribute
import net.woggioni.gbcs.client.GradleBuildCacheClient
import org.w3c.dom.Document import org.w3c.dom.Document
import java.net.URI import java.net.URI
import java.nio.file.Files import java.nio.file.Files

View File

@@ -4,6 +4,7 @@ import io.netty.util.concurrent.DefaultEventExecutorGroup
import io.netty.util.concurrent.EventExecutorGroup import io.netty.util.concurrent.EventExecutorGroup
import net.woggioni.gbcs.common.contextLogger import net.woggioni.gbcs.common.contextLogger
import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtensionContext import org.junit.jupiter.api.extension.ExtensionContext
import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments import org.junit.jupiter.params.provider.Arguments

View File

@@ -9,7 +9,6 @@ dependencies {
implementation project(':gbcs-api') implementation project(':gbcs-api')
implementation catalog.slf4j.api implementation catalog.slf4j.api
implementation catalog.jwo implementation catalog.jwo
implementation catalog.netty.buffer
} }
publishing { publishing {

View File

@@ -4,7 +4,6 @@ module net.woggioni.gbcs.common {
requires org.slf4j; requires org.slf4j;
requires kotlin.stdlib; requires kotlin.stdlib;
requires net.woggioni.jwo; requires net.woggioni.jwo;
requires io.netty.buffer;
provides java.net.spi.URLStreamHandlerProvider with net.woggioni.gbcs.common.GbcsUrlStreamHandlerFactory; provides java.net.spi.URLStreamHandlerProvider with net.woggioni.gbcs.common.GbcsUrlStreamHandlerFactory;
exports net.woggioni.gbcs.common; exports net.woggioni.gbcs.common;

View File

@@ -1,25 +0,0 @@
package net.woggioni.gbcs.common
import io.netty.buffer.ByteBuf
import java.io.InputStream
class ByteBufInputStream(private val buf : ByteBuf) : InputStream() {
override fun read(): Int {
return buf.takeIf {
it.readableBytes() > 0
}?.let(ByteBuf::readByte)
?.let(Byte::toInt) ?: -1
}
override fun read(b: ByteArray, off: Int, len: Int): Int {
val readableBytes = buf.readableBytes()
if(readableBytes == 0) return -1
val result = len.coerceAtMost(readableBytes)
buf.readBytes(b, off, result)
return result
}
override fun close() {
buf.release()
}
}

View File

@@ -1,19 +0,0 @@
package net.woggioni.gbcs.common
import io.netty.buffer.ByteBuf
import java.io.InputStream
import java.io.OutputStream
class ByteBufOutputStream(private val buf : ByteBuf) : OutputStream() {
override fun write(b: Int) {
buf.writeByte(b)
}
override fun write(b: ByteArray, off: Int, len: Int) {
buf.writeBytes(b, off, len)
}
override fun close() {
buf.release()
}
}

View File

@@ -1,7 +0,0 @@
package net.woggioni.gbcs.common
class ResourceNotFoundException(msg : String? = null, cause: Throwable? = null) : RuntimeException(msg, cause) {
}
class ModuleNotFoundException(msg : String? = null, cause: Throwable? = null) : RuntimeException(msg, cause) {
}

View File

@@ -5,6 +5,7 @@ import java.io.InputStream
import java.net.URL import java.net.URL
import java.net.URLConnection import java.net.URLConnection
import java.net.URLStreamHandler import java.net.URLStreamHandler
import java.net.URLStreamHandlerFactory
import java.net.spi.URLStreamHandlerProvider import java.net.spi.URLStreamHandlerProvider
import java.util.Optional import java.util.Optional
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
@@ -36,17 +37,13 @@ class GbcsUrlStreamHandlerFactory : URLStreamHandlerProvider() {
private class JpmsHandler : URLStreamHandler() { private class JpmsHandler : URLStreamHandler() {
override fun openConnection(u: URL): URLConnection { override fun openConnection(u: URL): URLConnection {
val moduleName = u.host
val thisModule = javaClass.module val thisModule = javaClass.module
val sourceModule = val sourceModule = Optional.ofNullable(thisModule)
thisModule .map { obj: Module -> obj.layer }
?.let(Module::getLayer) .flatMap { layer: ModuleLayer ->
?.let { layer: ModuleLayer -> val moduleName = u.host
layer.findModule(moduleName).orElse(null) layer.findModule(moduleName)
} ?: if(thisModule.layer == null) { }.orElse(thisModule)
thisModule
} else throw ModuleNotFoundException("Module '$moduleName' not found")
return JpmsResourceURLConnection(u, sourceModule) return JpmsResourceURLConnection(u, sourceModule)
} }
} }
@@ -57,9 +54,7 @@ class GbcsUrlStreamHandlerFactory : URLStreamHandlerProvider() {
@Throws(IOException::class) @Throws(IOException::class)
override fun getInputStream(): InputStream { override fun getInputStream(): InputStream {
val resource = getURL().path return module.getResourceAsStream(getURL().path)
return module.getResourceAsStream(resource)
?: throw ResourceNotFoundException("Resource '$resource' not found in module '${module.name}'")
} }
} }

View File

@@ -1,4 +0,0 @@
package net.woggioni.gbcs.server.memcache
class MemcacheException(status : Short, msg : String? = null, cause : Throwable? = null)
: RuntimeException(msg ?: "Memcached status $status", cause)

View File

@@ -1,23 +0,0 @@
package net.woggioni.gbcs.server.memcache
import io.netty.buffer.ByteBuf
import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.server.memcache.client.MemcacheClient
import java.nio.channels.ReadableByteChannel
import java.util.concurrent.CompletableFuture
class MemcacheCache(private val cfg : MemcacheCacheConfiguration) : Cache {
private val memcacheClient = MemcacheClient(cfg)
override fun get(key: String): CompletableFuture<ReadableByteChannel?> {
return memcacheClient.get(key)
}
override fun put(key: String, content: ByteBuf): CompletableFuture<Void> {
return memcacheClient.put(key, content, cfg.maxAge)
}
override fun close() {
memcacheClient.close()
}
}

View File

@@ -1,258 +0,0 @@
package net.woggioni.gbcs.server.memcache.client
import io.netty.bootstrap.Bootstrap
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelOption
import io.netty.channel.ChannelPipeline
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.pool.AbstractChannelPoolHandler
import io.netty.channel.pool.ChannelPool
import io.netty.channel.pool.FixedChannelPool
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.DecoderException
import io.netty.handler.codec.memcache.binary.BinaryMemcacheClientCodec
import io.netty.handler.codec.memcache.binary.BinaryMemcacheObjectAggregator
import io.netty.handler.codec.memcache.binary.BinaryMemcacheOpcodes
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus
import io.netty.handler.codec.memcache.binary.DefaultFullBinaryMemcacheRequest
import io.netty.handler.codec.memcache.binary.FullBinaryMemcacheRequest
import io.netty.handler.codec.memcache.binary.FullBinaryMemcacheResponse
import io.netty.util.concurrent.GenericFutureListener
import net.woggioni.gbcs.common.ByteBufInputStream
import net.woggioni.gbcs.common.ByteBufOutputStream
import net.woggioni.gbcs.common.GBCS.digest
import net.woggioni.gbcs.common.HostAndPort
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.server.memcache.MemcacheCacheConfiguration
import net.woggioni.gbcs.server.memcache.MemcacheException
import net.woggioni.jwo.JWO
import java.io.ByteArrayOutputStream
import java.net.InetSocketAddress
import java.nio.channels.Channels
import java.nio.channels.ReadableByteChannel
import java.security.MessageDigest
import java.time.Duration
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.GZIPInputStream
import java.util.zip.GZIPOutputStream
import java.util.zip.InflaterInputStream
import io.netty.util.concurrent.Future as NettyFuture
class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseable {
private companion object {
@JvmStatic
private val log = contextLogger()
}
private val group: NioEventLoopGroup
private val connectionPool: MutableMap<HostAndPort, ChannelPool> = ConcurrentHashMap()
init {
group = NioEventLoopGroup()
}
private fun newConnectionPool(server: MemcacheCacheConfiguration.Server): FixedChannelPool {
val bootstrap = Bootstrap().apply {
group(group)
channel(NioSocketChannel::class.java)
option(ChannelOption.SO_KEEPALIVE, true)
remoteAddress(InetSocketAddress(server.endpoint.host, server.endpoint.port))
server.connectionTimeoutMillis?.let {
option(ChannelOption.CONNECT_TIMEOUT_MILLIS, it)
}
}
val channelPoolHandler = object : AbstractChannelPoolHandler() {
override fun channelCreated(ch: Channel) {
val pipeline: ChannelPipeline = ch.pipeline()
pipeline.addLast(BinaryMemcacheClientCodec())
pipeline.addLast(BinaryMemcacheObjectAggregator(Integer.MAX_VALUE))
}
}
return FixedChannelPool(bootstrap, channelPoolHandler, server.maxConnections)
}
private fun sendRequest(request: FullBinaryMemcacheRequest): CompletableFuture<FullBinaryMemcacheResponse> {
val server = cfg.servers.let { servers ->
if (servers.size > 1) {
val key = request.key().duplicate()
var checksum = 0
while (key.readableBytes() > 4) {
val byte = key.readInt()
checksum = checksum xor byte
}
while (key.readableBytes() > 0) {
val byte = key.readByte()
checksum = checksum xor byte.toInt()
}
servers[checksum % servers.size]
} else {
servers.first()
}
}
val response = CompletableFuture<FullBinaryMemcacheResponse>()
// Custom handler for processing responses
val pool = connectionPool.computeIfAbsent(server.endpoint) {
newConnectionPool(server)
}
pool.acquire().addListener(object : GenericFutureListener<NettyFuture<Channel>> {
override fun operationComplete(channelFuture: NettyFuture<Channel>) {
if (channelFuture.isSuccess) {
val channel = channelFuture.now
val pipeline = channel.pipeline()
channel.pipeline()
.addLast("client-handler", object : SimpleChannelInboundHandler<FullBinaryMemcacheResponse>() {
override fun channelRead0(
ctx: ChannelHandlerContext,
msg: FullBinaryMemcacheResponse
) {
pipeline.removeLast()
pool.release(channel)
msg.touch("The method's caller must remember to release this")
response.complete(msg.retain())
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
val ex = when (cause) {
is DecoderException -> cause.cause!!
else -> cause
}
ctx.close()
pipeline.removeLast()
pool.release(channel)
response.completeExceptionally(ex)
}
})
request.touch()
channel.writeAndFlush(request)
} else {
response.completeExceptionally(channelFuture.cause())
}
}
})
return response
}
private fun encodeExpiry(expiry: Duration): Int {
val expirySeconds = expiry.toSeconds()
return expirySeconds.toInt().takeIf { it.toLong() == expirySeconds }
?: Instant.ofEpochSecond(expirySeconds).epochSecond.toInt()
}
fun get(key: String): CompletableFuture<ReadableByteChannel?> {
val request = (cfg.digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digest(key.toByteArray(), md)
} ?: key.toByteArray(Charsets.UTF_8)).let { digest ->
DefaultFullBinaryMemcacheRequest(Unpooled.wrappedBuffer(digest), null).apply {
setOpcode(BinaryMemcacheOpcodes.GET)
}
}
return sendRequest(request).thenApply { response ->
try {
when (val status = response.status()) {
BinaryMemcacheResponseStatus.SUCCESS -> {
val compressionMode = cfg.compressionMode
val content = response.content().retain()
content.touch()
if (compressionMode != null) {
when (compressionMode) {
MemcacheCacheConfiguration.CompressionMode.GZIP -> {
GZIPInputStream(ByteBufInputStream(content))
}
MemcacheCacheConfiguration.CompressionMode.DEFLATE -> {
InflaterInputStream(ByteBufInputStream(content))
}
}
} else {
ByteBufInputStream(content)
}.let(Channels::newChannel)
}
BinaryMemcacheResponseStatus.KEY_ENOENT -> {
null
}
else -> throw MemcacheException(status)
}
} finally {
response.release()
}
}
}
fun put(key: String, content: ByteBuf, expiry: Duration, cas: Long? = null): CompletableFuture<Void> {
val request = (cfg.digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digest(key.toByteArray(), md)
} ?: key.toByteArray(Charsets.UTF_8)).let { digest ->
val extras = Unpooled.buffer(8, 8)
extras.writeInt(0)
extras.writeInt(encodeExpiry(expiry))
val compressionMode = cfg.compressionMode
content.retain()
val payload = if (compressionMode != null) {
val inputStream = ByteBufInputStream(content)
val buf = content.alloc().buffer()
buf.retain()
val outputStream = when (compressionMode) {
MemcacheCacheConfiguration.CompressionMode.GZIP -> {
GZIPOutputStream(ByteBufOutputStream(buf))
}
MemcacheCacheConfiguration.CompressionMode.DEFLATE -> {
DeflaterOutputStream(ByteBufOutputStream(buf), Deflater(Deflater.DEFAULT_COMPRESSION, false))
}
}
inputStream.use { i ->
outputStream.use { o ->
JWO.copy(i, o)
}
}
buf
} else {
content
}
DefaultFullBinaryMemcacheRequest(Unpooled.wrappedBuffer(digest), extras, payload).apply {
setOpcode(BinaryMemcacheOpcodes.SET)
cas?.let(this::setCas)
}
}
return sendRequest(request).thenApply { response ->
try {
when (val status = response.status()) {
BinaryMemcacheResponseStatus.SUCCESS -> null
else -> throw MemcacheException(status)
}
} finally {
response.release()
}
}
}
fun shutDown(): NettyFuture<*> {
return group.shutdownGracefully()
}
override fun close() {
shutDown().sync()
}
}

View File

@@ -1 +0,0 @@
net.woggioni.gbcs.server.memcache.MemcacheCacheProvider

View File

@@ -6,10 +6,10 @@ plugins {
configurations { configurations {
bundle { bundle {
extendsFrom runtimeClasspath
canBeResolved = true canBeResolved = true
canBeConsumed = false canBeConsumed = false
visible = false visible = false
transitive = false
resolutionStrategy { resolutionStrategy {
dependencies { dependencies {
@@ -26,23 +26,24 @@ configurations {
canBeResolved = true canBeResolved = true
visible = true visible = true
} }
testImplementation {
extendsFrom compileOnly
}
} }
dependencies { dependencies {
implementation project(':gbcs-common') compileOnly project(':gbcs-common')
implementation project(':gbcs-api') compileOnly project(':gbcs-api')
implementation catalog.jwo compileOnly catalog.jwo
implementation catalog.slf4j.api compileOnly catalog.slf4j.api
implementation catalog.netty.common implementation catalog.xmemcached
implementation catalog.netty.codec.memcache implementation catalog.netty.codec.memcache
implementation catalog.netty.common
bundle catalog.netty.codec.memcache implementation group: 'io.netty', name: 'netty-handler', version: catalog.versions.netty.get()
testRuntimeOnly catalog.logback.classic testRuntimeOnly catalog.logback.classic
}
tasks.named(JavaPlugin.TEST_TASK_NAME, Test) {
systemProperty("io.netty.leakDetectionLevel", "PARANOID")
} }
Provider<Tar> bundleTask = tasks.register("bundle", Tar) { Provider<Tar> bundleTask = tasks.register("bundle", Tar) {
@@ -51,6 +52,11 @@ Provider<Tar> bundleTask = tasks.register("bundle", Tar) {
group = BasePlugin.BUILD_GROUP group = BasePlugin.BUILD_GROUP
} }
tasks.named(JavaPlugin.TEST_TASK_NAME, Test) {
systemProperty("io.netty.leakDetectionLevel", "PARANOID")
}
tasks.named(BasePlugin.ASSEMBLE_TASK_NAME) { tasks.named(BasePlugin.ASSEMBLE_TASK_NAME) {
dependsOn(bundleTask) dependsOn(bundleTask)
} }

View File

@@ -1,19 +1,21 @@
import net.woggioni.gbcs.api.CacheProvider; import net.woggioni.gbcs.api.CacheProvider;
module net.woggioni.gbcs.server.memcache { module net.woggioni.gbcs.server.memcached {
requires net.woggioni.gbcs.common; requires net.woggioni.gbcs.common;
requires net.woggioni.gbcs.api; requires net.woggioni.gbcs.api;
requires com.googlecode.xmemcached;
requires net.woggioni.jwo; requires net.woggioni.jwo;
requires java.xml; requires java.xml;
requires kotlin.stdlib; requires kotlin.stdlib;
requires io.netty.transport;
requires io.netty.codec;
requires io.netty.codec.memcache;
requires io.netty.common; requires io.netty.common;
requires io.netty.buffer; requires io.netty.handler;
requires io.netty.codec.memcache;
requires io.netty.transport;
requires org.slf4j; requires org.slf4j;
requires io.netty.buffer;
requires io.netty.codec;
provides CacheProvider with net.woggioni.gbcs.server.memcache.MemcacheCacheProvider; provides CacheProvider with net.woggioni.gbcs.server.memcached.MemcachedCacheProvider;
opens net.woggioni.gbcs.server.memcache.schema; opens net.woggioni.gbcs.server.memcached.schema;
} }

View File

@@ -0,0 +1,33 @@
package net.woggioni.gbcs.server.memcached
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.stream.ChunkedInput
import java.nio.channels.ReadableByteChannel
class CustomChunkedInput(private val readableByteChannel: ReadableByteChannel) : ChunkedInput<ByteBuf> {
override fun isEndOfInput(): Boolean {
TODO("Not yet implemented")
}
override fun close() {
TODO("Not yet implemented")
}
override fun readChunk(ctx: ChannelHandlerContext): ByteBuf {
TODO("Not yet implemented")
}
override fun readChunk(allocator: ByteBufAllocator): ByteBuf {
TODO("Not yet implemented")
}
override fun length(): Long {
TODO("Not yet implemented")
}
override fun progress(): Long {
TODO("Not yet implemented")
}
}

View File

@@ -0,0 +1,4 @@
package net.woggioni.gbcs.server.memcached
class MemcachedException(status : Short, msg : String? = null, cause : Throwable? = null)
: RuntimeException(msg ?: "Memcached status $status", cause)

View File

@@ -0,0 +1,85 @@
package net.woggioni.gbcs.server.memcached
import io.netty.buffer.Unpooled
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus
import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.api.CallHandle
import net.woggioni.gbcs.api.ResponseEventListener
import net.woggioni.gbcs.api.event.RequestEvent
import net.woggioni.gbcs.api.event.ResponseEvent
import net.woggioni.gbcs.server.memcached.client.MemcachedClient
import net.woggioni.gbcs.server.memcached.client.ResponseEvent.ExceptionCaught
import net.woggioni.gbcs.server.memcached.client.ResponseEvent.LastResponseContentChunkReceived
import net.woggioni.gbcs.server.memcached.client.ResponseEvent.ResponseContentChunkReceived
import net.woggioni.gbcs.server.memcached.client.ResponseEvent.ResponseReceived
import net.woggioni.gbcs.server.memcached.client.ResponseListener
import java.util.concurrent.CompletableFuture
class MemcachedCache(
private val cfg : MemcachedCacheConfiguration
) : Cache {
private val client = MemcachedClient(cfg)
override fun close() {
client.close()
}
override fun get(key: String, responseEventListener: ResponseEventListener): CompletableFuture<CallHandle<Void>> {
val listener = ResponseListener { evt ->
when(evt) {
is ResponseContentChunkReceived -> {
responseEventListener.listen(ResponseEvent.ChunkReceived(Unpooled.wrappedBuffer(evt.chunk)))
}
is LastResponseContentChunkReceived -> {
responseEventListener.listen(ResponseEvent.LastChunkReceived(Unpooled.wrappedBuffer(evt.chunk)))
}
is ExceptionCaught -> {
responseEventListener.listen(ResponseEvent.ExceptionCaught(evt.cause))
}
is ResponseReceived -> {
when(val status = evt.response.status) {
BinaryMemcacheResponseStatus.SUCCESS -> {
}
BinaryMemcacheResponseStatus.KEY_ENOENT -> {
responseEventListener.listen(ResponseEvent.NoContent())
}
else -> {
responseEventListener.listen(ResponseEvent.ExceptionCaught(MemcachedException(status)))
}
}
}
}
}
return client.get(key, listener).thenApply { clientCallHandle ->
object : CallHandle<Void> {
override fun postEvent(evt: RequestEvent) {
when(evt) {
is RequestEvent.ChunkSent -> clientCallHandle.sendChunk(evt.chunk.nioBuffer())
is RequestEvent.LastChunkSent -> clientCallHandle.sendChunk(evt.chunk.nioBuffer())
}
}
override fun call(): CompletableFuture<Void> {
return clientCallHandle.waitForResponse().thenApply { null }
}
}
}
}
override fun put(key: String): CompletableFuture<CallHandle<Void>> {
return client.put(key, cfg.maxAge).thenApply { clientCallHandle ->
object : CallHandle<Void> {
override fun postEvent(evt: RequestEvent) {
when(evt) {
is RequestEvent.ChunkSent -> clientCallHandle.sendChunk(evt.chunk.nioBuffer())
is RequestEvent.LastChunkSent -> clientCallHandle.sendChunk(evt.chunk.nioBuffer())
}
}
override fun call(): CompletableFuture<Void> {
return clientCallHandle.waitForResponse().thenApply { null }
}
}
}
}
}

View File

@@ -1,15 +1,15 @@
package net.woggioni.gbcs.server.memcache package net.woggioni.gbcs.server.memcached
import net.woggioni.gbcs.api.Configuration import net.woggioni.gbcs.api.Configuration
import net.woggioni.gbcs.common.HostAndPort import net.woggioni.gbcs.common.HostAndPort
import java.time.Duration import java.time.Duration
data class MemcacheCacheConfiguration( data class MemcachedCacheConfiguration(
val servers: List<Server>, val servers: List<Server>,
val maxAge: Duration = Duration.ofDays(1), val maxAge: Duration = Duration.ofDays(1),
val maxSize: Int = 0x100000, val maxSize: Int = 0x100000,
val digestAlgorithm: String? = null, val digestAlgorithm: String? = null,
val compressionMode: CompressionMode? = null, val compressionMode: CompressionMode? = CompressionMode.DEFLATE,
) : Configuration.Cache { ) : Configuration.Cache {
enum class CompressionMode { enum class CompressionMode {
@@ -24,17 +24,23 @@ data class MemcacheCacheConfiguration(
DEFLATE DEFLATE
} }
class RetryPolicy(
val maxAttempts: Int,
val initialDelayMillis: Long,
val exp: Double
)
data class Server( data class Server(
val endpoint : HostAndPort, val endpoint : HostAndPort,
val connectionTimeoutMillis : Int?, val connectionTimeoutMillis : Int?,
val retryPolicy : RetryPolicy?,
val maxConnections : Int val maxConnections : Int
) )
override fun materialize() = MemcacheCache(this) override fun materialize() = MemcachedCache(this)
override fun getNamespaceURI() = "urn:net.woggioni.gbcs.server.memcache" override fun getNamespaceURI() = "urn:net.woggioni.gbcs.server.memcached"
override fun getTypeName() = "memcacheCacheType" override fun getTypeName() = "memcachedCacheType"
} }

View File

@@ -1,5 +1,6 @@
package net.woggioni.gbcs.server.memcache package net.woggioni.gbcs.server.memcached
import net.rubyeye.xmemcached.transcoders.CompressionMode
import net.woggioni.gbcs.api.CacheProvider import net.woggioni.gbcs.api.CacheProvider
import net.woggioni.gbcs.api.exception.ConfigurationException import net.woggioni.gbcs.api.exception.ConfigurationException
import net.woggioni.gbcs.common.GBCS import net.woggioni.gbcs.common.GBCS
@@ -10,21 +11,19 @@ import net.woggioni.gbcs.common.Xml.Companion.renderAttribute
import org.w3c.dom.Document import org.w3c.dom.Document
import org.w3c.dom.Element import org.w3c.dom.Element
import java.time.Duration import java.time.Duration
import java.time.temporal.ChronoUnit
class MemcachedCacheProvider : CacheProvider<MemcachedCacheConfiguration> {
override fun getXmlSchemaLocation() = "jpms://net.woggioni.gbcs.server.memcached/net/woggioni/gbcs/server/memcached/schema/gbcs-memcached.xsd"
class MemcacheCacheProvider : CacheProvider<MemcacheCacheConfiguration> { override fun getXmlType() = "memcachedCacheType"
override fun getXmlSchemaLocation() = "jpms://net.woggioni.gbcs.server.memcache/net/woggioni/gbcs/server/memcache/schema/gbcs-memcache.xsd"
override fun getXmlType() = "memcacheCacheType" override fun getXmlNamespace() = "urn:net.woggioni.gbcs.server.memcached"
override fun getXmlNamespace() = "urn:net.woggioni.gbcs.server.memcache"
val xmlNamespacePrefix : String val xmlNamespacePrefix : String
get() = "gbcs-memcache" get() = "gbcs-memcached"
override fun deserialize(el: Element): MemcacheCacheConfiguration { override fun deserialize(el: Element): MemcachedCacheConfiguration {
val servers = mutableListOf<MemcacheCacheConfiguration.Server>() val servers = mutableListOf<HostAndPort>()
val maxAge = el.renderAttribute("max-age") val maxAge = el.renderAttribute("max-age")
?.let(Duration::parse) ?.let(Duration::parse)
?: Duration.ofDays(1) ?: Duration.ofDays(1)
@@ -34,31 +33,25 @@ class MemcacheCacheProvider : CacheProvider<MemcacheCacheConfiguration> {
val compressionMode = el.renderAttribute("compression-mode") val compressionMode = el.renderAttribute("compression-mode")
?.let { ?.let {
when (it) { when (it) {
"gzip" -> MemcacheCacheConfiguration.CompressionMode.GZIP "gzip" -> CompressionMode.GZIP
"deflate" -> MemcacheCacheConfiguration.CompressionMode.DEFLATE "zip" -> CompressionMode.ZIP
else -> MemcacheCacheConfiguration.CompressionMode.DEFLATE else -> CompressionMode.ZIP
} }
} }
?: MemcacheCacheConfiguration.CompressionMode.DEFLATE ?: CompressionMode.ZIP
val digestAlgorithm = el.renderAttribute("digest") val digestAlgorithm = el.renderAttribute("digest")
for (child in el.asIterable()) { for (child in el.asIterable()) {
when (child.nodeName) { when (child.nodeName) {
"server" -> { "server" -> {
val host = child.renderAttribute("host") ?: throw ConfigurationException("host attribute is required") val host = child.renderAttribute("host") ?: throw ConfigurationException("host attribute is required")
val port = child.renderAttribute("port")?.toInt() ?: throw ConfigurationException("port attribute is required") val port = child.renderAttribute("port")?.toInt() ?: throw ConfigurationException("port attribute is required")
val maxConnections = child.renderAttribute("max-connections")?.toInt() ?: 1 servers.add(HostAndPort(host, port))
val connectionTimeout = child.renderAttribute("connection-timeout")
?.let(Duration::parse)
?.let(Duration::toMillis)
?.let(Long::toInt)
?: 10000
servers.add(MemcacheCacheConfiguration.Server(HostAndPort(host, port), connectionTimeout, maxConnections))
} }
} }
} }
return MemcacheCacheConfiguration( return MemcachedCacheConfiguration(
servers, servers.map { MemcachedCacheConfiguration.Server(it, null, null, 1) },
maxAge, maxAge,
maxSize, maxSize,
digestAlgorithm, digestAlgorithm,
@@ -66,7 +59,7 @@ class MemcacheCacheProvider : CacheProvider<MemcacheCacheConfiguration> {
) )
} }
override fun serialize(doc: Document, cache: MemcacheCacheConfiguration) = cache.run { override fun serialize(doc: Document, cache: MemcachedCacheConfiguration) = cache.run {
val result = doc.createElement("cache") val result = doc.createElement("cache")
Xml.of(doc, result) { Xml.of(doc, result) {
attr("xmlns:${xmlNamespacePrefix}", xmlNamespace, namespaceURI = "http://www.w3.org/2000/xmlns/") attr("xmlns:${xmlNamespacePrefix}", xmlNamespace, namespaceURI = "http://www.w3.org/2000/xmlns/")
@@ -76,10 +69,6 @@ class MemcacheCacheProvider : CacheProvider<MemcacheCacheConfiguration> {
node("server") { node("server") {
attr("host", server.endpoint.host) attr("host", server.endpoint.host)
attr("port", server.endpoint.port.toString()) attr("port", server.endpoint.port.toString())
server.connectionTimeoutMillis?.let { connectionTimeoutMillis ->
attr("connection-timeout", Duration.of(connectionTimeoutMillis.toLong(), ChronoUnit.MILLIS).toString())
}
attr("max-connections", server.maxConnections.toString())
} }
} }
attr("max-age", maxAge.toString()) attr("max-age", maxAge.toString())
@@ -87,14 +76,12 @@ class MemcacheCacheProvider : CacheProvider<MemcacheCacheConfiguration> {
digestAlgorithm?.let { digestAlgorithm -> digestAlgorithm?.let { digestAlgorithm ->
attr("digest", digestAlgorithm) attr("digest", digestAlgorithm)
} }
compressionMode?.let { compressionMode -> attr(
attr( "compression-mode", when (compressionMode) {
"compression-mode", when (compressionMode) { CompressionMode.GZIP -> "gzip"
MemcacheCacheConfiguration.CompressionMode.GZIP -> "gzip" CompressionMode.ZIP -> "zip"
MemcacheCacheConfiguration.CompressionMode.DEFLATE -> "deflate" }
} )
)
}
} }
result result
} }

View File

@@ -0,0 +1,9 @@
package net.woggioni.gbcs.server.memcached.client
import java.nio.ByteBuffer
import java.util.concurrent.CompletableFuture
interface CallHandle {
fun sendChunk(requestBodyChunk : ByteBuffer)
fun waitForResponse() : CompletableFuture<Short>
}

View File

@@ -0,0 +1,24 @@
package net.woggioni.gbcs.server.memcached.client
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
import java.nio.ByteBuffer
data class MemcacheResponse(
val status: Short,
val opcode: Byte,
val cas: Long?,
val opaque: Int?,
val key: ByteBuffer?,
val extra: ByteBuffer?
) {
companion object {
fun of(response : BinaryMemcacheResponse) = MemcacheResponse(
response.status(),
response.opcode(),
response.cas(),
response.opaque(),
response.key()?.nioBuffer(),
response.extras()?.nioBuffer()
)
}
}

View File

@@ -0,0 +1,241 @@
package net.woggioni.gbcs.server.memcached.client
import io.netty.bootstrap.Bootstrap
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelOption
import io.netty.channel.ChannelPipeline
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.pool.AbstractChannelPoolHandler
import io.netty.channel.pool.ChannelPool
import io.netty.channel.pool.FixedChannelPool
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.DecoderException
import io.netty.handler.codec.memcache.DefaultLastMemcacheContent
import io.netty.handler.codec.memcache.DefaultMemcacheContent
import io.netty.handler.codec.memcache.LastMemcacheContent
import io.netty.handler.codec.memcache.MemcacheContent
import io.netty.handler.codec.memcache.MemcacheObject
import io.netty.handler.codec.memcache.binary.BinaryMemcacheClientCodec
import io.netty.handler.codec.memcache.binary.BinaryMemcacheOpcodes
import io.netty.handler.codec.memcache.binary.BinaryMemcacheRequest
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus
import io.netty.handler.codec.memcache.binary.DefaultBinaryMemcacheRequest
import io.netty.util.concurrent.GenericFutureListener
import net.woggioni.gbcs.common.GBCS.digest
import net.woggioni.gbcs.common.HostAndPort
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.server.memcached.MemcachedCacheConfiguration
import net.woggioni.gbcs.server.memcached.MemcachedException
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.security.MessageDigest
import java.time.Duration
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import io.netty.util.concurrent.Future as NettyFuture
class MemcachedClient(private val cfg: MemcachedCacheConfiguration) : AutoCloseable {
private val log = contextLogger()
private val group: NioEventLoopGroup
private val connectionPool: MutableMap<HostAndPort, ChannelPool> = ConcurrentHashMap()
init {
group = NioEventLoopGroup()
}
private fun newConnectionPool(server : MemcachedCacheConfiguration.Server) : FixedChannelPool {
val bootstrap = Bootstrap().apply {
group(group)
channel(NioSocketChannel::class.java)
option(ChannelOption.SO_KEEPALIVE, true)
remoteAddress(InetSocketAddress(server.endpoint.host, server.endpoint.port))
server.connectionTimeoutMillis?.let {
option(ChannelOption.CONNECT_TIMEOUT_MILLIS, it)
}
}
val channelPoolHandler = object : AbstractChannelPoolHandler() {
override fun channelCreated(ch: Channel) {
val pipeline: ChannelPipeline = ch.pipeline()
pipeline.addLast(BinaryMemcacheClientCodec())
}
}
return FixedChannelPool(bootstrap, channelPoolHandler, server.maxConnections)
}
private fun sendRequest(request: BinaryMemcacheRequest,
responseListener: ResponseListener?
): CompletableFuture<CallHandle> {
val server = cfg.servers.let { servers ->
if(servers.size > 1) {
val key = request.key().duplicate()
var checksum = 0
while(key.readableBytes() > 4) {
val byte = key.readInt()
checksum = checksum xor byte
}
while(key.readableBytes() > 0) {
val byte = key.readByte()
checksum = checksum xor byte.toInt()
}
servers[checksum % servers.size]
} else {
servers.first()
}
}
val callHandleFuture = CompletableFuture<CallHandle>()
val result = CompletableFuture<Short>()
// Custom handler for processing responses
val pool = connectionPool.computeIfAbsent(server.endpoint) {
newConnectionPool(server)
}
pool.acquire().addListener(object : GenericFutureListener<NettyFuture<Channel>> {
override fun operationComplete(channelFuture: NettyFuture<Channel>) {
if (channelFuture.isSuccess) {
val channel = channelFuture.now
val pipeline = channel.pipeline()
channel.pipeline().addLast("handler", object : SimpleChannelInboundHandler<MemcacheObject>() {
val response : MemcacheResponse? = null
override fun channelRead0(
ctx: ChannelHandlerContext,
msg: MemcacheObject
) {
if(msg is BinaryMemcacheResponse) {
val resp = MemcacheResponse.of(msg)
responseListener?.listen(ResponseEvent.ResponseReceived(resp))
if(msg.totalBodyLength() == msg.keyLength() + msg.extrasLength()) {
result.complete(resp.status)
}
}
if(responseListener != null) {
when (msg) {
is LastMemcacheContent -> {
responseListener.listen(ResponseEvent.LastResponseContentChunkReceived(msg.content().nioBuffer()))
result.complete(response?.status)
pipeline.removeLast()
pool.release(channel)
}
is MemcacheContent -> {
responseListener.listen(ResponseEvent.ResponseContentChunkReceived(msg.content().nioBuffer()))
}
}
}
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
val ex = when (cause) {
is DecoderException -> cause.cause!!
else -> cause
}
responseListener?.listen(ResponseEvent.ExceptionCaught(ex))
result.completeExceptionally(ex)
ctx.close()
pipeline.removeLast()
pool.release(channel)
}
})
val chunks = mutableListOf <ByteBuffer>()
fun sendRequest() {
val valueLen = chunks.fold(0) { acc : Int, c2 : ByteBuffer ->
acc + c2.remaining()
}
request.setTotalBodyLength(request.keyLength() + request.extrasLength() + valueLen)
channel.write(request)
for((i, chunk) in chunks.withIndex()) {
if(i + 1 < chunks.size) {
channel.write(DefaultMemcacheContent(Unpooled.wrappedBuffer(chunk)))
} else {
channel.write(DefaultLastMemcacheContent(Unpooled.wrappedBuffer(chunk)))
}
}
channel.flush()
}
callHandleFuture.complete(object : CallHandle {
override fun sendChunk(requestBodyChunk: ByteBuffer) {
chunks.addLast(requestBodyChunk)
}
override fun waitForResponse(): CompletableFuture<Short> {
sendRequest()
return result
}
})
} else {
callHandleFuture.completeExceptionally(channelFuture.cause())
}
}
})
return callHandleFuture
}
private fun encodeExpiry(expiry: Duration) : Int {
val expirySeconds = expiry.toSeconds()
return expirySeconds.toInt().takeIf { it.toLong() == expirySeconds }
?: Instant.ofEpochSecond(expirySeconds).epochSecond.toInt()
}
fun get(key: String, responseListener: ResponseListener) : CompletableFuture<CallHandle> {
val request = (cfg.digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digest(key.toByteArray(), md)
} ?: key.toByteArray(Charsets.UTF_8)).let { digest ->
DefaultBinaryMemcacheRequest().apply {
setKey(Unpooled.wrappedBuffer(digest))
setOpcode(BinaryMemcacheOpcodes.GET)
}
}
return sendRequest(request, responseListener)
}
fun put(key: String, expiry : Duration, cas : Long? = null): CompletableFuture<CallHandle> {
val request = (cfg.digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digest(key.toByteArray(), md)
} ?: key.toByteArray(Charsets.UTF_8)).let { digest ->
val extras = Unpooled.buffer(8, 8)
extras.writeInt(0)
extras.writeInt(encodeExpiry(expiry))
DefaultBinaryMemcacheRequest().apply {
setExtras(extras)
setKey(Unpooled.wrappedBuffer(digest))
setOpcode(BinaryMemcacheOpcodes.SET)
cas?.let(this::setCas)
}
}
return sendRequest(request) { evt ->
when (evt) {
is ResponseEvent.ResponseReceived -> {
if (evt.response.status != BinaryMemcacheResponseStatus.SUCCESS) {
throw MemcachedException(evt.response.status)
}
}
else -> {}
}
}
}
fun shutDown(): NettyFuture<*> {
return group.shutdownGracefully()
}
override fun close() {
shutDown().sync()
}
}

View File

@@ -0,0 +1,10 @@
package net.woggioni.gbcs.server.memcached.client
import java.nio.ByteBuffer
sealed interface ResponseEvent {
class ResponseReceived(val response : MemcacheResponse) : ResponseEvent
class ResponseContentChunkReceived(val chunk: ByteBuffer) : ResponseEvent
class LastResponseContentChunkReceived(val chunk: ByteBuffer) : ResponseEvent
class ExceptionCaught(val cause : Throwable) : ResponseEvent
}

View File

@@ -0,0 +1,5 @@
package net.woggioni.gbcs.server.memcached.client
fun interface ResponseListener {
fun listen(evt : ResponseEvent)
}

View File

@@ -0,0 +1 @@
net.woggioni.gbcs.server.memcached.MemcachedCacheProvider

View File

@@ -1,35 +1,33 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?> <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<xs:schema targetNamespace="urn:net.woggioni.gbcs.server.memcache" <xs:schema targetNamespace="urn:net.woggioni.gbcs.server.memcached"
xmlns:gbcs-memcache="urn:net.woggioni.gbcs.server.memcache" xmlns:gbcs-memcached="urn:net.woggioni.gbcs.server.memcached"
xmlns:gbcs="urn:net.woggioni.gbcs.server" xmlns:gbcs="urn:net.woggioni.gbcs.server"
xmlns:xs="http://www.w3.org/2001/XMLSchema"> xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:import schemaLocation="jpms://net.woggioni.gbcs.server/net/woggioni/gbcs/server/schema/gbcs.xsd" namespace="urn:net.woggioni.gbcs.server"/> <xs:import schemaLocation="jpms://net.woggioni.gbcs.server/net/woggioni/gbcs/server/schema/gbcs.xsd" namespace="urn:net.woggioni.gbcs.server"/>
<xs:complexType name="memcacheServerType"> <xs:complexType name="memcachedServerType">
<xs:attribute name="host" type="xs:token" use="required"/> <xs:attribute name="host" type="xs:token" use="required"/>
<xs:attribute name="port" type="xs:positiveInteger" use="required"/> <xs:attribute name="port" type="xs:positiveInteger" use="required"/>
<xs:attribute name="connection-timeout" type="xs:duration"/>
<xs:attribute name="max-connections" type="xs:positiveInteger" default="1"/>
</xs:complexType> </xs:complexType>
<xs:complexType name="memcacheCacheType"> <xs:complexType name="memcachedCacheType">
<xs:complexContent> <xs:complexContent>
<xs:extension base="gbcs:cacheType"> <xs:extension base="gbcs:cacheType">
<xs:sequence maxOccurs="unbounded"> <xs:sequence maxOccurs="unbounded">
<xs:element name="server" type="gbcs-memcache:memcacheServerType"/> <xs:element name="server" type="gbcs-memcached:memcachedServerType"/>
</xs:sequence> </xs:sequence>
<xs:attribute name="max-age" type="xs:duration" default="P1D"/> <xs:attribute name="max-age" type="xs:duration" default="P1D"/>
<xs:attribute name="max-size" type="xs:unsignedInt" default="1048576"/> <xs:attribute name="max-size" type="xs:unsignedInt" default="1048576"/>
<xs:attribute name="digest" type="xs:token" /> <xs:attribute name="digest" type="xs:token" />
<xs:attribute name="compression-mode" type="gbcs-memcache:compressionType"/> <xs:attribute name="compression-mode" type="gbcs-memcached:compressionType" default="zip"/>
</xs:extension> </xs:extension>
</xs:complexContent> </xs:complexContent>
</xs:complexType> </xs:complexType>
<xs:simpleType name="compressionType"> <xs:simpleType name="compressionType">
<xs:restriction base="xs:token"> <xs:restriction base="xs:token">
<xs:enumeration value="deflate"/> <xs:enumeration value="zip"/>
<xs:enumeration value="gzip"/> <xs:enumeration value="gzip"/>
</xs:restriction> </xs:restriction>
</xs:simpleType> </xs:simpleType>

View File

@@ -0,0 +1,80 @@
package net.woggioni.gbcs.server.memcached.test
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus
import net.woggioni.gbcs.api.event.ChunkReceived
import net.woggioni.gbcs.common.HostAndPort
import net.woggioni.gbcs.server.memcached.MemcachedCacheConfiguration
import net.woggioni.gbcs.server.memcached.client.MemcacheResponse
import net.woggioni.gbcs.server.memcached.client.MemcachedClient
import net.woggioni.gbcs.server.memcached.client.ResponseEvent
import net.woggioni.gbcs.server.memcached.client.ResponseListener
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import java.nio.ByteBuffer
import java.security.SecureRandom
import java.time.Duration
import java.util.Objects
import java.util.concurrent.TimeUnit
import kotlin.random.Random
class MemcachedClientTest {
@Test
fun test() {
val client = MemcachedClient(MemcachedCacheConfiguration(
servers = listOf(
MemcachedCacheConfiguration.Server(
endpoint = HostAndPort("127.0.0.1", 11211),
connectionTimeoutMillis = null,
retryPolicy = null,
maxConnections = 1
)
)
))
val random = Random(SecureRandom.getInstance("NativePRNGNonBlocking").nextLong())
val key = "101325"
val value = random.nextBytes(0x1000)
val requestListener = client.put(key, Duration.ofDays(2), null)
val response = Unpooled.buffer(value.size)
requestListener.thenCompose { listener ->
listener.sendChunk(ByteBuffer.wrap(value))
listener.waitForResponse()
}.get(10, TimeUnit.SECONDS)
client.get(key, object: ResponseListener {
override fun listen(evt: ResponseEvent) {
when(evt) {
is ResponseEvent.ResponseReceived -> {
if(evt.response.status != BinaryMemcacheResponseStatus.SUCCESS) {
Assertions.fail<String> {
"Memcache status ${evt.response.status}"
}
}
}
is ResponseEvent.ResponseContentChunkReceived -> response.writeBytes(evt.chunk)
else -> {}
}
}
}).thenCompose { it.waitForResponse() }.get(1, TimeUnit.SECONDS)
val retrievedResponse = response.array()
Assertions.assertArrayEquals(value, retrievedResponse)
}
@Test
fun test2() {
val a1 = ByteArray(10) {
it.toByte()
}
val a2 = ByteArray(10) {
it.toByte()
}
Assertions.assertTrue(Objects.equals(a1, a1))
}
}

View File

@@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration>
<configuration>
<import class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"/>
<import class="ch.qos.logback.core.ConsoleAppender"/>
<appender name="console" class="ConsoleAppender">
<target>System.err</target>
<encoder class="PatternLayoutEncoder">
<pattern>%d [%highlight(%-5level)] \(%thread\) %logger{36} -%kvp- %msg %n</pattern>
</encoder>
</appender>
<root level="info">
<appender-ref ref="console"/>
</root>
<logger name="io.netty" level="debug"/>
<logger name="com.google.code.yanf4j" level="warn"/>
<logger name="net.rubyeye.xmemcached" level="warn"/>
</configuration>

View File

@@ -19,7 +19,7 @@ dependencies {
testImplementation catalog.bcprov.jdk18on testImplementation catalog.bcprov.jdk18on
testImplementation catalog.bcpkix.jdk18on testImplementation catalog.bcpkix.jdk18on
testRuntimeOnly project(":gbcs-server-memcache") testRuntimeOnly project(":gbcs-server-memcached")
} }
test { test {

View File

@@ -18,7 +18,6 @@ module net.woggioni.gbcs.server {
requires net.woggioni.jwo; requires net.woggioni.jwo;
requires net.woggioni.gbcs.common; requires net.woggioni.gbcs.common;
requires net.woggioni.gbcs.api; requires net.woggioni.gbcs.api;
requires io.netty.transport.classes.epoll;
exports net.woggioni.gbcs.server; exports net.woggioni.gbcs.server;

View File

@@ -10,9 +10,6 @@ import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.channel.ChannelInitializer import io.netty.channel.ChannelInitializer
import io.netty.channel.ChannelOption import io.netty.channel.ChannelOption
import io.netty.channel.ChannelPromise import io.netty.channel.ChannelPromise
import io.netty.channel.MultithreadEventLoopGroup
import io.netty.channel.epoll.EpollEventLoopGroup
import io.netty.channel.epoll.EpollServerSocketChannel
import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.nio.NioServerSocketChannel import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.codec.compression.CompressionOptions import io.netty.handler.codec.compression.CompressionOptions
@@ -52,7 +49,6 @@ import net.woggioni.gbcs.server.exception.ExceptionHandler
import net.woggioni.gbcs.server.handler.ServerHandler import net.woggioni.gbcs.server.handler.ServerHandler
import net.woggioni.gbcs.server.throttling.ThrottlingHandler import net.woggioni.gbcs.server.throttling.ThrottlingHandler
import net.woggioni.jwo.JWO import net.woggioni.jwo.JWO
import net.woggioni.jwo.OS
import net.woggioni.jwo.Tuple2 import net.woggioni.jwo.Tuple2
import java.io.OutputStream import java.io.OutputStream
import java.net.InetSocketAddress import java.net.InetSocketAddress
@@ -403,23 +399,10 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
} }
fun run(): ServerHandle { fun run(): ServerHandle {
val bossGroup : MultithreadEventLoopGroup // Create the multithreaded event loops for the server
val workerGroup : MultithreadEventLoopGroup val bossGroup = NioEventLoopGroup(0)
val serverSocketChannel : Class<*> val serverSocketChannel = NioServerSocketChannel::class.java
if(cfg.isUseNativeTransport) { val workerGroup = bossGroup
if(OS.isLinux) {
bossGroup = EpollEventLoopGroup(1)
serverSocketChannel = EpollServerSocketChannel::class.java
workerGroup = EpollEventLoopGroup(0)
} else {
throw java.lang.IllegalArgumentException("Native transport is not supported on ${OS.current.name}")
}
} else {
bossGroup = NioEventLoopGroup(1)
serverSocketChannel = NioServerSocketChannel::class.java
workerGroup = NioEventLoopGroup(0)
}
val eventExecutorGroup = run { val eventExecutorGroup = run {
val threadFactory = if (cfg.eventExecutor.isUseVirtualThreads) { val threadFactory = if (cfg.eventExecutor.isUseVirtualThreads) {
Thread.ofVirtual().factory() Thread.ofVirtual().factory()

View File

@@ -1,11 +1,8 @@
package net.woggioni.gbcs.server.cache package net.woggioni.gbcs.server.cache
import io.netty.buffer.ByteBuf
import net.woggioni.gbcs.api.Cache import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.common.ByteBufInputStream
import net.woggioni.gbcs.common.GBCS.digestString import net.woggioni.gbcs.common.GBCS.digestString
import net.woggioni.gbcs.common.contextLogger import net.woggioni.gbcs.common.contextLogger
import net.woggioni.jwo.JWO
import net.woggioni.jwo.LockFile import net.woggioni.jwo.LockFile
import java.nio.channels.Channels import java.nio.channels.Channels
import java.nio.channels.FileChannel import java.nio.channels.FileChannel
@@ -17,7 +14,6 @@ import java.nio.file.attribute.BasicFileAttributes
import java.security.MessageDigest import java.security.MessageDigest
import java.time.Duration import java.time.Duration
import java.time.Instant import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import java.util.zip.Deflater import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream import java.util.zip.DeflaterOutputStream
@@ -32,10 +28,7 @@ class FileSystemCache(
val compressionLevel: Int val compressionLevel: Int
) : Cache { ) : Cache {
private companion object { private val log = contextLogger()
@JvmStatic
private val log = contextLogger()
}
init { init {
Files.createDirectories(root) Files.createDirectories(root)
@@ -69,12 +62,10 @@ class FileSystemCache(
} }
}.also { }.also {
gc() gc()
}.let {
CompletableFuture.completedFuture(it)
} }
} }
override fun put(key: String, content: ByteBuf): CompletableFuture<Void> { override fun put(key: String, content: ByteArray) {
(digestAlgorithm (digestAlgorithm
?.let(MessageDigest::getInstance) ?.let(MessageDigest::getInstance)
?.let { md -> ?.let { md ->
@@ -91,7 +82,7 @@ class FileSystemCache(
it it
} }
}.use { }.use {
JWO.copy(ByteBufInputStream(content), it) it.write(content)
} }
Files.move(tmpFile, file, StandardCopyOption.ATOMIC_MOVE) Files.move(tmpFile, file, StandardCopyOption.ATOMIC_MOVE)
} catch (t: Throwable) { } catch (t: Throwable) {
@@ -101,7 +92,6 @@ class FileSystemCache(
}.also { }.also {
gc() gc()
} }
return CompletableFuture.completedFuture(null)
} }
private fun gc() { private fun gc() {

View File

@@ -1,20 +1,15 @@
package net.woggioni.gbcs.server.cache package net.woggioni.gbcs.server.cache
import io.netty.buffer.ByteBuf
import net.woggioni.gbcs.api.Cache import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.common.ByteBufInputStream
import net.woggioni.gbcs.common.ByteBufOutputStream
import net.woggioni.gbcs.common.GBCS.digestString import net.woggioni.gbcs.common.GBCS.digestString
import net.woggioni.gbcs.common.contextLogger import java.io.ByteArrayInputStream
import net.woggioni.jwo.JWO import java.io.ByteArrayOutputStream
import java.nio.channels.Channels import java.nio.channels.Channels
import java.security.MessageDigest import java.security.MessageDigest
import java.time.Duration import java.time.Duration
import java.time.Instant import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.PriorityBlockingQueue import java.util.concurrent.PriorityBlockingQueue
import java.util.concurrent.atomic.AtomicLong
import java.util.zip.Deflater import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream import java.util.zip.DeflaterOutputStream
import java.util.zip.Inflater import java.util.zip.Inflater
@@ -22,21 +17,14 @@ import java.util.zip.InflaterInputStream
class InMemoryCache( class InMemoryCache(
val maxAge: Duration, val maxAge: Duration,
val maxSize: Long,
val digestAlgorithm: String?, val digestAlgorithm: String?,
val compressionEnabled: Boolean, val compressionEnabled: Boolean,
val compressionLevel: Int val compressionLevel: Int
) : Cache { ) : Cache {
companion object { private val map = ConcurrentHashMap<String, ByteArray>()
@JvmStatic
private val log = contextLogger()
}
private val size = AtomicLong() private class RemovalQueueElement(val key: String, val value : ByteArray, val expiry : Instant) : Comparable<RemovalQueueElement> {
private val map = ConcurrentHashMap<String, ByteBuf>()
private class RemovalQueueElement(val key: String, val value : ByteBuf, val expiry : Instant) : Comparable<RemovalQueueElement> {
override fun compareTo(other: RemovalQueueElement) = expiry.compareTo(other.expiry) override fun compareTo(other: RemovalQueueElement) = expiry.compareTo(other.expiry)
} }
@@ -46,17 +34,9 @@ class InMemoryCache(
private val garbageCollector = Thread { private val garbageCollector = Thread {
while(true) { while(true) {
val el = removalQueue.take() val el = removalQueue.take()
val buf = el.value
val now = Instant.now() val now = Instant.now()
if(now > el.expiry) { if(now > el.expiry) {
val removed = map.remove(el.key, buf) map.remove(el.key, el.value)
if(removed) {
updateSizeAfterRemoval(buf)
//Decrease the reference count for map
buf.release()
}
//Decrease the reference count for removalQueue
buf.release()
} else { } else {
removalQueue.put(el) removalQueue.put(el)
Thread.sleep(minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1))) Thread.sleep(minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1)))
@@ -66,28 +46,6 @@ class InMemoryCache(
start() start()
} }
private fun removeEldest() : Long {
while(true) {
val el = removalQueue.take()
val buf = el.value
val removed = map.remove(el.key, buf)
//Decrease the reference count for removalQueue
buf.release()
if(removed) {
val newSize = updateSizeAfterRemoval(buf)
//Decrease the reference count for map
buf.release()
return newSize
}
}
}
private fun updateSizeAfterRemoval(removed: ByteBuf) : Long {
return size.updateAndGet { currentSize : Long ->
currentSize - removed.readableBytes()
}
}
override fun close() { override fun close() {
running = false running = false
garbageCollector.join() garbageCollector.join()
@@ -102,49 +60,33 @@ class InMemoryCache(
).let { digest -> ).let { digest ->
map[digest] map[digest]
?.let { value -> ?.let { value ->
val copy = value.retainedDuplicate()
copy.touch("This has to be released by the caller of the cache")
if (compressionEnabled) { if (compressionEnabled) {
val inflater = Inflater() val inflater = Inflater()
Channels.newChannel(InflaterInputStream(ByteBufInputStream(copy), inflater)) Channels.newChannel(InflaterInputStream(ByteArrayInputStream(value), inflater))
} else { } else {
Channels.newChannel(ByteBufInputStream(copy)) Channels.newChannel(ByteArrayInputStream(value))
} }
} }
}.let {
CompletableFuture.completedFuture(it)
} }
override fun put(key: String, content: ByteBuf) = override fun put(key: String, content: ByteArray) {
(digestAlgorithm (digestAlgorithm
?.let(MessageDigest::getInstance) ?.let(MessageDigest::getInstance)
?.let { md -> ?.let { md ->
digestString(key.toByteArray(), md) digestString(key.toByteArray(), md)
} ?: key).let { digest -> } ?: key).let { digest ->
content.retain()
val value = if (compressionEnabled) { val value = if (compressionEnabled) {
val deflater = Deflater(compressionLevel) val deflater = Deflater(compressionLevel)
val buf = content.alloc().buffer() val baos = ByteArrayOutputStream()
buf.retain() DeflaterOutputStream(baos, deflater).use { stream ->
DeflaterOutputStream(ByteBufOutputStream(buf), deflater).use { outputStream -> stream.write(content)
ByteBufInputStream(content).use { inputStream ->
JWO.copy(inputStream, outputStream)
}
} }
buf baos.toByteArray()
} else { } else {
content content
} }
val old = map.put(digest, value) map[digest] = value
val delta = value.readableBytes() - (old?.readableBytes() ?: 0) removalQueue.put(RemovalQueueElement(digest, value, Instant.now().plus(maxAge)))
var newSize = size.updateAndGet { currentSize : Long ->
currentSize + delta
}
removalQueue.put(RemovalQueueElement(digest, value.retain(), Instant.now().plus(maxAge)))
while(newSize > maxSize) {
newSize = removeEldest()
}
}.let {
CompletableFuture.completedFuture<Void>(null)
} }
}
} }

View File

@@ -6,14 +6,12 @@ import java.time.Duration
data class InMemoryCacheConfiguration( data class InMemoryCacheConfiguration(
val maxAge: Duration, val maxAge: Duration,
val maxSize: Long,
val digestAlgorithm : String?, val digestAlgorithm : String?,
val compressionEnabled: Boolean, val compressionEnabled: Boolean,
val compressionLevel: Int, val compressionLevel: Int,
) : Configuration.Cache { ) : Configuration.Cache {
override fun materialize() = InMemoryCache( override fun materialize() = InMemoryCache(
maxAge, maxAge,
maxSize,
digestAlgorithm, digestAlgorithm,
compressionEnabled, compressionEnabled,
compressionLevel compressionLevel

View File

@@ -6,6 +6,7 @@ import net.woggioni.gbcs.common.Xml
import net.woggioni.gbcs.common.Xml.Companion.renderAttribute import net.woggioni.gbcs.common.Xml.Companion.renderAttribute
import org.w3c.dom.Document import org.w3c.dom.Document
import org.w3c.dom.Element import org.w3c.dom.Element
import java.nio.file.Path
import java.time.Duration import java.time.Duration
import java.util.zip.Deflater import java.util.zip.Deflater
@@ -21,9 +22,6 @@ class InMemoryCacheProvider : CacheProvider<InMemoryCacheConfiguration> {
val maxAge = el.renderAttribute("max-age") val maxAge = el.renderAttribute("max-age")
?.let(Duration::parse) ?.let(Duration::parse)
?: Duration.ofDays(1) ?: Duration.ofDays(1)
val maxSize = el.renderAttribute("max-size")
?.let(java.lang.Long::decode)
?: 0x1000000
val enableCompression = el.renderAttribute("enable-compression") val enableCompression = el.renderAttribute("enable-compression")
?.let(String::toBoolean) ?.let(String::toBoolean)
?: true ?: true
@@ -34,7 +32,6 @@ class InMemoryCacheProvider : CacheProvider<InMemoryCacheConfiguration> {
return InMemoryCacheConfiguration( return InMemoryCacheConfiguration(
maxAge, maxAge,
maxSize,
digestAlgorithm, digestAlgorithm,
enableCompression, enableCompression,
compressionLevel compressionLevel
@@ -47,7 +44,6 @@ class InMemoryCacheProvider : CacheProvider<InMemoryCacheConfiguration> {
val prefix = doc.lookupPrefix(GBCS.GBCS_NAMESPACE_URI) val prefix = doc.lookupPrefix(GBCS.GBCS_NAMESPACE_URI)
attr("xs:type", "${prefix}:inMemoryCacheType", GBCS.XML_SCHEMA_NAMESPACE_URI) attr("xs:type", "${prefix}:inMemoryCacheType", GBCS.XML_SCHEMA_NAMESPACE_URI)
attr("max-age", maxAge.toString()) attr("max-age", maxAge.toString())
attr("max-size", maxSize.toString())
digestAlgorithm?.let { digestAlgorithm -> digestAlgorithm?.let { digestAlgorithm ->
attr("digest", digestAlgorithm) attr("digest", digestAlgorithm)
} }

View File

@@ -44,7 +44,6 @@ object Parser {
val serverPath = root.renderAttribute("path") val serverPath = root.renderAttribute("path")
var incomingConnectionsBacklogSize = 1024 var incomingConnectionsBacklogSize = 1024
var authentication: Authentication? = null var authentication: Authentication? = null
var useNativeTransport = false
for (child in root.asIterable()) { for (child in root.asIterable()) {
val tagName = child.localName val tagName = child.localName
when (tagName) { when (tagName) {
@@ -137,7 +136,7 @@ object Parser {
} }
"event-executor" -> { "event-executor" -> {
val useVirtualThread = child.renderAttribute("use-virtual-threads") val useVirtualThread = root.renderAttribute("use-virtual-threads")
?.let(String::toBoolean) ?: true ?.let(String::toBoolean) ?: true
eventExecutor = Configuration.EventExecutor(useVirtualThread) eventExecutor = Configuration.EventExecutor(useVirtualThread)
} }
@@ -181,10 +180,6 @@ object Parser {
} }
tls = Tls(keyStore, trustStore) tls = Tls(keyStore, trustStore)
} }
"transport" -> {
useNativeTransport = child.renderAttribute("use-native-transport")
?.let(String::toBoolean) ?: false
}
} }
} }
return Configuration.of( return Configuration.of(
@@ -199,7 +194,6 @@ object Parser {
cache!!, cache!!,
authentication, authentication,
tls, tls,
useNativeTransport
) )
} }
@@ -271,8 +265,7 @@ object Parser {
}.map { el -> }.map { el ->
val groupName = el.renderAttribute("name") ?: throw ConfigurationException("Group name is required") val groupName = el.renderAttribute("name") ?: throw ConfigurationException("Group name is required")
var roles = emptySet<Role>() var roles = emptySet<Role>()
var userQuota: Configuration.Quota? = null var quota: Configuration.Quota? = null
var groupQuota: Configuration.Quota? = null
for (child in el.asIterable()) { for (child in el.asIterable()) {
when (child.localName) { when (child.localName) {
"users" -> { "users" -> {
@@ -286,15 +279,12 @@ object Parser {
"roles" -> { "roles" -> {
roles = parseRoles(child) roles = parseRoles(child)
} }
"group-quota" -> { "quota" -> {
userQuota = parseQuota(child) quota = parseQuota(child)
}
"user-quota" -> {
groupQuota = parseQuota(child)
} }
} }
} }
groupName to Group(groupName, roles, userQuota, groupQuota) groupName to Group(groupName, roles, quota)
}.toMap() }.toMap()
val users = knownUsersMap.map { (name, user) -> val users = knownUsersMap.map { (name, user) ->
name to User(name, user.password, userGroups[name]?.mapNotNull { groups[it] }?.toSet() ?: emptySet(), user.quota) name to User(name, user.password, userGroups[name]?.mapNotNull { groups[it] }?.toSet() ?: emptySet(), user.quota)

View File

@@ -8,14 +8,8 @@ import org.w3c.dom.Document
object Serializer { object Serializer {
private fun Xml.serializeQuota(quota : Configuration.Quota) {
attr("calls", quota.calls.toString())
attr("period", quota.period.toString())
attr("max-available-calls", quota.maxAvailableCalls.toString())
attr("initial-available-calls", quota.initialAvailableCalls.toString())
}
fun serialize(conf : Configuration) : Document { fun serialize(conf : Configuration) : Document {
val schemaLocations = CacheSerializers.index.values.asSequence().map { val schemaLocations = CacheSerializers.index.values.asSequence().map {
it.xmlNamespace to it.xmlSchemaLocation it.xmlNamespace to it.xmlSchemaLocation
}.toMap() }.toMap()
@@ -44,12 +38,8 @@ object Serializer {
attr("max-request-size", connection.maxRequestSize.toString()) attr("max-request-size", connection.maxRequestSize.toString())
} }
} }
node("transport") {
attr("use-native-transport", conf.isUseNativeTransport.toString())
}
node("event-executor") { node("event-executor") {
attr("use-virtual-threads", conf.eventExecutor.isUseVirtualThreads.toString()) attr("use-virtual-threads", conf.eventExecutor.isUseVirtualThreads.toString())
attr("use-virtual-threads", conf.eventExecutor.isUseVirtualThreads.toString())
} }
val cache = conf.cache val cache = conf.cache
val serializer : CacheProvider<Configuration.Cache> = val serializer : CacheProvider<Configuration.Cache> =
@@ -66,7 +56,10 @@ object Serializer {
} }
user.quota?.let { quota -> user.quota?.let { quota ->
node("quota") { node("quota") {
serializeQuota(quota) attr("calls", quota.calls.toString())
attr("period", quota.period.toString())
attr("max-available-calls", quota.maxAvailableCalls.toString())
attr("initial-available-calls", quota.initialAvailableCalls.toString())
} }
} }
} }
@@ -77,7 +70,10 @@ object Serializer {
anonymousUser.quota?.let { quota -> anonymousUser.quota?.let { quota ->
node("anonymous") { node("anonymous") {
node("quota") { node("quota") {
serializeQuota(quota) attr("calls", quota.calls.toString())
attr("period", quota.period.toString())
attr("max-available-calls", quota.maxAvailableCalls.toString())
attr("initial-available-calls", quota.initialAvailableCalls.toString())
} }
} }
} }
@@ -117,14 +113,12 @@ object Serializer {
} }
} }
} }
group.userQuota?.let { quota -> group.quota?.let { quota ->
node("user-quota") { node("quota") {
serializeQuota(quota) attr("calls", quota.calls.toString())
} attr("period", quota.period.toString())
} attr("max-available-calls", quota.maxAvailableCalls.toString())
group.groupQuota?.let { quota -> attr("initial-available-calls", quota.initialAvailableCalls.toString())
node("group-quota") {
serializeQuota(quota)
} }
} }
} }

View File

@@ -1,35 +1,83 @@
package net.woggioni.gbcs.server.handler package net.woggioni.gbcs.server.handler
import io.netty.buffer.Unpooled import io.netty.buffer.Unpooled
import io.netty.channel.ChannelFutureListener
import io.netty.channel.ChannelHandler
import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelHandlerContext
import io.netty.channel.DefaultFileRegion
import io.netty.channel.SimpleChannelInboundHandler import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.http.DefaultFullHttpResponse import io.netty.handler.codec.http.DefaultFullHttpResponse
import io.netty.handler.codec.http.DefaultHttpContent
import io.netty.handler.codec.http.DefaultHttpResponse import io.netty.handler.codec.http.DefaultHttpResponse
import io.netty.handler.codec.http.FullHttpRequest import io.netty.handler.codec.http.DefaultLastHttpContent
import io.netty.handler.codec.http.HttpContent
import io.netty.handler.codec.http.HttpHeaderNames import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpHeaderValues import io.netty.handler.codec.http.HttpHeaderValues
import io.netty.handler.codec.http.HttpMessage
import io.netty.handler.codec.http.HttpMethod import io.netty.handler.codec.http.HttpMethod
import io.netty.handler.codec.http.HttpRequest
import io.netty.handler.codec.http.HttpResponseStatus import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpUtil import io.netty.handler.codec.http.HttpUtil
import io.netty.handler.codec.http.LastHttpContent import io.netty.handler.codec.http.LastHttpContent
import io.netty.handler.stream.ChunkedNioStream
import net.woggioni.gbcs.api.Cache import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.api.CallHandle
import net.woggioni.gbcs.api.ResponseEventListener
import net.woggioni.gbcs.api.event.RequestEvent
import net.woggioni.gbcs.api.event.ResponseEvent
import net.woggioni.gbcs.common.contextLogger import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.server.debug import net.woggioni.gbcs.server.debug
import net.woggioni.gbcs.server.warn import net.woggioni.gbcs.server.warn
import java.nio.channels.FileChannel
import java.nio.file.Path import java.nio.file.Path
import java.util.concurrent.CompletableFuture
@ChannelHandler.Sharable
class ServerHandler(private val cache: Cache, private val serverPrefix: Path) : class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
SimpleChannelInboundHandler<FullHttpRequest>() { SimpleChannelInboundHandler<HttpMessage>() {
private val log = contextLogger() companion object {
@JvmStatic
private val log = contextLogger()
}
override fun channelRead0(ctx: ChannelHandlerContext, msg: FullHttpRequest) { private data class TransientContext(
var key: String?,
var callHandle: CompletableFuture<CallHandle<Void>>
)
private var transientContext: TransientContext? = null
override fun channelRead0(ctx: ChannelHandlerContext, msg: HttpMessage) {
when (msg) {
is HttpRequest -> {
handleRequest(ctx, msg)
}
is LastHttpContent -> {
transientContext?.run {
callHandle.thenCompose { callHandle ->
callHandle.postEvent(RequestEvent.LastChunkSent(msg.content()))
callHandle.call()
}.thenApply {
val response = DefaultFullHttpResponse(
msg.protocolVersion(), HttpResponseStatus.CREATED,
key?.let(String::toByteArray)
?.let(Unpooled::copiedBuffer)
)
// response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes()
ctx.writeAndFlush(response)
}
}
}
is HttpContent -> {
transientContext?.run {
callHandle = callHandle.thenApply { it ->
it.postEvent(RequestEvent.ChunkSent(msg.content()))
it
}
}
}
}
}
private fun handleRequest(ctx: ChannelHandlerContext, msg: HttpRequest) {
val keepAlive: Boolean = HttpUtil.isKeepAlive(msg) val keepAlive: Boolean = HttpUtil.isKeepAlive(msg)
val method = msg.method() val method = msg.method()
if (method === HttpMethod.GET) { if (method === HttpMethod.GET) {
@@ -42,54 +90,61 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
return return
} }
if (serverPrefix == prefix) { if (serverPrefix == prefix) {
cache.get(key).thenApply { channel -> cache.get(key, object : ResponseEventListener {
if(channel != null) { var first = false
log.debug(ctx) { override fun listen(evt: ResponseEvent) {
"Cache hit for key '$key'" when (evt) {
} is ResponseEvent.NoContent -> {
val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK) log.debug(ctx) {
response.headers()[HttpHeaderNames.CONTENT_TYPE] = HttpHeaderValues.APPLICATION_OCTET_STREAM "Cache miss for key '$key'"
if (!keepAlive) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.IDENTITY)
} else {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
}
ctx.write(response)
when (channel) {
is FileChannel -> {
val content = DefaultFileRegion(channel, 0, channel.size())
if (keepAlive) {
ctx.write(content)
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate())
} else {
ctx.writeAndFlush(content)
.addListener(ChannelFutureListener.CLOSE)
} }
val response =
DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response)
} }
else -> {
val content = ChunkedNioStream(channel) is ResponseEvent.ChunkReceived, is ResponseEvent.LastChunkReceived -> {
if (keepAlive) { if (first) {
ctx.write(content).addListener { first = false
content.close() log.debug(ctx) {
"Cache hit for key '$key'"
} }
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate()) val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK)
} else { response.headers()[HttpHeaderNames.CONTENT_TYPE] =
ctx.writeAndFlush(content) HttpHeaderValues.APPLICATION_OCTET_STREAM
.addListener(ChannelFutureListener.CLOSE) if (!keepAlive) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
response.headers()
.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.IDENTITY)
} else {
response.headers()
.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
response.headers()
.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
}
ctx.write(response)
} }
if (evt is ResponseEvent.LastChunkReceived)
ctx.write(DefaultLastHttpContent(evt.chunk))
else if (evt is ResponseEvent.ChunkReceived)
ctx.write(DefaultHttpContent(evt.chunk))
ctx.flush()
}
is ResponseEvent.ExceptionCaught -> {
log.error(evt.cause.message, evt.cause)
val errorResponse = DefaultFullHttpResponse(
msg.protocolVersion(), HttpResponseStatus.INTERNAL_SERVER_ERROR,
evt.cause.message
?.let(String::toByteArray)
?.let(Unpooled::copiedBuffer)
)
ctx.write(errorResponse)
} }
} }
} else {
log.debug(ctx) {
"Cache miss for key '$key'"
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response)
} }
}.whenComplete { _, ex -> ex?.let(ctx::fireExceptionCaught) } }).thenCompose(CallHandle<Void>::call)
} else { } else {
log.warn(ctx) { log.warn(ctx) {
"Got request for unhandled path '${msg.uri()}'" "Got request for unhandled path '${msg.uri()}'"
@@ -97,26 +152,23 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.BAD_REQUEST) val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.BAD_REQUEST)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0 response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response) ctx.writeAndFlush(response)
ctx.channel().read()
} }
} else if (method === HttpMethod.PUT) { } else if (method === HttpMethod.PUT) {
val path = Path.of(msg.uri()) val path = Path.of(msg.uri())
val prefix = path.parent val prefix = path.parent
val key = path.fileName.toString() val key = path.fileName.toString()
if (serverPrefix == prefix) { if (serverPrefix == prefix) {
log.debug(ctx) { log.debug(ctx) {
"Added value for key '$key' to build cache" "Added value for key '$key' to build cache"
} }
cache.put(key, msg.content()).thenRun { transientContext = TransientContext(key, cache.put(key))
val response = DefaultFullHttpResponse( val response = DefaultFullHttpResponse(
msg.protocolVersion(), HttpResponseStatus.CREATED, msg.protocolVersion(), HttpResponseStatus.CREATED,
Unpooled.copiedBuffer(key.toByteArray()) Unpooled.copiedBuffer(key.toByteArray())
) )
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes() // response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes()
ctx.writeAndFlush(response) ctx.writeAndFlush(response)
}.whenComplete { _, ex ->
ctx.fireExceptionCaught(ex)
}
} else { } else {
log.warn(ctx) { log.warn(ctx) {
"Got request for unhandled path '${msg.uri()}'" "Got request for unhandled path '${msg.uri()}'"
@@ -125,9 +177,12 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0" response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
ctx.writeAndFlush(response) ctx.writeAndFlush(response)
} }
} else if(method == HttpMethod.TRACE) { } else if (method == HttpMethod.TRACE) {
val replayedRequestHead = ctx.alloc().buffer() val replayedRequestHead = ctx.alloc().buffer()
replayedRequestHead.writeCharSequence("TRACE ${Path.of(msg.uri())} ${msg.protocolVersion().text()}\r\n", Charsets.US_ASCII) replayedRequestHead.writeCharSequence(
"TRACE ${Path.of(msg.uri())} ${msg.protocolVersion().text()}\r\n",
Charsets.US_ASCII
)
msg.headers().forEach { (key, value) -> msg.headers().forEach { (key, value) ->
replayedRequestHead.apply { replayedRequestHead.apply {
writeCharSequence(key, Charsets.US_ASCII) writeCharSequence(key, Charsets.US_ASCII)
@@ -137,18 +192,30 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
} }
} }
replayedRequestHead.writeCharSequence("\r\n", Charsets.US_ASCII) replayedRequestHead.writeCharSequence("\r\n", Charsets.US_ASCII)
val requestBody = msg.content() val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK)
requestBody.retain()
val responseBody = ctx.alloc().compositeBuffer(2).apply {
addComponents(true, replayedRequestHead)
addComponents(true, requestBody)
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK, responseBody)
response.headers().apply { response.headers().apply {
set(HttpHeaderNames.CONTENT_TYPE, "message/http") set(HttpHeaderNames.CONTENT_TYPE, "message/http")
set(HttpHeaderNames.CONTENT_LENGTH, responseBody.readableBytes())
} }
ctx.writeAndFlush(response) ctx.write(response)
ctx.writeAndFlush(DefaultHttpContent(replayedRequestHead))
val callHandle = object : CallHandle<Void> {
override fun postEvent(evt: RequestEvent) {
when (evt) {
is RequestEvent.ChunkSent -> {
ctx.writeAndFlush(DefaultHttpContent(evt.chunk))
}
is RequestEvent.LastChunkSent -> {
ctx.writeAndFlush(DefaultLastHttpContent(evt.chunk))
}
}
}
override fun call(): CompletableFuture<Void> {
return CompletableFuture.completedFuture(null)
}
}
transientContext = TransientContext(null, CompletableFuture.completedFuture(callHandle))
} else { } else {
log.warn(ctx) { log.warn(ctx) {
"Got request with unhandled method '${msg.method().name()}'" "Got request with unhandled method '${msg.method().name()}'"
@@ -157,5 +224,6 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0" response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
ctx.writeAndFlush(response) ctx.writeAndFlush(response)
} }
} }
} }

View File

@@ -8,7 +8,7 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.function.Function import java.util.function.Function
class BucketManager private constructor( class BucketManager private constructor(
private val bucketsByUser: Map<Configuration.User, List<Bucket>> = HashMap(), private val bucketsByUser: Map<Configuration.User, Bucket> = HashMap(),
private val bucketsByGroup: Map<Configuration.Group, Bucket> = HashMap(), private val bucketsByGroup: Map<Configuration.Group, Bucket> = HashMap(),
loader: Function<InetSocketAddress, Bucket>? loader: Function<InetSocketAddress, Bucket>?
) { ) {
@@ -43,27 +43,22 @@ class BucketManager private constructor(
companion object { companion object {
fun from(cfg : Configuration) : BucketManager { fun from(cfg : Configuration) : BucketManager {
val bucketsByUser = cfg.users.values.asSequence().map { user -> val bucketsByUser = cfg.users.values.asSequence().filter {
val buckets = ( it.quota != null
user.quota }.map { user ->
?.let { quota -> val quota = user.quota
sequenceOf(quota) val bucket = Bucket.local(
} ?: user.groups.asSequence() quota.maxAvailableCalls,
.mapNotNull(Configuration.Group::getUserQuota) quota.calls,
).map { quota -> quota.period,
Bucket.local( quota.initialAvailableCalls
quota.maxAvailableCalls, )
quota.calls, user to bucket
quota.period,
quota.initialAvailableCalls
)
}.toList()
user to buckets
}.toMap() }.toMap()
val bucketsByGroup = cfg.groups.values.asSequence().filter { val bucketsByGroup = cfg.groups.values.asSequence().filter {
it.groupQuota != null it.quota != null
}.map { group -> }.map { group ->
val quota = group.groupQuota val quota = group.quota
val bucket = Bucket.local( val bucket = Bucket.local(
quota.maxAvailableCalls, quota.maxAvailableCalls,
quota.calls, quota.calls,

View File

@@ -42,7 +42,7 @@ class ThrottlingHandler(cfg: Configuration) :
val buckets = mutableListOf<Bucket>() val buckets = mutableListOf<Bucket>()
val user = ctx.channel().attr(GradleBuildCacheServer.userAttribute).get() val user = ctx.channel().attr(GradleBuildCacheServer.userAttribute).get()
if (user != null) { if (user != null) {
bucketManager.getBucketByUser(user)?.let(buckets::addAll) bucketManager.getBucketByUser(user)?.let(buckets::add)
} }
val groups = ctx.channel().attr(GradleBuildCacheServer.groupAttribute).get() ?: emptySet() val groups = ctx.channel().attr(GradleBuildCacheServer.groupAttribute).get() ?: emptySet()
if (groups.isNotEmpty()) { if (groups.isNotEmpty()) {

View File

@@ -9,7 +9,6 @@
<xs:sequence minOccurs="0"> <xs:sequence minOccurs="0">
<xs:element name="bind" type="gbcs:bindType" maxOccurs="1"/> <xs:element name="bind" type="gbcs:bindType" maxOccurs="1"/>
<xs:element name="connection" type="gbcs:connectionType" minOccurs="0" maxOccurs="1"/> <xs:element name="connection" type="gbcs:connectionType" minOccurs="0" maxOccurs="1"/>
<xs:element name="transport" type="gbcs:transportType" minOccurs="0" maxOccurs="1"/>
<xs:element name="event-executor" type="gbcs:eventExecutorType" minOccurs="0" maxOccurs="1"/> <xs:element name="event-executor" type="gbcs:eventExecutorType" minOccurs="0" maxOccurs="1"/>
<xs:element name="cache" type="gbcs:cacheType" maxOccurs="1"/> <xs:element name="cache" type="gbcs:cacheType" maxOccurs="1"/>
<xs:element name="authorization" type="gbcs:authorizationType" minOccurs="0"> <xs:element name="authorization" type="gbcs:authorizationType" minOccurs="0">
@@ -43,10 +42,6 @@
<xs:attribute name="max-request-size" type="xs:unsignedInt" use="optional" default="67108864"/> <xs:attribute name="max-request-size" type="xs:unsignedInt" use="optional" default="67108864"/>
</xs:complexType> </xs:complexType>
<xs:complexType name="transportType">
<xs:attribute name="use-native-transport" type="xs:boolean" use="optional" default="false"/>
</xs:complexType>
<xs:complexType name="eventExecutorType"> <xs:complexType name="eventExecutorType">
<xs:attribute name="use-virtual-threads" type="xs:boolean" use="optional" default="true"/> <xs:attribute name="use-virtual-threads" type="xs:boolean" use="optional" default="true"/>
</xs:complexType> </xs:complexType>
@@ -57,7 +52,6 @@
<xs:complexContent> <xs:complexContent>
<xs:extension base="gbcs:cacheType"> <xs:extension base="gbcs:cacheType">
<xs:attribute name="max-age" type="xs:duration" default="P1D"/> <xs:attribute name="max-age" type="xs:duration" default="P1D"/>
<xs:attribute name="max-size" type="xs:token" default="0x1000000"/>
<xs:attribute name="digest" type="xs:token" default="MD5"/> <xs:attribute name="digest" type="xs:token" default="MD5"/>
<xs:attribute name="enable-compression" type="xs:boolean" default="true"/> <xs:attribute name="enable-compression" type="xs:boolean" default="true"/>
<xs:attribute name="compression-level" type="xs:byte" default="-1"/> <xs:attribute name="compression-level" type="xs:byte" default="-1"/>
@@ -152,8 +146,7 @@
</xs:unique> </xs:unique>
</xs:element> </xs:element>
<xs:element name="roles" type="gbcs:rolesType" maxOccurs="1" minOccurs="0"/> <xs:element name="roles" type="gbcs:rolesType" maxOccurs="1" minOccurs="0"/>
<xs:element name="user-quota" type="gbcs:quotaType" minOccurs="0" maxOccurs="1"/> <xs:element name="quota" type="gbcs:quotaType" minOccurs="0" maxOccurs="1"/>
<xs:element name="group-quota" type="gbcs:quotaType" minOccurs="0" maxOccurs="1"/>
</xs:sequence> </xs:sequence>
<xs:attribute name="name" type="xs:token"/> <xs:attribute name="name" type="xs:token"/>
</xs:complexType> </xs:complexType>

View File

@@ -24,8 +24,8 @@ abstract class AbstractBasicAuthServerTest : AbstractServerTest() {
protected val random = Random(101325) protected val random = Random(101325)
protected val keyValuePair = newEntry(random) protected val keyValuePair = newEntry(random)
protected val serverPath = "gbcs" protected val serverPath = "gbcs"
protected val readersGroup = Configuration.Group("readers", setOf(Role.Reader), null, null) protected val readersGroup = Configuration.Group("readers", setOf(Role.Reader), null)
protected val writersGroup = Configuration.Group("writers", setOf(Role.Writer), null, null) protected val writersGroup = Configuration.Group("writers", setOf(Role.Writer), null)
abstract protected val users : List<Configuration.User> abstract protected val users : List<Configuration.User>
@@ -55,7 +55,6 @@ abstract class AbstractBasicAuthServerTest : AbstractServerTest() {
), ),
Configuration.BasicAuthentication(), Configuration.BasicAuthentication(),
null, null,
false,
) )
Xml.write(Serializer.serialize(cfg), System.out) Xml.write(Serializer.serialize(cfg), System.out)
} }

View File

@@ -1,7 +1,7 @@
package net.woggioni.gbcs.server.test package net.woggioni.gbcs.server.test
import net.woggioni.gbcs.api.Configuration
import net.woggioni.gbcs.server.GradleBuildCacheServer import net.woggioni.gbcs.server.GradleBuildCacheServer
import net.woggioni.gbcs.api.Configuration
import org.junit.jupiter.api.AfterAll import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.BeforeAll import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.MethodOrderer import org.junit.jupiter.api.MethodOrderer

View File

@@ -4,6 +4,7 @@ import net.woggioni.gbcs.api.Configuration
import net.woggioni.gbcs.api.Role import net.woggioni.gbcs.api.Role
import net.woggioni.gbcs.common.Xml import net.woggioni.gbcs.common.Xml
import net.woggioni.gbcs.server.cache.FileSystemCacheConfiguration import net.woggioni.gbcs.server.cache.FileSystemCacheConfiguration
import net.woggioni.gbcs.server.cache.InMemoryCacheConfiguration
import net.woggioni.gbcs.server.configuration.Serializer import net.woggioni.gbcs.server.configuration.Serializer
import net.woggioni.gbcs.server.test.utils.CertificateUtils import net.woggioni.gbcs.server.test.utils.CertificateUtils
import net.woggioni.gbcs.server.test.utils.CertificateUtils.X509Credentials import net.woggioni.gbcs.server.test.utils.CertificateUtils.X509Credentials
@@ -45,8 +46,8 @@ abstract class AbstractTlsServerTest : AbstractServerTest() {
private lateinit var trustStore: KeyStore private lateinit var trustStore: KeyStore
protected lateinit var ca: X509Credentials protected lateinit var ca: X509Credentials
protected val readersGroup = Configuration.Group("readers", setOf(Role.Reader), null, null) protected val readersGroup = Configuration.Group("readers", setOf(Role.Reader), null)
protected val writersGroup = Configuration.Group("writers", setOf(Role.Writer), null, null) protected val writersGroup = Configuration.Group("writers", setOf(Role.Writer), null)
protected val random = Random(101325) protected val random = Random(101325)
protected val keyValuePair = newEntry(random) protected val keyValuePair = newEntry(random)
private val serverPath : String? = null private val serverPath : String? = null
@@ -171,8 +172,7 @@ abstract class AbstractTlsServerTest : AbstractServerTest() {
Configuration.Tls( Configuration.Tls(
Configuration.KeyStore(this.serverKeyStoreFile, null, SERVER_CERTIFICATE_ENTRY, PASSWORD), Configuration.KeyStore(this.serverKeyStoreFile, null, SERVER_CERTIFICATE_ENTRY, PASSWORD),
Configuration.TrustStore(this.trustStoreFile, null, false, false), Configuration.TrustStore(this.trustStoreFile, null, false, false),
), )
false
) )
Xml.write(Serializer.serialize(cfg), System.out) Xml.write(Serializer.serialize(cfg), System.out)
} }

View File

@@ -3,6 +3,7 @@ package net.woggioni.gbcs.server.test
import io.netty.handler.codec.http.HttpResponseStatus import io.netty.handler.codec.http.HttpResponseStatus
import net.woggioni.gbcs.api.Configuration import net.woggioni.gbcs.api.Configuration
import net.woggioni.gbcs.common.Xml import net.woggioni.gbcs.common.Xml
import net.woggioni.gbcs.server.cache.FileSystemCacheConfiguration
import net.woggioni.gbcs.server.cache.InMemoryCacheConfiguration import net.woggioni.gbcs.server.cache.InMemoryCacheConfiguration
import net.woggioni.gbcs.server.configuration.Serializer import net.woggioni.gbcs.server.configuration.Serializer
import net.woggioni.gbcs.server.test.utils.NetworkUtils import net.woggioni.gbcs.server.test.utils.NetworkUtils
@@ -51,12 +52,10 @@ class NoAuthServerTest : AbstractServerTest() {
maxAge = Duration.ofSeconds(3600 * 24), maxAge = Duration.ofSeconds(3600 * 24),
compressionEnabled = true, compressionEnabled = true,
digestAlgorithm = "MD5", digestAlgorithm = "MD5",
compressionLevel = Deflater.DEFAULT_COMPRESSION, compressionLevel = Deflater.DEFAULT_COMPRESSION
maxSize = 0x1000000
), ),
null, null,
null, null,
false,
) )
Xml.write(Serializer.serialize(cfg), System.out) Xml.write(Serializer.serialize(cfg), System.out)
} }

View File

@@ -7,6 +7,7 @@ import org.bouncycastle.asn1.x500.X500Name
import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Order import org.junit.jupiter.api.Order
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import org.junit.jupiter.params.provider.ArgumentsSource
import java.net.http.HttpClient import java.net.http.HttpClient
import java.net.http.HttpRequest import java.net.http.HttpRequest
import java.net.http.HttpResponse import java.net.http.HttpResponse

View File

@@ -10,7 +10,6 @@
write-idle-timeout="PT11M" write-idle-timeout="PT11M"
idle-timeout="PT30M" idle-timeout="PT30M"
max-request-size="101325"/> max-request-size="101325"/>
<transport use-native-transport="true"/>
<event-executor use-virtual-threads="false"/> <event-executor use-virtual-threads="false"/>
<cache xs:type="gbcs:fileSystemCacheType" path="/tmp/gbcs" max-age="P7D"/> <cache xs:type="gbcs:fileSystemCacheType" path="/tmp/gbcs" max-age="P7D"/>
<authentication> <authentication>

View File

@@ -1,8 +1,8 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?> <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<gbcs:server xmlns:xs="http://www.w3.org/2001/XMLSchema-instance" <gbcs:server xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
xmlns:gbcs="urn:net.woggioni.gbcs.server" xmlns:gbcs="urn:net.woggioni.gbcs.server"
xmlns:gbcs-memcache="urn:net.woggioni.gbcs.server.memcache" xmlns:gbcs-memcached="urn:net.woggioni.gbcs.server.memcached"
xs:schemaLocation="urn:net.woggioni.gbcs.server.memcache jpms://net.woggioni.gbcs.server.memcache/net/woggioni/gbcs/server/memcache/schema/gbcs-memcache.xsd urn:net.woggioni.gbcs.server jpms://net.woggioni.gbcs.server/net/woggioni/gbcs/server/schema/gbcs.xsd" xs:schemaLocation="urn:net.woggioni.gbcs.server.memcached jpms://net.woggioni.gbcs.server.memcached/net/woggioni/gbcs/server/memcached/schema/gbcs-memcached.xsd urn:net.woggioni.gbcs.server jpms://net.woggioni.gbcs.server/net/woggioni/gbcs/server/schema/gbcs.xsd"
> >
<bind host="0.0.0.0" port="8443" incoming-connections-backlog-size="4096"/> <bind host="0.0.0.0" port="8443" incoming-connections-backlog-size="4096"/>
<connection <connection
@@ -13,7 +13,7 @@
read-timeout="PT5M" read-timeout="PT5M"
write-timeout="PT5M"/> write-timeout="PT5M"/>
<event-executor use-virtual-threads="true"/> <event-executor use-virtual-threads="true"/>
<cache xs:type="gbcs-memcache:memcacheCacheType" max-age="P7D" max-size="16777216" compression-mode="deflate"> <cache xs:type="gbcs-memcached:memcachedCacheType" max-age="P7D" max-size="16777216" compression-mode="zip">
<server host="memcached" port="11211"/> <server host="memcached" port="11211"/>
</cache> </cache>
<authorization> <authorization>

View File

@@ -1,8 +1,8 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?> <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<gbcs:server xmlns:xs="http://www.w3.org/2001/XMLSchema-instance" <gbcs:server xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
xmlns:gbcs="urn:net.woggioni.gbcs.server" xmlns:gbcs="urn:net.woggioni.gbcs.server"
xmlns:gbcs-memcache="urn:net.woggioni.gbcs.server.memcache" xmlns:gbcs-memcached="urn:net.woggioni.gbcs.server.memcached"
xs:schemaLocation="urn:net.woggioni.gbcs.server.memcache jpms://net.woggioni.gbcs.server.memcache/net/woggioni/gbcs/server/memcache/schema/gbcs-memcache.xsd urn:net.woggioni.gbcs.server jpms://net.woggioni.gbcs.server/net/woggioni/gbcs/server/schema/gbcs.xsd"> xs:schemaLocation="urn:net.woggioni.gbcs.server.memcached jpms://net.woggioni.gbcs.server.memcached/net/woggioni/gbcs/server/memcached/schema/gbcs-memcached.xsd urn:net.woggioni.gbcs.server jpms://net.woggioni.gbcs.server/net/woggioni/gbcs/server/schema/gbcs.xsd">
<bind host="127.0.0.1" port="11443" incoming-connections-backlog-size="50"/> <bind host="127.0.0.1" port="11443" incoming-connections-backlog-size="50"/>
<connection <connection
write-timeout="PT25M" write-timeout="PT25M"
@@ -12,8 +12,8 @@
idle-timeout="PT30M" idle-timeout="PT30M"
max-request-size="101325"/> max-request-size="101325"/>
<event-executor use-virtual-threads="false"/> <event-executor use-virtual-threads="false"/>
<cache xs:type="gbcs-memcache:memcacheCacheType" max-age="P7D" max-size="101325" digest="SHA-256"> <cache xs:type="gbcs-memcached:memcachedCacheType" max-age="P7D" max-size="101325" digest="SHA-256">
<server host="127.0.0.1" port="11211" max-connections="10" connection-timeout="PT20S"/> <server host="127.0.0.1" port="11211"/>
</cache> </cache>
<authentication> <authentication>
<none/> <none/>

View File

@@ -32,8 +32,7 @@
<roles> <roles>
<reader/> <reader/>
</roles> </roles>
<user-quota calls="30" period="PT1M"/> <quota calls="10" period="PT1S"/>
<group-quota calls="10" period="PT1S"/>
</group> </group>
<group name="writers"> <group name="writers">
<users> <users>
@@ -51,7 +50,7 @@
<reader/> <reader/>
<writer/> <writer/>
</roles> </roles>
<group-quota calls="1000" period="P1D"/> <quota calls="1000" period="P1D"/>
</group> </group>
</groups> </groups>
</authorization> </authorization>

View File

@@ -2,7 +2,7 @@ org.gradle.configuration-cache=false
org.gradle.parallel=true org.gradle.parallel=true
org.gradle.caching=true org.gradle.caching=true
gbcs.version = 0.1.2 gbcs.version = 0.1.1
lys.version = 2025.01.31 lys.version = 2025.01.31

View File

@@ -27,7 +27,7 @@ rootProject.name = 'gbcs'
include 'gbcs-api' include 'gbcs-api'
include 'gbcs-common' include 'gbcs-common'
include 'gbcs-server-memcache' include 'gbcs-server-memcached'
include 'gbcs-cli' include 'gbcs-cli'
include 'docker' include 'docker'
include 'gbcs-client' include 'gbcs-client'