Compare commits

...

18 Commits

Author SHA1 Message Date
adf8a0cf24 added documentation for rbcs-servlet
All checks were successful
CI / build (push) Successful in 1m54s
switched Docker images to serial GC
2025-02-19 23:00:56 +08:00
42eb26a948 optimize imports 2025-02-19 22:40:14 +08:00
f048a60540 implemented streaming request/response streaming
added metadata to cache values

added cache servlet for comparison
2025-02-19 22:37:54 +08:00
0463038aaa first commit with streaming support (buggy and unreliable) 2025-02-13 23:02:08 +08:00
7eca8a270d 0.1.6 release
All checks were successful
CI / build (push) Successful in 3m29s
2025-02-08 00:54:25 +08:00
84d7c977f9 added randomizer to retries 2025-02-07 23:19:13 +08:00
317eadce07 used virtual thread for garbage colection in FileSystemCache
All checks were successful
CI / build (push) Successful in 2m32s
2025-02-07 20:45:29 +08:00
af79e74b95 fixed max message size for memcache backend 2025-02-06 23:09:22 +08:00
78ae21caa4 0.1.4 release
All checks were successful
CI / build (push) Successful in 8m14s
2025-02-06 15:24:00 +08:00
6c0eadb9fb renamed project to "Remote Cache Build Server" (RBCS) 2025-02-06 15:20:50 +08:00
5fef1b932e updated lys-catalog version
All checks were successful
CI / build (push) Successful in 2m32s
2025-02-05 21:49:08 +08:00
5e173dbf62 fixed unit tests 2025-02-05 21:24:10 +08:00
53b24e3d54 improved benchmark accuracy 2025-02-05 19:10:25 +08:00
7d0f24fa58 fixed memory leak in InMemoryCache 2025-02-05 19:09:51 +08:00
1b6cf1bd96 fixed memory leak in memcached plugin 2025-02-05 14:41:11 +08:00
4180df2352 added healthcheck command to client 2025-02-05 00:02:17 +08:00
c2e388b931 switched to ZGC in docker image
All checks were successful
CI / build (push) Successful in 3m28s
2025-02-04 22:46:34 +08:00
6c62ac85c0 implemented memcached client with Netty
All checks were successful
CI / build (push) Successful in 1m46s
2025-02-04 22:09:28 +08:00
159 changed files with 4352 additions and 1839 deletions

View File

@@ -31,7 +31,7 @@ jobs:
username: woggioni
password: ${{ secrets.PUBLISHER_TOKEN }}
-
name: Build gbcs Docker image
name: Build rbcs Docker image
uses: docker/build-push-action@v5.3.0
with:
context: "docker/build/docker"
@@ -39,12 +39,12 @@ jobs:
push: true
pull: true
tags: |
gitea.woggioni.net/woggioni/gbcs:latest
gitea.woggioni.net/woggioni/gbcs:${{ steps.retrieve-version.outputs.VERSION }}
gitea.woggioni.net/woggioni/rbcs:latest
gitea.woggioni.net/woggioni/rbcs:${{ steps.retrieve-version.outputs.VERSION }}
target: release
cache-from: type=registry,ref=gitea.woggioni.net/woggioni/gbcs:buildx
cache-from: type=registry,ref=gitea.woggioni.net/woggioni/rbcs:buildx
-
name: Build gbcs memcached Docker image
name: Build rbcs memcache Docker image
uses: docker/build-push-action@v5.3.0
with:
context: "docker/build/docker"
@@ -52,11 +52,11 @@ jobs:
push: true
pull: true
tags: |
gitea.woggioni.net/woggioni/gbcs:memcached
gitea.woggioni.net/woggioni/gbcs:memcached-${{ steps.retrieve-version.outputs.VERSION }}
target: release-memcached
cache-from: type=registry,ref=gitea.woggioni.net/woggioni/gbcs:buildx
cache-to: type=registry,mode=max,compression=zstd,image-manifest=true,oci-mediatypes=true,ref=gitea.woggioni.net/woggioni/gbcs:buildx
gitea.woggioni.net/woggioni/rbcs:memcache
gitea.woggioni.net/woggioni/rbcs:memcache-${{ steps.retrieve-version.outputs.VERSION }}
target: release-memcache
cache-from: type=registry,ref=gitea.woggioni.net/woggioni/rbcs:buildx
cache-to: type=registry,mode=max,compression=zstd,image-manifest=true,oci-mediatypes=true,ref=gitea.woggioni.net/woggioni/rbcs:buildx
- name: Publish artifacts
env:
PUBLISHER_TOKEN: ${{ secrets.PUBLISHER_TOKEN }}

2
.gitignore vendored
View File

