Compare commits

...

2 Commits

Author SHA1 Message Date
c2e388b931 switched to ZGC in docker image
All checks were successful
CI / build (push) Successful in 3m28s
2025-02-04 22:46:34 +08:00
6c62ac85c0 implemented memcached client with Netty
All checks were successful
CI / build (push) Successful in 1m46s
2025-02-04 22:09:28 +08:00
53 changed files with 592 additions and 295 deletions

View File

@@ -44,7 +44,7 @@ jobs:
target: release target: release
cache-from: type=registry,ref=gitea.woggioni.net/woggioni/gbcs:buildx cache-from: type=registry,ref=gitea.woggioni.net/woggioni/gbcs:buildx
- -
name: Build gbcs memcached Docker image name: Build gbcs memcache Docker image
uses: docker/build-push-action@v5.3.0 uses: docker/build-push-action@v5.3.0
with: with:
context: "docker/build/docker" context: "docker/build/docker"
@@ -52,9 +52,9 @@ jobs:
push: true push: true
pull: true pull: true
tags: | tags: |
gitea.woggioni.net/woggioni/gbcs:memcached gitea.woggioni.net/woggioni/gbcs:memcache
gitea.woggioni.net/woggioni/gbcs:memcached-${{ steps.retrieve-version.outputs.VERSION }} gitea.woggioni.net/woggioni/gbcs:memcache-${{ steps.retrieve-version.outputs.VERSION }}
target: release-memcached target: release-memcache
cache-from: type=registry,ref=gitea.woggioni.net/woggioni/gbcs:buildx cache-from: type=registry,ref=gitea.woggioni.net/woggioni/gbcs:buildx
cache-to: type=registry,mode=max,compression=zstd,image-manifest=true,oci-mediatypes=true,ref=gitea.woggioni.net/woggioni/gbcs:buildx cache-to: type=registry,mode=max,compression=zstd,image-manifest=true,oci-mediatypes=true,ref=gitea.woggioni.net/woggioni/gbcs:buildx
- name: Publish artifacts - name: Publish artifacts

View File

@@ -5,12 +5,12 @@ WORKDIR /home/luser
FROM base-release AS release FROM base-release AS release
ADD gbcs-cli-envelope-*.jar gbcs.jar ADD gbcs-cli-envelope-*.jar gbcs.jar
ENTRYPOINT ["java", "-jar", "/home/luser/gbcs.jar", "server"] ENTRYPOINT ["java", "-XX:+UseZGC", "-XX:+ZGenerational", "-jar", "/home/luser/gbcs.jar", "server"]
FROM base-release AS release-memcached FROM base-release AS release-memcache
ADD --chown=luser:luser gbcs-cli-envelope-*.jar gbcs.jar ADD --chown=luser:luser gbcs-cli-envelope-*.jar gbcs.jar
RUN mkdir plugins RUN mkdir plugins
WORKDIR /home/luser/plugins WORKDIR /home/luser/plugins
RUN --mount=type=bind,source=.,target=/build/distributions tar -xf /build/distributions/gbcs-server-memcached*.tar RUN --mount=type=bind,source=.,target=/build/distributions tar -xf /build/distributions/gbcs-server-memcache*.tar
WORKDIR /home/luser WORKDIR /home/luser
ENTRYPOINT ["java", "-jar", "/home/luser/gbcs.jar", "server"] ENTRYPOINT ["java", "-XX:+UseZGC", "-XX:+ZGenerational", "-jar", "/home/luser/gbcs.jar", "server"]

View File

@@ -19,7 +19,7 @@ configurations {
dependencies { dependencies {
docker project(path: ':gbcs-cli', configuration: 'release') docker project(path: ':gbcs-cli', configuration: 'release')
docker project(path: ':gbcs-server-memcached', configuration: 'release') docker project(path: ':gbcs-server-memcache', configuration: 'release')
} }
Provider<Task> cleanTaskProvider = tasks.named(BasePlugin.CLEAN_TASK_NAME) {} Provider<Task> cleanTaskProvider = tasks.named(BasePlugin.CLEAN_TASK_NAME) {}
@@ -46,22 +46,22 @@ Provider<DockerTagImage> dockerTag = tasks.register('dockerTagImage', DockerTagI
tag = version tag = version
} }
Provider<DockerTagImage> dockerTagMemcached = tasks.register('dockerTagMemcachedImage', DockerTagImage) { Provider<DockerTagImage> dockerTagMemcache = tasks.register('dockerTagMemcacheImage', DockerTagImage) {
group = 'docker' group = 'docker'
repository = 'gitea.woggioni.net/woggioni/gbcs' repository = 'gitea.woggioni.net/woggioni/gbcs'
imageId = 'gitea.woggioni.net/woggioni/gbcs:memcached' imageId = 'gitea.woggioni.net/woggioni/gbcs:memcache'
tag = "${version}-memcached" tag = "${version}-memcache"
} }
Provider<DockerPushImage> dockerPush = tasks.register('dockerPushImage', DockerPushImage) { Provider<DockerPushImage> dockerPush = tasks.register('dockerPushImage', DockerPushImage) {
group = 'docker' group = 'docker'
dependsOn dockerTag, dockerTagMemcached dependsOn dockerTag, dockerTagMemcache
registryCredentials { registryCredentials {
url = getProperty('docker.registry.url') url = getProperty('docker.registry.url')
username = 'woggioni' username = 'woggioni'
password = System.getenv().get("PUBLISHER_TOKEN") password = System.getenv().get("PUBLISHER_TOKEN")
} }
images = [dockerTag.flatMap{ it.tag }, dockerTagMemcached.flatMap{ it.tag }] images = [dockerTag.flatMap{ it.tag }, dockerTagMemcache.flatMap{ it.tag }]
} }

View File