@@ -4,4 +4,4 @@
# Ignore Gradle build output directory
build
gbcs-cli/native-image/*.json
rbcs-cli/native-image/*.json

View File

@@ -15,7 +15,7 @@ allprojects { subproject ->
version = project.currentTag.map { it[0] }.get()
} else {
version = project.gitRevision.map { gitRevision ->
"${getProperty('gbcs.version')}.${gitRevision[0..10]}"
"${getProperty('rbcs.version')}.${gitRevision[0..10]}"
}.get()
}

View File

@@ -4,13 +4,13 @@ USER luser
WORKDIR /home/luser
FROM base-release AS release
ADD gbcs-cli-envelope-*.jar gbcs.jar
ENTRYPOINT ["java", "-jar", "/home/luser/gbcs.jar", "server"]
ADD rbcs-cli-envelope-*.jar rbcs.jar
ENTRYPOINT ["java", "-XX:+UseSerialGC", "-XX:GCTimeRatio=24", "-jar", "/home/luser/rbcs.jar", "server"]
FROM base-release AS release-memcached
ADD --chown=luser:luser gbcs-cli-envelope-*.jar gbcs.jar
FROM base-release AS release-memcache
ADD --chown=luser:luser rbcs-cli-envelope-*.jar rbcs.jar
RUN mkdir plugins
WORKDIR /home/luser/plugins
RUN --mount=type=bind,source=.,target=/build/distributions tar -xf /build/distributions/gbcs-server-memcached*.tar
RUN --mount=type=bind,source=.,target=/build/distributions tar -xf /build/distributions/rbcs-server-memcache*.tar
WORKDIR /home/luser
ENTRYPOINT ["java", "-jar", "/home/luser/gbcs.jar", "server"]
ENTRYPOINT ["java", "-XX:+UseSerialGC", "-XX:GCTimeRatio=24", "-jar", "/home/luser/rbcs.jar", "server"]

View File

@@ -18,8 +18,8 @@ configurations {
}
dependencies {
docker project(path: ':gbcs-cli', configuration: 'release')
docker project(path: ':gbcs-server-memcached', configuration: 'release')
docker project(path: ':rbcs-cli', configuration: 'release')
docker project(path: ':rbcs-server-memcache', configuration: 'release')
}
Provider<Task> cleanTaskProvider = tasks.named(BasePlugin.CLEAN_TASK_NAME) {}
@@ -35,33 +35,33 @@ Provider<Copy> prepareDockerBuild = tasks.register('prepareDockerBuild', Copy) {
Provider<DockerBuildImage> dockerBuild = tasks.register('dockerBuildImage', DockerBuildImage) {
group = 'docker'
dependsOn prepareDockerBuild
images.add('gitea.woggioni.net/woggioni/gbcs:latest')
images.add("gitea.woggioni.net/woggioni/gbcs:${version}")
images.add('gitea.woggioni.net/woggioni/rbcs:latest')
images.add("gitea.woggioni.net/woggioni/rbcs:${version}")
}
Provider<DockerTagImage> dockerTag = tasks.register('dockerTagImage', DockerTagImage) {
group = 'docker'
repository = 'gitea.woggioni.net/woggioni/gbcs'
imageId = 'gitea.woggioni.net/woggioni/gbcs:latest'
repository = 'gitea.woggioni.net/woggioni/rbcs'
imageId = 'gitea.woggioni.net/woggioni/rbcs:latest'
tag = version
}
Provider<DockerTagImage> dockerTagMemcached = tasks.register('dockerTagMemcachedImage', DockerTagImage) {
Provider<DockerTagImage> dockerTagMemcache = tasks.register('dockerTagMemcacheImage', DockerTagImage) {
group = 'docker'
repository = 'gitea.woggioni.net/woggioni/gbcs'
imageId = 'gitea.woggioni.net/woggioni/gbcs:memcached'
tag = "${version}-memcached"
repository = 'gitea.woggioni.net/woggioni/rbcs'
imageId = 'gitea.woggioni.net/woggioni/rbcs:memcache'
tag = "${version}-memcache"
}
Provider<DockerPushImage> dockerPush = tasks.register('dockerPushImage', DockerPushImage) {
group = 'docker'
dependsOn dockerTag, dockerTagMemcached
dependsOn dockerTag, dockerTagMemcache
registryCredentials {
url = getProperty('docker.registry.url')
username = 'woggioni'
password = System.getenv().get("PUBLISHER_TOKEN")
}
images = [dockerTag.flatMap{ it.tag }, dockerTagMemcached.flatMap{ it.tag }]
images = [dockerTag.flatMap{ it.tag }, dockerTagMemcache.flatMap{ it.tag }]
}

View File

@@ -1,6 +0,0 @@
module net.woggioni.gbcs.api {
requires static lombok;
requires java.xml;
exports net.woggioni.gbcs.api;
exports net.woggioni.gbcs.api.exception;
}

View File

@@ -1,12 +0,0 @@
package net.woggioni.gbcs.api;
import net.woggioni.gbcs.api.exception.ContentTooLargeException;
import java.nio.channels.ReadableByteChannel;
public interface Cache extends AutoCloseable {
ReadableByteChannel get(String key);
void put(String key, byte[] content) throws ContentTooLargeException;
}

View File

@@ -1,7 +0,0 @@
package net.woggioni.gbcs.api.exception;
public class GbcsException extends RuntimeException {
public GbcsException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@@ -1,17 +0,0 @@
module net.woggioni.gbcs.cli {
requires org.slf4j;
requires net.woggioni.gbcs.server;
requires info.picocli;
requires net.woggioni.gbcs.common;
requires net.woggioni.gbcs.client;
requires kotlin.stdlib;
requires net.woggioni.jwo;
requires net.woggioni.gbcs.api;
exports net.woggioni.gbcs.cli.impl.converters to info.picocli;
opens net.woggioni.gbcs.cli.impl.commands to info.picocli;
opens net.woggioni.gbcs.cli.impl to info.picocli;
opens net.woggioni.gbcs.cli to info.picocli, net.woggioni.gbcs.common;
exports net.woggioni.gbcs.cli;
}

View File

@@ -1,64 +0,0 @@
package net.woggioni.gbcs.cli
import net.woggioni.gbcs.common.GbcsUrlStreamHandlerFactory
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.cli.impl.AbstractVersionProvider
import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.cli.impl.commands.BenchmarkCommand
import net.woggioni.gbcs.cli.impl.commands.ClientCommand
import net.woggioni.gbcs.cli.impl.commands.GetCommand
import net.woggioni.gbcs.cli.impl.commands.PasswordHashCommand
import net.woggioni.gbcs.cli.impl.commands.PutCommand
import net.woggioni.gbcs.cli.impl.commands.ServerCommand
import net.woggioni.jwo.Application
import picocli.CommandLine
import picocli.CommandLine.Model.CommandSpec
import java.net.URI
@CommandLine.Command(
name = "gbcs", versionProvider = GradleBuildCacheServerCli.VersionProvider::class
)
class GradleBuildCacheServerCli : GbcsCommand() {
class VersionProvider : AbstractVersionProvider()
companion object {
@JvmStatic
fun main(vararg args: String) {
Thread.currentThread().contextClassLoader = GradleBuildCacheServerCli::class.java.classLoader
GbcsUrlStreamHandlerFactory.install()
val log = contextLogger()
val app = Application.builder("gbcs")
.configurationDirectoryEnvVar("GBCS_CONFIGURATION_DIR")
.configurationDirectoryPropertyKey("net.woggioni.gbcs.conf.dir")
.build()
val gbcsCli = GradleBuildCacheServerCli()
val commandLine = CommandLine(gbcsCli)
commandLine.setExecutionExceptionHandler { ex, cl, parseResult ->
log.error(ex.message, ex)
CommandLine.ExitCode.SOFTWARE
}
commandLine.addSubcommand(ServerCommand(app))
commandLine.addSubcommand(PasswordHashCommand())
commandLine.addSubcommand(
CommandLine(ClientCommand(app)).apply {
addSubcommand(BenchmarkCommand())
addSubcommand(PutCommand())
addSubcommand(GetCommand())
})
System.exit(commandLine.execute(*args))
}
}
@CommandLine.Option(names = ["-V", "--version"], versionHelp = true)
var versionHelp = false
private set
@CommandLine.Spec
private lateinit var spec: CommandSpec
override fun run() {
spec.commandLine().usage(System.out);
}
}

View File

@@ -1,128 +0,0 @@
package net.woggioni.gbcs.cli.impl.commands
import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.client.GradleBuildCacheClient
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.common.error
import net.woggioni.gbcs.common.info
import net.woggioni.jwo.JWO
import picocli.CommandLine
import java.security.SecureRandom
import java.time.Duration
import java.time.Instant
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.Semaphore
import java.util.concurrent.atomic.AtomicLong
import kotlin.random.Random
@CommandLine.Command(
name = "benchmark",
description = ["Run a load test against the server"],
showDefaultValues = true
)
class BenchmarkCommand : GbcsCommand() {
private val log = contextLogger()
@CommandLine.Spec
private lateinit var spec: CommandLine.Model.CommandSpec
@CommandLine.Option(
names = ["-e", "--entries"],
description = ["Total number of elements to be added to the cache"],
paramLabel = "NUMBER_OF_ENTRIES"
)
private var numberOfEntries = 1000
override fun run() {
val clientCommand = spec.parent().userObject() as ClientCommand
val profile = clientCommand.profileName.let { profileName ->
clientCommand.configuration.profiles[profileName]
?: throw IllegalArgumentException("Profile $profileName does not exist in configuration")
}
val client = GradleBuildCacheClient(profile)
val entryGenerator = sequence {
val random = Random(SecureRandom.getInstance("NativePRNGNonBlocking").nextLong())
while (true) {
val key = JWO.bytesToHex(random.nextBytes(16))
val content = random.nextInt().toByte()
val value = ByteArray(0x1000, { _ -> content })
yield(key to value)
}
}
log.info {
"Starting insertion"
}
val entries = let {
val completionCounter = AtomicLong(0)
val completionQueue = LinkedBlockingQueue<Pair<String, ByteArray>>(numberOfEntries)
val start = Instant.now()
val semaphore = Semaphore(profile.maxConnections * 3)
val iterator = entryGenerator.take(numberOfEntries).iterator()
while(completionCounter.get() < numberOfEntries) {
if(iterator.hasNext()) {
val entry = iterator.next()
semaphore.acquire()
val future = client.put(entry.first, entry.second).thenApply { entry }
future.whenComplete { result, ex ->
if (ex != null) {
log.error(ex.message, ex)
} else {
completionQueue.put(result)
}
semaphore.release()
completionCounter.incrementAndGet()
}
}
}
val inserted = completionQueue.toList()
val end = Instant.now()
log.info {
val elapsed = Duration.between(start, end).toMillis()
val opsPerSecond = String.format("%.2f", numberOfEntries.toDouble() / elapsed * 1000)
"Insertion rate: $opsPerSecond ops/s"
}
inserted
}
log.info {
"Inserted ${entries.size} entries"
}
log.info {
"Starting retrieval"
}
if (entries.isNotEmpty()) {
val completionCounter = AtomicLong(0)
val semaphore = Semaphore(profile.maxConnections * 3)
val start = Instant.now()
entries.forEach { entry ->
semaphore.acquire()
val future = client.get(entry.first).thenApply {
if (it == null) {
log.error {
"Missing entry for key '${entry.first}'"
}
} else if (!entry.second.contentEquals(it)) {
log.error {
"Retrieved a value different from what was inserted for key '${entry.first}'"
}
}
}
future.whenComplete { _, _ ->
completionCounter.incrementAndGet()
semaphore.release()
}
}
val end = Instant.now()
log.info {
val elapsed = Duration.between(start, end).toMillis()
val opsPerSecond = String.format("%.2f", entries.size.toDouble() / elapsed * 1000)
"Retrieval rate: $opsPerSecond ops/s"
}
} else {
log.error("Skipping retrieval benchmark as it was not possible to insert any entry in the cache")
}
}
}

View File

@@ -1,48 +0,0 @@
package net.woggioni.gbcs.cli.impl.commands
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.cli.impl.converters.InputStreamConverter
import net.woggioni.gbcs.client.GradleBuildCacheClient
import picocli.CommandLine
import java.io.InputStream
@CommandLine.Command(
name = "put",
description = ["Add or replace a value to the cache with the specified key"],
showDefaultValues = true
)
class PutCommand : GbcsCommand() {
private val log = contextLogger()
@CommandLine.Spec
private lateinit var spec: CommandLine.Model.CommandSpec
@CommandLine.Option(
names = ["-k", "--key"],
description = ["The key for the new value"],
paramLabel = "KEY"
)
private var key : String = ""
@CommandLine.Option(
names = ["-v", "--value"],
description = ["Path to a file containing the value to be added (defaults to stdin)"],
paramLabel = "VALUE_FILE",
converter = [InputStreamConverter::class]
)
private var value : InputStream = System.`in`
override fun run() {
val clientCommand = spec.parent().userObject() as ClientCommand
val profile = clientCommand.profileName.let { profileName ->
clientCommand.configuration.profiles[profileName]
?: throw IllegalArgumentException("Profile $profileName does not exist in configuration")
}
GradleBuildCacheClient(profile).use { client ->
value.use {
client.put(key, it.readAllBytes())
}.get()
}
}
}

View File

@@ -1,67 +0,0 @@
package net.woggioni.gbcs.cli.impl.commands
import net.woggioni.gbcs.server.GradleBuildCacheServer
import net.woggioni.gbcs.server.GradleBuildCacheServer.Companion.DEFAULT_CONFIGURATION_URL
import net.woggioni.gbcs.api.Configuration
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.common.debug
import net.woggioni.gbcs.common.info
import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.jwo.Application
import net.woggioni.jwo.JWO
import picocli.CommandLine
import java.io.ByteArrayOutputStream
import java.nio.file.Files
import java.nio.file.Path
@CommandLine.Command(
name = "server",
description = ["GBCS server"],
showDefaultValues = true
)
class ServerCommand(app : Application) : GbcsCommand() {
private val log = contextLogger()
private fun createDefaultConfigurationFile(configurationFile: Path) {
log.info {
"Creating default configuration file at '$configurationFile'"
}
val defaultConfigurationFileResource = DEFAULT_CONFIGURATION_URL
Files.newOutputStream(configurationFile).use { outputStream ->
defaultConfigurationFileResource.openStream().use { inputStream ->
JWO.copy(inputStream, outputStream)
}
}
}
@CommandLine.Option(
names = ["-c", "--config-file"],
description = ["Read the application configuration from this file"],
paramLabel = "CONFIG_FILE"
)
private var configurationFile: Path = findConfigurationFile(app, "gbcs-server.xml")
val configuration : Configuration by lazy {
GradleBuildCacheServer.loadConfiguration(configurationFile)
}
override fun run() {
if (!Files.exists(configurationFile)) {
Files.createDirectories(configurationFile.parent)
createDefaultConfigurationFile(configurationFile)
}
val configuration = GradleBuildCacheServer.loadConfiguration(configurationFile)
log.debug {
ByteArrayOutputStream().also {
GradleBuildCacheServer.dumpConfiguration(configuration, it)
}.let {
"Server configuration:\n${String(it.toByteArray())}"
}
}
val server = GradleBuildCacheServer(configuration)
server.run().use {
}
}
}

View File

@@ -1,16 +0,0 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<gbcs-client:profiles xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
xmlns:gbcs-client="urn:net.woggioni.gbcs.client"
xs:schemaLocation="urn:net.woggioni.gbcs.client jms://net.woggioni.gbcs.client/net/woggioni/gbcs/client/schema/gbcs-client.xsd"
>
<profile name="profile1" base-url="https://gbcs1.example.com/">
<tls-client-auth
key-store-file="keystore.pfx"
key-store-password="password"
key-alias="woggioni@c962475fa38"
key-password="key-password"/>
</profile>
<profile name="profile2" base-url="https://gbcs2.example.com/">
<basic-auth user="user" password="password"/>
</profile>
</gbcs-client:profiles>

View File

@@ -1,29 +0,0 @@
package net.woggioni.gbcs.common
import net.woggioni.jwo.JWO
import java.net.URI
import java.net.URL
import java.security.MessageDigest
object GBCS {
fun String.toUrl() : URL = URL.of(URI(this), null)
const val GBCS_NAMESPACE_URI: String = "urn:net.woggioni.gbcs.server"
const val GBCS_PREFIX: String = "gbcs"
const val XML_SCHEMA_NAMESPACE_URI = "http://www.w3.org/2001/XMLSchema-instance"
fun digest(
data: ByteArray,
md: MessageDigest = MessageDigest.getInstance("MD5")
): ByteArray {
md.update(data)
return md.digest()
}
fun digestString(
data: ByteArray,
md: MessageDigest = MessageDigest.getInstance("MD5")
): String {
return JWO.bytesToHex(digest(data, md))
}
}

View File

@@ -1,111 +0,0 @@
package net.woggioni.gbcs.common
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.slf4j.event.Level
import java.nio.file.Files
import java.nio.file.Path
import java.util.logging.LogManager
inline fun <reified T> T.contextLogger() = LoggerFactory.getLogger(T::class.java)
inline fun Logger.traceParam(messageBuilder : () -> Pair<String, Array<Any>>) {
if(isTraceEnabled) {
val (format, params) = messageBuilder()
trace(format, params)
}
}
inline fun Logger.debugParam(messageBuilder : () -> Pair<String, Array<Any>>) {
if(isDebugEnabled) {
val (format, params) = messageBuilder()
info(format, params)
}
}
inline fun Logger.infoParam(messageBuilder : () -> Pair<String, Array<Any>>) {
if(isInfoEnabled) {
val (format, params) = messageBuilder()
info(format, params)
}
}
inline fun Logger.warnParam(messageBuilder : () -> Pair<String, Array<Any>>) {
if(isWarnEnabled) {
val (format, params) = messageBuilder()
warn(format, params)
}
}
inline fun Logger.errorParam(messageBuilder : () -> Pair<String, Array<Any>>) {
if(isErrorEnabled) {
val (format, params) = messageBuilder()
error(format, params)
}
}
inline fun log(log : Logger,
filter : Logger.() -> Boolean,
loggerMethod : Logger.(String) -> Unit, messageBuilder : () -> String) {
if(log.filter()) {
log.loggerMethod(messageBuilder())
}
}
inline fun Logger.log(level : Level, messageBuilder : () -> String) {
if(isEnabledForLevel(level)) {
makeLoggingEventBuilder(level).log(messageBuilder())
}
}
inline fun Logger.trace(messageBuilder : () -> String) {
if(isTraceEnabled) {
trace(messageBuilder())
}
}
inline fun Logger.debug(messageBuilder : () -> String) {
if(isDebugEnabled) {
debug(messageBuilder())
}
}
inline fun Logger.info(messageBuilder : () -> String) {
if(isInfoEnabled) {
info(messageBuilder())
}
}
inline fun Logger.warn(messageBuilder : () -> String) {
if(isWarnEnabled) {
warn(messageBuilder())
}
}
inline fun Logger.error(messageBuilder : () -> String) {
if(isErrorEnabled) {
error(messageBuilder())
}
}
class LoggingConfig {
init {
val logManager = LogManager.getLogManager()
System.getProperty("log.config.source")?.let withSource@ { source ->
val urls = LoggingConfig::class.java.classLoader.getResources(source)
while(urls.hasMoreElements()) {
val url = urls.nextElement()
url.openStream().use { inputStream ->
logManager.readConfiguration(inputStream)
return@withSource
}
}
Path.of(source).takeIf(Files::exists)
?.let(Files::newInputStream)
?.use(logManager::readConfiguration)
}
}
}

View File

@@ -1 +0,0 @@
net.woggioni.gbcs.common.GbcsUrlStreamHandlerFactory

View File

@@ -1,14 +0,0 @@
import net.woggioni.gbcs.api.CacheProvider;
module net.woggioni.gbcs.server.memcached {
requires net.woggioni.gbcs.common;
requires net.woggioni.gbcs.api;
requires com.googlecode.xmemcached;
requires net.woggioni.jwo;
requires java.xml;
requires kotlin.stdlib;
provides CacheProvider with net.woggioni.gbcs.server.memcached.MemcachedCacheProvider;
opens net.woggioni.gbcs.server.memcached.schema;
}

View File

@@ -1,59 +0,0 @@
package net.woggioni.gbcs.server.memcached
import net.rubyeye.xmemcached.XMemcachedClientBuilder
import net.rubyeye.xmemcached.command.BinaryCommandFactory
import net.rubyeye.xmemcached.transcoders.CompressionMode
import net.rubyeye.xmemcached.transcoders.SerializingTranscoder
import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.api.exception.ContentTooLargeException
import net.woggioni.gbcs.common.HostAndPort
import net.woggioni.jwo.JWO
import java.io.ByteArrayInputStream
import java.net.InetSocketAddress
import java.nio.channels.Channels
import java.nio.channels.ReadableByteChannel
import java.nio.charset.StandardCharsets
import java.security.MessageDigest
import java.time.Duration
class MemcachedCache(
servers: List<HostAndPort>,
private val maxAge: Duration,
maxSize : Int,
digestAlgorithm: String?,
compressionMode: CompressionMode,
) : Cache {
private val memcachedClient = XMemcachedClientBuilder(
servers.stream().map { addr: HostAndPort -> InetSocketAddress(addr.host, addr.port) }.toList()
).apply {
commandFactory = BinaryCommandFactory()
digestAlgorithm?.let { dAlg ->
setKeyProvider { key ->
val md = MessageDigest.getInstance(dAlg)
md.update(key.toByteArray(StandardCharsets.UTF_8))
JWO.bytesToHex(md.digest())
}
}
transcoder = SerializingTranscoder(maxSize).apply {
setCompressionMode(compressionMode)
}
}.build()
override fun get(key: String): ReadableByteChannel? {
return memcachedClient.get<ByteArray>(key)
?.let(::ByteArrayInputStream)
?.let(Channels::newChannel)
}
override fun put(key: String, content: ByteArray) {
try {
memcachedClient[key, maxAge.toSeconds().toInt()] = content
} catch (e: IllegalArgumentException) {
throw ContentTooLargeException(e.message, e)
}
}
override fun close() {
memcachedClient.shutdown()
}
}

View File

@@ -1,26 +0,0 @@
package net.woggioni.gbcs.server.memcached
import net.rubyeye.xmemcached.transcoders.CompressionMode
import net.woggioni.gbcs.api.Configuration
import net.woggioni.gbcs.common.HostAndPort
import java.time.Duration
data class MemcachedCacheConfiguration(
var servers: List<HostAndPort>,
var maxAge: Duration = Duration.ofDays(1),
var maxSize: Int = 0x100000,
var digestAlgorithm: String? = null,
var compressionMode: CompressionMode = CompressionMode.ZIP,
) : Configuration.Cache {
override fun materialize() = MemcachedCache(
servers,
maxAge,
maxSize,
digestAlgorithm,
compressionMode
)
override fun getNamespaceURI() = "urn:net.woggioni.gbcs.server.memcached"
override fun getTypeName() = "memcachedCacheType"
}

View File

@@ -1,88 +0,0 @@
package net.woggioni.gbcs.server.memcached
import net.rubyeye.xmemcached.transcoders.CompressionMode
import net.woggioni.gbcs.api.CacheProvider
import net.woggioni.gbcs.api.exception.ConfigurationException
import net.woggioni.gbcs.common.GBCS
import net.woggioni.gbcs.common.HostAndPort
import net.woggioni.gbcs.common.Xml
import net.woggioni.gbcs.common.Xml.Companion.asIterable
import net.woggioni.gbcs.common.Xml.Companion.renderAttribute
import org.w3c.dom.Document
import org.w3c.dom.Element
import java.time.Duration
class MemcachedCacheProvider : CacheProvider<MemcachedCacheConfiguration> {
override fun getXmlSchemaLocation() = "jpms://net.woggioni.gbcs.server.memcached/net/woggioni/gbcs/server/memcached/schema/gbcs-memcached.xsd"
override fun getXmlType() = "memcachedCacheType"
override fun getXmlNamespace() = "urn:net.woggioni.gbcs.server.memcached"
val xmlNamespacePrefix : String
get() = "gbcs-memcached"
override fun deserialize(el: Element): MemcachedCacheConfiguration {
val servers = mutableListOf<HostAndPort>()
val maxAge = el.renderAttribute("max-age")
?.let(Duration::parse)
?: Duration.ofDays(1)
val maxSize = el.renderAttribute("max-size")
?.let(String::toInt)
?: 0x100000
val compressionMode = el.renderAttribute("compression-mode")
?.let {
when (it) {
"gzip" -> CompressionMode.GZIP
"zip" -> CompressionMode.ZIP
else -> CompressionMode.ZIP
}
}
?: CompressionMode.ZIP
val digestAlgorithm = el.renderAttribute("digest")
for (child in el.asIterable()) {
when (child.nodeName) {
"server" -> {
val host = child.renderAttribute("host") ?: throw ConfigurationException("host attribute is required")
val port = child.renderAttribute("port")?.toInt() ?: throw ConfigurationException("port attribute is required")
servers.add(HostAndPort(host, port))
}
}
}
return MemcachedCacheConfiguration(
servers,
maxAge,
maxSize,
digestAlgorithm,
compressionMode,
)
}
override fun serialize(doc: Document, cache: MemcachedCacheConfiguration) = cache.run {
val result = doc.createElement("cache")
Xml.of(doc, result) {
attr("xmlns:${xmlNamespacePrefix}", xmlNamespace, namespaceURI = "http://www.w3.org/2000/xmlns/")
attr("xs:type", "${xmlNamespacePrefix}:$xmlType", GBCS.XML_SCHEMA_NAMESPACE_URI)
for (server in servers) {
node("server") {
attr("host", server.host)
attr("port", server.port.toString())
}
}
attr("max-age", maxAge.toString())
attr("max-size", maxSize.toString())
digestAlgorithm?.let { digestAlgorithm ->
attr("digest", digestAlgorithm)
}
attr(
"compression-mode", when (compressionMode) {
CompressionMode.GZIP -> "gzip"
CompressionMode.ZIP -> "zip"
}
)
}
result
}
}

View File

@@ -1 +0,0 @@
net.woggioni.gbcs.server.memcached.MemcachedCacheProvider

View File

@@ -1,35 +0,0 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<xs:schema targetNamespace="urn:net.woggioni.gbcs.server.memcached"
xmlns:gbcs-memcached="urn:net.woggioni.gbcs.server.memcached"
xmlns:gbcs="urn:net.woggioni.gbcs.server"
xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:import schemaLocation="jpms://net.woggioni.gbcs.server/net/woggioni/gbcs/server/schema/gbcs.xsd" namespace="urn:net.woggioni.gbcs.server"/>
<xs:complexType name="memcachedServerType">
<xs:attribute name="host" type="xs:token" use="required"/>
<xs:attribute name="port" type="xs:positiveInteger" use="required"/>
</xs:complexType>
<xs:complexType name="memcachedCacheType">
<xs:complexContent>
<xs:extension base="gbcs:cacheType">
<xs:sequence maxOccurs="unbounded">
<xs:element name="server" type="gbcs-memcached:memcachedServerType"/>
</xs:sequence>
<xs:attribute name="max-age" type="xs:duration" default="P1D"/>
<xs:attribute name="max-size" type="xs:unsignedInt" default="1048576"/>
<xs:attribute name="digest" type="xs:token" />
<xs:attribute name="compression-mode" type="gbcs-memcached:compressionType" default="zip"/>
</xs:extension>
</xs:complexContent>
</xs:complexType>
<xs:simpleType name="compressionType">
<xs:restriction base="xs:token">
<xs:enumeration value="zip"/>
<xs:enumeration value="gzip"/>
</xs:restriction>
</xs:simpleType>
</xs:schema>

View File

@@ -1,30 +0,0 @@
package net.woggioni.gbcs.server
import io.netty.channel.ChannelHandlerContext
import org.slf4j.Logger
import java.net.InetSocketAddress
inline fun Logger.trace(ctx : ChannelHandlerContext, messageBuilder : () -> String) {
log(this, ctx, { isTraceEnabled }, { trace(it) } , messageBuilder)
}
inline fun Logger.debug(ctx : ChannelHandlerContext, messageBuilder : () -> String) {
log(this, ctx, { isDebugEnabled }, { debug(it) } , messageBuilder)
}
inline fun Logger.info(ctx : ChannelHandlerContext, messageBuilder : () -> String) {
log(this, ctx, { isInfoEnabled }, { info(it) } , messageBuilder)
}
inline fun Logger.warn(ctx : ChannelHandlerContext, messageBuilder : () -> String) {
log(this, ctx, { isWarnEnabled }, { warn(it) } , messageBuilder)
}
inline fun Logger.error(ctx : ChannelHandlerContext, messageBuilder : () -> String) {
log(this, ctx, { isErrorEnabled }, { error(it) } , messageBuilder)
}
inline fun log(log : Logger, ctx : ChannelHandlerContext,
filter : Logger.() -> Boolean,
loggerMethod : Logger.(String) -> Unit, messageBuilder : () -> String) {
if(log.filter()) {
val clientAddress = (ctx.channel().remoteAddress() as InetSocketAddress).address.hostAddress
log.loggerMethod(clientAddress + " - " + messageBuilder())
}
}

View File

@@ -1,120 +0,0 @@
package net.woggioni.gbcs.server.cache
import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.common.GBCS.digestString
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.jwo.LockFile
import java.nio.channels.Channels
import java.nio.channels.FileChannel
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardCopyOption
import java.nio.file.StandardOpenOption
import java.nio.file.attribute.BasicFileAttributes
import java.security.MessageDigest
import java.time.Duration
import java.time.Instant
import java.util.concurrent.atomic.AtomicReference
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.Inflater
import java.util.zip.InflaterInputStream
class FileSystemCache(
val root: Path,
val maxAge: Duration,
val digestAlgorithm: String?,
val compressionEnabled: Boolean,
val compressionLevel: Int
) : Cache {
private val log = contextLogger()
init {
Files.createDirectories(root)
}
private var nextGc = AtomicReference(Instant.now().plus(maxAge))
override fun get(key: String) = (digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digestString(key.toByteArray(), md)
} ?: key).let { digest ->
root.resolve(digest).takeIf(Files::exists)
?.let { file ->
file.takeIf(Files::exists)?.let { file ->
if (compressionEnabled) {
val inflater = Inflater()
Channels.newChannel(
InflaterInputStream(
Channels.newInputStream(
FileChannel.open(
file,
StandardOpenOption.READ
)
), inflater
)
)
} else {
FileChannel.open(file, StandardOpenOption.READ)
}
}
}.also {
gc()
}
}
override fun put(key: String, content: ByteArray) {
(digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digestString(key.toByteArray(), md)
} ?: key).let { digest ->
val file = root.resolve(digest)
val tmpFile = Files.createTempFile(root, null, ".tmp")
try {
Files.newOutputStream(tmpFile).let {
if (compressionEnabled) {
val deflater = Deflater(compressionLevel)
DeflaterOutputStream(it, deflater)
} else {
it
}
}.use {
it.write(content)
}
Files.move(tmpFile, file, StandardCopyOption.ATOMIC_MOVE)
} catch (t: Throwable) {
Files.delete(tmpFile)
throw t
}
}.also {
gc()
}
}
private fun gc() {
val now = Instant.now()
val oldValue = nextGc.getAndSet(now.plus(maxAge))
if (oldValue < now) {
actualGc(now)
}
}
@Synchronized
private fun actualGc(now: Instant) {
Files.list(root).filter {
val creationTimeStamp = Files.readAttributes(it, BasicFileAttributes::class.java)
.creationTime()
.toInstant()
now > creationTimeStamp.plus(maxAge)
}.forEach { file ->
LockFile.acquire(file, false).use {
Files.delete(file)
}
}
}
override fun close() {}
}

View File

@@ -1,27 +0,0 @@
package net.woggioni.gbcs.server.cache
import net.woggioni.gbcs.api.Configuration
import net.woggioni.gbcs.common.GBCS
import net.woggioni.jwo.Application
import java.nio.file.Path
import java.time.Duration
data class FileSystemCacheConfiguration(
val root: Path?,
val maxAge: Duration,
val digestAlgorithm : String?,
val compressionEnabled: Boolean,
val compressionLevel: Int,
) : Configuration.Cache {
override fun materialize() = FileSystemCache(
root ?: Application.builder("gbcs").build().computeCacheDirectory(),
maxAge,
digestAlgorithm,
compressionEnabled,
compressionLevel
)
override fun getNamespaceURI() = GBCS.GBCS_NAMESPACE_URI
override fun getTypeName() = "fileSystemCacheType"
}

View File

@@ -1,92 +0,0 @@
package net.woggioni.gbcs.server.cache
import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.common.GBCS.digestString
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.nio.channels.Channels
import java.security.MessageDigest
import java.time.Duration
import java.time.Instant
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.PriorityBlockingQueue
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.Inflater
import java.util.zip.InflaterInputStream
class InMemoryCache(
val maxAge: Duration,
val digestAlgorithm: String?,
val compressionEnabled: Boolean,
val compressionLevel: Int
) : Cache {
private val map = ConcurrentHashMap<String, ByteArray>()
private class RemovalQueueElement(val key: String, val value : ByteArray, val expiry : Instant) : Comparable<RemovalQueueElement> {
override fun compareTo(other: RemovalQueueElement) = expiry.compareTo(other.expiry)
}
private val removalQueue = PriorityBlockingQueue<RemovalQueueElement>()
private var running = true
private val garbageCollector = Thread {
while(true) {
val el = removalQueue.take()
val now = Instant.now()
if(now > el.expiry) {
map.remove(el.key, el.value)
} else {
removalQueue.put(el)
Thread.sleep(minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1)))
}
}
}.apply {
start()
}
override fun close() {
running = false
garbageCollector.join()
}
override fun get(key: String) =
(digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digestString(key.toByteArray(), md)
} ?: key
).let { digest ->
map[digest]
?.let { value ->
if (compressionEnabled) {
val inflater = Inflater()
Channels.newChannel(InflaterInputStream(ByteArrayInputStream(value), inflater))
} else {
Channels.newChannel(ByteArrayInputStream(value))
}
}
}
override fun put(key: String, content: ByteArray) {
(digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digestString(key.toByteArray(), md)
} ?: key).let { digest ->
val value = if (compressionEnabled) {
val deflater = Deflater(compressionLevel)
val baos = ByteArrayOutputStream()
DeflaterOutputStream(baos, deflater).use { stream ->
stream.write(content)
}
baos.toByteArray()
} else {
content
}
map[digest] = value
removalQueue.put(RemovalQueueElement(digest, value, Instant.now().plus(maxAge)))
}
}
}

View File

@@ -1,23 +0,0 @@
package net.woggioni.gbcs.server.cache
import net.woggioni.gbcs.api.Configuration
import net.woggioni.gbcs.common.GBCS
import java.time.Duration
data class InMemoryCacheConfiguration(
val maxAge: Duration,
val digestAlgorithm : String?,
val compressionEnabled: Boolean,
val compressionLevel: Int,
) : Configuration.Cache {
override fun materialize() = InMemoryCache(
maxAge,
digestAlgorithm,
compressionEnabled,
compressionLevel
)
override fun getNamespaceURI() = GBCS.GBCS_NAMESPACE_URI
override fun getTypeName() = "inMemoryCacheType"
}

View File

@@ -1,167 +0,0 @@
package net.woggioni.gbcs.server.handler
import io.netty.buffer.Unpooled
import io.netty.channel.ChannelFutureListener
import io.netty.channel.ChannelHandler
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.DefaultFileRegion
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.http.DefaultFullHttpResponse
import io.netty.handler.codec.http.DefaultHttpResponse
import io.netty.handler.codec.http.FullHttpRequest
import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpHeaderValues
import io.netty.handler.codec.http.HttpMethod
import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpUtil
import io.netty.handler.codec.http.LastHttpContent
import io.netty.handler.stream.ChunkedNioStream
import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.api.exception.CacheException
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.server.debug
import net.woggioni.gbcs.server.warn
import java.nio.channels.FileChannel
import java.nio.file.Path
@ChannelHandler.Sharable
class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
SimpleChannelInboundHandler<FullHttpRequest>() {
private val log = contextLogger()
override fun channelRead0(ctx: ChannelHandlerContext, msg: FullHttpRequest) {
val keepAlive: Boolean = HttpUtil.isKeepAlive(msg)
val method = msg.method()
if (method === HttpMethod.GET) {
val path = Path.of(msg.uri())
val prefix = path.parent
val key = path.fileName?.toString() ?: let {
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response)
return
}
if (serverPrefix == prefix) {
try {
cache.get(key)
} catch(ex : Throwable) {
throw CacheException("Error accessing the cache backend", ex)
}?.let { channel ->
log.debug(ctx) {
"Cache hit for key '$key'"
}
val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK)
response.headers()[HttpHeaderNames.CONTENT_TYPE] = HttpHeaderValues.APPLICATION_OCTET_STREAM
if (!keepAlive) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.IDENTITY)
} else {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
}
ctx.write(response)
when (channel) {
is FileChannel -> {
if (keepAlive) {
ctx.write(DefaultFileRegion(channel, 0, channel.size()))
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate())
} else {
ctx.writeAndFlush(DefaultFileRegion(channel, 0, channel.size()))
.addListener(ChannelFutureListener.CLOSE)
}
}
else -> {
ctx.write(ChunkedNioStream(channel)).addListener { evt ->
channel.close()
}
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate())
}
}
} ?: let {
log.debug(ctx) {
"Cache miss for key '$key'"
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response)
}
} else {
log.warn(ctx) {
"Got request for unhandled path '${msg.uri()}'"
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.BAD_REQUEST)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response)
}
} else if (method === HttpMethod.PUT) {
val path = Path.of(msg.uri())
val prefix = path.parent
val key = path.fileName.toString()
if (serverPrefix == prefix) {
log.debug(ctx) {
"Added value for key '$key' to build cache"
}
val bodyBytes = msg.content().run {
if (isDirect) {
ByteArray(readableBytes()).also {
readBytes(it)
}
} else {
array()
}
}
try {
cache.put(key, bodyBytes)
} catch(ex : Throwable) {
throw CacheException("Error accessing the cache backend", ex)
}
val response = DefaultFullHttpResponse(
msg.protocolVersion(), HttpResponseStatus.CREATED,
Unpooled.copiedBuffer(key.toByteArray())
)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes()
ctx.writeAndFlush(response)
} else {
log.warn(ctx) {
"Got request for unhandled path '${msg.uri()}'"
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.BAD_REQUEST)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
ctx.writeAndFlush(response)
}
} else if(method == HttpMethod.TRACE) {
val replayedRequestHead = ctx.alloc().buffer()
replayedRequestHead.writeCharSequence("TRACE ${Path.of(msg.uri())} ${msg.protocolVersion().text()}\r\n", Charsets.US_ASCII)
msg.headers().forEach { (key, value) ->
replayedRequestHead.apply {
writeCharSequence(key, Charsets.US_ASCII)
writeCharSequence(": ", Charsets.US_ASCII)
writeCharSequence(value, Charsets.UTF_8)
writeCharSequence("\r\n", Charsets.US_ASCII)
}
}
replayedRequestHead.writeCharSequence("\r\n", Charsets.US_ASCII)
val requestBody = msg.content()
requestBody.retain()
val responseBody = ctx.alloc().compositeBuffer(2).apply {
addComponents(true, replayedRequestHead)
addComponents(true, requestBody)
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK, responseBody)
response.headers().apply {
set(HttpHeaderNames.CONTENT_TYPE, "message/http")
set(HttpHeaderNames.CONTENT_LENGTH, responseBody.readableBytes())
}
ctx.writeAndFlush(response)
} else {
log.warn(ctx) {
"Got request with unhandled method '${msg.method().name()}'"
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.METHOD_NOT_ALLOWED)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
ctx.writeAndFlush(response)
}
}
}

View File

@@ -1,99 +0,0 @@
package net.woggioni.gbcs.server.throttling
import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.handler.codec.http.DefaultFullHttpResponse
import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpVersion
import net.woggioni.gbcs.api.Configuration
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.server.GradleBuildCacheServer
import net.woggioni.jwo.Bucket
import net.woggioni.jwo.LongMath
import java.net.InetSocketAddress
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeUnit
@Sharable
class ThrottlingHandler(cfg: Configuration) :
ChannelInboundHandlerAdapter() {
private val log = contextLogger()
private val bucketManager = BucketManager.from(cfg)
private val connectionConfiguration = cfg.connection
/**
* If the suggested waiting time from the bucket is lower than this
* amount, then the server will simply wait by itself before sending a response
* instead of replying with 429
*/
private val waitThreshold = minOf(
connectionConfiguration.idleTimeout,
connectionConfiguration.readIdleTimeout,
connectionConfiguration.writeIdleTimeout
).dividedBy(2)
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
val buckets = mutableListOf<Bucket>()
val user = ctx.channel().attr(GradleBuildCacheServer.userAttribute).get()
if (user != null) {
bucketManager.getBucketByUser(user)?.let(buckets::add)
}
val groups = ctx.channel().attr(GradleBuildCacheServer.groupAttribute).get() ?: emptySet()
if (groups.isNotEmpty()) {
groups.forEach { group ->
bucketManager.getBucketByGroup(group)?.let(buckets::add)
}
}
if (user == null && groups.isEmpty()) {
bucketManager.getBucketByAddress(ctx.channel().remoteAddress() as InetSocketAddress)?.let(buckets::add)
}
if (buckets.isEmpty()) {
return super.channelRead(ctx, msg)
} else {
handleBuckets(buckets, ctx, msg, true)
}
}
private fun handleBuckets(buckets : List<Bucket>, ctx : ChannelHandlerContext, msg : Any, delayResponse : Boolean) {
var nextAttempt = -1L
for (bucket in buckets) {
val bucketNextAttempt = bucket.removeTokensWithEstimate(1)
if (bucketNextAttempt > nextAttempt) {
nextAttempt = bucketNextAttempt
}
}
if(nextAttempt < 0) {
super.channelRead(ctx, msg)
return
}
val waitDuration = Duration.of(LongMath.ceilDiv(nextAttempt, 100_000_000L) * 100L, ChronoUnit.MILLIS)
if (delayResponse && waitDuration < waitThreshold) {
ctx.executor().schedule({
handleBuckets(buckets, ctx, msg, false)
}, waitDuration.toMillis(), TimeUnit.MILLISECONDS)
} else {
sendThrottledResponse(ctx, waitDuration)
}
}
private fun sendThrottledResponse(ctx: ChannelHandlerContext, retryAfter: Duration) {
val response = DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
HttpResponseStatus.TOO_MANY_REQUESTS
)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
retryAfter.seconds.takeIf {
it > 0
}?.let {
response.headers()[HttpHeaderNames.RETRY_AFTER] = retryAfter.seconds
}
ctx.writeAndFlush(response)
}
}

View File

@@ -1,2 +0,0 @@
net.woggioni.gbcs.server.cache.FileSystemCacheProvider
net.woggioni.gbcs.server.cache.InMemoryCacheProvider

View File

@@ -1,21 +0,0 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<gbcs:server xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
xmlns:gbcs="urn:net.woggioni.gbcs.server"
xmlns:gbcs-memcached="urn:net.woggioni.gbcs.server.memcached"
xs:schemaLocation="urn:net.woggioni.gbcs.server.memcached jpms://net.woggioni.gbcs.server.memcached/net/woggioni/gbcs/server/memcached/schema/gbcs-memcached.xsd urn:net.woggioni.gbcs.server jpms://net.woggioni.gbcs.server/net/woggioni/gbcs/server/schema/gbcs.xsd">
<bind host="127.0.0.1" port="11443" incoming-connections-backlog-size="50"/>
<connection
write-timeout="PT25M"
read-timeout="PT20M"
read-idle-timeout="PT10M"
write-idle-timeout="PT11M"
idle-timeout="PT30M"
max-request-size="101325"/>
<event-executor use-virtual-threads="false"/>
<cache xs:type="gbcs-memcached:memcachedCacheType" max-age="P7D" max-size="101325" digest="SHA-256">
<server host="127.0.0.1" port="11211"/>
</cache>
<authentication>
<none/>
</authentication>
</gbcs:server>

View File

@@ -2,11 +2,10 @@ org.gradle.configuration-cache=false
org.gradle.parallel=true
org.gradle.caching=true
gbcs.version = 0.1.1
rbcs.version = 0.2.0
lys.version = 2025.01.31
lys.version = 2025.02.08
gitea.maven.url = https://gitea.woggioni.net/api/packages/woggioni/maven
docker.registry.url=gitea.woggioni.net
jpms-check.configurationName = runtimeClasspath

View File