@@ -5,6 +5,7 @@ plugins {
} }
dependencies { dependencies {
api catalog.netty.buffer
} }
publishing { publishing {

View File

@@ -1,6 +1,7 @@
module net.woggioni.gbcs.api { module net.woggioni.gbcs.api {
requires static lombok; requires static lombok;
requires java.xml; requires java.xml;
requires io.netty.buffer;
exports net.woggioni.gbcs.api; exports net.woggioni.gbcs.api;
exports net.woggioni.gbcs.api.exception; exports net.woggioni.gbcs.api.exception;
} }

View File

@@ -1,12 +1,14 @@
package net.woggioni.gbcs.api; package net.woggioni.gbcs.api;
import io.netty.buffer.ByteBuf;
import net.woggioni.gbcs.api.exception.ContentTooLargeException; import net.woggioni.gbcs.api.exception.ContentTooLargeException;
import java.nio.channels.ReadableByteChannel; import java.nio.channels.ReadableByteChannel;
import java.util.concurrent.CompletableFuture;
public interface Cache extends AutoCloseable { public interface Cache extends AutoCloseable {
ReadableByteChannel get(String key); CompletableFuture<ReadableByteChannel> get(String key);
void put(String key, byte[] content) throws ContentTooLargeException; CompletableFuture<Void> put(String key, ByteBuf content) throws ContentTooLargeException;
} }

View File

@@ -56,7 +56,8 @@ public class Configuration {
@EqualsAndHashCode.Include @EqualsAndHashCode.Include
String name; String name;
Set<Role> roles; Set<Role> roles;
Quota quota; Quota groupQuota;
Quota userQuota;
} }
@Value @Value

View File

@@ -1,7 +1,5 @@
package net.woggioni.gbcs.cli package net.woggioni.gbcs.cli
import net.woggioni.gbcs.common.GbcsUrlStreamHandlerFactory
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.cli.impl.AbstractVersionProvider import net.woggioni.gbcs.cli.impl.AbstractVersionProvider
import net.woggioni.gbcs.cli.impl.GbcsCommand import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.cli.impl.commands.BenchmarkCommand import net.woggioni.gbcs.cli.impl.commands.BenchmarkCommand
@@ -10,10 +8,11 @@ import net.woggioni.gbcs.cli.impl.commands.GetCommand
import net.woggioni.gbcs.cli.impl.commands.PasswordHashCommand import net.woggioni.gbcs.cli.impl.commands.PasswordHashCommand
import net.woggioni.gbcs.cli.impl.commands.PutCommand import net.woggioni.gbcs.cli.impl.commands.PutCommand
import net.woggioni.gbcs.cli.impl.commands.ServerCommand import net.woggioni.gbcs.cli.impl.commands.ServerCommand
import net.woggioni.gbcs.common.GbcsUrlStreamHandlerFactory
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.jwo.Application import net.woggioni.jwo.Application
import picocli.CommandLine import picocli.CommandLine
import picocli.CommandLine.Model.CommandSpec import picocli.CommandLine.Model.CommandSpec
import java.net.URI
@CommandLine.Command( @CommandLine.Command(

View File

@@ -33,6 +33,13 @@ class BenchmarkCommand : GbcsCommand() {
) )
private var numberOfEntries = 1000 private var numberOfEntries = 1000
@CommandLine.Option(
names = ["-s", "--size"],
description = ["Size of a cache value in bytes"],
paramLabel = "SIZE"
)
private var size = 0x1000
override fun run() { override fun run() {
val clientCommand = spec.parent().userObject() as ClientCommand val clientCommand = spec.parent().userObject() as ClientCommand
val profile = clientCommand.profileName.let { profileName -> val profile = clientCommand.profileName.let { profileName ->
@@ -46,7 +53,7 @@ class BenchmarkCommand : GbcsCommand() {
while (true) { while (true) {
val key = JWO.bytesToHex(random.nextBytes(16)) val key = JWO.bytesToHex(random.nextBytes(16))
val content = random.nextInt().toByte() val content = random.nextInt().toByte()
val value = ByteArray(0x1000, { _ -> content }) val value = ByteArray(size, { _ -> content })
yield(key to value) yield(key to value)
} }
} }

View File

@@ -1,8 +1,8 @@
package net.woggioni.gbcs.cli.impl.commands package net.woggioni.gbcs.cli.impl.commands
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.cli.impl.GbcsCommand import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.client.GradleBuildCacheClient import net.woggioni.gbcs.client.GradleBuildCacheClient
import net.woggioni.gbcs.common.contextLogger
import picocli.CommandLine import picocli.CommandLine
import java.nio.file.Files import java.nio.file.Files
import java.nio.file.Path import java.nio.file.Path

View File

@@ -1,8 +1,8 @@
package net.woggioni.gbcs.cli.impl.commands package net.woggioni.gbcs.cli.impl.commands
import net.woggioni.gbcs.common.PasswordSecurity.hashPassword
import net.woggioni.gbcs.cli.impl.GbcsCommand import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.cli.impl.converters.OutputStreamConverter import net.woggioni.gbcs.cli.impl.converters.OutputStreamConverter
import net.woggioni.gbcs.common.PasswordSecurity.hashPassword
import net.woggioni.jwo.UncloseableOutputStream import net.woggioni.jwo.UncloseableOutputStream
import picocli.CommandLine import picocli.CommandLine
import java.io.OutputStream import java.io.OutputStream

View File

@@ -1,9 +1,9 @@
package net.woggioni.gbcs.cli.impl.commands package net.woggioni.gbcs.cli.impl.commands
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.cli.impl.GbcsCommand import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.cli.impl.converters.InputStreamConverter import net.woggioni.gbcs.cli.impl.converters.InputStreamConverter
import net.woggioni.gbcs.client.GradleBuildCacheClient import net.woggioni.gbcs.client.GradleBuildCacheClient
import net.woggioni.gbcs.common.contextLogger
import picocli.CommandLine import picocli.CommandLine
import java.io.InputStream import java.io.InputStream

View File

@@ -1,12 +1,12 @@
package net.woggioni.gbcs.cli.impl.commands package net.woggioni.gbcs.cli.impl.commands
import net.woggioni.gbcs.server.GradleBuildCacheServer
import net.woggioni.gbcs.server.GradleBuildCacheServer.Companion.DEFAULT_CONFIGURATION_URL
import net.woggioni.gbcs.api.Configuration import net.woggioni.gbcs.api.Configuration
import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.common.contextLogger import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.common.debug import net.woggioni.gbcs.common.debug
import net.woggioni.gbcs.common.info import net.woggioni.gbcs.common.info
import net.woggioni.gbcs.cli.impl.GbcsCommand import net.woggioni.gbcs.server.GradleBuildCacheServer
import net.woggioni.gbcs.server.GradleBuildCacheServer.Companion.DEFAULT_CONFIGURATION_URL
import net.woggioni.jwo.Application import net.woggioni.jwo.Application
import net.woggioni.jwo.JWO import net.woggioni.jwo.JWO
import picocli.CommandLine import picocli.CommandLine

View File

@@ -1,9 +1,9 @@
package net.woggioni.gbcs.client.impl package net.woggioni.gbcs.client.impl
import net.woggioni.gbcs.api.exception.ConfigurationException import net.woggioni.gbcs.api.exception.ConfigurationException
import net.woggioni.gbcs.client.GradleBuildCacheClient
import net.woggioni.gbcs.common.Xml.Companion.asIterable import net.woggioni.gbcs.common.Xml.Companion.asIterable
import net.woggioni.gbcs.common.Xml.Companion.renderAttribute import net.woggioni.gbcs.common.Xml.Companion.renderAttribute
import net.woggioni.gbcs.client.GradleBuildCacheClient
import org.w3c.dom.Document import org.w3c.dom.Document
import java.net.URI import java.net.URI
import java.nio.file.Files import java.nio.file.Files

View File

@@ -4,7 +4,6 @@ import io.netty.util.concurrent.DefaultEventExecutorGroup
import io.netty.util.concurrent.EventExecutorGroup import io.netty.util.concurrent.EventExecutorGroup
import net.woggioni.gbcs.common.contextLogger import net.woggioni.gbcs.common.contextLogger
import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtensionContext import org.junit.jupiter.api.extension.ExtensionContext
import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments import org.junit.jupiter.params.provider.Arguments

View File

@@ -9,6 +9,7 @@ dependencies {
implementation project(':gbcs-api') implementation project(':gbcs-api')
implementation catalog.slf4j.api implementation catalog.slf4j.api
implementation catalog.jwo implementation catalog.jwo
implementation catalog.netty.buffer
} }
publishing { publishing {

View File

@@ -4,6 +4,7 @@ module net.woggioni.gbcs.common {
requires org.slf4j; requires org.slf4j;
requires kotlin.stdlib; requires kotlin.stdlib;
requires net.woggioni.jwo; requires net.woggioni.jwo;
requires io.netty.buffer;
provides java.net.spi.URLStreamHandlerProvider with net.woggioni.gbcs.common.GbcsUrlStreamHandlerFactory; provides java.net.spi.URLStreamHandlerProvider with net.woggioni.gbcs.common.GbcsUrlStreamHandlerFactory;
exports net.woggioni.gbcs.common; exports net.woggioni.gbcs.common;

View File

@@ -0,0 +1,25 @@
package net.woggioni.gbcs.common
import io.netty.buffer.ByteBuf
import java.io.InputStream
class ByteBufInputStream(private val buf : ByteBuf) : InputStream() {
override fun read(): Int {
return buf.takeIf {
it.readableBytes() > 0
}?.let(ByteBuf::readByte)
?.let(Byte::toInt) ?: -1
}
override fun read(b: ByteArray, off: Int, len: Int): Int {
val readableBytes = buf.readableBytes()
if(readableBytes == 0) return -1
val result = len.coerceAtMost(readableBytes)
buf.readBytes(b, off, result)
return result
}
override fun close() {
buf.release()
}
}

View File

@@ -5,7 +5,6 @@ import java.io.InputStream
import java.net.URL import java.net.URL
import java.net.URLConnection import java.net.URLConnection
import java.net.URLStreamHandler import java.net.URLStreamHandler
import java.net.URLStreamHandlerFactory
import java.net.spi.URLStreamHandlerProvider import java.net.spi.URLStreamHandlerProvider
import java.util.Optional import java.util.Optional
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean

View File

@@ -6,10 +6,10 @@ plugins {
configurations { configurations {
bundle { bundle {
extendsFrom runtimeClasspath
canBeResolved = true canBeResolved = true
canBeConsumed = false canBeConsumed = false
visible = false visible = false
transitive = false
resolutionStrategy { resolutionStrategy {
dependencies { dependencies {
@@ -29,10 +29,20 @@ configurations {
} }
dependencies { dependencies {
compileOnly project(':gbcs-common') implementation project(':gbcs-common')
compileOnly project(':gbcs-api') implementation project(':gbcs-api')
compileOnly catalog.jwo implementation catalog.jwo
implementation catalog.xmemcached implementation catalog.slf4j.api
implementation catalog.netty.common
implementation catalog.netty.codec.memcache
bundle catalog.netty.codec.memcache
testRuntimeOnly catalog.logback.classic
}
tasks.named(JavaPlugin.TEST_TASK_NAME, Test) {
systemProperty("io.netty.leakDetectionLevel", "PARANOID")
} }
Provider<Tar> bundleTask = tasks.register("bundle", Tar) { Provider<Tar> bundleTask = tasks.register("bundle", Tar) {

View File

@@ -0,0 +1,19 @@
import net.woggioni.gbcs.api.CacheProvider;
module net.woggioni.gbcs.server.memcache {
requires net.woggioni.gbcs.common;
requires net.woggioni.gbcs.api;
requires net.woggioni.jwo;
requires java.xml;
requires kotlin.stdlib;
requires io.netty.transport;
requires io.netty.codec;
requires io.netty.codec.memcache;
requires io.netty.common;
requires io.netty.buffer;
requires org.slf4j;
provides CacheProvider with net.woggioni.gbcs.server.memcache.MemcacheCacheProvider;
opens net.woggioni.gbcs.server.memcache.schema;
}

View File

@@ -0,0 +1,4 @@
package net.woggioni.gbcs.server.memcache
class MemcacheException(status : Short, msg : String? = null, cause : Throwable? = null)
: RuntimeException(msg ?: "Memcached status $status", cause)

View File

@@ -0,0 +1,23 @@
package net.woggioni.gbcs.server.memcache
import io.netty.buffer.ByteBuf
import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.server.memcache.client.MemcacheClient
import java.nio.channels.ReadableByteChannel
import java.util.concurrent.CompletableFuture
class MemcacheCache(private val cfg : MemcacheCacheConfiguration) : Cache {
private val memcacheClient = MemcacheClient(cfg)
override fun get(key: String): CompletableFuture<ReadableByteChannel?> {
return memcacheClient.get(key)
}
override fun put(key: String, content: ByteBuf): CompletableFuture<Void> {
return memcacheClient.put(key, content, cfg.maxAge)
}
override fun close() {
memcacheClient.close()
}
}

View File

@@ -0,0 +1,40 @@
package net.woggioni.gbcs.server.memcache
import net.woggioni.gbcs.api.Configuration
import net.woggioni.gbcs.common.HostAndPort
import java.time.Duration
data class MemcacheCacheConfiguration(
val servers: List<Server>,
val maxAge: Duration = Duration.ofDays(1),
val maxSize: Int = 0x100000,
val digestAlgorithm: String? = null,
val compressionMode: CompressionMode? = null,
) : Configuration.Cache {
enum class CompressionMode {
/**
* Gzip mode
*/
GZIP,
/**
* Deflate mode
*/
DEFLATE
}
data class Server(
val endpoint : HostAndPort,
val connectionTimeoutMillis : Int?,
val maxConnections : Int
)
override fun materialize() = MemcacheCache(this)
override fun getNamespaceURI() = "urn:net.woggioni.gbcs.server.memcache"
override fun getTypeName() = "memcacheCacheType"
}

View File

@@ -1,6 +1,5 @@
package net.woggioni.gbcs.server.memcached package net.woggioni.gbcs.server.memcache
import net.rubyeye.xmemcached.transcoders.CompressionMode
import net.woggioni.gbcs.api.CacheProvider import net.woggioni.gbcs.api.CacheProvider
import net.woggioni.gbcs.api.exception.ConfigurationException import net.woggioni.gbcs.api.exception.ConfigurationException
import net.woggioni.gbcs.common.GBCS import net.woggioni.gbcs.common.GBCS
@@ -11,19 +10,21 @@ import net.woggioni.gbcs.common.Xml.Companion.renderAttribute
import org.w3c.dom.Document import org.w3c.dom.Document
import org.w3c.dom.Element import org.w3c.dom.Element
import java.time.Duration import java.time.Duration
import java.time.temporal.ChronoUnit
class MemcachedCacheProvider : CacheProvider<MemcachedCacheConfiguration> {
override fun getXmlSchemaLocation() = "jpms://net.woggioni.gbcs.server.memcached/net/woggioni/gbcs/server/memcached/schema/gbcs-memcached.xsd"
override fun getXmlType() = "memcachedCacheType" class MemcacheCacheProvider : CacheProvider<MemcacheCacheConfiguration> {
override fun getXmlSchemaLocation() = "jpms://net.woggioni.gbcs.server.memcache/net/woggioni/gbcs/server/memcache/schema/gbcs-memcache.xsd"
override fun getXmlNamespace() = "urn:net.woggioni.gbcs.server.memcached" override fun getXmlType() = "memcacheCacheType"
override fun getXmlNamespace() = "urn:net.woggioni.gbcs.server.memcache"
val xmlNamespacePrefix : String val xmlNamespacePrefix : String
get() = "gbcs-memcached" get() = "gbcs-memcache"
override fun deserialize(el: Element): MemcachedCacheConfiguration { override fun deserialize(el: Element): MemcacheCacheConfiguration {
val servers = mutableListOf<HostAndPort>() val servers = mutableListOf<MemcacheCacheConfiguration.Server>()
val maxAge = el.renderAttribute("max-age") val maxAge = el.renderAttribute("max-age")
?.let(Duration::parse) ?.let(Duration::parse)
?: Duration.ofDays(1) ?: Duration.ofDays(1)
@@ -33,24 +34,30 @@ class MemcachedCacheProvider : CacheProvider<MemcachedCacheConfiguration> {
val compressionMode = el.renderAttribute("compression-mode") val compressionMode = el.renderAttribute("compression-mode")
?.let { ?.let {
when (it) { when (it) {
"gzip" -> CompressionMode.GZIP "gzip" -> MemcacheCacheConfiguration.CompressionMode.GZIP
"zip" -> CompressionMode.ZIP "deflate" -> MemcacheCacheConfiguration.CompressionMode.DEFLATE
else -> CompressionMode.ZIP else -> MemcacheCacheConfiguration.CompressionMode.DEFLATE
} }
} }
?: CompressionMode.ZIP ?: MemcacheCacheConfiguration.CompressionMode.DEFLATE
val digestAlgorithm = el.renderAttribute("digest") val digestAlgorithm = el.renderAttribute("digest")
for (child in el.asIterable()) { for (child in el.asIterable()) {
when (child.nodeName) { when (child.nodeName) {
"server" -> { "server" -> {
val host = child.renderAttribute("host") ?: throw ConfigurationException("host attribute is required") val host = child.renderAttribute("host") ?: throw ConfigurationException("host attribute is required")
val port = child.renderAttribute("port")?.toInt() ?: throw ConfigurationException("port attribute is required") val port = child.renderAttribute("port")?.toInt() ?: throw ConfigurationException("port attribute is required")
servers.add(HostAndPort(host, port)) val maxConnections = child.renderAttribute("max-connections")?.toInt() ?: 1
val connectionTimeout = child.renderAttribute("connection-timeout")
?.let(Duration::parse)
?.let(Duration::toMillis)
?.let(Long::toInt)
?: 10000
servers.add(MemcacheCacheConfiguration.Server(HostAndPort(host, port), connectionTimeout, maxConnections))
} }
} }
} }
return MemcachedCacheConfiguration( return MemcacheCacheConfiguration(
servers, servers,
maxAge, maxAge,
maxSize, maxSize,
@@ -59,7 +66,7 @@ class MemcachedCacheProvider : CacheProvider<MemcachedCacheConfiguration> {
) )
} }
override fun serialize(doc: Document, cache: MemcachedCacheConfiguration) = cache.run { override fun serialize(doc: Document, cache: MemcacheCacheConfiguration) = cache.run {
val result = doc.createElement("cache") val result = doc.createElement("cache")
Xml.of(doc, result) { Xml.of(doc, result) {
attr("xmlns:${xmlNamespacePrefix}", xmlNamespace, namespaceURI = "http://www.w3.org/2000/xmlns/") attr("xmlns:${xmlNamespacePrefix}", xmlNamespace, namespaceURI = "http://www.w3.org/2000/xmlns/")
@@ -67,8 +74,12 @@ class MemcachedCacheProvider : CacheProvider<MemcachedCacheConfiguration> {
attr("xs:type", "${xmlNamespacePrefix}:$xmlType", GBCS.XML_SCHEMA_NAMESPACE_URI) attr("xs:type", "${xmlNamespacePrefix}:$xmlType", GBCS.XML_SCHEMA_NAMESPACE_URI)
for (server in servers) { for (server in servers) {
node("server") { node("server") {
attr("host", server.host) attr("host", server.endpoint.host)
attr("port", server.port.toString()) attr("port", server.endpoint.port.toString())
server.connectionTimeoutMillis?.let { connectionTimeoutMillis ->
attr("connection-timeout", Duration.of(connectionTimeoutMillis.toLong(), ChronoUnit.MILLIS).toString())
}
attr("max-connections", server.maxConnections.toString())
} }
} }
attr("max-age", maxAge.toString()) attr("max-age", maxAge.toString())
@@ -76,12 +87,14 @@ class MemcachedCacheProvider : CacheProvider<MemcachedCacheConfiguration> {
digestAlgorithm?.let { digestAlgorithm -> digestAlgorithm?.let { digestAlgorithm ->
attr("digest", digestAlgorithm) attr("digest", digestAlgorithm)
} }
attr( compressionMode?.let { compressionMode ->
"compression-mode", when (compressionMode) { attr(
CompressionMode.GZIP -> "gzip" "compression-mode", when (compressionMode) {
CompressionMode.ZIP -> "zip" MemcacheCacheConfiguration.CompressionMode.GZIP -> "gzip"
} MemcacheCacheConfiguration.CompressionMode.DEFLATE -> "deflate"
) }
)
}
} }
result result
} }

View File

@@ -0,0 +1,242 @@
package net.woggioni.gbcs.server.memcache.client
import io.netty.bootstrap.Bootstrap
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelOption
import io.netty.channel.ChannelPipeline
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.pool.AbstractChannelPoolHandler
import io.netty.channel.pool.ChannelPool
import io.netty.channel.pool.FixedChannelPool
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.DecoderException
import io.netty.handler.codec.memcache.binary.BinaryMemcacheClientCodec
import io.netty.handler.codec.memcache.binary.BinaryMemcacheObjectAggregator
import io.netty.handler.codec.memcache.binary.BinaryMemcacheOpcodes
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus
import io.netty.handler.codec.memcache.binary.DefaultFullBinaryMemcacheRequest
import io.netty.handler.codec.memcache.binary.FullBinaryMemcacheRequest
import io.netty.handler.codec.memcache.binary.FullBinaryMemcacheResponse
import io.netty.util.concurrent.GenericFutureListener
import net.woggioni.gbcs.common.ByteBufInputStream
import net.woggioni.gbcs.common.GBCS.digest
import net.woggioni.gbcs.common.HostAndPort
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.server.memcache.MemcacheCacheConfiguration
import net.woggioni.gbcs.server.memcache.MemcacheException
import net.woggioni.jwo.JWO
import java.io.ByteArrayOutputStream
import java.net.InetSocketAddress
import java.nio.channels.Channels
import java.nio.channels.ReadableByteChannel
import java.security.MessageDigest
import java.time.Duration
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.GZIPInputStream
import java.util.zip.GZIPOutputStream
import java.util.zip.InflaterInputStream
import io.netty.util.concurrent.Future as NettyFuture
class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseable {
private companion object {
@JvmStatic
private val log = contextLogger()
}
private val group: NioEventLoopGroup
private val connectionPool: MutableMap<HostAndPort, ChannelPool> = ConcurrentHashMap()
init {
group = NioEventLoopGroup()
}
private fun newConnectionPool(server: MemcacheCacheConfiguration.Server): FixedChannelPool {
val bootstrap = Bootstrap().apply {
group(group)
channel(NioSocketChannel::class.java)
option(ChannelOption.SO_KEEPALIVE, true)
remoteAddress(InetSocketAddress(server.endpoint.host, server.endpoint.port))
server.connectionTimeoutMillis?.let {
option(ChannelOption.CONNECT_TIMEOUT_MILLIS, it)
}
}
val channelPoolHandler = object : AbstractChannelPoolHandler() {
override fun channelCreated(ch: Channel) {
val pipeline: ChannelPipeline = ch.pipeline()
pipeline.addLast(BinaryMemcacheClientCodec())
pipeline.addLast(BinaryMemcacheObjectAggregator(Integer.MAX_VALUE))
}
}
return FixedChannelPool(bootstrap, channelPoolHandler, server.maxConnections)
}
private fun sendRequest(request: FullBinaryMemcacheRequest): CompletableFuture<FullBinaryMemcacheResponse> {
val server = cfg.servers.let { servers ->
if (servers.size > 1) {
val key = request.key().duplicate()
var checksum = 0
while (key.readableBytes() > 4) {
val byte = key.readInt()
checksum = checksum xor byte
}
while (key.readableBytes() > 0) {
val byte = key.readByte()
checksum = checksum xor byte.toInt()
}
servers[checksum % servers.size]
} else {
servers.first()
}
}
val response = CompletableFuture<FullBinaryMemcacheResponse>()
// Custom handler for processing responses
val pool = connectionPool.computeIfAbsent(server.endpoint) {
newConnectionPool(server)
}
pool.acquire().addListener(object : GenericFutureListener<NettyFuture<Channel>> {
override fun operationComplete(channelFuture: NettyFuture<Channel>) {
if (channelFuture.isSuccess) {
val channel = channelFuture.now
val pipeline = channel.pipeline()
channel.pipeline()
.addLast("handler", object : SimpleChannelInboundHandler<FullBinaryMemcacheResponse>() {
override fun channelRead0(
ctx: ChannelHandlerContext,
msg: FullBinaryMemcacheResponse
) {
pipeline.removeLast()
pool.release(channel)
response.complete(msg.retain())
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
val ex = when (cause) {
is DecoderException -> cause.cause!!
else -> cause
}
ctx.close()
pipeline.removeLast()
pool.release(channel)
response.completeExceptionally(ex)
}
})
channel.writeAndFlush(request)
} else {
response.completeExceptionally(channelFuture.cause())
}
}
})
return response
}
private fun encodeExpiry(expiry: Duration): Int {
val expirySeconds = expiry.toSeconds()
return expirySeconds.toInt().takeIf { it.toLong() == expirySeconds }
?: Instant.ofEpochSecond(expirySeconds).epochSecond.toInt()
}
fun get(key: String): CompletableFuture<ReadableByteChannel?> {
val request = (cfg.digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digest(key.toByteArray(), md)
} ?: key.toByteArray(Charsets.UTF_8)).let { digest ->
DefaultFullBinaryMemcacheRequest(Unpooled.wrappedBuffer(digest), null).apply {
setOpcode(BinaryMemcacheOpcodes.GET)
}
}
return sendRequest(request).thenApply { response ->
when (val status = response.status()) {
BinaryMemcacheResponseStatus.SUCCESS -> {
val compressionMode = cfg.compressionMode
val content = response.content()
if (compressionMode != null) {
when (compressionMode) {
MemcacheCacheConfiguration.CompressionMode.GZIP -> {
GZIPInputStream(ByteBufInputStream(content))
}
MemcacheCacheConfiguration.CompressionMode.DEFLATE -> {
InflaterInputStream(ByteBufInputStream(content))
}
}
} else {
ByteBufInputStream(content)
}.let(Channels::newChannel)
}
BinaryMemcacheResponseStatus.KEY_ENOENT -> {
null
}
else -> throw MemcacheException(status)
}
}
}
fun put(key: String, content: ByteBuf, expiry: Duration, cas: Long? = null): CompletableFuture<Void> {
val request = (cfg.digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digest(key.toByteArray(), md)
} ?: key.toByteArray(Charsets.UTF_8)).let { digest ->
val extras = Unpooled.buffer(8, 8)
extras.writeInt(0)
extras.writeInt(encodeExpiry(expiry))
val compressionMode = cfg.compressionMode
val payload = if (compressionMode != null) {
val inputStream = ByteBufInputStream(Unpooled.wrappedBuffer(content))
val baos = ByteArrayOutputStream()
val outputStream = when (compressionMode) {
MemcacheCacheConfiguration.CompressionMode.GZIP -> {
GZIPOutputStream(baos)
}
MemcacheCacheConfiguration.CompressionMode.DEFLATE -> {
DeflaterOutputStream(baos, Deflater(Deflater.DEFAULT_COMPRESSION, false))
}
}
inputStream.use { i ->
outputStream.use { o ->
JWO.copy(i, o)
}
}
Unpooled.wrappedBuffer(baos.toByteArray())
} else {
content
}
DefaultFullBinaryMemcacheRequest(Unpooled.wrappedBuffer(digest), extras, payload).apply {
setOpcode(BinaryMemcacheOpcodes.SET)
cas?.let(this::setCas)
}
}
return sendRequest(request).thenApply { response ->
when(val status = response.status()) {
BinaryMemcacheResponseStatus.SUCCESS -> null
else -> throw MemcacheException(status)
}
}
}
fun shutDown(): NettyFuture<*> {
return group.shutdownGracefully()
}
override fun close() {
shutDown().sync()
}
}

View File

@@ -0,0 +1 @@
net.woggioni.gbcs.server.memcache.MemcacheCacheProvider

View File

@@ -1,33 +1,35 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?> <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<xs:schema targetNamespace="urn:net.woggioni.gbcs.server.memcached" <xs:schema targetNamespace="urn:net.woggioni.gbcs.server.memcache"
xmlns:gbcs-memcached="urn:net.woggioni.gbcs.server.memcached" xmlns:gbcs-memcache="urn:net.woggioni.gbcs.server.memcache"
xmlns:gbcs="urn:net.woggioni.gbcs.server" xmlns:gbcs="urn:net.woggioni.gbcs.server"
xmlns:xs="http://www.w3.org/2001/XMLSchema"> xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:import schemaLocation="jpms://net.woggioni.gbcs.server/net/woggioni/gbcs/server/schema/gbcs.xsd" namespace="urn:net.woggioni.gbcs.server"/> <xs:import schemaLocation="jpms://net.woggioni.gbcs.server/net/woggioni/gbcs/server/schema/gbcs.xsd" namespace="urn:net.woggioni.gbcs.server"/>
<xs:complexType name="memcachedServerType"> <xs:complexType name="memcacheServerType">
<xs:attribute name="host" type="xs:token" use="required"/> <xs:attribute name="host" type="xs:token" use="required"/>
<xs:attribute name="port" type="xs:positiveInteger" use="required"/> <xs:attribute name="port" type="xs:positiveInteger" use="required"/>
<xs:attribute name="connection-timeout" type="xs:duration"/>
<xs:attribute name="max-connections" type="xs:positiveInteger" default="1"/>
</xs:complexType> </xs:complexType>
<xs:complexType name="memcachedCacheType"> <xs:complexType name="memcacheCacheType">
<xs:complexContent> <xs:complexContent>
<xs:extension base="gbcs:cacheType"> <xs:extension base="gbcs:cacheType">
<xs:sequence maxOccurs="unbounded"> <xs:sequence maxOccurs="unbounded">
<xs:element name="server" type="gbcs-memcached:memcachedServerType"/> <xs:element name="server" type="gbcs-memcache:memcacheServerType"/>
</xs:sequence> </xs:sequence>
<xs:attribute name="max-age" type="xs:duration" default="P1D"/> <xs:attribute name="max-age" type="xs:duration" default="P1D"/>
<xs:attribute name="max-size" type="xs:unsignedInt" default="1048576"/> <xs:attribute name="max-size" type="xs:unsignedInt" default="1048576"/>
<xs:attribute name="digest" type="xs:token" /> <xs:attribute name="digest" type="xs:token" />
<xs:attribute name="compression-mode" type="gbcs-memcached:compressionType" default="zip"/> <xs:attribute name="compression-mode" type="gbcs-memcache:compressionType"/>
</xs:extension> </xs:extension>
</xs:complexContent> </xs:complexContent>
</xs:complexType> </xs:complexType>
<xs:simpleType name="compressionType"> <xs:simpleType name="compressionType">
<xs:restriction base="xs:token"> <xs:restriction base="xs:token">
<xs:enumeration value="zip"/> <xs:enumeration value="deflate"/>
<xs:enumeration value="gzip"/> <xs:enumeration value="gzip"/>
</xs:restriction> </xs:restriction>
</xs:simpleType> </xs:simpleType>

View File

@@ -1,14 +0,0 @@
import net.woggioni.gbcs.api.CacheProvider;
module net.woggioni.gbcs.server.memcached {
requires net.woggioni.gbcs.common;
requires net.woggioni.gbcs.api;
requires com.googlecode.xmemcached;
requires net.woggioni.jwo;
requires java.xml;
requires kotlin.stdlib;
provides CacheProvider with net.woggioni.gbcs.server.memcached.MemcachedCacheProvider;
opens net.woggioni.gbcs.server.memcached.schema;
}

View File

@@ -1,59 +0,0 @@
package net.woggioni.gbcs.server.memcached
import net.rubyeye.xmemcached.XMemcachedClientBuilder
import net.rubyeye.xmemcached.command.BinaryCommandFactory
import net.rubyeye.xmemcached.transcoders.CompressionMode
import net.rubyeye.xmemcached.transcoders.SerializingTranscoder
import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.api.exception.ContentTooLargeException
import net.woggioni.gbcs.common.HostAndPort
import net.woggioni.jwo.JWO
import java.io.ByteArrayInputStream
import java.net.InetSocketAddress
import java.nio.channels.Channels
import java.nio.channels.ReadableByteChannel
import java.nio.charset.StandardCharsets
import java.security.MessageDigest
import java.time.Duration
class MemcachedCache(
servers: List<HostAndPort>,
private val maxAge: Duration,
maxSize : Int,
digestAlgorithm: String?,
compressionMode: CompressionMode,
) : Cache {
private val memcachedClient = XMemcachedClientBuilder(
servers.stream().map { addr: HostAndPort -> InetSocketAddress(addr.host, addr.port) }.toList()
).apply {
commandFactory = BinaryCommandFactory()
digestAlgorithm?.let { dAlg ->
setKeyProvider { key ->
val md = MessageDigest.getInstance(dAlg)
md.update(key.toByteArray(StandardCharsets.UTF_8))
JWO.bytesToHex(md.digest())
}
}
transcoder = SerializingTranscoder(maxSize).apply {
setCompressionMode(compressionMode)
}
}.build()
override fun get(key: String): ReadableByteChannel? {
return memcachedClient.get<ByteArray>(key)
?.let(::ByteArrayInputStream)
?.let(Channels::newChannel)
}
override fun put(key: String, content: ByteArray) {
try {
memcachedClient[key, maxAge.toSeconds().toInt()] = content
} catch (e: IllegalArgumentException) {
throw ContentTooLargeException(e.message, e)
}
}
override fun close() {
memcachedClient.shutdown()
}
}

View File

@@ -1,26 +0,0 @@
package net.woggioni.gbcs.server.memcached
import net.rubyeye.xmemcached.transcoders.CompressionMode
import net.woggioni.gbcs.api.Configuration
import net.woggioni.gbcs.common.HostAndPort
import java.time.Duration
data class MemcachedCacheConfiguration(
var servers: List<HostAndPort>,
var maxAge: Duration = Duration.ofDays(1),
var maxSize: Int = 0x100000,
var digestAlgorithm: String? = null,
var compressionMode: CompressionMode = CompressionMode.ZIP,
) : Configuration.Cache {
override fun materialize() = MemcachedCache(
servers,
maxAge,
maxSize,
digestAlgorithm,
compressionMode
)
override fun getNamespaceURI() = "urn:net.woggioni.gbcs.server.memcached"
override fun getTypeName() = "memcachedCacheType"
}

View File

@@ -1 +0,0 @@
net.woggioni.gbcs.server.memcached.MemcachedCacheProvider

View File

@@ -19,7 +19,7 @@ dependencies {
testImplementation catalog.bcprov.jdk18on testImplementation catalog.bcprov.jdk18on
testImplementation catalog.bcpkix.jdk18on testImplementation catalog.bcpkix.jdk18on
testRuntimeOnly project(":gbcs-server-memcached") testRuntimeOnly project(":gbcs-server-memcache")
} }
test { test {

View File

@@ -400,9 +400,9 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
fun run(): ServerHandle { fun run(): ServerHandle {
// Create the multithreaded event loops for the server // Create the multithreaded event loops for the server
val bossGroup = NioEventLoopGroup(0) val bossGroup = NioEventLoopGroup(1)
val serverSocketChannel = NioServerSocketChannel::class.java val serverSocketChannel = NioServerSocketChannel::class.java
val workerGroup = bossGroup val workerGroup = NioEventLoopGroup(0)
val eventExecutorGroup = run { val eventExecutorGroup = run {
val threadFactory = if (cfg.eventExecutor.isUseVirtualThreads) { val threadFactory = if (cfg.eventExecutor.isUseVirtualThreads) {
Thread.ofVirtual().factory() Thread.ofVirtual().factory()

View File

@@ -1,8 +1,11 @@
package net.woggioni.gbcs.server.cache package net.woggioni.gbcs.server.cache
import io.netty.buffer.ByteBuf
import net.woggioni.gbcs.api.Cache import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.common.ByteBufInputStream
import net.woggioni.gbcs.common.GBCS.digestString import net.woggioni.gbcs.common.GBCS.digestString
import net.woggioni.gbcs.common.contextLogger import net.woggioni.gbcs.common.contextLogger
import net.woggioni.jwo.JWO
import net.woggioni.jwo.LockFile import net.woggioni.jwo.LockFile
import java.nio.channels.Channels import java.nio.channels.Channels
import java.nio.channels.FileChannel import java.nio.channels.FileChannel
@@ -14,6 +17,7 @@ import java.nio.file.attribute.BasicFileAttributes
import java.security.MessageDigest import java.security.MessageDigest
import java.time.Duration import java.time.Duration
import java.time.Instant import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import java.util.zip.Deflater import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream import java.util.zip.DeflaterOutputStream
@@ -28,7 +32,10 @@ class FileSystemCache(
val compressionLevel: Int val compressionLevel: Int
) : Cache { ) : Cache {
private val log = contextLogger() private companion object {
@JvmStatic
private val log = contextLogger()
}
init { init {
Files.createDirectories(root) Files.createDirectories(root)
@@ -62,10 +69,12 @@ class FileSystemCache(
} }
}.also { }.also {
gc() gc()
}.let {
CompletableFuture.completedFuture(it)
} }
} }
override fun put(key: String, content: ByteArray) { override fun put(key: String, content: ByteBuf): CompletableFuture<Void> {
(digestAlgorithm (digestAlgorithm
?.let(MessageDigest::getInstance) ?.let(MessageDigest::getInstance)
?.let { md -> ?.let { md ->
@@ -82,7 +91,7 @@ class FileSystemCache(
it it
} }
}.use { }.use {
it.write(content) JWO.copy(ByteBufInputStream(content), it)
} }
Files.move(tmpFile, file, StandardCopyOption.ATOMIC_MOVE) Files.move(tmpFile, file, StandardCopyOption.ATOMIC_MOVE)
} catch (t: Throwable) { } catch (t: Throwable) {
@@ -92,6 +101,7 @@ class FileSystemCache(
}.also { }.also {
gc() gc()
} }
return CompletableFuture.completedFuture(null)
} }
private fun gc() { private fun gc() {

View File

@@ -1,13 +1,17 @@
package net.woggioni.gbcs.server.cache package net.woggioni.gbcs.server.cache
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import net.woggioni.gbcs.api.Cache import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.common.ByteBufInputStream
import net.woggioni.gbcs.common.GBCS.digestString import net.woggioni.gbcs.common.GBCS.digestString
import java.io.ByteArrayInputStream import net.woggioni.jwo.JWO
import java.io.ByteArrayOutputStream import java.io.ByteArrayOutputStream
import java.nio.channels.Channels import java.nio.channels.Channels
import java.security.MessageDigest import java.security.MessageDigest
import java.time.Duration import java.time.Duration
import java.time.Instant import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.PriorityBlockingQueue import java.util.concurrent.PriorityBlockingQueue
import java.util.zip.Deflater import java.util.zip.Deflater
@@ -22,9 +26,9 @@ class InMemoryCache(
val compressionLevel: Int val compressionLevel: Int
) : Cache { ) : Cache {
private val map = ConcurrentHashMap<String, ByteArray>() private val map = ConcurrentHashMap<String, ByteBuf>()
private class RemovalQueueElement(val key: String, val value : ByteArray, val expiry : Instant) : Comparable<RemovalQueueElement> { private class RemovalQueueElement(val key: String, val value : ByteBuf, val expiry : Instant) : Comparable<RemovalQueueElement> {
override fun compareTo(other: RemovalQueueElement) = expiry.compareTo(other.expiry) override fun compareTo(other: RemovalQueueElement) = expiry.compareTo(other.expiry)
} }
@@ -62,14 +66,16 @@ class InMemoryCache(
?.let { value -> ?.let { value ->
if (compressionEnabled) { if (compressionEnabled) {
val inflater = Inflater() val inflater = Inflater()
Channels.newChannel(InflaterInputStream(ByteArrayInputStream(value), inflater)) Channels.newChannel(InflaterInputStream(ByteBufInputStream(value), inflater))
} else { } else {
Channels.newChannel(ByteArrayInputStream(value)) Channels.newChannel(ByteBufInputStream(value))
} }
} }
}.let {
CompletableFuture.completedFuture(it)
} }
override fun put(key: String, content: ByteArray) { override fun put(key: String, content: ByteBuf) =
(digestAlgorithm (digestAlgorithm
?.let(MessageDigest::getInstance) ?.let(MessageDigest::getInstance)
?.let { md -> ?.let { md ->
@@ -79,14 +85,15 @@ class InMemoryCache(
val deflater = Deflater(compressionLevel) val deflater = Deflater(compressionLevel)
val baos = ByteArrayOutputStream() val baos = ByteArrayOutputStream()
DeflaterOutputStream(baos, deflater).use { stream -> DeflaterOutputStream(baos, deflater).use { stream ->
stream.write(content) JWO.copy(ByteBufInputStream(content), stream)
} }
baos.toByteArray() Unpooled.wrappedBuffer(baos.toByteArray())
} else { } else {
content content
} }
map[digest] = value map[digest] = value
removalQueue.put(RemovalQueueElement(digest, value, Instant.now().plus(maxAge))) removalQueue.put(RemovalQueueElement(digest, value, Instant.now().plus(maxAge)))
}.let {
CompletableFuture.completedFuture<Void>(null)
} }
}
} }

View File

@@ -6,7 +6,6 @@ import net.woggioni.gbcs.common.Xml
import net.woggioni.gbcs.common.Xml.Companion.renderAttribute import net.woggioni.gbcs.common.Xml.Companion.renderAttribute
import org.w3c.dom.Document import org.w3c.dom.Document
import org.w3c.dom.Element import org.w3c.dom.Element
import java.nio.file.Path
import java.time.Duration import java.time.Duration
import java.util.zip.Deflater import java.util.zip.Deflater

View File

@@ -265,7 +265,8 @@ object Parser {
}.map { el -> }.map { el ->
val groupName = el.renderAttribute("name") ?: throw ConfigurationException("Group name is required") val groupName = el.renderAttribute("name") ?: throw ConfigurationException("Group name is required")
var roles = emptySet<Role>() var roles = emptySet<Role>()
var quota: Configuration.Quota? = null var userQuota: Configuration.Quota? = null
var groupQuota: Configuration.Quota? = null
for (child in el.asIterable()) { for (child in el.asIterable()) {
when (child.localName) { when (child.localName) {
"users" -> { "users" -> {
@@ -279,12 +280,15 @@ object Parser {
"roles" -> { "roles" -> {
roles = parseRoles(child) roles = parseRoles(child)
} }
"quota" -> { "group-quota" -> {
quota = parseQuota(child) userQuota = parseQuota(child)
}
"user-quota" -> {
groupQuota = parseQuota(child)
} }
} }
} }
groupName to Group(groupName, roles, quota) groupName to Group(groupName, roles, userQuota, groupQuota)
}.toMap() }.toMap()
val users = knownUsersMap.map { (name, user) -> val users = knownUsersMap.map { (name, user) ->
name to User(name, user.password, userGroups[name]?.mapNotNull { groups[it] }?.toSet() ?: emptySet(), user.quota) name to User(name, user.password, userGroups[name]?.mapNotNull { groups[it] }?.toSet() ?: emptySet(), user.quota)

View File

@@ -8,8 +8,14 @@ import org.w3c.dom.Document
object Serializer { object Serializer {
fun serialize(conf : Configuration) : Document { private fun Xml.serializeQuota(quota : Configuration.Quota) {
attr("calls", quota.calls.toString())
attr("period", quota.period.toString())
attr("max-available-calls", quota.maxAvailableCalls.toString())
attr("initial-available-calls", quota.initialAvailableCalls.toString())
}
fun serialize(conf : Configuration) : Document {
val schemaLocations = CacheSerializers.index.values.asSequence().map { val schemaLocations = CacheSerializers.index.values.asSequence().map {
it.xmlNamespace to it.xmlSchemaLocation it.xmlNamespace to it.xmlSchemaLocation
}.toMap() }.toMap()
@@ -56,10 +62,7 @@ object Serializer {
} }
user.quota?.let { quota -> user.quota?.let { quota ->
node("quota") { node("quota") {
attr("calls", quota.calls.toString()) serializeQuota(quota)
attr("period", quota.period.toString())
attr("max-available-calls", quota.maxAvailableCalls.toString())
attr("initial-available-calls", quota.initialAvailableCalls.toString())
} }
} }
} }
@@ -70,10 +73,7 @@ object Serializer {
anonymousUser.quota?.let { quota -> anonymousUser.quota?.let { quota ->
node("anonymous") { node("anonymous") {
node("quota") { node("quota") {
attr("calls", quota.calls.toString()) serializeQuota(quota)
attr("period", quota.period.toString())
attr("max-available-calls", quota.maxAvailableCalls.toString())
attr("initial-available-calls", quota.initialAvailableCalls.toString())
} }
} }
} }
@@ -113,12 +113,14 @@ object Serializer {
} }
} }
} }
group.quota?.let { quota -> group.userQuota?.let { quota ->
node("quota") { node("user-quota") {
attr("calls", quota.calls.toString()) serializeQuota(quota)
attr("period", quota.period.toString()) }
attr("max-available-calls", quota.maxAvailableCalls.toString()) }
attr("initial-available-calls", quota.initialAvailableCalls.toString()) group.groupQuota?.let { quota ->
node("group-quota") {
serializeQuota(quota)
} }
} }
} }

View File

@@ -17,7 +17,6 @@ import io.netty.handler.codec.http.HttpUtil
import io.netty.handler.codec.http.LastHttpContent import io.netty.handler.codec.http.LastHttpContent
import io.netty.handler.stream.ChunkedNioStream import io.netty.handler.stream.ChunkedNioStream
import net.woggioni.gbcs.api.Cache import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.api.exception.CacheException
import net.woggioni.gbcs.common.contextLogger import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.server.debug import net.woggioni.gbcs.server.debug
import net.woggioni.gbcs.server.warn import net.woggioni.gbcs.server.warn
@@ -43,49 +42,41 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
return return
} }
if (serverPrefix == prefix) { if (serverPrefix == prefix) {
try { cache.get(key).thenApply { channel ->
cache.get(key) if(channel != null) {
} catch(ex : Throwable) { log.debug(ctx) {
throw CacheException("Error accessing the cache backend", ex) "Cache hit for key '$key'"
}?.let { channel ->
log.debug(ctx) {
"Cache hit for key '$key'"
}
val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK)
response.headers()[HttpHeaderNames.CONTENT_TYPE] = HttpHeaderValues.APPLICATION_OCTET_STREAM
if (!keepAlive) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.IDENTITY)
} else {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
}
ctx.write(response)
when (channel) {
is FileChannel -> {
if (keepAlive) {
ctx.write(DefaultFileRegion(channel, 0, channel.size()))
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate())
} else {
ctx.writeAndFlush(DefaultFileRegion(channel, 0, channel.size()))
.addListener(ChannelFutureListener.CLOSE)
}
} }
else -> { val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK)
ctx.write(ChunkedNioStream(channel)).addListener { evt -> response.headers()[HttpHeaderNames.CONTENT_TYPE] = HttpHeaderValues.APPLICATION_OCTET_STREAM
channel.close() if (!keepAlive) {
} response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.IDENTITY)
} else {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
}
ctx.write(response)
val content : Any = when (channel) {
is FileChannel -> DefaultFileRegion(channel, 0, channel.size())
else -> ChunkedNioStream(channel)
}
if (keepAlive) {
ctx.write(content)
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate()) ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate())
} else {
ctx.writeAndFlush(content)
.addListener(ChannelFutureListener.CLOSE)
} }
} else {
log.debug(ctx) {
"Cache miss for key '$key'"
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response)
} }
} ?: let { }.whenComplete { _, ex -> ex?.let(ctx::fireExceptionCaught) }
log.debug(ctx) {
"Cache miss for key '$key'"
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response)
}
} else { } else {
log.warn(ctx) { log.warn(ctx) {
"Got request for unhandled path '${msg.uri()}'" "Got request for unhandled path '${msg.uri()}'"
@@ -103,26 +94,16 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
log.debug(ctx) { log.debug(ctx) {
"Added value for key '$key' to build cache" "Added value for key '$key' to build cache"
} }
val bodyBytes = msg.content().run { cache.put(key, msg.content().retain()).thenRun {
if (isDirect) { val response = DefaultFullHttpResponse(
ByteArray(readableBytes()).also { msg.protocolVersion(), HttpResponseStatus.CREATED,
readBytes(it) Unpooled.copiedBuffer(key.toByteArray())
} )
} else { response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes()
array() ctx.writeAndFlush(response)
} }.whenComplete { _, ex ->
ctx.fireExceptionCaught(ex)
} }
try {
cache.put(key, bodyBytes)
} catch(ex : Throwable) {
throw CacheException("Error accessing the cache backend", ex)
}
val response = DefaultFullHttpResponse(
msg.protocolVersion(), HttpResponseStatus.CREATED,
Unpooled.copiedBuffer(key.toByteArray())
)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes()
ctx.writeAndFlush(response)
} else { } else {
log.warn(ctx) { log.warn(ctx) {
"Got request for unhandled path '${msg.uri()}'" "Got request for unhandled path '${msg.uri()}'"

View File

@@ -8,7 +8,7 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.function.Function import java.util.function.Function
class BucketManager private constructor( class BucketManager private constructor(
private val bucketsByUser: Map<Configuration.User, Bucket> = HashMap(), private val bucketsByUser: Map<Configuration.User, List<Bucket>> = HashMap(),
private val bucketsByGroup: Map<Configuration.Group, Bucket> = HashMap(), private val bucketsByGroup: Map<Configuration.Group, Bucket> = HashMap(),
loader: Function<InetSocketAddress, Bucket>? loader: Function<InetSocketAddress, Bucket>?
) { ) {
@@ -43,22 +43,27 @@ class BucketManager private constructor(
companion object { companion object {
fun from(cfg : Configuration) : BucketManager { fun from(cfg : Configuration) : BucketManager {
val bucketsByUser = cfg.users.values.asSequence().filter { val bucketsByUser = cfg.users.values.asSequence().map { user ->
it.quota != null val buckets = (
}.map { user -> user.quota
val quota = user.quota ?.let { quota ->
val bucket = Bucket.local( sequenceOf(quota)
quota.maxAvailableCalls, } ?: user.groups.asSequence()
quota.calls, .mapNotNull(Configuration.Group::getUserQuota)
quota.period, ).map { quota ->
quota.initialAvailableCalls Bucket.local(
) quota.maxAvailableCalls,
user to bucket quota.calls,
quota.period,
quota.initialAvailableCalls
)
}.toList()
user to buckets
}.toMap() }.toMap()
val bucketsByGroup = cfg.groups.values.asSequence().filter { val bucketsByGroup = cfg.groups.values.asSequence().filter {
it.quota != null it.groupQuota != null
}.map { group -> }.map { group ->
val quota = group.quota val quota = group.groupQuota
val bucket = Bucket.local( val bucket = Bucket.local(
quota.maxAvailableCalls, quota.maxAvailableCalls,
quota.calls, quota.calls,

View File

@@ -42,7 +42,7 @@ class ThrottlingHandler(cfg: Configuration) :
val buckets = mutableListOf<Bucket>() val buckets = mutableListOf<Bucket>()
val user = ctx.channel().attr(GradleBuildCacheServer.userAttribute).get() val user = ctx.channel().attr(GradleBuildCacheServer.userAttribute).get()
if (user != null) { if (user != null) {
bucketManager.getBucketByUser(user)?.let(buckets::add) bucketManager.getBucketByUser(user)?.let(buckets::addAll)
} }
val groups = ctx.channel().attr(GradleBuildCacheServer.groupAttribute).get() ?: emptySet() val groups = ctx.channel().attr(GradleBuildCacheServer.groupAttribute).get() ?: emptySet()
if (groups.isNotEmpty()) { if (groups.isNotEmpty()) {

View File

@@ -146,7 +146,8 @@
</xs:unique> </xs:unique>
</xs:element> </xs:element>
<xs:element name="roles" type="gbcs:rolesType" maxOccurs="1" minOccurs="0"/> <xs:element name="roles" type="gbcs:rolesType" maxOccurs="1" minOccurs="0"/>
<xs:element name="quota" type="gbcs:quotaType" minOccurs="0" maxOccurs="1"/> <xs:element name="user-quota" type="gbcs:quotaType" minOccurs="0" maxOccurs="1"/>
<xs:element name="group-quota" type="gbcs:quotaType" minOccurs="0" maxOccurs="1"/>
</xs:sequence> </xs:sequence>
<xs:attribute name="name" type="xs:token"/> <xs:attribute name="name" type="xs:token"/>
</xs:complexType> </xs:complexType>

View File

@@ -24,8 +24,8 @@ abstract class AbstractBasicAuthServerTest : AbstractServerTest() {
protected val random = Random(101325) protected val random = Random(101325)
protected val keyValuePair = newEntry(random) protected val keyValuePair = newEntry(random)
protected val serverPath = "gbcs" protected val serverPath = "gbcs"
protected val readersGroup = Configuration.Group("readers", setOf(Role.Reader), null) protected val readersGroup = Configuration.Group("readers", setOf(Role.Reader), null, null)
protected val writersGroup = Configuration.Group("writers", setOf(Role.Writer), null) protected val writersGroup = Configuration.Group("writers", setOf(Role.Writer), null, null)
abstract protected val users : List<Configuration.User> abstract protected val users : List<Configuration.User>

View File

@@ -1,7 +1,7 @@
package net.woggioni.gbcs.server.test package net.woggioni.gbcs.server.test
import net.woggioni.gbcs.server.GradleBuildCacheServer
import net.woggioni.gbcs.api.Configuration import net.woggioni.gbcs.api.Configuration
import net.woggioni.gbcs.server.GradleBuildCacheServer
import org.junit.jupiter.api.AfterAll import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.BeforeAll import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.MethodOrderer import org.junit.jupiter.api.MethodOrderer

View File

@@ -4,7 +4,6 @@ import net.woggioni.gbcs.api.Configuration
import net.woggioni.gbcs.api.Role import net.woggioni.gbcs.api.Role
import net.woggioni.gbcs.common.Xml import net.woggioni.gbcs.common.Xml
import net.woggioni.gbcs.server.cache.FileSystemCacheConfiguration import net.woggioni.gbcs.server.cache.FileSystemCacheConfiguration
import net.woggioni.gbcs.server.cache.InMemoryCacheConfiguration
import net.woggioni.gbcs.server.configuration.Serializer import net.woggioni.gbcs.server.configuration.Serializer
import net.woggioni.gbcs.server.test.utils.CertificateUtils import net.woggioni.gbcs.server.test.utils.CertificateUtils
import net.woggioni.gbcs.server.test.utils.CertificateUtils.X509Credentials import net.woggioni.gbcs.server.test.utils.CertificateUtils.X509Credentials
@@ -46,8 +45,8 @@ abstract class AbstractTlsServerTest : AbstractServerTest() {
private lateinit var trustStore: KeyStore private lateinit var trustStore: KeyStore
protected lateinit var ca: X509Credentials protected lateinit var ca: X509Credentials
protected val readersGroup = Configuration.Group("readers", setOf(Role.Reader), null) protected val readersGroup = Configuration.Group("readers", setOf(Role.Reader), null, null)
protected val writersGroup = Configuration.Group("writers", setOf(Role.Writer), null) protected val writersGroup = Configuration.Group("writers", setOf(Role.Writer), null, null)
protected val random = Random(101325) protected val random = Random(101325)
protected val keyValuePair = newEntry(random) protected val keyValuePair = newEntry(random)
private val serverPath : String? = null private val serverPath : String? = null

View File

@@ -3,7 +3,6 @@ package net.woggioni.gbcs.server.test
import io.netty.handler.codec.http.HttpResponseStatus import io.netty.handler.codec.http.HttpResponseStatus
import net.woggioni.gbcs.api.Configuration import net.woggioni.gbcs.api.Configuration
import net.woggioni.gbcs.common.Xml import net.woggioni.gbcs.common.Xml
import net.woggioni.gbcs.server.cache.FileSystemCacheConfiguration
import net.woggioni.gbcs.server.cache.InMemoryCacheConfiguration import net.woggioni.gbcs.server.cache.InMemoryCacheConfiguration
import net.woggioni.gbcs.server.configuration.Serializer import net.woggioni.gbcs.server.configuration.Serializer
import net.woggioni.gbcs.server.test.utils.NetworkUtils import net.woggioni.gbcs.server.test.utils.NetworkUtils

View File

@@ -7,7 +7,6 @@ import org.bouncycastle.asn1.x500.X500Name
import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Order import org.junit.jupiter.api.Order
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import org.junit.jupiter.params.provider.ArgumentsSource
import java.net.http.HttpClient import java.net.http.HttpClient
import java.net.http.HttpRequest import java.net.http.HttpRequest
import java.net.http.HttpResponse import java.net.http.HttpResponse

View File

@@ -1,8 +1,8 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?> <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<gbcs:server xmlns:xs="http://www.w3.org/2001/XMLSchema-instance" <gbcs:server xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
xmlns:gbcs="urn:net.woggioni.gbcs.server" xmlns:gbcs="urn:net.woggioni.gbcs.server"
xmlns:gbcs-memcached="urn:net.woggioni.gbcs.server.memcached" xmlns:gbcs-memcache="urn:net.woggioni.gbcs.server.memcache"
xs:schemaLocation="urn:net.woggioni.gbcs.server.memcached jpms://net.woggioni.gbcs.server.memcached/net/woggioni/gbcs/server/memcached/schema/gbcs-memcached.xsd urn:net.woggioni.gbcs.server jpms://net.woggioni.gbcs.server/net/woggioni/gbcs/server/schema/gbcs.xsd" xs:schemaLocation="urn:net.woggioni.gbcs.server.memcache jpms://net.woggioni.gbcs.server.memcache/net/woggioni/gbcs/server/memcache/schema/gbcs-memcache.xsd urn:net.woggioni.gbcs.server jpms://net.woggioni.gbcs.server/net/woggioni/gbcs/server/schema/gbcs.xsd"
> >
<bind host="0.0.0.0" port="8443" incoming-connections-backlog-size="4096"/> <bind host="0.0.0.0" port="8443" incoming-connections-backlog-size="4096"/>
<connection <connection
@@ -13,7 +13,7 @@
read-timeout="PT5M" read-timeout="PT5M"
write-timeout="PT5M"/> write-timeout="PT5M"/>
<event-executor use-virtual-threads="true"/> <event-executor use-virtual-threads="true"/>
<cache xs:type="gbcs-memcached:memcachedCacheType" max-age="P7D" max-size="16777216" compression-mode="zip"> <cache xs:type="gbcs-memcache:memcacheCacheType" max-age="P7D" max-size="16777216" compression-mode="deflate">
<server host="memcached" port="11211"/> <server host="memcached" port="11211"/>
</cache> </cache>
<authorization> <authorization>

View File

@@ -1,8 +1,8 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?> <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<gbcs:server xmlns:xs="http://www.w3.org/2001/XMLSchema-instance" <gbcs:server xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
xmlns:gbcs="urn:net.woggioni.gbcs.server" xmlns:gbcs="urn:net.woggioni.gbcs.server"
xmlns:gbcs-memcached="urn:net.woggioni.gbcs.server.memcached" xmlns:gbcs-memcache="urn:net.woggioni.gbcs.server.memcache"
xs:schemaLocation="urn:net.woggioni.gbcs.server.memcached jpms://net.woggioni.gbcs.server.memcached/net/woggioni/gbcs/server/memcached/schema/gbcs-memcached.xsd urn:net.woggioni.gbcs.server jpms://net.woggioni.gbcs.server/net/woggioni/gbcs/server/schema/gbcs.xsd"> xs:schemaLocation="urn:net.woggioni.gbcs.server.memcache jpms://net.woggioni.gbcs.server.memcache/net/woggioni/gbcs/server/memcache/schema/gbcs-memcache.xsd urn:net.woggioni.gbcs.server jpms://net.woggioni.gbcs.server/net/woggioni/gbcs/server/schema/gbcs.xsd">
<bind host="127.0.0.1" port="11443" incoming-connections-backlog-size="50"/> <bind host="127.0.0.1" port="11443" incoming-connections-backlog-size="50"/>
<connection <connection
write-timeout="PT25M" write-timeout="PT25M"
@@ -12,8 +12,8 @@
idle-timeout="PT30M" idle-timeout="PT30M"
max-request-size="101325"/> max-request-size="101325"/>
<event-executor use-virtual-threads="false"/> <event-executor use-virtual-threads="false"/>
<cache xs:type="gbcs-memcached:memcachedCacheType" max-age="P7D" max-size="101325" digest="SHA-256"> <cache xs:type="gbcs-memcache:memcacheCacheType" max-age="P7D" max-size="101325" digest="SHA-256">
<server host="127.0.0.1" port="11211"/> <server host="127.0.0.1" port="11211" max-connections="10" connection-timeout="PT20S"/>
</cache> </cache>
<authentication> <authentication>
<none/> <none/>

View File

@@ -32,7 +32,8 @@
<roles> <roles>
<reader/> <reader/>
</roles> </roles>
<quota calls="10" period="PT1S"/> <user-quota calls="30" period="PT1M"/>
<group-quota calls="10" period="PT1S"/>
</group> </group>
<group name="writers"> <group name="writers">
<users> <users>
@@ -50,7 +51,7 @@
<reader/> <reader/>
<writer/> <writer/>
</roles> </roles>
<quota calls="1000" period="P1D"/> <group-quota calls="1000" period="P1D"/>
</group> </group>
</groups> </groups>
</authorization> </authorization>

View File

@@ -2,7 +2,7 @@ org.gradle.configuration-cache=false
org.gradle.parallel=true org.gradle.parallel=true
org.gradle.caching=true org.gradle.caching=true
gbcs.version = 0.1.1 gbcs.version = 0.1.2
lys.version = 2025.01.31 lys.version = 2025.01.31

View File

@@ -27,7 +27,7 @@ rootProject.name = 'gbcs'
include 'gbcs-api' include 'gbcs-api'
include 'gbcs-common' include 'gbcs-common'
include 'gbcs-server-memcached' include 'gbcs-server-memcache'
include 'gbcs-cli' include 'gbcs-cli'
include 'docker' include 'docker'
include 'gbcs-client' include 'gbcs-client'