@@ -5,6 +5,8 @@ plugins {
}
dependencies {
api catalog.netty.buffer
api catalog.netty.handler
}
publishing {

View File

@@ -0,0 +1,10 @@
module net.woggioni.rbcs.api {
requires static lombok;
requires java.xml;
requires io.netty.buffer;
requires io.netty.handler;
requires io.netty.transport;
exports net.woggioni.rbcs.api;
exports net.woggioni.rbcs.api.exception;
exports net.woggioni.rbcs.api.message;
}

View File

@@ -0,0 +1,7 @@
package net.woggioni.rbcs.api;
import io.netty.channel.ChannelHandler;
public interface CacheHandlerFactory extends AutoCloseable {
ChannelHandler newHandler();
}

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.api;
package net.woggioni.rbcs.api;
import org.w3c.dom.Document;
import org.w3c.dom.Element;

View File

@@ -0,0 +1,14 @@
package net.woggioni.rbcs.api;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import java.io.Serializable;
@Getter
@RequiredArgsConstructor
public class CacheValueMetadata implements Serializable {
private final String contentDisposition;
private final String mimeType;
}

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.api;
package net.woggioni.rbcs.api;
import lombok.EqualsAndHashCode;
@@ -56,7 +56,8 @@ public class Configuration {
@EqualsAndHashCode.Include
String name;
Set<Role> roles;
Quota quota;
Quota groupQuota;
Quota userQuota;
}
@Value
@@ -134,7 +135,7 @@ public class Configuration {
}
public interface Cache {
net.woggioni.gbcs.api.Cache materialize();
CacheHandlerFactory materialize();
String getNamespaceURI();
String getTypeName();
}

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.api;
package net.woggioni.rbcs.api;
public enum Role {
Reader, Writer

View File

@@ -1,6 +1,6 @@
package net.woggioni.gbcs.api.exception;
package net.woggioni.rbcs.api.exception;
public class CacheException extends GbcsException {
public class CacheException extends RbcsException {
public CacheException(String message, Throwable cause) {
super(message, cause);
}

View File

@@ -1,6 +1,6 @@
package net.woggioni.gbcs.api.exception;
package net.woggioni.rbcs.api.exception;
public class ConfigurationException extends GbcsException {
public class ConfigurationException extends RbcsException {
public ConfigurationException(String message, Throwable cause) {
super(message, cause);
}

View File

@@ -1,6 +1,6 @@
package net.woggioni.gbcs.api.exception;
package net.woggioni.rbcs.api.exception;
public class ContentTooLargeException extends GbcsException {
public class ContentTooLargeException extends RbcsException {
public ContentTooLargeException(String message, Throwable cause) {
super(message, cause);
}

View File

@@ -0,0 +1,7 @@
package net.woggioni.rbcs.api.exception;
public class RbcsException extends RuntimeException {
public RbcsException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@@ -0,0 +1,161 @@
package net.woggioni.rbcs.api.message;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import net.woggioni.rbcs.api.CacheValueMetadata;
public sealed interface CacheMessage {
@Getter
@RequiredArgsConstructor
final class CacheGetRequest implements CacheMessage {
private final String key;
}
abstract sealed class CacheGetResponse implements CacheMessage {
}
@Getter
@RequiredArgsConstructor
final class CacheValueFoundResponse extends CacheGetResponse {
private final String key;
private final CacheValueMetadata metadata;
}
final class CacheValueNotFoundResponse extends CacheGetResponse {
}
@Getter
@RequiredArgsConstructor
final class CachePutRequest implements CacheMessage {
private final String key;
private final CacheValueMetadata metadata;
}
@Getter
@RequiredArgsConstructor
final class CachePutResponse implements CacheMessage {
private final String key;
}
@RequiredArgsConstructor
non-sealed class CacheContent implements CacheMessage, ByteBufHolder {
protected final ByteBuf chunk;
@Override
public ByteBuf content() {
return chunk;
}
@Override
public CacheContent copy() {
return replace(chunk.copy());
}
@Override
public CacheContent duplicate() {
return new CacheContent(chunk.duplicate());
}
@Override
public CacheContent retainedDuplicate() {
return new CacheContent(chunk.retainedDuplicate());
}
@Override
public CacheContent replace(ByteBuf content) {
return new CacheContent(content);
}
@Override
public CacheContent retain() {
chunk.retain();
return this;
}
@Override
public CacheContent retain(int increment) {
chunk.retain(increment);
return this;
}
@Override
public CacheContent touch() {
chunk.touch();
return this;
}
@Override
public CacheContent touch(Object hint) {
chunk.touch(hint);
return this;
}
@Override
public int refCnt() {
return chunk.refCnt();
}
@Override
public boolean release() {
return chunk.release();
}
@Override
public boolean release(int decrement) {
return chunk.release(decrement);
}
}
final class LastCacheContent extends CacheContent {
public LastCacheContent(ByteBuf chunk) {
super(chunk);
}
@Override
public LastCacheContent copy() {
return replace(chunk.copy());
}
@Override
public LastCacheContent duplicate() {
return new LastCacheContent(chunk.duplicate());
}
@Override
public LastCacheContent retainedDuplicate() {
return new LastCacheContent(chunk.retainedDuplicate());
}
@Override
public LastCacheContent replace(ByteBuf content) {
return new LastCacheContent(chunk);
}
@Override
public LastCacheContent retain() {
super.retain();
return this;
}
@Override
public LastCacheContent retain(int increment) {
super.retain(increment);
return this;
}
@Override
public LastCacheContent touch() {
super.touch();
return this;
}
@Override
public LastCacheContent touch(Object hint) {
super.touch(hint);
return this;
}
}
}

View File

@@ -17,9 +17,9 @@ import net.woggioni.gradle.graalvm.JlinkPlugin
import net.woggioni.gradle.graalvm.JlinkTask
Property<String> mainModuleName = objects.property(String.class)
mainModuleName.set('net.woggioni.gbcs.cli')
mainModuleName.set('net.woggioni.rbcs.cli')
Property<String> mainClassName = objects.property(String.class)
mainClassName.set('net.woggioni.gbcs.cli.GradleBuildCacheServerCli')
mainClassName.set('net.woggioni.rbcs.cli.RemoteBuildCacheServerCli')
tasks.named(JavaPlugin.COMPILE_JAVA_TASK_NAME, JavaCompile) {
options.javaModuleMainClass = mainClassName
@@ -44,11 +44,10 @@ envelopeJar {
dependencies {
implementation catalog.jwo
implementation catalog.slf4j.api
implementation catalog.netty.codec.http
implementation catalog.picocli
implementation project(':gbcs-client')
implementation project(':gbcs-server')
implementation project(':rbcs-client')
implementation project(':rbcs-server')
// runtimeOnly catalog.slf4j.jdk14
runtimeOnly catalog.logback.classic
@@ -56,10 +55,10 @@ dependencies {
}
Provider<EnvelopeJarTask> envelopeJarTaskProvider = tasks.named('envelopeJar', EnvelopeJarTask.class) {
// systemProperties['java.util.logging.config.class'] = 'net.woggioni.gbcs.LoggingConfig'
// systemProperties['log.config.source'] = 'net/woggioni/gbcs/cli/logging.properties'
// systemProperties['java.util.logging.config.file'] = 'classpath:net/woggioni/gbcs/cli/logging.properties'
systemProperties['logback.configurationFile'] = 'classpath:net/woggioni/gbcs/cli/logback.xml'
// systemProperties['java.util.logging.config.class'] = 'net.woggioni.rbcs.LoggingConfig'
// systemProperties['log.config.source'] = 'net/woggioni/rbcs/cli/logging.properties'
// systemProperties['java.util.logging.config.file'] = 'classpath:net/woggioni/rbcs/cli/logging.properties'
systemProperties['logback.configurationFile'] = 'classpath:net/woggioni/rbcs/cli/logback.xml'
systemProperties['io.netty.leakDetectionLevel'] = 'DISABLED'
// systemProperties['org.slf4j.simpleLogger.showDateTime'] = 'true'
@@ -83,7 +82,7 @@ tasks.named(NativeImagePlugin.NATIVE_IMAGE_TASK_NAME, NativeImageTask) {
tasks.named(JlinkPlugin.JLINK_TASK_NAME, JlinkTask) {
mainClass = mainClassName
mainModule = 'net.woggioni.gbcs.cli'
mainModule = 'net.woggioni.rbcs.cli'
}
artifacts {

View File

@@ -0,0 +1,17 @@
module net.woggioni.rbcs.cli {
requires org.slf4j;
requires net.woggioni.rbcs.server;
requires info.picocli;
requires net.woggioni.rbcs.common;
requires net.woggioni.rbcs.client;
requires kotlin.stdlib;
requires net.woggioni.jwo;
requires net.woggioni.rbcs.api;
exports net.woggioni.rbcs.cli.impl.converters to info.picocli;
opens net.woggioni.rbcs.cli.impl.commands to info.picocli;
opens net.woggioni.rbcs.cli.impl to info.picocli;
opens net.woggioni.rbcs.cli to info.picocli, net.woggioni.rbcs.common;
exports net.woggioni.rbcs.cli;
}

View File

@@ -0,0 +1,69 @@
package net.woggioni.rbcs.cli
import net.woggioni.jwo.Application
import net.woggioni.rbcs.cli.impl.AbstractVersionProvider
import net.woggioni.rbcs.cli.impl.RbcsCommand
import net.woggioni.rbcs.cli.impl.commands.BenchmarkCommand
import net.woggioni.rbcs.cli.impl.commands.ClientCommand
import net.woggioni.rbcs.cli.impl.commands.GetCommand
import net.woggioni.rbcs.cli.impl.commands.HealthCheckCommand
import net.woggioni.rbcs.cli.impl.commands.PasswordHashCommand
import net.woggioni.rbcs.cli.impl.commands.PutCommand
import net.woggioni.rbcs.cli.impl.commands.ServerCommand
import net.woggioni.rbcs.common.RbcsUrlStreamHandlerFactory
import net.woggioni.rbcs.common.contextLogger
import picocli.CommandLine
import picocli.CommandLine.Model.CommandSpec
@CommandLine.Command(
name = "rbcs", versionProvider = RemoteBuildCacheServerCli.VersionProvider::class
)
class RemoteBuildCacheServerCli : RbcsCommand() {
class VersionProvider : AbstractVersionProvider()
companion object {
@JvmStatic
fun main(vararg args: String) {
val currentClassLoader = RemoteBuildCacheServerCli::class.java.classLoader
Thread.currentThread().contextClassLoader = currentClassLoader
if(currentClassLoader.javaClass.name == "net.woggioni.envelope.loader.ModuleClassLoader") {
//We're running in an envelope jar and custom URL protocols won't work
RbcsUrlStreamHandlerFactory.install()
}
val log = contextLogger()
val app = Application.builder("rbcs")
.configurationDirectoryEnvVar("RBCS_CONFIGURATION_DIR")
.configurationDirectoryPropertyKey("net.woggioni.rbcs.conf.dir")
.build()
val rbcsCli = RemoteBuildCacheServerCli()
val commandLine = CommandLine(rbcsCli)
commandLine.setExecutionExceptionHandler { ex, cl, parseResult ->
log.error(ex.message, ex)
CommandLine.ExitCode.SOFTWARE
}
commandLine.addSubcommand(ServerCommand(app))
commandLine.addSubcommand(PasswordHashCommand())
commandLine.addSubcommand(
CommandLine(ClientCommand(app)).apply {
addSubcommand(BenchmarkCommand())
addSubcommand(PutCommand())
addSubcommand(GetCommand())
addSubcommand(HealthCheckCommand())
})
System.exit(commandLine.execute(*args))
}
}
@CommandLine.Option(names = ["-V", "--version"], versionHelp = true)
var versionHelp = false
private set
@CommandLine.Spec
private lateinit var spec: CommandSpec
override fun run() {
spec.commandLine().usage(System.out);
}
}

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.cli.impl
package net.woggioni.rbcs.cli.impl
import picocli.CommandLine
import java.util.jar.Attributes

View File

@@ -1,11 +1,11 @@
package net.woggioni.gbcs.cli.impl
package net.woggioni.rbcs.cli.impl
import net.woggioni.jwo.Application
import picocli.CommandLine
import java.nio.file.Path
abstract class GbcsCommand : Runnable {
abstract class RbcsCommand : Runnable {
@CommandLine.Option(names = ["-h", "--help"], usageHelp = true)
var usageHelp = false

View File

@@ -0,0 +1,172 @@
package net.woggioni.rbcs.cli.impl.commands
import net.woggioni.jwo.JWO
import net.woggioni.jwo.LongMath
import net.woggioni.rbcs.api.CacheValueMetadata
import net.woggioni.rbcs.cli.impl.RbcsCommand
import net.woggioni.rbcs.cli.impl.converters.ByteSizeConverter
import net.woggioni.rbcs.client.RemoteBuildCacheClient
import net.woggioni.rbcs.common.createLogger
import net.woggioni.rbcs.common.debug
import net.woggioni.rbcs.common.error
import net.woggioni.rbcs.common.info
import picocli.CommandLine
import java.security.SecureRandom
import java.time.Duration
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.Semaphore
import java.util.concurrent.atomic.AtomicLong
import kotlin.random.Random
@CommandLine.Command(
name = "benchmark",
description = ["Run a load test against the server"],
showDefaultValues = true
)
class BenchmarkCommand : RbcsCommand() {
companion object{
private val log = createLogger<BenchmarkCommand>()
}
@CommandLine.Spec
private lateinit var spec: CommandLine.Model.CommandSpec
@CommandLine.Option(
names = ["-e", "--entries"],
description = ["Total number of elements to be added to the cache"],
paramLabel = "NUMBER_OF_ENTRIES"
)
private var numberOfEntries = 1000
@CommandLine.Option(
names = ["-s", "--size"],
description = ["Size of a cache value in bytes"],
paramLabel = "SIZE",
converter = [ByteSizeConverter::class]
)
private var size = 0x1000
@CommandLine.Option(
names = ["-r", "--random"],
description = ["Insert completely random byte values"]
)
private var randomValues = false
override fun run() {
val clientCommand = spec.parent().userObject() as ClientCommand
val profile = clientCommand.profileName.let { profileName ->
clientCommand.configuration.profiles[profileName]
?: throw IllegalArgumentException("Profile $profileName does not exist in configuration")
}
val progressThreshold = LongMath.ceilDiv(numberOfEntries.toLong(), 20)
RemoteBuildCacheClient(profile).use { client ->
val entryGenerator = sequence {
val random = Random(SecureRandom.getInstance("NativePRNGNonBlocking").nextLong())
while (true) {
val key = JWO.bytesToHex(random.nextBytes(16))
val value = if(randomValues) {
random.nextBytes(size)
} else {
val byteValue = random.nextInt().toByte()
ByteArray(size) {_ -> byteValue}
}
yield(key to value)
}
}
log.info {
"Starting insertion"
}
val entries = let {
val completionCounter = AtomicLong(0)
val completionQueue = LinkedBlockingQueue<Pair<String, ByteArray>>(numberOfEntries)
val start = Instant.now()
val semaphore = Semaphore(profile.maxConnections * 5)
val iterator = entryGenerator.take(numberOfEntries).iterator()
while (completionCounter.get() < numberOfEntries) {
if (iterator.hasNext()) {
val entry = iterator.next()
semaphore.acquire()
val future = client.put(entry.first, entry.second, CacheValueMetadata(null, null)).thenApply { entry }
future.whenComplete { result, ex ->
if (ex != null) {
log.error(ex.message, ex)
} else {
completionQueue.put(result)
}
semaphore.release()
val completed = completionCounter.incrementAndGet()
if(completed.mod(progressThreshold) == 0L) {
log.debug {
"Inserted $completed / $numberOfEntries"
}
}
}
} else {
Thread.sleep(Duration.of(500, ChronoUnit.MILLIS))
}
}
val inserted = completionQueue.toList()
val end = Instant.now()
log.info {
val elapsed = Duration.between(start, end).toMillis()
val opsPerSecond = String.format("%.2f", numberOfEntries.toDouble() / elapsed * 1000)
"Insertion rate: $opsPerSecond ops/s"
}
inserted
}
log.info {
"Inserted ${entries.size} entries"
}
log.info {
"Starting retrieval"
}
if (entries.isNotEmpty()) {
val completionCounter = AtomicLong(0)
val semaphore = Semaphore(profile.maxConnections * 5)
val start = Instant.now()
val it = entries.iterator()
while (completionCounter.get() < entries.size) {
if (it.hasNext()) {
val entry = it.next()
semaphore.acquire()
val future = client.get(entry.first).thenApply {
if (it == null) {
log.error {
"Missing entry for key '${entry.first}'"
}
} else if (!entry.second.contentEquals(it)) {
log.error {
"Retrieved a value different from what was inserted for key '${entry.first}'"
}
}
}
future.whenComplete { _, _ ->
val completed = completionCounter.incrementAndGet()
if(completed.mod(progressThreshold) == 0L) {
log.debug {
"Retrieved $completed / ${entries.size}"
}
}
semaphore.release()
}
} else {
Thread.sleep(Duration.of(500, ChronoUnit.MILLIS))
}
}
val end = Instant.now()
log.info {
val elapsed = Duration.between(start, end).toMillis()
val opsPerSecond = String.format("%.2f", entries.size.toDouble() / elapsed * 1000)
"Retrieval rate: $opsPerSecond ops/s"
}
} else {
log.error("Skipping retrieval benchmark as it was not possible to insert any entry in the cache")
}
}
}
}

View File

@@ -1,24 +1,24 @@
package net.woggioni.gbcs.cli.impl.commands
package net.woggioni.rbcs.cli.impl.commands
import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.client.GradleBuildCacheClient
import net.woggioni.jwo.Application
import net.woggioni.rbcs.cli.impl.RbcsCommand
import net.woggioni.rbcs.client.RemoteBuildCacheClient
import picocli.CommandLine
import java.nio.file.Path
@CommandLine.Command(
name = "client",
description = ["GBCS client"],
description = ["RBCS client"],
showDefaultValues = true
)
class ClientCommand(app : Application) : GbcsCommand() {
class ClientCommand(app : Application) : RbcsCommand() {
@CommandLine.Option(
names = ["-c", "--configuration"],
description = ["Path to the client configuration file"],
paramLabel = "CONFIGURATION_FILE"
)
private var configurationFile : Path = findConfigurationFile(app, "gbcs-client.xml")
private var configurationFile : Path = findConfigurationFile(app, "rbcs-client.xml")
@CommandLine.Option(
names = ["-p", "--profile"],
@@ -28,8 +28,8 @@ class ClientCommand(app : Application) : GbcsCommand() {
)
var profileName : String? = null
val configuration : GradleBuildCacheClient.Configuration by lazy {
GradleBuildCacheClient.Configuration.parse(configurationFile)
val configuration : RemoteBuildCacheClient.Configuration by lazy {
RemoteBuildCacheClient.Configuration.parse(configurationFile)
}
override fun run() {

View File

@@ -1,8 +1,8 @@
package net.woggioni.gbcs.cli.impl.commands
package net.woggioni.rbcs.cli.impl.commands
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.client.GradleBuildCacheClient
import net.woggioni.rbcs.cli.impl.RbcsCommand
import net.woggioni.rbcs.client.RemoteBuildCacheClient
import net.woggioni.rbcs.common.createLogger
import picocli.CommandLine
import java.nio.file.Files
import java.nio.file.Path
@@ -12,8 +12,10 @@ import java.nio.file.Path
description = ["Fetch a value from the cache with the specified key"],
showDefaultValues = true
)
class GetCommand : GbcsCommand() {
private val log = contextLogger()
class GetCommand : RbcsCommand() {
companion object{
private val log = createLogger<GetCommand>()
}
@CommandLine.Spec
private lateinit var spec: CommandLine.Model.CommandSpec
@@ -38,7 +40,7 @@ class GetCommand : GbcsCommand() {
clientCommand.configuration.profiles[profileName]
?: throw IllegalArgumentException("Profile $profileName does not exist in configuration")
}
GradleBuildCacheClient(profile).use { client ->
RemoteBuildCacheClient(profile).use { client ->
client.get(key).thenApply { value ->
value?.let {
(output?.let(Files::newOutputStream) ?: System.out).use {

View File

@@ -0,0 +1,48 @@
package net.woggioni.rbcs.cli.impl.commands
import net.woggioni.rbcs.cli.impl.RbcsCommand
import net.woggioni.rbcs.client.RemoteBuildCacheClient
import net.woggioni.rbcs.common.createLogger
import picocli.CommandLine
import java.security.SecureRandom
import kotlin.random.Random
@CommandLine.Command(
name = "health",
description = ["Check server health"],
showDefaultValues = true
)
class HealthCheckCommand : RbcsCommand() {
companion object{
private val log = createLogger<HealthCheckCommand>()
}
@CommandLine.Spec
private lateinit var spec: CommandLine.Model.CommandSpec
override fun run() {
val clientCommand = spec.parent().userObject() as ClientCommand
val profile = clientCommand.profileName.let { profileName ->
clientCommand.configuration.profiles[profileName]
?: throw IllegalArgumentException("Profile $profileName does not exist in configuration")
}
RemoteBuildCacheClient(profile).use { client ->
val random = Random(SecureRandom.getInstance("NativePRNGNonBlocking").nextLong())
val nonce = ByteArray(0xa0)
random.nextBytes(nonce)
client.healthCheck(nonce).thenApply { value ->
if(value == null) {
throw IllegalStateException("Empty response from server")
}
val offset = value.size - nonce.size
for(i in 0 until nonce.size) {
val a = nonce[i]
val b = value[offset + i]
if(a != b) {
throw IllegalStateException("Server nonce does not match")
}
}
}.get()
}
}
}

View File

@@ -1,9 +1,9 @@
package net.woggioni.gbcs.cli.impl.commands
package net.woggioni.rbcs.cli.impl.commands
import net.woggioni.gbcs.common.PasswordSecurity.hashPassword
import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.cli.impl.converters.OutputStreamConverter
import net.woggioni.jwo.UncloseableOutputStream
import net.woggioni.rbcs.cli.impl.RbcsCommand
import net.woggioni.rbcs.cli.impl.converters.OutputStreamConverter
import net.woggioni.rbcs.common.PasswordSecurity.hashPassword
import picocli.CommandLine
import java.io.OutputStream
import java.io.OutputStreamWriter
@@ -12,10 +12,10 @@ import java.io.PrintWriter
@CommandLine.Command(
name = "password",
description = ["Generate a password hash to add to GBCS configuration file"],
description = ["Generate a password hash to add to RBCS configuration file"],
showDefaultValues = true
)
class PasswordHashCommand : GbcsCommand() {
class PasswordHashCommand : RbcsCommand() {
@CommandLine.Option(
names = ["-o", "--output-file"],
description = ["Write the output to a file instead of stdout"],

View File

@@ -0,0 +1,101 @@
package net.woggioni.rbcs.cli.impl.commands
import net.woggioni.jwo.Hash
import net.woggioni.jwo.JWO
import net.woggioni.jwo.NullOutputStream
import net.woggioni.rbcs.api.CacheValueMetadata
import net.woggioni.rbcs.cli.impl.RbcsCommand
import net.woggioni.rbcs.client.RemoteBuildCacheClient
import net.woggioni.rbcs.common.createLogger
import picocli.CommandLine
import java.io.InputStream
import java.nio.file.Files
import java.nio.file.Path
import java.util.UUID
@CommandLine.Command(
name = "put",
description = ["Add or replace a value to the cache with the specified key"],
showDefaultValues = true
)
class PutCommand : RbcsCommand() {
companion object{
private val log = createLogger<PutCommand>()
}
@CommandLine.Spec
private lateinit var spec: CommandLine.Model.CommandSpec
@CommandLine.Option(
names = ["-k", "--key"],
description = ["The key for the new value, randomly generated if omitted"],
paramLabel = "KEY"
)
private var key : String? = null
@CommandLine.Option(
names = ["-i", "--inline"],
description = ["File is to be displayed in the browser"],
paramLabel = "INLINE",
)
private var inline : Boolean = false
@CommandLine.Option(
names = ["-t", "--type"],
description = ["File mime type"],
paramLabel = "MIME_TYPE",
)
private var mimeType : String? = null
@CommandLine.Option(
names = ["-v", "--value"],
description = ["Path to a file containing the value to be added (defaults to stdin)"],
paramLabel = "VALUE_FILE",
)
private var value : Path? = null
override fun run() {
val clientCommand = spec.parent().userObject() as ClientCommand
val profile = clientCommand.profileName.let { profileName ->
clientCommand.configuration.profiles[profileName]
?: throw IllegalArgumentException("Profile $profileName does not exist in configuration")
}
RemoteBuildCacheClient(profile).use { client ->
val inputStream : InputStream
val mimeType : String?
val contentDisposition : String?
val valuePath = value
val actualKey : String?
if(valuePath != null) {
inputStream = Files.newInputStream(valuePath)
mimeType = this.mimeType ?: Files.probeContentType(valuePath)
contentDisposition = if(inline) {
"inline"
} else {
"attachment; filename=\"${valuePath.fileName}\""
}
actualKey = key ?: let {
val md = Hash.Algorithm.SHA512.newInputStream(Files.newInputStream(valuePath)).use {
JWO.copy(it, NullOutputStream())
it.messageDigest
}
UUID.nameUUIDFromBytes(md.digest()).toString()
}
} else {
inputStream = System.`in`
mimeType = this.mimeType
contentDisposition = if(inline) {
"inline"
} else {
null
}
actualKey = key ?: UUID.randomUUID().toString()
}
inputStream.use {
client.put(actualKey, it.readAllBytes(), CacheValueMetadata(contentDisposition, mimeType))
}.get()
println(profile.serverURI.resolve(actualKey))
}
}
}

View File

@@ -0,0 +1,87 @@
package net.woggioni.rbcs.cli.impl.commands
import net.woggioni.jwo.Application
import net.woggioni.jwo.JWO
import net.woggioni.rbcs.cli.impl.RbcsCommand
import net.woggioni.rbcs.cli.impl.converters.DurationConverter
import net.woggioni.rbcs.common.createLogger
import net.woggioni.rbcs.common.debug
import net.woggioni.rbcs.common.info
import net.woggioni.rbcs.server.RemoteBuildCacheServer
import net.woggioni.rbcs.server.RemoteBuildCacheServer.Companion.DEFAULT_CONFIGURATION_URL
import picocli.CommandLine
import java.io.ByteArrayOutputStream
import java.nio.file.Files
import java.nio.file.Path
import java.time.Duration
import java.util.concurrent.TimeUnit
@CommandLine.Command(
name = "server",
description = ["RBCS server"],
showDefaultValues = true
)
class ServerCommand(app : Application) : RbcsCommand() {
companion object {
private val log = createLogger<ServerCommand>()
}
private fun createDefaultConfigurationFile(configurationFile: Path) {
log.info {
"Creating default configuration file at '$configurationFile'"
}
val defaultConfigurationFileResource = DEFAULT_CONFIGURATION_URL
Files.newOutputStream(configurationFile).use { outputStream ->
defaultConfigurationFileResource.openStream().use { inputStream ->
JWO.copy(inputStream, outputStream)
}
}
}
@CommandLine.Option(
names = ["-t", "--timeout"],
description = ["Exit after the specified time"],
paramLabel = "TIMEOUT",
converter = [DurationConverter::class]
)
private var timeout: Duration? = null
@CommandLine.Option(
names = ["-c", "--config-file"],
description = ["Read the application configuration from this file"],
paramLabel = "CONFIG_FILE"
)
private var configurationFile: Path = findConfigurationFile(app, "rbcs-server.xml")
override fun run() {
if (!Files.exists(configurationFile)) {
Files.createDirectories(configurationFile.parent)
createDefaultConfigurationFile(configurationFile)
}
val configuration = RemoteBuildCacheServer.loadConfiguration(configurationFile)
log.debug {
ByteArrayOutputStream().also {
RemoteBuildCacheServer.dumpConfiguration(configuration, it)
}.let {
"Server configuration:\n${String(it.toByteArray())}"
}
}
val server = RemoteBuildCacheServer(configuration)
val handle = server.run()
val shutdownHook = Thread.ofPlatform().unstarted {
handle.sendShutdownSignal()
try {
handle.get(60, TimeUnit.SECONDS)
} catch (ex : Throwable) {
log.warn(ex.message, ex)
}
}
Runtime.getRuntime().addShutdownHook(shutdownHook)
if(timeout != null) {
Thread.sleep(timeout)
handle.sendShutdownSignal()
}
handle.get()
}
}

View File

@@ -0,0 +1,10 @@
package net.woggioni.rbcs.cli.impl.converters
import picocli.CommandLine
class ByteSizeConverter : CommandLine.ITypeConverter<Int> {
override fun convert(value: String): Int {
return Integer.decode(value)
}
}

View File

@@ -0,0 +1,11 @@
package net.woggioni.rbcs.cli.impl.converters
import picocli.CommandLine
import java.time.Duration
class DurationConverter : CommandLine.ITypeConverter<Duration> {
override fun convert(value: String): Duration {
return Duration.parse(value)
}
}

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.cli.impl.converters
package net.woggioni.rbcs.cli.impl.converters
import picocli.CommandLine
import java.io.InputStream

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.cli.impl.converters
package net.woggioni.rbcs.cli.impl.converters
import picocli.CommandLine
import java.io.OutputStream

View File

@@ -15,6 +15,4 @@
<root level="info">
<appender-ref ref="console"/>
</root>
<logger name="com.google.code.yanf4j" level="warn"/>
<logger name="net.rubyeye.xmemcached" level="warn"/>
</configuration>

View File

@@ -4,11 +4,13 @@ plugins {
}
dependencies {
implementation project(':gbcs-api')
implementation project(':gbcs-common')
implementation catalog.picocli
implementation project(':rbcs-api')
implementation project(':rbcs-common')
implementation catalog.slf4j.api
implementation catalog.netty.buffer
implementation catalog.netty.handler
implementation catalog.netty.transport
implementation catalog.netty.common
implementation catalog.netty.codec.http
testRuntimeOnly catalog.logback.classic

View File

@@ -1,4 +1,4 @@
module net.woggioni.gbcs.client {
module net.woggioni.rbcs.client {
requires io.netty.handler;
requires io.netty.codec.http;
requires io.netty.transport;
@@ -6,12 +6,12 @@ module net.woggioni.gbcs.client {
requires io.netty.common;
requires io.netty.buffer;
requires java.xml;
requires net.woggioni.gbcs.common;
requires net.woggioni.gbcs.api;
requires net.woggioni.rbcs.common;
requires net.woggioni.rbcs.api;
requires io.netty.codec;
requires org.slf4j;
exports net.woggioni.gbcs.client;
exports net.woggioni.rbcs.client;
opens net.woggioni.gbcs.client.schema;
opens net.woggioni.rbcs.client.schema;
}

View File

@@ -1,10 +1,12 @@
package net.woggioni.gbcs.client
package net.woggioni.rbcs.client
import io.netty.bootstrap.Bootstrap
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import io.netty.channel.Channel
import io.netty.channel.ChannelHandler
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.channel.ChannelOption
import io.netty.channel.ChannelPipeline
import io.netty.channel.SimpleChannelInboundHandler
@@ -28,13 +30,18 @@ import io.netty.handler.codec.http.HttpVersion
import io.netty.handler.ssl.SslContext
import io.netty.handler.ssl.SslContextBuilder
import io.netty.handler.stream.ChunkedWriteHandler
import io.netty.handler.timeout.IdleState
import io.netty.handler.timeout.IdleStateEvent
import io.netty.handler.timeout.IdleStateHandler
import io.netty.util.concurrent.Future
import io.netty.util.concurrent.GenericFutureListener
import net.woggioni.gbcs.client.impl.Parser
import net.woggioni.gbcs.common.Xml
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.common.debug
import net.woggioni.gbcs.common.trace
import net.woggioni.rbcs.api.CacheValueMetadata
import net.woggioni.rbcs.client.impl.Parser
import net.woggioni.rbcs.common.Xml
import net.woggioni.rbcs.common.createLogger
import net.woggioni.rbcs.common.debug
import net.woggioni.rbcs.common.trace
import java.io.IOException
import java.net.InetSocketAddress
import java.net.URI
import java.nio.file.Files
@@ -44,14 +51,19 @@ import java.security.cert.X509Certificate
import java.time.Duration
import java.util.Base64
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger
import kotlin.random.Random
import io.netty.util.concurrent.Future as NettyFuture
class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoCloseable {
companion object{
private val log = createLogger<RemoteBuildCacheClient>()
}
class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoCloseable {
private val group: NioEventLoopGroup
private var sslContext: SslContext
private val log = contextLogger()
private val pool: ChannelPool
data class Configuration(
@@ -72,11 +84,21 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
val exp: Double
)
class Connection(
val readTimeout: Duration,
val writeTimeout: Duration,
val idleTimeout: Duration,
val readIdleTimeout: Duration,
val writeIdleTimeout: Duration
)
data class Profile(
val serverURI: URI,
val connection: Connection?,
val authentication: Authentication?,
val connectionTimeout: Duration?,
val maxConnections: Int,
val compressionEnabled: Boolean,
val retryPolicy: RetryPolicy?,
)
@@ -141,18 +163,50 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
}
override fun channelCreated(ch: Channel) {
val connectionId = connectionCount.getAndIncrement()
val connectionId = connectionCount.incrementAndGet()
log.debug {
"Created connection $connectionId, total number of active connections: $connectionId"
"Created connection ${ch.id().asShortText()}, total number of active connections: $connectionId"
}
ch.closeFuture().addListener {
val activeConnections = connectionCount.decrementAndGet()
log.debug {
"Closed connection $connectionId, total number of active connections: $activeConnections"
"Closed connection ${
ch.id().asShortText()
}, total number of active connections: $activeConnections"
}
}
val pipeline: ChannelPipeline = ch.pipeline()
profile.connection?.also { conn ->
val readTimeout = conn.readTimeout.toMillis()
val writeTimeout = conn.writeTimeout.toMillis()
if (readTimeout > 0 || writeTimeout > 0) {
pipeline.addLast(
IdleStateHandler(
false,
readTimeout,
writeTimeout,
0,
TimeUnit.MILLISECONDS
)
)
}
val readIdleTimeout = conn.readIdleTimeout.toMillis()
val writeIdleTimeout = conn.writeIdleTimeout.toMillis()
val idleTimeout = conn.idleTimeout.toMillis()
if (readIdleTimeout > 0 || writeIdleTimeout > 0 || idleTimeout > 0) {
pipeline.addLast(
IdleStateHandler(
true,
readIdleTimeout,
writeIdleTimeout,
idleTimeout,
TimeUnit.MILLISECONDS
)
)
}
}
// Add SSL handler if needed
if ("https".equals(scheme, ignoreCase = true)) {
pipeline.addLast("ssl", sslContext.newHandler(ch.alloc(), host, port))
@@ -160,7 +214,9 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
// HTTP handlers
pipeline.addLast("codec", HttpClientCodec())
pipeline.addLast("decompressor", HttpContentDecompressor())
if(profile.compressionEnabled) {
pipeline.addLast("decompressor", HttpContentDecompressor())
}
pipeline.addLast("aggregator", HttpObjectAggregator(134217728))
pipeline.addLast("chunked", ChunkedWriteHandler())
}
@@ -206,6 +262,7 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
retryPolicy.initialDelayMillis.toDouble(),
retryPolicy.exp,
outcomeHandler,
Random.Default,
operation
)
} else {
@@ -213,6 +270,25 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
}
}
fun healthCheck(nonce: ByteArray): CompletableFuture<ByteArray?> {
return executeWithRetry {
sendRequest(profile.serverURI, HttpMethod.TRACE, nonce)
}.thenApply {
val status = it.status()
if (it.status() != HttpResponseStatus.OK) {
throw HttpException(status)
} else {
it.content()
}
}.thenApply { maybeByteBuf ->
maybeByteBuf?.let {
val result = ByteArray(it.readableBytes())
it.getBytes(0, result)
result
}
}
}
fun get(key: String): CompletableFuture<ByteArray?> {
return executeWithRetry {
sendRequest(profile.serverURI.resolve(key), HttpMethod.GET, null)
@@ -234,9 +310,13 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
}
}
fun put(key: String, content: ByteArray): CompletableFuture<Unit> {
fun put(key: String, content: ByteArray, metadata: CacheValueMetadata): CompletableFuture<Unit> {
return executeWithRetry {
sendRequest(profile.serverURI.resolve(key), HttpMethod.PUT, content)
val extraHeaders = sequenceOf(
metadata.mimeType?.let { HttpHeaderNames.CONTENT_TYPE to it },
metadata.contentDisposition?.let { HttpHeaderNames.CONTENT_DISPOSITION to it }
).filterNotNull()
sendRequest(profile.serverURI.resolve(key), HttpMethod.PUT, content, extraHeaders.asIterable())
}.thenApply {
val status = it.status()
if (it.status() != HttpResponseStatus.CREATED && it.status() != HttpResponseStatus.OK) {
@@ -245,35 +325,83 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
}
}
private fun sendRequest(uri: URI, method: HttpMethod, body: ByteArray?): CompletableFuture<FullHttpResponse> {
private fun sendRequest(
uri: URI,
method: HttpMethod,
body: ByteArray?,
extraHeaders: Iterable<Pair<CharSequence, CharSequence>>? = null
): CompletableFuture<FullHttpResponse> {
val responseFuture = CompletableFuture<FullHttpResponse>()
// Custom handler for processing responses
pool.acquire().addListener(object : GenericFutureListener<NettyFuture<Channel>> {
private val handlers = mutableListOf<ChannelHandler>()
fun cleanup(channel: Channel, pipeline: ChannelPipeline) {
handlers.forEach(pipeline::remove)
pool.release(channel)
}
override fun operationComplete(channelFuture: Future<Channel>) {
if (channelFuture.isSuccess) {
val channel = channelFuture.now
val pipeline = channel.pipeline()
channel.pipeline().addLast("handler", object : SimpleChannelInboundHandler<FullHttpResponse>() {
val timeoutHandler = object : ChannelInboundHandlerAdapter() {
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
if (evt is IdleStateEvent) {
val te = when (evt.state()) {
IdleState.READER_IDLE -> TimeoutException(
"Read timeout",
)
IdleState.WRITER_IDLE -> TimeoutException("Write timeout")
IdleState.ALL_IDLE -> TimeoutException("Idle timeout")
null -> throw IllegalStateException("This should never happen")
}
responseFuture.completeExceptionally(te)
ctx.close()
}
}
}
val closeListener = GenericFutureListener<Future<Void>> {
responseFuture.completeExceptionally(IOException("The remote server closed the connection"))
pool.release(channel)
}
val responseHandler = object : SimpleChannelInboundHandler<FullHttpResponse>() {
override fun channelRead0(
ctx: ChannelHandlerContext,
response: FullHttpResponse
) {
pipeline.removeLast()
pool.release(channel)
channel.closeFuture().removeListener(closeListener)
cleanup(channel, pipeline)
responseFuture.complete(response)
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
ctx.newPromise()
val ex = when (cause) {
is DecoderException -> cause.cause
else -> cause
}
responseFuture.completeExceptionally(ex)
ctx.close()
pipeline.removeLast()
pool.release(channel)
}
})
override fun channelInactive(ctx: ChannelHandlerContext) {
pool.release(channel)
responseFuture.completeExceptionally(IOException("The remote server closed the connection"))
super.channelInactive(ctx)
}
}
for (handler in arrayOf(timeoutHandler, responseHandler)) {
handlers.add(handler)
}
pipeline.addLast(timeoutHandler, responseHandler)
channel.closeFuture().addListener(closeListener)
// Prepare the HTTP request
val request: FullHttpRequest = let {
val content: ByteBuf? = body?.takeIf(ByteArray::isNotEmpty)?.let(Unpooled::wrappedBuffer)
@@ -285,15 +413,19 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
).apply {
headers().apply {
if (content != null) {
set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_OCTET_STREAM)
set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes())
}
set(HttpHeaderNames.HOST, profile.serverURI.host)
set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
set(
HttpHeaderNames.ACCEPT_ENCODING,
HttpHeaderValues.GZIP.toString() + "," + HttpHeaderValues.DEFLATE.toString()
)
if(profile.compressionEnabled) {
set(
HttpHeaderNames.ACCEPT_ENCODING,
HttpHeaderValues.GZIP.toString() + "," + HttpHeaderValues.DEFLATE.toString()
)
}
extraHeaders?.forEach { (k, v) ->
add(k, v)
}
// Add basic auth if configured
(profile.authentication as? Configuration.Authentication.BasicAuthenticationCredentials)?.let { credentials ->
val auth = "${credentials.username}:${credentials.password}"

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.client
package net.woggioni.rbcs.client
import io.netty.handler.codec.http.HttpResponseStatus

View File

@@ -1,9 +1,9 @@
package net.woggioni.gbcs.client.impl
package net.woggioni.rbcs.client.impl
import net.woggioni.gbcs.api.exception.ConfigurationException
import net.woggioni.gbcs.common.Xml.Companion.asIterable
import net.woggioni.gbcs.common.Xml.Companion.renderAttribute
import net.woggioni.gbcs.client.GradleBuildCacheClient
import net.woggioni.rbcs.api.exception.ConfigurationException
import net.woggioni.rbcs.client.RemoteBuildCacheClient
import net.woggioni.rbcs.common.Xml.Companion.asIterable
import net.woggioni.rbcs.common.Xml.Companion.renderAttribute
import org.w3c.dom.Document
import java.net.URI
import java.nio.file.Files
@@ -12,12 +12,13 @@ import java.security.KeyStore
import java.security.PrivateKey
import java.security.cert.X509Certificate
import java.time.Duration
import java.time.temporal.ChronoUnit
object Parser {
fun parse(document: Document): GradleBuildCacheClient.Configuration {
fun parse(document: Document): RemoteBuildCacheClient.Configuration {
val root = document.documentElement
val profiles = mutableMapOf<String, GradleBuildCacheClient.Configuration.Profile>()
val profiles = mutableMapOf<String, RemoteBuildCacheClient.Configuration.Profile>()
for (child in root.asIterable()) {
val tagName = child.localName
@@ -27,8 +28,9 @@ object Parser {
child.renderAttribute("name") ?: throw ConfigurationException("name attribute is required")
val uri = child.renderAttribute("base-url")?.let(::URI)
?: throw ConfigurationException("base-url attribute is required")
var authentication: GradleBuildCacheClient.Configuration.Authentication? = null
var retryPolicy: GradleBuildCacheClient.Configuration.RetryPolicy? = null
var authentication: RemoteBuildCacheClient.Configuration.Authentication? = null
var retryPolicy: RemoteBuildCacheClient.Configuration.RetryPolicy? = null
var connection : RemoteBuildCacheClient.Configuration.Connection? = null
for (gchild in child.asIterable()) {
when (gchild.localName) {
"tls-client-auth" -> {
@@ -49,7 +51,7 @@ object Parser {
.toList()
.toTypedArray()
authentication =
GradleBuildCacheClient.Configuration.Authentication.TlsClientAuthenticationCredentials(
RemoteBuildCacheClient.Configuration.Authentication.TlsClientAuthenticationCredentials(
key,
certChain
)
@@ -61,7 +63,7 @@ object Parser {
val password = gchild.renderAttribute("password")
?: throw ConfigurationException("password attribute is required")
authentication =
GradleBuildCacheClient.Configuration.Authentication.BasicAuthenticationCredentials(
RemoteBuildCacheClient.Configuration.Authentication.BasicAuthenticationCredentials(
username,
password
)
@@ -80,12 +82,32 @@ object Parser {
gchild.renderAttribute("exp")
?.let(String::toDouble)
?: 2.0f
retryPolicy = GradleBuildCacheClient.Configuration.RetryPolicy(
retryPolicy = RemoteBuildCacheClient.Configuration.RetryPolicy(
maxAttempts,
initialDelay.toMillis(),
exp.toDouble()
)
}
"connection" -> {
val writeTimeout = gchild.renderAttribute("write-timeout")
?.let(Duration::parse) ?: Duration.of(0, ChronoUnit.SECONDS)
val readTimeout = gchild.renderAttribute("read-timeout")
?.let(Duration::parse) ?: Duration.of(0, ChronoUnit.SECONDS)
val idleTimeout = gchild.renderAttribute("idle-timeout")
?.let(Duration::parse) ?: Duration.of(30, ChronoUnit.SECONDS)
val readIdleTimeout = gchild.renderAttribute("read-idle-timeout")
?.let(Duration::parse) ?: Duration.of(60, ChronoUnit.SECONDS)
val writeIdleTimeout = gchild.renderAttribute("write-idle-timeout")
?.let(Duration::parse) ?: Duration.of(60, ChronoUnit.SECONDS)
connection = RemoteBuildCacheClient.Configuration.Connection(
readTimeout,
writeTimeout,
idleTimeout,
readIdleTimeout,
writeIdleTimeout,
)
}
}
}
val maxConnections = child.renderAttribute("max-connections")
@@ -93,16 +115,22 @@ object Parser {
?: 50
val connectionTimeout = child.renderAttribute("connection-timeout")
?.let(Duration::parse)
profiles[name] = GradleBuildCacheClient.Configuration.Profile(
val compressionEnabled = child.renderAttribute("enable-compression")
?.let(String::toBoolean)
?: true
profiles[name] = RemoteBuildCacheClient.Configuration.Profile(
uri,
connection,
authentication,
connectionTimeout,
maxConnections,
compressionEnabled,
retryPolicy
)
}
}
}
return GradleBuildCacheClient.Configuration(profiles)
return RemoteBuildCacheClient.Configuration(profiles)
}
}

View File

@@ -1,8 +1,10 @@
package net.woggioni.gbcs.client
package net.woggioni.rbcs.client
import io.netty.util.concurrent.EventExecutorGroup
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit
import kotlin.math.pow
import kotlin.random.Random
sealed class OperationOutcome<T> {
class Success<T>(val result: T) : OperationOutcome<T>()
@@ -24,8 +26,10 @@ fun <T> executeWithRetry(
initialDelay: Double,
exp: Double,
outcomeHandler: OutcomeHandler<T>,
randomizer : Random?,
cb: () -> CompletableFuture<T>
): CompletableFuture<T> {
val finalResult = cb()
var future = finalResult
var shortCircuit = false
@@ -46,7 +50,7 @@ fun <T> executeWithRetry(
is OutcomeHandlerResult.Retry -> {
val res = CompletableFuture<T>()
val delay = run {
val scheduledDelay = (initialDelay * Math.pow(exp, i.toDouble())).toLong()
val scheduledDelay = (initialDelay * exp.pow(i.toDouble()) * (1.0 + (randomizer?.nextDouble(-0.5, 0.5) ?: 0.0))).toLong()
outcomeHandlerResult.suggestedDelayMillis?.coerceAtMost(scheduledDelay) ?: scheduledDelay
}
eventExecutorGroup.schedule({

View File

@@ -1,30 +1,40 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<xs:schema targetNamespace="urn:net.woggioni.gbcs.client"
<xs:schema targetNamespace="urn:net.woggioni.rbcs.client"
xmlns:xs="http://www.w3.org/2001/XMLSchema"
xmlns:gbcs-client="urn:net.woggioni.gbcs.client"
xmlns:rbcs-client="urn:net.woggioni.rbcs.client"
elementFormDefault="unqualified"
>
<xs:element name="profiles" type="gbcs-client:profilesType"/>
<xs:element name="profiles" type="rbcs-client:profilesType"/>
<xs:complexType name="profilesType">
<xs:sequence minOccurs="0">
<xs:element name="profile" type="gbcs-client:profileType" maxOccurs="unbounded"/>
<xs:element name="profile" type="rbcs-client:profileType" maxOccurs="unbounded"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="profileType">
<xs:sequence>
<xs:choice>
<xs:element name="no-auth" type="gbcs-client:noAuthType"/>
<xs:element name="basic-auth" type="gbcs-client:basicAuthType"/>
<xs:element name="tls-client-auth" type="gbcs-client:tlsClientAuthType"/>
<xs:element name="no-auth" type="rbcs-client:noAuthType"/>
<xs:element name="basic-auth" type="rbcs-client:basicAuthType"/>
<xs:element name="tls-client-auth" type="rbcs-client:tlsClientAuthType"/>
</xs:choice>
<xs:element name="retry-policy" type="gbcs-client:retryType" minOccurs="0"/>
<xs:element name="connection" type="rbcs-client:connectionType" minOccurs="0" />
<xs:element name="retry-policy" type="rbcs-client:retryType" minOccurs="0"/>
</xs:sequence>
<xs:attribute name="name" type="xs:token" use="required"/>
<xs:attribute name="base-url" type="xs:anyURI" use="required"/>
<xs:attribute name="max-connections" type="xs:positiveInteger" default="50"/>
<xs:attribute name="connection-timeout" type="xs:duration"/>
<xs:attribute name="enable-compression" type="xs:boolean" default="true"/>
</xs:complexType>
<xs:complexType name="connectionType">
<xs:attribute name="read-timeout" type="xs:duration" use="optional" default="PT0S"/>
<xs:attribute name="write-timeout" type="xs:duration" use="optional" default="PT0S"/>
<xs:attribute name="idle-timeout" type="xs:duration" use="optional" default="PT30S"/>
<xs:attribute name="read-idle-timeout" type="xs:duration" use="optional" default="PT60S"/>
<xs:attribute name="write-idle-timeout" type="xs:duration" use="optional" default="PT60S"/>
</xs:complexType>
<xs:complexType name="noAuthType"/>

View File

@@ -1,10 +1,9 @@
package net.woggioni.gbcs.client
package net.woggioni.rbcs.client
import io.netty.util.concurrent.DefaultEventExecutorGroup
import io.netty.util.concurrent.EventExecutorGroup
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.rbcs.common.contextLogger
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtensionContext
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments
@@ -90,7 +89,7 @@ class RetryTest {
val random = Random(testArgs.seed)
val future =
executeWithRetry(executor, testArgs.maxAttempt, testArgs.initialDelay, testArgs.exp, outcomeHandler) {
executeWithRetry(executor, testArgs.maxAttempt, testArgs.initialDelay, testArgs.exp, outcomeHandler, null) {
val now = System.nanoTime()
val result = CompletableFuture<Int>()
executor.submit {
@@ -130,7 +129,7 @@ class RetryTest {
previousAttempt.first + testArgs.initialDelay * Math.pow(testArgs.exp, index.toDouble()) * 1e6
val actualTimestamp = timestamp
val err = Math.abs(expectedTimestamp - actualTimestamp) / expectedTimestamp
Assertions.assertTrue(err < 1e-3)
Assertions.assertTrue(err < 1e-2)
}
if (index == attempts.size - 1 && index < testArgs.maxAttempt - 1) {
/*

View File

@@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<rbcs-client:profiles xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rbcs-client="urn:net.woggioni.rbcs.client"
xs:schemaLocation="urn:net.woggioni.rbcs.client jms://net.woggioni.rbcs.client/net/woggioni/rbcs/client/schema/rbcs-client.xsd"
>
<profile name="profile1" base-url="https://rbcs1.example.com/">
<tls-client-auth
key-store-file="keystore.pfx"
key-store-password="password"
key-alias="woggioni@c962475fa38"
key-password="key-password"/>
</profile>
<profile name="profile2" base-url="https://rbcs2.example.com/">
<basic-auth user="user" password="password"/>
</profile>
</rbcs-client:profiles>

View File

@@ -6,9 +6,10 @@ plugins {
}
dependencies {
implementation project(':gbcs-api')
implementation project(':rbcs-api')
implementation catalog.slf4j.api
implementation catalog.jwo
implementation catalog.netty.buffer
}
publishing {

View File

@@ -1,10 +1,12 @@
module net.woggioni.gbcs.common {
module net.woggioni.rbcs.common {
requires java.xml;
requires java.logging;
requires org.slf4j;
requires kotlin.stdlib;
requires net.woggioni.jwo;
requires io.netty.buffer;
requires io.netty.transport;
provides java.net.spi.URLStreamHandlerProvider with net.woggioni.gbcs.common.GbcsUrlStreamHandlerFactory;
exports net.woggioni.gbcs.common;
provides java.net.spi.URLStreamHandlerProvider with net.woggioni.rbcs.common.RbcsUrlStreamHandlerFactory;
exports net.woggioni.rbcs.common;
}

View File

@@ -0,0 +1,15 @@
package net.woggioni.rbcs.common
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.CompositeByteBuf
fun extractChunk(buf: CompositeByteBuf, alloc: ByteBufAllocator): ByteBuf {
val chunk = alloc.compositeBuffer()
for (component in buf.decompose(0, buf.readableBytes())) {
chunk.addComponent(true, component.retain())
}
buf.removeComponents(0, buf.numComponents())
buf.clear()
return chunk
}

View File

@@ -0,0 +1,25 @@
package net.woggioni.rbcs.common
import io.netty.buffer.ByteBuf
import java.io.InputStream
class ByteBufInputStream(private val buf : ByteBuf) : InputStream() {
override fun read(): Int {
return buf.takeIf {
it.readableBytes() > 0
}?.let(ByteBuf::readByte)
?.let(Byte::toInt) ?: -1
}
override fun read(b: ByteArray, off: Int, len: Int): Int {
val readableBytes = buf.readableBytes()
if(readableBytes == 0) return -1
val result = len.coerceAtMost(readableBytes)
buf.readBytes(b, off, result)
return result
}
override fun close() {
buf.release()
}
}

View File

@@ -0,0 +1,18 @@
package net.woggioni.rbcs.common
import io.netty.buffer.ByteBuf
import java.io.OutputStream
class ByteBufOutputStream(private val buf : ByteBuf) : OutputStream() {
override fun write(b: Int) {
buf.writeByte(b)
}
override fun write(b: ByteArray, off: Int, len: Int) {
buf.writeBytes(b, off, len)
}
override fun close() {
buf.release()
}
}

View File

@@ -0,0 +1,7 @@
package net.woggioni.rbcs.common
class ResourceNotFoundException(msg : String? = null, cause: Throwable? = null) : RuntimeException(msg, cause) {
}
class ModuleNotFoundException(msg : String? = null, cause: Throwable? = null) : RuntimeException(msg, cause) {
}

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.common
package net.woggioni.rbcs.common
data class HostAndPort(val host: String, val port: Int = 0) {

View File

@@ -0,0 +1,194 @@
package net.woggioni.rbcs.common
import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.slf4j.MDC
import org.slf4j.event.Level
import org.slf4j.spi.LoggingEventBuilder
import java.nio.file.Files
import java.nio.file.Path
import java.util.logging.LogManager
inline fun <reified T> T.contextLogger() = LoggerFactory.getLogger(T::class.java)
inline fun <reified T> createLogger() = LoggerFactory.getLogger(T::class.java)
inline fun Logger.traceParam(messageBuilder: () -> Pair<String, Array<Any>>) {
if (isTraceEnabled) {
val (format, params) = messageBuilder()
trace(format, params)
}
}
inline fun Logger.debugParam(messageBuilder: () -> Pair<String, Array<Any>>) {
if (isDebugEnabled) {
val (format, params) = messageBuilder()
info(format, params)
}
}
inline fun Logger.infoParam(messageBuilder: () -> Pair<String, Array<Any>>) {
if (isInfoEnabled) {
val (format, params) = messageBuilder()
info(format, params)
}
}
inline fun Logger.warnParam(messageBuilder: () -> Pair<String, Array<Any>>) {
if (isWarnEnabled) {
val (format, params) = messageBuilder()
warn(format, params)
}
}
inline fun Logger.errorParam(messageBuilder: () -> Pair<String, Array<Any>>) {
if (isErrorEnabled) {
val (format, params) = messageBuilder()
error(format, params)
}
}
inline fun log(
log: Logger,
filter: Logger.() -> Boolean,
loggerMethod: Logger.(String) -> Unit, messageBuilder: () -> String
) {
if (log.filter()) {
log.loggerMethod(messageBuilder())
}
}
fun withMDC(params: Array<Pair<String, String>>, cb: () -> Unit) {
object : AutoCloseable {
override fun close() {
for ((key, _) in params) MDC.remove(key)
}
}.use {
for ((key, value) in params) MDC.put(key, value)
cb()
}
}
inline fun Logger.log(level: Level, channel: Channel, crossinline messageBuilder: (LoggingEventBuilder) -> Unit ) {
if (isEnabledForLevel(level)) {
val params = arrayOf<Pair<String, String>>(
"channel-id-short" to channel.id().asShortText(),
"channel-id-long" to channel.id().asLongText(),
"remote-address" to channel.remoteAddress().toString(),
"local-address" to channel.localAddress().toString(),
)
withMDC(params) {
val builder = makeLoggingEventBuilder(level)
// for ((key, value) in params) {
// builder.addKeyValue(key, value)
// }
messageBuilder(builder)
builder.log()
}
}
}
inline fun Logger.log(level: Level, channel: Channel, crossinline messageBuilder: () -> String) {
log(level, channel) { builder ->
builder.setMessage(messageBuilder())
}
}
inline fun Logger.trace(ch: Channel, crossinline messageBuilder: () -> String) {
log(Level.TRACE, ch, messageBuilder)
}
inline fun Logger.debug(ch: Channel, crossinline messageBuilder: () -> String) {
log(Level.DEBUG, ch, messageBuilder)
}
inline fun Logger.info(ch: Channel, crossinline messageBuilder: () -> String) {
log(Level.INFO, ch, messageBuilder)
}
inline fun Logger.warn(ch: Channel, crossinline messageBuilder: () -> String) {
log(Level.WARN, ch, messageBuilder)
}
inline fun Logger.error(ch: Channel, crossinline messageBuilder: () -> String) {
log(Level.ERROR, ch, messageBuilder)
}
inline fun Logger.trace(ctx: ChannelHandlerContext, crossinline messageBuilder: () -> String) {
log(Level.TRACE, ctx.channel(), messageBuilder)
}
inline fun Logger.debug(ctx: ChannelHandlerContext, crossinline messageBuilder: () -> String) {
log(Level.DEBUG, ctx.channel(), messageBuilder)
}
inline fun Logger.info(ctx: ChannelHandlerContext, crossinline messageBuilder: () -> String) {
log(Level.INFO, ctx.channel(), messageBuilder)
}
inline fun Logger.warn(ctx: ChannelHandlerContext, crossinline messageBuilder: () -> String) {
log(Level.WARN, ctx.channel(), messageBuilder)
}
inline fun Logger.error(ctx: ChannelHandlerContext, crossinline messageBuilder: () -> String) {
log(Level.ERROR, ctx.channel(), messageBuilder)
}
inline fun Logger.log(level: Level, messageBuilder: () -> String) {
if (isEnabledForLevel(level)) {
makeLoggingEventBuilder(level).log(messageBuilder())
}
}
inline fun Logger.trace(messageBuilder: () -> String) {
if (isTraceEnabled) {
trace(messageBuilder())
}
}
inline fun Logger.debug(messageBuilder: () -> String) {
if (isDebugEnabled) {
debug(messageBuilder())
}
}
inline fun Logger.info(messageBuilder: () -> String) {
if (isInfoEnabled) {
info(messageBuilder())
}
}
inline fun Logger.warn(messageBuilder: () -> String) {
if (isWarnEnabled) {
warn(messageBuilder())
}
}
inline fun Logger.error(messageBuilder: () -> String) {
if (isErrorEnabled) {
error(messageBuilder())
}
}
class LoggingConfig {
init {
val logManager = LogManager.getLogManager()
System.getProperty("log.config.source")?.let withSource@{ source ->
val urls = LoggingConfig::class.java.classLoader.getResources(source)
while (urls.hasMoreElements()) {
val url = urls.nextElement()
url.openStream().use { inputStream ->
logManager.readConfiguration(inputStream)
return@withSource
}
}
Path.of(source).takeIf(Files::exists)
?.let(Files::newInputStream)
?.use(logManager::readConfiguration)
}
}
}

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.common
package net.woggioni.rbcs.common
import java.security.SecureRandom
import java.security.spec.KeySpec

View File

@@ -0,0 +1,61 @@
package net.woggioni.rbcs.common
import net.woggioni.jwo.JWO
import java.net.URI
import java.net.URL
import java.security.MessageDigest
object RBCS {
fun String.toUrl() : URL = URL.of(URI(this), null)
const val RBCS_NAMESPACE_URI: String = "urn:net.woggioni.rbcs.server"
const val RBCS_PREFIX: String = "rbcs"
const val XML_SCHEMA_NAMESPACE_URI = "http://www.w3.org/2001/XMLSchema-instance"
fun ByteArray.toInt(index : Int = 0) : Long {
if(index + 4 > size) throw IllegalArgumentException("Not enough bytes to decode a 32 bits integer")
var value : Long = 0
for (b in index until index + 4) {
value = (value shl 8) + (get(b).toInt() and 0xFF)
}
return value
}
fun ByteArray.toLong(index : Int = 0) : Long {
if(index + 8 > size) throw IllegalArgumentException("Not enough bytes to decode a 64 bits long integer")
var value : Long = 0
for (b in index until index + 8) {
value = (value shl 8) + (get(b).toInt() and 0xFF)
}
return value
}
fun digest(
data: ByteArray,
md: MessageDigest = MessageDigest.getInstance("MD5")
): ByteArray {
md.update(data)
return md.digest()
}
fun digestString(
data: ByteArray,
md: MessageDigest = MessageDigest.getInstance("MD5")
): String {
return JWO.bytesToHex(digest(data, md))
}
fun processCacheKey(key: String, digestAlgorithm: String?) = digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digest(key.toByteArray(), md)
} ?: key.toByteArray(Charsets.UTF_8)
fun Long.toIntOrNull(): Int? {
return if (this >= Int.MIN_VALUE && this <= Int.MAX_VALUE) {
toInt()
} else {
null
}
}
}

View File

@@ -1,20 +1,18 @@
package net.woggioni.gbcs.common
package net.woggioni.rbcs.common
import java.io.IOException
import java.io.InputStream
import java.net.URL
import java.net.URLConnection
import java.net.URLStreamHandler
import java.net.URLStreamHandlerFactory
import java.net.spi.URLStreamHandlerProvider
import java.util.Optional
import java.util.concurrent.atomic.AtomicBoolean
import java.util.stream.Collectors
class GbcsUrlStreamHandlerFactory : URLStreamHandlerProvider() {
class RbcsUrlStreamHandlerFactory : URLStreamHandlerProvider() {
private class ClasspathHandler(private val classLoader: ClassLoader = GbcsUrlStreamHandlerFactory::class.java.classLoader) :
private class ClasspathHandler(private val classLoader: ClassLoader = RbcsUrlStreamHandlerFactory::class.java.classLoader) :
URLStreamHandler() {
override fun openConnection(u: URL): URLConnection? {
@@ -37,13 +35,17 @@ class GbcsUrlStreamHandlerFactory : URLStreamHandlerProvider() {
private class JpmsHandler : URLStreamHandler() {
override fun openConnection(u: URL): URLConnection {
val moduleName = u.host
val thisModule = javaClass.module
val sourceModule = Optional.ofNullable(thisModule)
.map { obj: Module -> obj.layer }
.flatMap { layer: ModuleLayer ->
val moduleName = u.host
layer.findModule(moduleName)
}.orElse(thisModule)
val sourceModule =
thisModule
?.let(Module::getLayer)
?.let { layer: ModuleLayer ->
layer.findModule(moduleName).orElse(null)
} ?: if(thisModule.layer == null) {
thisModule
} else throw ModuleNotFoundException("Module '$moduleName' not found")
return JpmsResourceURLConnection(u, sourceModule)
}
}
@@ -54,7 +56,9 @@ class GbcsUrlStreamHandlerFactory : URLStreamHandlerProvider() {
@Throws(IOException::class)
override fun getInputStream(): InputStream {
return module.getResourceAsStream(getURL().path)
val resource = getURL().path
return module.getResourceAsStream(resource)
?: throw ResourceNotFoundException("Resource '$resource' not found in module '${module.name}'")
}
}
@@ -83,12 +87,12 @@ class GbcsUrlStreamHandlerFactory : URLStreamHandlerProvider() {
private val installed = AtomicBoolean(false)
fun install() {
if (!installed.getAndSet(true)) {
URL.setURLStreamHandlerFactory(GbcsUrlStreamHandlerFactory())
URL.setURLStreamHandlerFactory(RbcsUrlStreamHandlerFactory())
}
}
private val packageMap: Map<String, List<Module>> by lazy {
GbcsUrlStreamHandlerFactory::class.java.module.layer
RbcsUrlStreamHandlerFactory::class.java.module.layer
.modules()
.stream()
.flatMap { m: Module ->

View File

@@ -1,7 +1,6 @@
package net.woggioni.gbcs.common
package net.woggioni.rbcs.common
import net.woggioni.jwo.JWO
import org.slf4j.LoggerFactory
import org.slf4j.event.Level
import org.w3c.dom.Document
import org.w3c.dom.Element
@@ -79,7 +78,7 @@ class Xml(val doc: Document, val element: Element) {
class ErrorHandler(private val fileURL: URL) : ErrHandler {
companion object {
private val log = LoggerFactory.getLogger(ErrorHandler::class.java)
private val log = createLogger<ErrorHandler>()
}
override fun warning(ex: SAXParseException)= err(ex, Level.WARN)

View File

@@ -0,0 +1 @@
net.woggioni.rbcs.common.RbcsUrlStreamHandlerFactory

View File

@@ -6,10 +6,10 @@ plugins {
configurations {
bundle {
extendsFrom runtimeClasspath
canBeResolved = true
canBeConsumed = false
visible = false
transitive = false
resolutionStrategy {
dependencies {
@@ -29,10 +29,21 @@ configurations {
}
dependencies {
compileOnly project(':gbcs-common')
compileOnly project(':gbcs-api')
compileOnly catalog.jwo
implementation catalog.xmemcached
implementation project(':rbcs-common')
implementation project(':rbcs-api')
implementation catalog.jwo
implementation catalog.slf4j.api
implementation catalog.netty.common
implementation catalog.netty.handler
implementation catalog.netty.codec.memcache
bundle catalog.netty.codec.memcache
testRuntimeOnly catalog.logback.classic
}
tasks.named(JavaPlugin.TEST_TASK_NAME, Test) {
systemProperty("io.netty.leakDetectionLevel", "PARANOID")
}
Provider<Tar> bundleTask = tasks.register("bundle", Tar) {

View File

@@ -0,0 +1,20 @@
import net.woggioni.rbcs.api.CacheProvider;
module net.woggioni.rbcs.server.memcache {
requires net.woggioni.rbcs.common;
requires net.woggioni.rbcs.api;
requires net.woggioni.jwo;
requires java.xml;
requires kotlin.stdlib;
requires io.netty.transport;
requires io.netty.codec;
requires io.netty.codec.memcache;
requires io.netty.common;
requires io.netty.buffer;
requires io.netty.handler;
requires org.slf4j;
provides CacheProvider with net.woggioni.rbcs.server.memcache.MemcacheCacheProvider;
opens net.woggioni.rbcs.server.memcache.schema;
}

View File

@@ -0,0 +1,4 @@
package net.woggioni.rbcs.server.memcache
class MemcacheException(status : Short, msg : String? = null, cause : Throwable? = null)
: RuntimeException(msg ?: "Memcached status $status", cause)

View File

@@ -0,0 +1,45 @@
package net.woggioni.rbcs.server.memcache
import net.woggioni.rbcs.api.CacheHandlerFactory
import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.common.HostAndPort
import net.woggioni.rbcs.server.memcache.client.MemcacheClient
import java.time.Duration
data class MemcacheCacheConfiguration(
val servers: List<Server>,
val maxAge: Duration = Duration.ofDays(1),
val digestAlgorithm: String? = null,
val compressionMode: CompressionMode? = null,
val compressionLevel: Int,
val chunkSize : Int
) : Configuration.Cache {
enum class CompressionMode {
/**
* Deflate mode
*/
DEFLATE
}
data class Server(
val endpoint : HostAndPort,
val connectionTimeoutMillis : Int?,
val maxConnections : Int
)
override fun materialize() = object : CacheHandlerFactory {
private val client = MemcacheClient(this@MemcacheCacheConfiguration.servers, chunkSize)
override fun close() {
client.close()
}
override fun newHandler() = MemcacheCacheHandler(client, digestAlgorithm, compressionMode != null, compressionLevel, chunkSize, maxAge)
}
override fun getNamespaceURI() = "urn:net.woggioni.rbcs.server.memcache"
override fun getTypeName() = "memcacheCacheType"
}

View File

@@ -0,0 +1,409 @@
package net.woggioni.rbcs.server.memcache
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.CompositeByteBuf
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.memcache.DefaultLastMemcacheContent
import io.netty.handler.codec.memcache.DefaultMemcacheContent
import io.netty.handler.codec.memcache.LastMemcacheContent
import io.netty.handler.codec.memcache.MemcacheContent
import io.netty.handler.codec.memcache.binary.BinaryMemcacheOpcodes
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus
import io.netty.handler.codec.memcache.binary.DefaultBinaryMemcacheRequest
import net.woggioni.rbcs.api.CacheValueMetadata
import net.woggioni.rbcs.api.exception.ContentTooLargeException
import net.woggioni.rbcs.api.message.CacheMessage
import net.woggioni.rbcs.api.message.CacheMessage.CacheContent
import net.woggioni.rbcs.api.message.CacheMessage.CacheGetRequest
import net.woggioni.rbcs.api.message.CacheMessage.CachePutRequest
import net.woggioni.rbcs.api.message.CacheMessage.CachePutResponse
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueFoundResponse
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse
import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent
import net.woggioni.rbcs.common.ByteBufInputStream
import net.woggioni.rbcs.common.ByteBufOutputStream
import net.woggioni.rbcs.common.RBCS.processCacheKey
import net.woggioni.rbcs.common.RBCS.toIntOrNull
import net.woggioni.rbcs.common.createLogger
import net.woggioni.rbcs.common.debug
import net.woggioni.rbcs.common.extractChunk
import net.woggioni.rbcs.common.trace
import net.woggioni.rbcs.server.memcache.client.MemcacheClient
import net.woggioni.rbcs.server.memcache.client.MemcacheRequestController
import net.woggioni.rbcs.server.memcache.client.MemcacheResponseHandler
import java.io.ByteArrayOutputStream
import java.io.ObjectInputStream
import java.io.ObjectOutputStream
import java.nio.ByteBuffer
import java.nio.channels.Channels
import java.nio.channels.FileChannel
import java.nio.channels.ReadableByteChannel
import java.nio.file.Files
import java.nio.file.StandardOpenOption
import java.time.Duration
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.InflaterOutputStream
import io.netty.channel.Channel as NettyChannel
class MemcacheCacheHandler(
private val client: MemcacheClient,
private val digestAlgorithm: String?,
private val compressionEnabled: Boolean,
private val compressionLevel: Int,
private val chunkSize: Int,
private val maxAge: Duration
) : SimpleChannelInboundHandler<CacheMessage>() {
companion object {
private val log = createLogger<MemcacheCacheHandler>()
private fun encodeExpiry(expiry: Duration): Int {
val expirySeconds = expiry.toSeconds()
return expirySeconds.toInt().takeIf { it.toLong() == expirySeconds }
?: Instant.ofEpochSecond(expirySeconds).epochSecond.toInt()
}
}
private inner class InProgressGetRequest(
private val key: String,
private val ctx: ChannelHandlerContext
) {
private val acc = ctx.alloc().compositeBuffer()
private val chunk = ctx.alloc().compositeBuffer()
private val outputStream = ByteBufOutputStream(chunk).let {
if (compressionEnabled) {
InflaterOutputStream(it)
} else {
it
}
}
private var responseSent = false
private var metadataSize: Int? = null
fun write(buf: ByteBuf) {
acc.addComponent(true, buf.retain())
if (metadataSize == null && acc.readableBytes() >= Int.SIZE_BYTES) {
metadataSize = acc.readInt()
}
metadataSize
?.takeIf { !responseSent }
?.takeIf { acc.readableBytes() >= it }
?.let { mSize ->
val metadata = ObjectInputStream(ByteBufInputStream(acc)).use {
acc.retain()
it.readObject() as CacheValueMetadata
}
ctx.writeAndFlush(CacheValueFoundResponse(key, metadata))
responseSent = true
acc.readerIndex(Int.SIZE_BYTES + mSize)
}
if (responseSent) {
acc.readBytes(outputStream, acc.readableBytes())
if(acc.readableBytes() >= chunkSize) {
flush(false)
}
}
}
private fun flush(last : Boolean) {
val toSend = extractChunk(chunk, ctx.alloc())
val msg = if(last) {
log.trace(ctx) {
"Sending last chunk to client on channel ${ctx.channel().id().asShortText()}"
}
LastCacheContent(toSend)
} else {
log.trace(ctx) {
"Sending chunk to client on channel ${ctx.channel().id().asShortText()}"
}
CacheContent(toSend)
}
ctx.writeAndFlush(msg)
}
fun commit() {
acc.release()
chunk.retain()
outputStream.close()
flush(true)
chunk.release()
}
fun rollback() {
acc.release()
outputStream.close()
}
}
private inner class InProgressPutRequest(
private val ch : NettyChannel,
metadata : CacheValueMetadata,
val digest : ByteBuf,
val requestController: CompletableFuture<MemcacheRequestController>,
private val alloc: ByteBufAllocator
) {
private var totalSize = 0
private var tmpFile : FileChannel? = null
private val accumulator = alloc.compositeBuffer()
private val stream = ByteBufOutputStream(accumulator).let {
if (compressionEnabled) {
DeflaterOutputStream(it, Deflater(compressionLevel))
} else {
it
}
}
init {
ByteArrayOutputStream().let { baos ->
ObjectOutputStream(baos).use {
it.writeObject(metadata)
}
val serializedBytes = baos.toByteArray()
accumulator.writeInt(serializedBytes.size)
accumulator.writeBytes(serializedBytes)
}
}
fun write(buf: ByteBuf) {
totalSize += buf.readableBytes()
buf.readBytes(stream, buf.readableBytes())
tmpFile?.let {
flushToDisk(it, accumulator)
}
if(accumulator.readableBytes() > 0x100000) {
log.debug(ch) {
"Entry is too big, buffering it into a file"
}
val opts = arrayOf(
StandardOpenOption.DELETE_ON_CLOSE,
StandardOpenOption.READ,
StandardOpenOption.WRITE,
StandardOpenOption.TRUNCATE_EXISTING
)
FileChannel.open(Files.createTempFile("rbcs-memcache", ".tmp"), *opts).let { fc ->
tmpFile = fc
flushToDisk(fc, accumulator)
}
}
}
private fun flushToDisk(fc : FileChannel, buf : CompositeByteBuf) {
val chunk = extractChunk(buf, alloc)
fc.write(chunk.nioBuffer())
chunk.release()
}
fun commit() : Pair<Int, ReadableByteChannel> {
digest.release()
accumulator.retain()
stream.close()
val fileChannel = tmpFile
return if(fileChannel != null) {
flushToDisk(fileChannel, accumulator)
accumulator.release()
fileChannel.position(0)
val fileSize = fileChannel.size().toIntOrNull() ?: let {
fileChannel.close()
throw ContentTooLargeException("Request body is too large", null)
}
fileSize to fileChannel
} else {
accumulator.readableBytes() to Channels.newChannel(ByteBufInputStream(accumulator))
}
}
fun rollback() {
stream.close()
digest.release()
tmpFile?.close()
}
}
private var inProgressPutRequest: InProgressPutRequest? = null
private var inProgressGetRequest: InProgressGetRequest? = null
override fun channelRead0(ctx: ChannelHandlerContext, msg: CacheMessage) {
when (msg) {
is CacheGetRequest -> handleGetRequest(ctx, msg)
is CachePutRequest -> handlePutRequest(ctx, msg)
is LastCacheContent -> handleLastCacheContent(ctx, msg)
is CacheContent -> handleCacheContent(ctx, msg)
else -> ctx.fireChannelRead(msg)
}
}
private fun handleGetRequest(ctx: ChannelHandlerContext, msg: CacheGetRequest) {
log.debug(ctx) {
"Fetching ${msg.key} from memcache"
}
val key = ctx.alloc().buffer().also {
it.writeBytes(processCacheKey(msg.key, digestAlgorithm))
}
val responseHandler = object : MemcacheResponseHandler {
override fun responseReceived(response: BinaryMemcacheResponse) {
val status = response.status()
when (status) {
BinaryMemcacheResponseStatus.SUCCESS -> {
log.debug(ctx) {
"Cache hit for key ${msg.key} on memcache"
}
inProgressGetRequest = InProgressGetRequest(msg.key, ctx)
}
BinaryMemcacheResponseStatus.KEY_ENOENT -> {
log.debug(ctx) {
"Cache miss for key ${msg.key} on memcache"
}
ctx.writeAndFlush(CacheValueNotFoundResponse())
}
}
}
override fun contentReceived(content: MemcacheContent) {
log.trace(ctx) {
"${if(content is LastMemcacheContent) "Last chunk" else "Chunk"} of ${content.content().readableBytes()} bytes received from memcache for key ${msg.key}"
}
inProgressGetRequest?.write(content.content())
if (content is LastMemcacheContent) {
inProgressGetRequest?.commit()
}
}
override fun exceptionCaught(ex: Throwable) {
inProgressGetRequest?.let {
inProgressGetRequest = null
it.rollback()
}
this@MemcacheCacheHandler.exceptionCaught(ctx, ex)
}
}
client.sendRequest(key.retainedDuplicate(), responseHandler).thenAccept { requestHandle ->
log.trace(ctx) {
"Sending GET request for key ${msg.key} to memcache"
}
val request = DefaultBinaryMemcacheRequest(key).apply {
setOpcode(BinaryMemcacheOpcodes.GET)
}
requestHandle.sendRequest(request)
}
}
private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) {
val key = ctx.alloc().buffer().also {
it.writeBytes(processCacheKey(msg.key, digestAlgorithm))
}
val responseHandler = object : MemcacheResponseHandler {
override fun responseReceived(response: BinaryMemcacheResponse) {
val status = response.status()
when (status) {
BinaryMemcacheResponseStatus.SUCCESS -> {
log.debug(ctx) {
"Inserted key ${msg.key} into memcache"
}
ctx.writeAndFlush(CachePutResponse(msg.key))
}
else -> this@MemcacheCacheHandler.exceptionCaught(ctx, MemcacheException(status))
}
}
override fun contentReceived(content: MemcacheContent) {}
override fun exceptionCaught(ex: Throwable) {
this@MemcacheCacheHandler.exceptionCaught(ctx, ex)
}
}
val requestController = client.sendRequest(key.retainedDuplicate(), responseHandler).whenComplete { _, ex ->
ex?.let {
this@MemcacheCacheHandler.exceptionCaught(ctx, ex)
}
}
inProgressPutRequest = InProgressPutRequest(ctx.channel(), msg.metadata, key, requestController, ctx.alloc())
}
private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) {
inProgressPutRequest?.let { request ->
log.trace(ctx) {
"Received chunk of ${msg.content().readableBytes()} bytes for memcache"
}
request.write(msg.content())
}
}
private fun handleLastCacheContent(ctx: ChannelHandlerContext, msg: LastCacheContent) {
inProgressPutRequest?.let { request ->
inProgressPutRequest = null
log.trace(ctx) {
"Received last chunk of ${msg.content().readableBytes()} bytes for memcache"
}
request.write(msg.content())
val key = request.digest.retainedDuplicate()
val (payloadSize, payloadSource) = request.commit()
val extras = ctx.alloc().buffer(8, 8)
extras.writeInt(0)
extras.writeInt(encodeExpiry(maxAge))
val totalBodyLength = request.digest.readableBytes() + extras.readableBytes() + payloadSize
request.requestController.whenComplete { requestController, ex ->
if(ex == null) {
log.trace(ctx) {
"Sending SET request to memcache"
}
requestController.sendRequest(DefaultBinaryMemcacheRequest().apply {
setOpcode(BinaryMemcacheOpcodes.SET)
setKey(key)
setExtras(extras)
setTotalBodyLength(totalBodyLength)
})
log.trace(ctx) {
"Sending request payload to memcache"
}
payloadSource.use { source ->
val bb = ByteBuffer.allocate(chunkSize)
while (true) {
val read = source.read(bb)
bb.limit()
if(read >= 0 && bb.position() < chunkSize && bb.hasRemaining()) {
continue
}
val chunk = ctx.alloc().buffer(chunkSize)
bb.flip()
chunk.writeBytes(bb)
bb.clear()
log.trace(ctx) {
"Sending ${chunk.readableBytes()} bytes chunk to memcache"
}
if(read < 0) {
requestController.sendContent(DefaultLastMemcacheContent(chunk))
break
} else {
requestController.sendContent(DefaultMemcacheContent(chunk))
}
}
}
} else {
payloadSource.close()
}
}
}
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
inProgressGetRequest?.let {
inProgressGetRequest = null
it.rollback()
}
inProgressPutRequest?.let {
inProgressPutRequest = null
it.requestController.thenAccept { controller ->
controller.exceptionCaught(cause)
}
it.rollback()
}
super.exceptionCaught(ctx, cause)
}
}

View File

@@ -0,0 +1,102 @@
package net.woggioni.rbcs.server.memcache
import net.woggioni.rbcs.api.CacheProvider
import net.woggioni.rbcs.api.exception.ConfigurationException
import net.woggioni.rbcs.common.HostAndPort
import net.woggioni.rbcs.common.RBCS
import net.woggioni.rbcs.common.Xml
import net.woggioni.rbcs.common.Xml.Companion.asIterable
import net.woggioni.rbcs.common.Xml.Companion.renderAttribute
import org.w3c.dom.Document
import org.w3c.dom.Element
import java.time.Duration
import java.time.temporal.ChronoUnit
class MemcacheCacheProvider : CacheProvider<MemcacheCacheConfiguration> {
override fun getXmlSchemaLocation() = "jpms://net.woggioni.rbcs.server.memcache/net/woggioni/rbcs/server/memcache/schema/rbcs-memcache.xsd"
override fun getXmlType() = "memcacheCacheType"
override fun getXmlNamespace() = "urn:net.woggioni.rbcs.server.memcache"
val xmlNamespacePrefix : String
get() = "rbcs-memcache"
override fun deserialize(el: Element): MemcacheCacheConfiguration {
val servers = mutableListOf<MemcacheCacheConfiguration.Server>()
val maxAge = el.renderAttribute("max-age")
?.let(Duration::parse)
?: Duration.ofDays(1)
val chunkSize = el.renderAttribute("chunk-size")
?.let(Integer::decode)
?: 0x10000
val compressionLevel = el.renderAttribute("compression-level")
?.let(Integer::decode)
?: -1
val compressionMode = el.renderAttribute("compression-mode")
?.let {
when (it) {
"deflate" -> MemcacheCacheConfiguration.CompressionMode.DEFLATE
else -> MemcacheCacheConfiguration.CompressionMode.DEFLATE
}
}
val digestAlgorithm = el.renderAttribute("digest")
for (child in el.asIterable()) {
when (child.nodeName) {
"server" -> {
val host = child.renderAttribute("host") ?: throw ConfigurationException("host attribute is required")
val port = child.renderAttribute("port")?.toInt() ?: throw ConfigurationException("port attribute is required")
val maxConnections = child.renderAttribute("max-connections")?.toInt() ?: 1
val connectionTimeout = child.renderAttribute("connection-timeout")
?.let(Duration::parse)
?.let(Duration::toMillis)
?.let(Long::toInt)
?: 10000
servers.add(MemcacheCacheConfiguration.Server(HostAndPort(host, port), connectionTimeout, maxConnections))
}
}
}
return MemcacheCacheConfiguration(
servers,
maxAge,
digestAlgorithm,
compressionMode,
compressionLevel,
chunkSize
)
}
override fun serialize(doc: Document, cache: MemcacheCacheConfiguration) = cache.run {
val result = doc.createElement("cache")
Xml.of(doc, result) {
attr("xmlns:${xmlNamespacePrefix}", xmlNamespace, namespaceURI = "http://www.w3.org/2000/xmlns/")
attr("xs:type", "${xmlNamespacePrefix}:$xmlType", RBCS.XML_SCHEMA_NAMESPACE_URI)
for (server in servers) {
node("server") {
attr("host", server.endpoint.host)
attr("port", server.endpoint.port.toString())
server.connectionTimeoutMillis?.let { connectionTimeoutMillis ->
attr("connection-timeout", Duration.of(connectionTimeoutMillis.toLong(), ChronoUnit.MILLIS).toString())
}
attr("max-connections", server.maxConnections.toString())
}
}
attr("max-age", maxAge.toString())
attr("chunk-size", chunkSize.toString())
digestAlgorithm?.let { digestAlgorithm ->
attr("digest", digestAlgorithm)
}
compressionMode?.let { compressionMode ->
attr(
"compression-mode", when (compressionMode) {
MemcacheCacheConfiguration.CompressionMode.DEFLATE -> "deflate"
}
)
}
attr("compression-level", compressionLevel.toString())
}
result
}
}

View File

@@ -0,0 +1,214 @@
package net.woggioni.rbcs.server.memcache.client
import io.netty.bootstrap.Bootstrap
import io.netty.buffer.ByteBuf
import io.netty.channel.Channel
import io.netty.channel.ChannelFutureListener
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelOption
import io.netty.channel.ChannelPipeline
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.pool.AbstractChannelPoolHandler
import io.netty.channel.pool.ChannelPool
import io.netty.channel.pool.FixedChannelPool
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.memcache.LastMemcacheContent
import io.netty.handler.codec.memcache.MemcacheContent
import io.netty.handler.codec.memcache.MemcacheObject
import io.netty.handler.codec.memcache.binary.BinaryMemcacheClientCodec
import io.netty.handler.codec.memcache.binary.BinaryMemcacheRequest
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
import io.netty.util.concurrent.GenericFutureListener
import net.woggioni.rbcs.common.HostAndPort
import net.woggioni.rbcs.common.createLogger
import net.woggioni.rbcs.common.warn
import net.woggioni.rbcs.server.memcache.MemcacheCacheConfiguration
import net.woggioni.rbcs.server.memcache.MemcacheCacheHandler
import java.io.IOException
import java.net.InetSocketAddress
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import io.netty.util.concurrent.Future as NettyFuture
class MemcacheClient(private val servers: List<MemcacheCacheConfiguration.Server>, private val chunkSize : Int) : AutoCloseable {
private companion object {
private val log = createLogger<MemcacheCacheHandler>()
}
private val group: NioEventLoopGroup
private val connectionPool: MutableMap<HostAndPort, ChannelPool> = ConcurrentHashMap()
init {
group = NioEventLoopGroup()
}
private fun newConnectionPool(server: MemcacheCacheConfiguration.Server): FixedChannelPool {
val bootstrap = Bootstrap().apply {
group(group)
channel(NioSocketChannel::class.java)
option(ChannelOption.SO_KEEPALIVE, true)
remoteAddress(InetSocketAddress(server.endpoint.host, server.endpoint.port))
server.connectionTimeoutMillis?.let {
option(ChannelOption.CONNECT_TIMEOUT_MILLIS, it)
}
}
val channelPoolHandler = object : AbstractChannelPoolHandler() {
override fun channelCreated(ch: Channel) {
val pipeline: ChannelPipeline = ch.pipeline()
pipeline.addLast(BinaryMemcacheClientCodec(chunkSize, true))
}
}
return FixedChannelPool(bootstrap, channelPoolHandler, server.maxConnections)
}
fun sendRequest(
key: ByteBuf,
responseHandler: MemcacheResponseHandler
): CompletableFuture<MemcacheRequestController> {
val server = if (servers.size > 1) {
var checksum = 0
while (key.readableBytes() > 4) {
val byte = key.readInt()
checksum = checksum xor byte
}
while (key.readableBytes() > 0) {
val byte = key.readByte()
checksum = checksum xor byte.toInt()
}
servers[checksum % servers.size]
} else {
servers.first()
}
key.release()
val response = CompletableFuture<MemcacheRequestController>()
// Custom handler for processing responses
val pool = connectionPool.computeIfAbsent(server.endpoint) {
newConnectionPool(server)
}
pool.acquire().addListener(object : GenericFutureListener<NettyFuture<Channel>> {
override fun operationComplete(channelFuture: NettyFuture<Channel>) {
if (channelFuture.isSuccess) {
var requestSent = false
var requestBodySent = false
var requestFinished = false
var responseReceived = false
var responseBodyReceived = false
var responseFinished = false
var requestBodySize = 0
var requestBodyBytesSent = 0
val channel = channelFuture.now
var connectionClosedByTheRemoteServer = true
val closeCallback = {
if (connectionClosedByTheRemoteServer) {
val ex = IOException("The memcache server closed the connection")
val completed = response.completeExceptionally(ex)
if(!completed) responseHandler.exceptionCaught(ex)
log.warn {
"RequestSent: $requestSent, RequestBodySent: $requestBodySent, " +
"RequestFinished: $requestFinished, ResponseReceived: $responseReceived, " +
"ResponseBodyReceived: $responseBodyReceived, ResponseFinished: $responseFinished, " +
"RequestBodySize: $requestBodySize, RequestBodyBytesSent: $requestBodyBytesSent"
}
}
pool.release(channel)
}
val closeListener = ChannelFutureListener {
closeCallback()
}
channel.closeFuture().addListener(closeListener)
val pipeline = channel.pipeline()
val handler = object : SimpleChannelInboundHandler<MemcacheObject>() {
override fun handlerAdded(ctx: ChannelHandlerContext) {
channel.closeFuture().removeListener(closeListener)
}
override fun channelRead0(
ctx: ChannelHandlerContext,
msg: MemcacheObject
) {
when (msg) {
is BinaryMemcacheResponse -> {
responseHandler.responseReceived(msg)
responseReceived = true
}
is LastMemcacheContent -> {
responseFinished = true
responseHandler.contentReceived(msg)
pipeline.remove(this)
pool.release(channel)
}
is MemcacheContent -> {
responseBodyReceived = true
responseHandler.contentReceived(msg)
}
}
}
override fun channelInactive(ctx: ChannelHandlerContext) {
closeCallback()
ctx.fireChannelInactive()
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
connectionClosedByTheRemoteServer = false
ctx.close()
pool.release(channel)
responseHandler.exceptionCaught(cause)
}
}
channel.pipeline()
.addLast("client-handler", handler)
response.complete(object : MemcacheRequestController {
override fun sendRequest(request: BinaryMemcacheRequest) {
requestBodySize = request.totalBodyLength() - request.keyLength() - request.extrasLength()
channel.writeAndFlush(request)
requestSent = true
}
override fun sendContent(content: MemcacheContent) {
val size = content.content().readableBytes()
channel.writeAndFlush(content).addListener {
requestBodyBytesSent += size
requestBodySent = true
if(content is LastMemcacheContent) {
requestFinished = true
}
}
}
override fun exceptionCaught(ex: Throwable) {
connectionClosedByTheRemoteServer = false
channel.close()
}
})
} else {
response.completeExceptionally(channelFuture.cause())
}
}
})
return response
}
fun shutDown(): NettyFuture<*> {
return group.shutdownGracefully()
}
override fun close() {
shutDown().sync()
}
}

View File

@@ -0,0 +1,13 @@
package net.woggioni.rbcs.server.memcache.client
import io.netty.handler.codec.memcache.MemcacheContent
import io.netty.handler.codec.memcache.binary.BinaryMemcacheRequest
interface MemcacheRequestController {
fun sendRequest(request : BinaryMemcacheRequest)
fun sendContent(content : MemcacheContent)
fun exceptionCaught(ex : Throwable)
}

View File

@@ -0,0 +1,14 @@
package net.woggioni.rbcs.server.memcache.client
import io.netty.handler.codec.memcache.MemcacheContent
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
interface MemcacheResponseHandler {
fun responseReceived(response : BinaryMemcacheResponse)
fun contentReceived(content : MemcacheContent)
fun exceptionCaught(ex : Throwable)
}

View File

@@ -0,0 +1 @@
net.woggioni.rbcs.server.memcache.MemcacheCacheProvider

View File

@@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<xs:schema targetNamespace="urn:net.woggioni.rbcs.server.memcache"
xmlns:rbcs-memcache="urn:net.woggioni.rbcs.server.memcache"
xmlns:rbcs="urn:net.woggioni.rbcs.server"
xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:import schemaLocation="jpms://net.woggioni.rbcs.server/net/woggioni/rbcs/server/schema/rbcs.xsd" namespace="urn:net.woggioni.rbcs.server"/>
<xs:complexType name="memcacheServerType">
<xs:attribute name="host" type="xs:token" use="required"/>
<xs:attribute name="port" type="xs:positiveInteger" use="required"/>
<xs:attribute name="connection-timeout" type="xs:duration"/>
<xs:attribute name="max-connections" type="xs:positiveInteger" default="1"/>
</xs:complexType>
<xs:complexType name="memcacheCacheType">
<xs:complexContent>
<xs:extension base="rbcs:cacheType">
<xs:sequence maxOccurs="unbounded">
<xs:element name="server" type="rbcs-memcache:memcacheServerType"/>
</xs:sequence>
<xs:attribute name="max-age" type="xs:duration" default="P1D"/>
<xs:attribute name="chunk-size" type="rbcs:byteSizeType" default="0x10000"/>
<xs:attribute name="digest" type="xs:token" />
<xs:attribute name="compression-mode" type="rbcs-memcache:compressionType"/>
<xs:attribute name="compression-level" type="rbcs:compressionLevelType" default="-1"/>
</xs:extension>
</xs:complexContent>
</xs:complexType>
<xs:simpleType name="compressionType">
<xs:restriction base="xs:token">
<xs:enumeration value="deflate"/>
</xs:restriction>
</xs:simpleType>
</xs:schema>

View File

@@ -0,0 +1,27 @@
package net.woggioni.rbcs.server.memcache.client
import io.netty.buffer.ByteBufUtil
import io.netty.buffer.Unpooled
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import java.io.ByteArrayInputStream
import java.nio.ByteBuffer
import java.nio.channels.Channels
import kotlin.random.Random
class ByteBufferTest {
@Test
fun test() {
val byteBuffer = ByteBuffer.allocate(0x100)
val originalBytes = Random(101325).nextBytes(0x100)
Channels.newChannel(ByteArrayInputStream(originalBytes)).use { source ->
source.read(byteBuffer)
}
byteBuffer.flip()
val buf = Unpooled.buffer()
buf.writeBytes(byteBuffer)
val finalBytes = ByteBufUtil.getBytes(buf)
Assertions.assertArrayEquals(originalBytes, finalBytes)
}
}

View File

@@ -9,9 +9,12 @@ dependencies {
implementation catalog.jwo
implementation catalog.slf4j.api
implementation catalog.netty.codec.http
implementation catalog.netty.handler
implementation catalog.netty.buffer
implementation catalog.netty.transport
api project(':gbcs-common')
api project(':gbcs-api')
api project(':rbcs-common')
api project(':rbcs-api')
// runtimeOnly catalog.slf4j.jdk14
testRuntimeOnly catalog.logback.classic
@@ -19,7 +22,7 @@ dependencies {
testImplementation catalog.bcprov.jdk18on
testImplementation catalog.bcpkix.jdk18on
testRuntimeOnly project(":gbcs-server-memcached")
testRuntimeOnly project(":rbcs-server-memcache")
}
test {
@@ -36,3 +39,4 @@ publishing {
}

Some files were not shown because too many files have changed in this diff Show More