diff --git a/.gitea/workflows/build.yaml b/.gitea/workflows/build.yaml index f268189..fc4b68d 100644 --- a/.gitea/workflows/build.yaml +++ b/.gitea/workflows/build.yaml @@ -44,7 +44,7 @@ jobs: target: release cache-from: type=registry,ref=gitea.woggioni.net/woggioni/gbcs:buildx - - name: Build gbcs memcached Docker image + name: Build gbcs memcache Docker image uses: docker/build-push-action@v5.3.0 with: context: "docker/build/docker" @@ -52,9 +52,9 @@ jobs: push: true pull: true tags: | - gitea.woggioni.net/woggioni/gbcs:memcached - gitea.woggioni.net/woggioni/gbcs:memcached-${{ steps.retrieve-version.outputs.VERSION }} - target: release-memcached + gitea.woggioni.net/woggioni/gbcs:memcache + gitea.woggioni.net/woggioni/gbcs:memcache-${{ steps.retrieve-version.outputs.VERSION }} + target: release-memcache cache-from: type=registry,ref=gitea.woggioni.net/woggioni/gbcs:buildx cache-to: type=registry,mode=max,compression=zstd,image-manifest=true,oci-mediatypes=true,ref=gitea.woggioni.net/woggioni/gbcs:buildx - name: Publish artifacts diff --git a/docker/Dockerfile b/docker/Dockerfile index f9e0c02..d70edd3 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -7,10 +7,10 @@ FROM base-release AS release ADD gbcs-cli-envelope-*.jar gbcs.jar ENTRYPOINT ["java", "-jar", "/home/luser/gbcs.jar", "server"] -FROM base-release AS release-memcached +FROM base-release AS release-memcache ADD --chown=luser:luser gbcs-cli-envelope-*.jar gbcs.jar RUN mkdir plugins WORKDIR /home/luser/plugins -RUN --mount=type=bind,source=.,target=/build/distributions tar -xf /build/distributions/gbcs-server-memcached*.tar +RUN --mount=type=bind,source=.,target=/build/distributions tar -xf /build/distributions/gbcs-server-memcache*.tar WORKDIR /home/luser ENTRYPOINT ["java", "-jar", "/home/luser/gbcs.jar", "server"] diff --git a/docker/build.gradle b/docker/build.gradle index abc4ad2..d431354 100644 --- a/docker/build.gradle +++ b/docker/build.gradle @@ -19,7 +19,7 @@ configurations { dependencies { docker project(path: ':gbcs-cli', configuration: 'release') - docker project(path: ':gbcs-server-memcached', configuration: 'release') + docker project(path: ':gbcs-server-memcache', configuration: 'release') } Provider cleanTaskProvider = tasks.named(BasePlugin.CLEAN_TASK_NAME) {} @@ -46,22 +46,22 @@ Provider dockerTag = tasks.register('dockerTagImage', DockerTagI tag = version } -Provider dockerTagMemcached = tasks.register('dockerTagMemcachedImage', DockerTagImage) { +Provider dockerTagMemcache = tasks.register('dockerTagMemcacheImage', DockerTagImage) { group = 'docker' repository = 'gitea.woggioni.net/woggioni/gbcs' - imageId = 'gitea.woggioni.net/woggioni/gbcs:memcached' - tag = "${version}-memcached" + imageId = 'gitea.woggioni.net/woggioni/gbcs:memcache' + tag = "${version}-memcache" } Provider dockerPush = tasks.register('dockerPushImage', DockerPushImage) { group = 'docker' - dependsOn dockerTag, dockerTagMemcached + dependsOn dockerTag, dockerTagMemcache registryCredentials { url = getProperty('docker.registry.url') username = 'woggioni' password = System.getenv().get("PUBLISHER_TOKEN") } - images = [dockerTag.flatMap{ it.tag }, dockerTagMemcached.flatMap{ it.tag }] + images = [dockerTag.flatMap{ it.tag }, dockerTagMemcache.flatMap{ it.tag }] } diff --git a/gbcs-api/build.gradle b/gbcs-api/build.gradle index 516de05..ac6a484 100644 --- a/gbcs-api/build.gradle +++ b/gbcs-api/build.gradle @@ -5,6 +5,7 @@ plugins { } dependencies { + api catalog.netty.buffer } publishing { diff --git a/gbcs-api/src/main/java/module-info.java b/gbcs-api/src/main/java/module-info.java index e825602..63ae1e7 100644 --- a/gbcs-api/src/main/java/module-info.java +++ b/gbcs-api/src/main/java/module-info.java @@ -1,6 +1,7 @@ module net.woggioni.gbcs.api { requires static lombok; requires java.xml; + requires io.netty.buffer; exports net.woggioni.gbcs.api; exports net.woggioni.gbcs.api.exception; } \ No newline at end of file diff --git a/gbcs-api/src/main/java/net/woggioni/gbcs/api/Cache.java b/gbcs-api/src/main/java/net/woggioni/gbcs/api/Cache.java index c6018b0..b9427c5 100644 --- a/gbcs-api/src/main/java/net/woggioni/gbcs/api/Cache.java +++ b/gbcs-api/src/main/java/net/woggioni/gbcs/api/Cache.java @@ -1,12 +1,14 @@ package net.woggioni.gbcs.api; +import io.netty.buffer.ByteBuf; import net.woggioni.gbcs.api.exception.ContentTooLargeException; import java.nio.channels.ReadableByteChannel; +import java.util.concurrent.CompletableFuture; public interface Cache extends AutoCloseable { - ReadableByteChannel get(String key); + CompletableFuture get(String key); - void put(String key, byte[] content) throws ContentTooLargeException; + CompletableFuture put(String key, ByteBuf content) throws ContentTooLargeException; } diff --git a/gbcs-api/src/main/java/net/woggioni/gbcs/api/Configuration.java b/gbcs-api/src/main/java/net/woggioni/gbcs/api/Configuration.java index 84b27c6..b09522a 100644 --- a/gbcs-api/src/main/java/net/woggioni/gbcs/api/Configuration.java +++ b/gbcs-api/src/main/java/net/woggioni/gbcs/api/Configuration.java @@ -56,7 +56,8 @@ public class Configuration { @EqualsAndHashCode.Include String name; Set roles; - Quota quota; + Quota groupQuota; + Quota userQuota; } @Value diff --git a/gbcs-cli/src/main/kotlin/net/woggioni/gbcs/cli/GradleBuildCacheServerCli.kt b/gbcs-cli/src/main/kotlin/net/woggioni/gbcs/cli/GradleBuildCacheServerCli.kt index 7f099cd..aa0afad 100644 --- a/gbcs-cli/src/main/kotlin/net/woggioni/gbcs/cli/GradleBuildCacheServerCli.kt +++ b/gbcs-cli/src/main/kotlin/net/woggioni/gbcs/cli/GradleBuildCacheServerCli.kt @@ -1,7 +1,5 @@ package net.woggioni.gbcs.cli -import net.woggioni.gbcs.common.GbcsUrlStreamHandlerFactory -import net.woggioni.gbcs.common.contextLogger import net.woggioni.gbcs.cli.impl.AbstractVersionProvider import net.woggioni.gbcs.cli.impl.GbcsCommand import net.woggioni.gbcs.cli.impl.commands.BenchmarkCommand @@ -10,10 +8,11 @@ import net.woggioni.gbcs.cli.impl.commands.GetCommand import net.woggioni.gbcs.cli.impl.commands.PasswordHashCommand import net.woggioni.gbcs.cli.impl.commands.PutCommand import net.woggioni.gbcs.cli.impl.commands.ServerCommand +import net.woggioni.gbcs.common.GbcsUrlStreamHandlerFactory +import net.woggioni.gbcs.common.contextLogger import net.woggioni.jwo.Application import picocli.CommandLine import picocli.CommandLine.Model.CommandSpec -import java.net.URI @CommandLine.Command( diff --git a/gbcs-cli/src/main/kotlin/net/woggioni/gbcs/cli/impl/commands/BenchmarkCommand.kt b/gbcs-cli/src/main/kotlin/net/woggioni/gbcs/cli/impl/commands/BenchmarkCommand.kt index 5b7aac1..535d76d 100644 --- a/gbcs-cli/src/main/kotlin/net/woggioni/gbcs/cli/impl/commands/BenchmarkCommand.kt +++ b/gbcs-cli/src/main/kotlin/net/woggioni/gbcs/cli/impl/commands/BenchmarkCommand.kt @@ -33,6 +33,13 @@ class BenchmarkCommand : GbcsCommand() { ) private var numberOfEntries = 1000 + @CommandLine.Option( + names = ["-s", "--size"], + description = ["Size of a cache value in bytes"], + paramLabel = "SIZE" + ) + private var size = 0x1000 + override fun run() { val clientCommand = spec.parent().userObject() as ClientCommand val profile = clientCommand.profileName.let { profileName -> @@ -46,7 +53,7 @@ class BenchmarkCommand : GbcsCommand() { while (true) { val key = JWO.bytesToHex(random.nextBytes(16)) val content = random.nextInt().toByte() - val value = ByteArray(0x1000, { _ -> content }) + val value = ByteArray(size, { _ -> content }) yield(key to value) } } diff --git a/gbcs-cli/src/main/kotlin/net/woggioni/gbcs/cli/impl/commands/GetCommand.kt b/gbcs-cli/src/main/kotlin/net/woggioni/gbcs/cli/impl/commands/GetCommand.kt index aa7048d..1f5f2a2 100644 --- a/gbcs-cli/src/main/kotlin/net/woggioni/gbcs/cli/impl/commands/GetCommand.kt +++ b/gbcs-cli/src/main/kotlin/net/woggioni/gbcs/cli/impl/commands/GetCommand.kt @@ -1,8 +1,8 @@ package net.woggioni.gbcs.cli.impl.commands -import net.woggioni.gbcs.common.contextLogger import net.woggioni.gbcs.cli.impl.GbcsCommand import net.woggioni.gbcs.client.GradleBuildCacheClient +import net.woggioni.gbcs.common.contextLogger import picocli.CommandLine import java.nio.file.Files import java.nio.file.Path diff --git a/gbcs-cli/src/main/kotlin/net/woggioni/gbcs/cli/impl/commands/PasswordHashCommand.kt b/gbcs-cli/src/main/kotlin/net/woggioni/gbcs/cli/impl/commands/PasswordHashCommand.kt index b6b29dc..80a47f6 100644 --- a/gbcs-cli/src/main/kotlin/net/woggioni/gbcs/cli/impl/commands/PasswordHashCommand.kt +++ b/gbcs-cli/src/main/kotlin/net/woggioni/gbcs/cli/impl/commands/PasswordHashCommand.kt @@ -1,8 +1,8 @@ package net.woggioni.gbcs.cli.impl.commands -import net.woggioni.gbcs.common.PasswordSecurity.hashPassword import net.woggioni.gbcs.cli.impl.GbcsCommand import net.woggioni.gbcs.cli.impl.converters.OutputStreamConverter +import net.woggioni.gbcs.common.PasswordSecurity.hashPassword import net.woggioni.jwo.UncloseableOutputStream import picocli.CommandLine import java.io.OutputStream diff --git a/gbcs-cli/src/main/kotlin/net/woggioni/gbcs/cli/impl/commands/PutCommand.kt b/gbcs-cli/src/main/kotlin/net/woggioni/gbcs/cli/impl/commands/PutCommand.kt index f90a016..c2af2d5 100644 --- a/gbcs-cli/src/main/kotlin/net/woggioni/gbcs/cli/impl/commands/PutCommand.kt +++ b/gbcs-cli/src/main/kotlin/net/woggioni/gbcs/cli/impl/commands/PutCommand.kt @@ -1,9 +1,9 @@ package net.woggioni.gbcs.cli.impl.commands -import net.woggioni.gbcs.common.contextLogger import net.woggioni.gbcs.cli.impl.GbcsCommand import net.woggioni.gbcs.cli.impl.converters.InputStreamConverter import net.woggioni.gbcs.client.GradleBuildCacheClient +import net.woggioni.gbcs.common.contextLogger import picocli.CommandLine import java.io.InputStream diff --git a/gbcs-cli/src/main/kotlin/net/woggioni/gbcs/cli/impl/commands/ServerCommand.kt b/gbcs-cli/src/main/kotlin/net/woggioni/gbcs/cli/impl/commands/ServerCommand.kt index 6717620..69bf80b 100644 --- a/gbcs-cli/src/main/kotlin/net/woggioni/gbcs/cli/impl/commands/ServerCommand.kt +++ b/gbcs-cli/src/main/kotlin/net/woggioni/gbcs/cli/impl/commands/ServerCommand.kt @@ -1,12 +1,12 @@ package net.woggioni.gbcs.cli.impl.commands -import net.woggioni.gbcs.server.GradleBuildCacheServer -import net.woggioni.gbcs.server.GradleBuildCacheServer.Companion.DEFAULT_CONFIGURATION_URL import net.woggioni.gbcs.api.Configuration +import net.woggioni.gbcs.cli.impl.GbcsCommand import net.woggioni.gbcs.common.contextLogger import net.woggioni.gbcs.common.debug import net.woggioni.gbcs.common.info -import net.woggioni.gbcs.cli.impl.GbcsCommand +import net.woggioni.gbcs.server.GradleBuildCacheServer +import net.woggioni.gbcs.server.GradleBuildCacheServer.Companion.DEFAULT_CONFIGURATION_URL import net.woggioni.jwo.Application import net.woggioni.jwo.JWO import picocli.CommandLine diff --git a/gbcs-client/src/main/kotlin/net/woggioni/gbcs/client/impl/Parser.kt b/gbcs-client/src/main/kotlin/net/woggioni/gbcs/client/impl/Parser.kt index 88c3bfd..a584c91 100644 --- a/gbcs-client/src/main/kotlin/net/woggioni/gbcs/client/impl/Parser.kt +++ b/gbcs-client/src/main/kotlin/net/woggioni/gbcs/client/impl/Parser.kt @@ -1,9 +1,9 @@ package net.woggioni.gbcs.client.impl import net.woggioni.gbcs.api.exception.ConfigurationException +import net.woggioni.gbcs.client.GradleBuildCacheClient import net.woggioni.gbcs.common.Xml.Companion.asIterable import net.woggioni.gbcs.common.Xml.Companion.renderAttribute -import net.woggioni.gbcs.client.GradleBuildCacheClient import org.w3c.dom.Document import java.net.URI import java.nio.file.Files diff --git a/gbcs-client/src/test/kotlin/net/woggioni/gbcs/client/RetryTest.kt b/gbcs-client/src/test/kotlin/net/woggioni/gbcs/client/RetryTest.kt index 6c2f76f..977cc89 100644 --- a/gbcs-client/src/test/kotlin/net/woggioni/gbcs/client/RetryTest.kt +++ b/gbcs-client/src/test/kotlin/net/woggioni/gbcs/client/RetryTest.kt @@ -4,7 +4,6 @@ import io.netty.util.concurrent.DefaultEventExecutorGroup import io.netty.util.concurrent.EventExecutorGroup import net.woggioni.gbcs.common.contextLogger import org.junit.jupiter.api.Assertions -import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.ExtensionContext import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.Arguments diff --git a/gbcs-common/build.gradle b/gbcs-common/build.gradle index 651b091..afa0b02 100644 --- a/gbcs-common/build.gradle +++ b/gbcs-common/build.gradle @@ -9,6 +9,7 @@ dependencies { implementation project(':gbcs-api') implementation catalog.slf4j.api implementation catalog.jwo + implementation catalog.netty.buffer } publishing { diff --git a/gbcs-common/src/main/java/module-info.java b/gbcs-common/src/main/java/module-info.java index 492b855..9fb6552 100644 --- a/gbcs-common/src/main/java/module-info.java +++ b/gbcs-common/src/main/java/module-info.java @@ -4,6 +4,7 @@ module net.woggioni.gbcs.common { requires org.slf4j; requires kotlin.stdlib; requires net.woggioni.jwo; + requires io.netty.buffer; provides java.net.spi.URLStreamHandlerProvider with net.woggioni.gbcs.common.GbcsUrlStreamHandlerFactory; exports net.woggioni.gbcs.common; diff --git a/gbcs-common/src/main/kotlin/net/woggioni/gbcs/common/ByteBufInputStream.kt b/gbcs-common/src/main/kotlin/net/woggioni/gbcs/common/ByteBufInputStream.kt new file mode 100644 index 0000000..1538da3 --- /dev/null +++ b/gbcs-common/src/main/kotlin/net/woggioni/gbcs/common/ByteBufInputStream.kt @@ -0,0 +1,25 @@ +package net.woggioni.gbcs.common + +import io.netty.buffer.ByteBuf +import java.io.InputStream + +class ByteBufInputStream(private val buf : ByteBuf) : InputStream() { + override fun read(): Int { + return buf.takeIf { + it.readableBytes() > 0 + }?.let(ByteBuf::readByte) + ?.let(Byte::toInt) ?: -1 + } + + override fun read(b: ByteArray, off: Int, len: Int): Int { + val readableBytes = buf.readableBytes() + if(readableBytes == 0) return -1 + val result = len.coerceAtMost(readableBytes) + buf.readBytes(b, off, result) + return result + } + + override fun close() { + buf.release() + } +} \ No newline at end of file diff --git a/gbcs-common/src/main/kotlin/net/woggioni/gbcs/common/GbcsUrlStreamHandlerFactory.kt b/gbcs-common/src/main/kotlin/net/woggioni/gbcs/common/GbcsUrlStreamHandlerFactory.kt index 47a5e0a..4d79947 100644 --- a/gbcs-common/src/main/kotlin/net/woggioni/gbcs/common/GbcsUrlStreamHandlerFactory.kt +++ b/gbcs-common/src/main/kotlin/net/woggioni/gbcs/common/GbcsUrlStreamHandlerFactory.kt @@ -5,7 +5,6 @@ import java.io.InputStream import java.net.URL import java.net.URLConnection import java.net.URLStreamHandler -import java.net.URLStreamHandlerFactory import java.net.spi.URLStreamHandlerProvider import java.util.Optional import java.util.concurrent.atomic.AtomicBoolean diff --git a/gbcs-server-memcached/build.gradle b/gbcs-server-memcache/build.gradle similarity index 69% rename from gbcs-server-memcached/build.gradle rename to gbcs-server-memcache/build.gradle index 4f60e00..fed9c28 100644 --- a/gbcs-server-memcached/build.gradle +++ b/gbcs-server-memcache/build.gradle @@ -6,10 +6,10 @@ plugins { configurations { bundle { - extendsFrom runtimeClasspath canBeResolved = true canBeConsumed = false visible = false + transitive = false resolutionStrategy { dependencies { @@ -29,10 +29,20 @@ configurations { } dependencies { - compileOnly project(':gbcs-common') - compileOnly project(':gbcs-api') - compileOnly catalog.jwo - implementation catalog.xmemcached + implementation project(':gbcs-common') + implementation project(':gbcs-api') + implementation catalog.jwo + implementation catalog.slf4j.api + implementation catalog.netty.common + implementation catalog.netty.codec.memcache + + bundle catalog.netty.codec.memcache + + testRuntimeOnly catalog.logback.classic +} + +tasks.named(JavaPlugin.TEST_TASK_NAME, Test) { + systemProperty("io.netty.leakDetectionLevel", "PARANOID") } Provider bundleTask = tasks.register("bundle", Tar) { diff --git a/gbcs-server-memcache/src/main/java/module-info.java b/gbcs-server-memcache/src/main/java/module-info.java new file mode 100644 index 0000000..89bff29 --- /dev/null +++ b/gbcs-server-memcache/src/main/java/module-info.java @@ -0,0 +1,19 @@ +import net.woggioni.gbcs.api.CacheProvider; + +module net.woggioni.gbcs.server.memcache { + requires net.woggioni.gbcs.common; + requires net.woggioni.gbcs.api; + requires net.woggioni.jwo; + requires java.xml; + requires kotlin.stdlib; + requires io.netty.transport; + requires io.netty.codec; + requires io.netty.codec.memcache; + requires io.netty.common; + requires io.netty.buffer; + requires org.slf4j; + + provides CacheProvider with net.woggioni.gbcs.server.memcache.MemcacheCacheProvider; + + opens net.woggioni.gbcs.server.memcache.schema; +} \ No newline at end of file diff --git a/gbcs-server-memcache/src/main/kotlin/net/woggioni/gbcs/server/memcache/Exception.kt b/gbcs-server-memcache/src/main/kotlin/net/woggioni/gbcs/server/memcache/Exception.kt new file mode 100644 index 0000000..a7ca669 --- /dev/null +++ b/gbcs-server-memcache/src/main/kotlin/net/woggioni/gbcs/server/memcache/Exception.kt @@ -0,0 +1,4 @@ +package net.woggioni.gbcs.server.memcache + +class MemcacheException(status : Short, msg : String? = null, cause : Throwable? = null) + : RuntimeException(msg ?: "Memcached status $status", cause) \ No newline at end of file diff --git a/gbcs-server-memcache/src/main/kotlin/net/woggioni/gbcs/server/memcache/MemcacheCache.kt b/gbcs-server-memcache/src/main/kotlin/net/woggioni/gbcs/server/memcache/MemcacheCache.kt new file mode 100644 index 0000000..29627e2 --- /dev/null +++ b/gbcs-server-memcache/src/main/kotlin/net/woggioni/gbcs/server/memcache/MemcacheCache.kt @@ -0,0 +1,23 @@ +package net.woggioni.gbcs.server.memcache + +import io.netty.buffer.ByteBuf +import net.woggioni.gbcs.api.Cache +import net.woggioni.gbcs.server.memcache.client.MemcacheClient +import java.nio.channels.ReadableByteChannel +import java.util.concurrent.CompletableFuture + +class MemcacheCache(private val cfg : MemcacheCacheConfiguration) : Cache { + private val memcacheClient = MemcacheClient(cfg) + + override fun get(key: String): CompletableFuture { + return memcacheClient.get(key) + } + + override fun put(key: String, content: ByteBuf): CompletableFuture { + return memcacheClient.put(key, content, cfg.maxAge) + } + + override fun close() { + memcacheClient.close() + } +} diff --git a/gbcs-server-memcache/src/main/kotlin/net/woggioni/gbcs/server/memcache/MemcacheCacheConfiguration.kt b/gbcs-server-memcache/src/main/kotlin/net/woggioni/gbcs/server/memcache/MemcacheCacheConfiguration.kt new file mode 100644 index 0000000..58fc814 --- /dev/null +++ b/gbcs-server-memcache/src/main/kotlin/net/woggioni/gbcs/server/memcache/MemcacheCacheConfiguration.kt @@ -0,0 +1,40 @@ +package net.woggioni.gbcs.server.memcache + +import net.woggioni.gbcs.api.Configuration +import net.woggioni.gbcs.common.HostAndPort +import java.time.Duration + +data class MemcacheCacheConfiguration( + val servers: List, + val maxAge: Duration = Duration.ofDays(1), + val maxSize: Int = 0x100000, + val digestAlgorithm: String? = null, + val compressionMode: CompressionMode? = null, +) : Configuration.Cache { + + enum class CompressionMode { + /** + * Gzip mode + */ + GZIP, + + /** + * Deflate mode + */ + DEFLATE + } + + data class Server( + val endpoint : HostAndPort, + val connectionTimeoutMillis : Int?, + val maxConnections : Int + ) + + + override fun materialize() = MemcacheCache(this) + + override fun getNamespaceURI() = "urn:net.woggioni.gbcs.server.memcache" + + override fun getTypeName() = "memcacheCacheType" +} + diff --git a/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/MemcachedCacheProvider.kt b/gbcs-server-memcache/src/main/kotlin/net/woggioni/gbcs/server/memcache/MemcacheCacheProvider.kt similarity index 50% rename from gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/MemcachedCacheProvider.kt rename to gbcs-server-memcache/src/main/kotlin/net/woggioni/gbcs/server/memcache/MemcacheCacheProvider.kt index 46acdb4..d15e569 100644 --- a/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/MemcachedCacheProvider.kt +++ b/gbcs-server-memcache/src/main/kotlin/net/woggioni/gbcs/server/memcache/MemcacheCacheProvider.kt @@ -1,6 +1,5 @@ -package net.woggioni.gbcs.server.memcached +package net.woggioni.gbcs.server.memcache -import net.rubyeye.xmemcached.transcoders.CompressionMode import net.woggioni.gbcs.api.CacheProvider import net.woggioni.gbcs.api.exception.ConfigurationException import net.woggioni.gbcs.common.GBCS @@ -11,19 +10,21 @@ import net.woggioni.gbcs.common.Xml.Companion.renderAttribute import org.w3c.dom.Document import org.w3c.dom.Element import java.time.Duration +import java.time.temporal.ChronoUnit -class MemcachedCacheProvider : CacheProvider { - override fun getXmlSchemaLocation() = "jpms://net.woggioni.gbcs.server.memcached/net/woggioni/gbcs/server/memcached/schema/gbcs-memcached.xsd" - override fun getXmlType() = "memcachedCacheType" +class MemcacheCacheProvider : CacheProvider { + override fun getXmlSchemaLocation() = "jpms://net.woggioni.gbcs.server.memcache/net/woggioni/gbcs/server/memcache/schema/gbcs-memcache.xsd" - override fun getXmlNamespace() = "urn:net.woggioni.gbcs.server.memcached" + override fun getXmlType() = "memcacheCacheType" + + override fun getXmlNamespace() = "urn:net.woggioni.gbcs.server.memcache" val xmlNamespacePrefix : String - get() = "gbcs-memcached" + get() = "gbcs-memcache" - override fun deserialize(el: Element): MemcachedCacheConfiguration { - val servers = mutableListOf() + override fun deserialize(el: Element): MemcacheCacheConfiguration { + val servers = mutableListOf() val maxAge = el.renderAttribute("max-age") ?.let(Duration::parse) ?: Duration.ofDays(1) @@ -33,24 +34,30 @@ class MemcachedCacheProvider : CacheProvider { val compressionMode = el.renderAttribute("compression-mode") ?.let { when (it) { - "gzip" -> CompressionMode.GZIP - "zip" -> CompressionMode.ZIP - else -> CompressionMode.ZIP + "gzip" -> MemcacheCacheConfiguration.CompressionMode.GZIP + "deflate" -> MemcacheCacheConfiguration.CompressionMode.DEFLATE + else -> MemcacheCacheConfiguration.CompressionMode.DEFLATE } } - ?: CompressionMode.ZIP + ?: MemcacheCacheConfiguration.CompressionMode.DEFLATE val digestAlgorithm = el.renderAttribute("digest") for (child in el.asIterable()) { when (child.nodeName) { "server" -> { val host = child.renderAttribute("host") ?: throw ConfigurationException("host attribute is required") val port = child.renderAttribute("port")?.toInt() ?: throw ConfigurationException("port attribute is required") - servers.add(HostAndPort(host, port)) + val maxConnections = child.renderAttribute("max-connections")?.toInt() ?: 1 + val connectionTimeout = child.renderAttribute("connection-timeout") + ?.let(Duration::parse) + ?.let(Duration::toMillis) + ?.let(Long::toInt) + ?: 10000 + servers.add(MemcacheCacheConfiguration.Server(HostAndPort(host, port), connectionTimeout, maxConnections)) } } } - return MemcachedCacheConfiguration( + return MemcacheCacheConfiguration( servers, maxAge, maxSize, @@ -59,7 +66,7 @@ class MemcachedCacheProvider : CacheProvider { ) } - override fun serialize(doc: Document, cache: MemcachedCacheConfiguration) = cache.run { + override fun serialize(doc: Document, cache: MemcacheCacheConfiguration) = cache.run { val result = doc.createElement("cache") Xml.of(doc, result) { attr("xmlns:${xmlNamespacePrefix}", xmlNamespace, namespaceURI = "http://www.w3.org/2000/xmlns/") @@ -67,8 +74,12 @@ class MemcachedCacheProvider : CacheProvider { attr("xs:type", "${xmlNamespacePrefix}:$xmlType", GBCS.XML_SCHEMA_NAMESPACE_URI) for (server in servers) { node("server") { - attr("host", server.host) - attr("port", server.port.toString()) + attr("host", server.endpoint.host) + attr("port", server.endpoint.port.toString()) + server.connectionTimeoutMillis?.let { connectionTimeoutMillis -> + attr("connection-timeout", Duration.of(connectionTimeoutMillis.toLong(), ChronoUnit.MILLIS).toString()) + } + attr("max-connections", server.maxConnections.toString()) } } attr("max-age", maxAge.toString()) @@ -76,12 +87,14 @@ class MemcachedCacheProvider : CacheProvider { digestAlgorithm?.let { digestAlgorithm -> attr("digest", digestAlgorithm) } - attr( - "compression-mode", when (compressionMode) { - CompressionMode.GZIP -> "gzip" - CompressionMode.ZIP -> "zip" - } - ) + compressionMode?.let { compressionMode -> + attr( + "compression-mode", when (compressionMode) { + MemcacheCacheConfiguration.CompressionMode.GZIP -> "gzip" + MemcacheCacheConfiguration.CompressionMode.DEFLATE -> "deflate" + } + ) + } } result } diff --git a/gbcs-server-memcache/src/main/kotlin/net/woggioni/gbcs/server/memcache/client/MemcacheClient.kt b/gbcs-server-memcache/src/main/kotlin/net/woggioni/gbcs/server/memcache/client/MemcacheClient.kt new file mode 100644 index 0000000..a1025c3 --- /dev/null +++ b/gbcs-server-memcache/src/main/kotlin/net/woggioni/gbcs/server/memcache/client/MemcacheClient.kt @@ -0,0 +1,242 @@ +package net.woggioni.gbcs.server.memcache.client + + +import io.netty.bootstrap.Bootstrap +import io.netty.buffer.ByteBuf +import io.netty.buffer.Unpooled +import io.netty.channel.Channel +import io.netty.channel.ChannelHandlerContext +import io.netty.channel.ChannelOption +import io.netty.channel.ChannelPipeline +import io.netty.channel.SimpleChannelInboundHandler +import io.netty.channel.nio.NioEventLoopGroup +import io.netty.channel.pool.AbstractChannelPoolHandler +import io.netty.channel.pool.ChannelPool +import io.netty.channel.pool.FixedChannelPool +import io.netty.channel.socket.nio.NioSocketChannel +import io.netty.handler.codec.DecoderException +import io.netty.handler.codec.memcache.binary.BinaryMemcacheClientCodec +import io.netty.handler.codec.memcache.binary.BinaryMemcacheObjectAggregator +import io.netty.handler.codec.memcache.binary.BinaryMemcacheOpcodes +import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus +import io.netty.handler.codec.memcache.binary.DefaultFullBinaryMemcacheRequest +import io.netty.handler.codec.memcache.binary.FullBinaryMemcacheRequest +import io.netty.handler.codec.memcache.binary.FullBinaryMemcacheResponse +import io.netty.util.concurrent.GenericFutureListener +import net.woggioni.gbcs.common.ByteBufInputStream +import net.woggioni.gbcs.common.GBCS.digest +import net.woggioni.gbcs.common.HostAndPort +import net.woggioni.gbcs.common.contextLogger +import net.woggioni.gbcs.server.memcache.MemcacheCacheConfiguration +import net.woggioni.gbcs.server.memcache.MemcacheException +import net.woggioni.jwo.JWO +import java.io.ByteArrayOutputStream +import java.net.InetSocketAddress +import java.nio.channels.Channels +import java.nio.channels.ReadableByteChannel +import java.security.MessageDigest +import java.time.Duration +import java.time.Instant +import java.util.concurrent.CompletableFuture +import java.util.concurrent.ConcurrentHashMap +import java.util.zip.Deflater +import java.util.zip.DeflaterOutputStream +import java.util.zip.GZIPInputStream +import java.util.zip.GZIPOutputStream +import java.util.zip.InflaterInputStream +import io.netty.util.concurrent.Future as NettyFuture + + +class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseable { + + private companion object { + @JvmStatic + private val log = contextLogger() + } + + private val group: NioEventLoopGroup + private val connectionPool: MutableMap = ConcurrentHashMap() + + init { + group = NioEventLoopGroup() + } + + private fun newConnectionPool(server: MemcacheCacheConfiguration.Server): FixedChannelPool { + val bootstrap = Bootstrap().apply { + group(group) + channel(NioSocketChannel::class.java) + option(ChannelOption.SO_KEEPALIVE, true) + remoteAddress(InetSocketAddress(server.endpoint.host, server.endpoint.port)) + server.connectionTimeoutMillis?.let { + option(ChannelOption.CONNECT_TIMEOUT_MILLIS, it) + } + } + val channelPoolHandler = object : AbstractChannelPoolHandler() { + + override fun channelCreated(ch: Channel) { + val pipeline: ChannelPipeline = ch.pipeline() + pipeline.addLast(BinaryMemcacheClientCodec()) + pipeline.addLast(BinaryMemcacheObjectAggregator(Integer.MAX_VALUE)) + } + } + return FixedChannelPool(bootstrap, channelPoolHandler, server.maxConnections) + } + + + private fun sendRequest(request: FullBinaryMemcacheRequest): CompletableFuture { + + val server = cfg.servers.let { servers -> + if (servers.size > 1) { + val key = request.key().duplicate() + var checksum = 0 + while (key.readableBytes() > 4) { + val byte = key.readInt() + checksum = checksum xor byte + } + while (key.readableBytes() > 0) { + val byte = key.readByte() + checksum = checksum xor byte.toInt() + } + servers[checksum % servers.size] + } else { + servers.first() + } + } + + val response = CompletableFuture() + // Custom handler for processing responses + val pool = connectionPool.computeIfAbsent(server.endpoint) { + newConnectionPool(server) + } + pool.acquire().addListener(object : GenericFutureListener> { + override fun operationComplete(channelFuture: NettyFuture) { + if (channelFuture.isSuccess) { + val channel = channelFuture.now + val pipeline = channel.pipeline() + channel.pipeline() + .addLast("handler", object : SimpleChannelInboundHandler() { + override fun channelRead0( + ctx: ChannelHandlerContext, + msg: FullBinaryMemcacheResponse + ) { + pipeline.removeLast() + pool.release(channel) + response.complete(msg.retain()) + } + + override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { + val ex = when (cause) { + is DecoderException -> cause.cause!! + else -> cause + } + ctx.close() + pipeline.removeLast() + pool.release(channel) + response.completeExceptionally(ex) + } + }) + channel.writeAndFlush(request) + } else { + response.completeExceptionally(channelFuture.cause()) + } + } + }) + return response + } + + private fun encodeExpiry(expiry: Duration): Int { + val expirySeconds = expiry.toSeconds() + return expirySeconds.toInt().takeIf { it.toLong() == expirySeconds } + ?: Instant.ofEpochSecond(expirySeconds).epochSecond.toInt() + } + + fun get(key: String): CompletableFuture { + val request = (cfg.digestAlgorithm + ?.let(MessageDigest::getInstance) + ?.let { md -> + digest(key.toByteArray(), md) + } ?: key.toByteArray(Charsets.UTF_8)).let { digest -> + DefaultFullBinaryMemcacheRequest(Unpooled.wrappedBuffer(digest), null).apply { + setOpcode(BinaryMemcacheOpcodes.GET) + } + } + return sendRequest(request).thenApply { response -> + when (val status = response.status()) { + BinaryMemcacheResponseStatus.SUCCESS -> { + val compressionMode = cfg.compressionMode + val content = response.content() + if (compressionMode != null) { + when (compressionMode) { + MemcacheCacheConfiguration.CompressionMode.GZIP -> { + GZIPInputStream(ByteBufInputStream(content)) + } + + MemcacheCacheConfiguration.CompressionMode.DEFLATE -> { + InflaterInputStream(ByteBufInputStream(content)) + } + } + } else { + ByteBufInputStream(content) + }.let(Channels::newChannel) + } + BinaryMemcacheResponseStatus.KEY_ENOENT -> { + null + } + else -> throw MemcacheException(status) + } + } + } + + fun put(key: String, content: ByteBuf, expiry: Duration, cas: Long? = null): CompletableFuture { + val request = (cfg.digestAlgorithm + ?.let(MessageDigest::getInstance) + ?.let { md -> + digest(key.toByteArray(), md) + } ?: key.toByteArray(Charsets.UTF_8)).let { digest -> + val extras = Unpooled.buffer(8, 8) + extras.writeInt(0) + extras.writeInt(encodeExpiry(expiry)) + val compressionMode = cfg.compressionMode + val payload = if (compressionMode != null) { + val inputStream = ByteBufInputStream(Unpooled.wrappedBuffer(content)) + val baos = ByteArrayOutputStream() + val outputStream = when (compressionMode) { + MemcacheCacheConfiguration.CompressionMode.GZIP -> { + GZIPOutputStream(baos) + } + + MemcacheCacheConfiguration.CompressionMode.DEFLATE -> { + DeflaterOutputStream(baos, Deflater(Deflater.DEFAULT_COMPRESSION, false)) + } + } + inputStream.use { i -> + outputStream.use { o -> + JWO.copy(i, o) + } + } + Unpooled.wrappedBuffer(baos.toByteArray()) + } else { + content + } + DefaultFullBinaryMemcacheRequest(Unpooled.wrappedBuffer(digest), extras, payload).apply { + setOpcode(BinaryMemcacheOpcodes.SET) + cas?.let(this::setCas) + } + } + return sendRequest(request).thenApply { response -> + when(val status = response.status()) { + BinaryMemcacheResponseStatus.SUCCESS -> null + else -> throw MemcacheException(status) + } + } + } + + + fun shutDown(): NettyFuture<*> { + return group.shutdownGracefully() + } + + override fun close() { + shutDown().sync() + } +} \ No newline at end of file diff --git a/gbcs-server-memcache/src/main/resources/META-INF/services/net.woggioni.gbcs.api.CacheProvider b/gbcs-server-memcache/src/main/resources/META-INF/services/net.woggioni.gbcs.api.CacheProvider new file mode 100644 index 0000000..122761c --- /dev/null +++ b/gbcs-server-memcache/src/main/resources/META-INF/services/net.woggioni.gbcs.api.CacheProvider @@ -0,0 +1 @@ +net.woggioni.gbcs.server.memcache.MemcacheCacheProvider \ No newline at end of file diff --git a/gbcs-server-memcached/src/main/resources/net/woggioni/gbcs/server/memcached/schema/gbcs-memcached.xsd b/gbcs-server-memcache/src/main/resources/net/woggioni/gbcs/server/memcache/schema/gbcs-memcache.xsd similarity index 76% rename from gbcs-server-memcached/src/main/resources/net/woggioni/gbcs/server/memcached/schema/gbcs-memcached.xsd rename to gbcs-server-memcache/src/main/resources/net/woggioni/gbcs/server/memcache/schema/gbcs-memcache.xsd index 47cc0fa..946a1da 100644 --- a/gbcs-server-memcached/src/main/resources/net/woggioni/gbcs/server/memcached/schema/gbcs-memcached.xsd +++ b/gbcs-server-memcache/src/main/resources/net/woggioni/gbcs/server/memcache/schema/gbcs-memcache.xsd @@ -1,33 +1,35 @@ - - + + + - + - + - + - + diff --git a/gbcs-server-memcached/src/main/java/module-info.java b/gbcs-server-memcached/src/main/java/module-info.java deleted file mode 100644 index 12ed994..0000000 --- a/gbcs-server-memcached/src/main/java/module-info.java +++ /dev/null @@ -1,14 +0,0 @@ -import net.woggioni.gbcs.api.CacheProvider; - -module net.woggioni.gbcs.server.memcached { - requires net.woggioni.gbcs.common; - requires net.woggioni.gbcs.api; - requires com.googlecode.xmemcached; - requires net.woggioni.jwo; - requires java.xml; - requires kotlin.stdlib; - - provides CacheProvider with net.woggioni.gbcs.server.memcached.MemcachedCacheProvider; - - opens net.woggioni.gbcs.server.memcached.schema; -} \ No newline at end of file diff --git a/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/MemcachedCache.kt b/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/MemcachedCache.kt deleted file mode 100644 index cec7a1a..0000000 --- a/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/MemcachedCache.kt +++ /dev/null @@ -1,59 +0,0 @@ -package net.woggioni.gbcs.server.memcached - -import net.rubyeye.xmemcached.XMemcachedClientBuilder -import net.rubyeye.xmemcached.command.BinaryCommandFactory -import net.rubyeye.xmemcached.transcoders.CompressionMode -import net.rubyeye.xmemcached.transcoders.SerializingTranscoder -import net.woggioni.gbcs.api.Cache -import net.woggioni.gbcs.api.exception.ContentTooLargeException -import net.woggioni.gbcs.common.HostAndPort -import net.woggioni.jwo.JWO -import java.io.ByteArrayInputStream -import java.net.InetSocketAddress -import java.nio.channels.Channels -import java.nio.channels.ReadableByteChannel -import java.nio.charset.StandardCharsets -import java.security.MessageDigest -import java.time.Duration - -class MemcachedCache( - servers: List, - private val maxAge: Duration, - maxSize : Int, - digestAlgorithm: String?, - compressionMode: CompressionMode, -) : Cache { - private val memcachedClient = XMemcachedClientBuilder( - servers.stream().map { addr: HostAndPort -> InetSocketAddress(addr.host, addr.port) }.toList() - ).apply { - commandFactory = BinaryCommandFactory() - digestAlgorithm?.let { dAlg -> - setKeyProvider { key -> - val md = MessageDigest.getInstance(dAlg) - md.update(key.toByteArray(StandardCharsets.UTF_8)) - JWO.bytesToHex(md.digest()) - } - } - transcoder = SerializingTranscoder(maxSize).apply { - setCompressionMode(compressionMode) - } - }.build() - - override fun get(key: String): ReadableByteChannel? { - return memcachedClient.get(key) - ?.let(::ByteArrayInputStream) - ?.let(Channels::newChannel) - } - - override fun put(key: String, content: ByteArray) { - try { - memcachedClient[key, maxAge.toSeconds().toInt()] = content - } catch (e: IllegalArgumentException) { - throw ContentTooLargeException(e.message, e) - } - } - - override fun close() { - memcachedClient.shutdown() - } -} diff --git a/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/MemcachedCacheConfiguration.kt b/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/MemcachedCacheConfiguration.kt deleted file mode 100644 index b600dff..0000000 --- a/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/MemcachedCacheConfiguration.kt +++ /dev/null @@ -1,26 +0,0 @@ -package net.woggioni.gbcs.server.memcached - -import net.rubyeye.xmemcached.transcoders.CompressionMode -import net.woggioni.gbcs.api.Configuration -import net.woggioni.gbcs.common.HostAndPort -import java.time.Duration - -data class MemcachedCacheConfiguration( - var servers: List, - var maxAge: Duration = Duration.ofDays(1), - var maxSize: Int = 0x100000, - var digestAlgorithm: String? = null, - var compressionMode: CompressionMode = CompressionMode.ZIP, -) : Configuration.Cache { - override fun materialize() = MemcachedCache( - servers, - maxAge, - maxSize, - digestAlgorithm, - compressionMode - ) - - override fun getNamespaceURI() = "urn:net.woggioni.gbcs.server.memcached" - - override fun getTypeName() = "memcachedCacheType" -} diff --git a/gbcs-server-memcached/src/main/resources/META-INF/services/net.woggioni.gbcs.api.CacheProvider b/gbcs-server-memcached/src/main/resources/META-INF/services/net.woggioni.gbcs.api.CacheProvider deleted file mode 100644 index 0d170bb..0000000 --- a/gbcs-server-memcached/src/main/resources/META-INF/services/net.woggioni.gbcs.api.CacheProvider +++ /dev/null @@ -1 +0,0 @@ -net.woggioni.gbcs.server.memcached.MemcachedCacheProvider \ No newline at end of file diff --git a/gbcs-server/build.gradle b/gbcs-server/build.gradle index 6ee91d5..71603c1 100644 --- a/gbcs-server/build.gradle +++ b/gbcs-server/build.gradle @@ -19,7 +19,7 @@ dependencies { testImplementation catalog.bcprov.jdk18on testImplementation catalog.bcpkix.jdk18on - testRuntimeOnly project(":gbcs-server-memcached") + testRuntimeOnly project(":gbcs-server-memcache") } test { diff --git a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/GradleBuildCacheServer.kt b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/GradleBuildCacheServer.kt index 648d651..516f2aa 100644 --- a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/GradleBuildCacheServer.kt +++ b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/GradleBuildCacheServer.kt @@ -400,9 +400,9 @@ class GradleBuildCacheServer(private val cfg: Configuration) { fun run(): ServerHandle { // Create the multithreaded event loops for the server - val bossGroup = NioEventLoopGroup(0) + val bossGroup = NioEventLoopGroup(1) val serverSocketChannel = NioServerSocketChannel::class.java - val workerGroup = bossGroup + val workerGroup = NioEventLoopGroup(0) val eventExecutorGroup = run { val threadFactory = if (cfg.eventExecutor.isUseVirtualThreads) { Thread.ofVirtual().factory() diff --git a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/cache/FileSystemCache.kt b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/cache/FileSystemCache.kt index d9748d9..1b280f8 100644 --- a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/cache/FileSystemCache.kt +++ b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/cache/FileSystemCache.kt @@ -1,8 +1,11 @@ package net.woggioni.gbcs.server.cache +import io.netty.buffer.ByteBuf import net.woggioni.gbcs.api.Cache +import net.woggioni.gbcs.common.ByteBufInputStream import net.woggioni.gbcs.common.GBCS.digestString import net.woggioni.gbcs.common.contextLogger +import net.woggioni.jwo.JWO import net.woggioni.jwo.LockFile import java.nio.channels.Channels import java.nio.channels.FileChannel @@ -14,6 +17,7 @@ import java.nio.file.attribute.BasicFileAttributes import java.security.MessageDigest import java.time.Duration import java.time.Instant +import java.util.concurrent.CompletableFuture import java.util.concurrent.atomic.AtomicReference import java.util.zip.Deflater import java.util.zip.DeflaterOutputStream @@ -28,7 +32,10 @@ class FileSystemCache( val compressionLevel: Int ) : Cache { - private val log = contextLogger() + private companion object { + @JvmStatic + private val log = contextLogger() + } init { Files.createDirectories(root) @@ -62,10 +69,12 @@ class FileSystemCache( } }.also { gc() + }.let { + CompletableFuture.completedFuture(it) } } - override fun put(key: String, content: ByteArray) { + override fun put(key: String, content: ByteBuf): CompletableFuture { (digestAlgorithm ?.let(MessageDigest::getInstance) ?.let { md -> @@ -82,7 +91,7 @@ class FileSystemCache( it } }.use { - it.write(content) + JWO.copy(ByteBufInputStream(content), it) } Files.move(tmpFile, file, StandardCopyOption.ATOMIC_MOVE) } catch (t: Throwable) { @@ -92,6 +101,7 @@ class FileSystemCache( }.also { gc() } + return CompletableFuture.completedFuture(null) } private fun gc() { diff --git a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/cache/InMemoryCache.kt b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/cache/InMemoryCache.kt index 49d9a15..90271fa 100644 --- a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/cache/InMemoryCache.kt +++ b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/cache/InMemoryCache.kt @@ -1,13 +1,17 @@ package net.woggioni.gbcs.server.cache +import io.netty.buffer.ByteBuf +import io.netty.buffer.Unpooled import net.woggioni.gbcs.api.Cache +import net.woggioni.gbcs.common.ByteBufInputStream import net.woggioni.gbcs.common.GBCS.digestString -import java.io.ByteArrayInputStream +import net.woggioni.jwo.JWO import java.io.ByteArrayOutputStream import java.nio.channels.Channels import java.security.MessageDigest import java.time.Duration import java.time.Instant +import java.util.concurrent.CompletableFuture import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.PriorityBlockingQueue import java.util.zip.Deflater @@ -22,9 +26,9 @@ class InMemoryCache( val compressionLevel: Int ) : Cache { - private val map = ConcurrentHashMap() + private val map = ConcurrentHashMap() - private class RemovalQueueElement(val key: String, val value : ByteArray, val expiry : Instant) : Comparable { + private class RemovalQueueElement(val key: String, val value : ByteBuf, val expiry : Instant) : Comparable { override fun compareTo(other: RemovalQueueElement) = expiry.compareTo(other.expiry) } @@ -62,14 +66,16 @@ class InMemoryCache( ?.let { value -> if (compressionEnabled) { val inflater = Inflater() - Channels.newChannel(InflaterInputStream(ByteArrayInputStream(value), inflater)) + Channels.newChannel(InflaterInputStream(ByteBufInputStream(value), inflater)) } else { - Channels.newChannel(ByteArrayInputStream(value)) + Channels.newChannel(ByteBufInputStream(value)) } } + }.let { + CompletableFuture.completedFuture(it) } - override fun put(key: String, content: ByteArray) { + override fun put(key: String, content: ByteBuf) = (digestAlgorithm ?.let(MessageDigest::getInstance) ?.let { md -> @@ -79,14 +85,15 @@ class InMemoryCache( val deflater = Deflater(compressionLevel) val baos = ByteArrayOutputStream() DeflaterOutputStream(baos, deflater).use { stream -> - stream.write(content) + JWO.copy(ByteBufInputStream(content), stream) } - baos.toByteArray() + Unpooled.wrappedBuffer(baos.toByteArray()) } else { content } map[digest] = value removalQueue.put(RemovalQueueElement(digest, value, Instant.now().plus(maxAge))) + }.let { + CompletableFuture.completedFuture(null) } - } } \ No newline at end of file diff --git a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/cache/InMemoryCacheProvider.kt b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/cache/InMemoryCacheProvider.kt index 7d27528..7bebd42 100644 --- a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/cache/InMemoryCacheProvider.kt +++ b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/cache/InMemoryCacheProvider.kt @@ -6,7 +6,6 @@ import net.woggioni.gbcs.common.Xml import net.woggioni.gbcs.common.Xml.Companion.renderAttribute import org.w3c.dom.Document import org.w3c.dom.Element -import java.nio.file.Path import java.time.Duration import java.util.zip.Deflater diff --git a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/configuration/Parser.kt b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/configuration/Parser.kt index 172c2f5..251d16d 100644 --- a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/configuration/Parser.kt +++ b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/configuration/Parser.kt @@ -265,7 +265,8 @@ object Parser { }.map { el -> val groupName = el.renderAttribute("name") ?: throw ConfigurationException("Group name is required") var roles = emptySet() - var quota: Configuration.Quota? = null + var userQuota: Configuration.Quota? = null + var groupQuota: Configuration.Quota? = null for (child in el.asIterable()) { when (child.localName) { "users" -> { @@ -279,12 +280,15 @@ object Parser { "roles" -> { roles = parseRoles(child) } - "quota" -> { - quota = parseQuota(child) + "group-quota" -> { + userQuota = parseQuota(child) + } + "user-quota" -> { + groupQuota = parseQuota(child) } } } - groupName to Group(groupName, roles, quota) + groupName to Group(groupName, roles, userQuota, groupQuota) }.toMap() val users = knownUsersMap.map { (name, user) -> name to User(name, user.password, userGroups[name]?.mapNotNull { groups[it] }?.toSet() ?: emptySet(), user.quota) diff --git a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/configuration/Serializer.kt b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/configuration/Serializer.kt index febb0fa..f481da5 100644 --- a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/configuration/Serializer.kt +++ b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/configuration/Serializer.kt @@ -8,8 +8,14 @@ import org.w3c.dom.Document object Serializer { - fun serialize(conf : Configuration) : Document { + private fun Xml.serializeQuota(quota : Configuration.Quota) { + attr("calls", quota.calls.toString()) + attr("period", quota.period.toString()) + attr("max-available-calls", quota.maxAvailableCalls.toString()) + attr("initial-available-calls", quota.initialAvailableCalls.toString()) + } + fun serialize(conf : Configuration) : Document { val schemaLocations = CacheSerializers.index.values.asSequence().map { it.xmlNamespace to it.xmlSchemaLocation }.toMap() @@ -56,10 +62,7 @@ object Serializer { } user.quota?.let { quota -> node("quota") { - attr("calls", quota.calls.toString()) - attr("period", quota.period.toString()) - attr("max-available-calls", quota.maxAvailableCalls.toString()) - attr("initial-available-calls", quota.initialAvailableCalls.toString()) + serializeQuota(quota) } } } @@ -70,10 +73,7 @@ object Serializer { anonymousUser.quota?.let { quota -> node("anonymous") { node("quota") { - attr("calls", quota.calls.toString()) - attr("period", quota.period.toString()) - attr("max-available-calls", quota.maxAvailableCalls.toString()) - attr("initial-available-calls", quota.initialAvailableCalls.toString()) + serializeQuota(quota) } } } @@ -113,12 +113,14 @@ object Serializer { } } } - group.quota?.let { quota -> - node("quota") { - attr("calls", quota.calls.toString()) - attr("period", quota.period.toString()) - attr("max-available-calls", quota.maxAvailableCalls.toString()) - attr("initial-available-calls", quota.initialAvailableCalls.toString()) + group.userQuota?.let { quota -> + node("user-quota") { + serializeQuota(quota) + } + } + group.groupQuota?.let { quota -> + node("group-quota") { + serializeQuota(quota) } } } diff --git a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/handler/ServerHandler.kt b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/handler/ServerHandler.kt index a127163..5f6587e 100644 --- a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/handler/ServerHandler.kt +++ b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/handler/ServerHandler.kt @@ -17,7 +17,6 @@ import io.netty.handler.codec.http.HttpUtil import io.netty.handler.codec.http.LastHttpContent import io.netty.handler.stream.ChunkedNioStream import net.woggioni.gbcs.api.Cache -import net.woggioni.gbcs.api.exception.CacheException import net.woggioni.gbcs.common.contextLogger import net.woggioni.gbcs.server.debug import net.woggioni.gbcs.server.warn @@ -43,49 +42,41 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) : return } if (serverPrefix == prefix) { - try { - cache.get(key) - } catch(ex : Throwable) { - throw CacheException("Error accessing the cache backend", ex) - }?.let { channel -> - log.debug(ctx) { - "Cache hit for key '$key'" - } - val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK) - response.headers()[HttpHeaderNames.CONTENT_TYPE] = HttpHeaderValues.APPLICATION_OCTET_STREAM - if (!keepAlive) { - response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE) - response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.IDENTITY) - } else { - response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE) - response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED) - } - ctx.write(response) - when (channel) { - is FileChannel -> { - if (keepAlive) { - ctx.write(DefaultFileRegion(channel, 0, channel.size())) - ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate()) - } else { - ctx.writeAndFlush(DefaultFileRegion(channel, 0, channel.size())) - .addListener(ChannelFutureListener.CLOSE) - } + cache.get(key).thenApply { channel -> + if(channel != null) { + log.debug(ctx) { + "Cache hit for key '$key'" } - else -> { - ctx.write(ChunkedNioStream(channel)).addListener { evt -> - channel.close() - } + val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK) + response.headers()[HttpHeaderNames.CONTENT_TYPE] = HttpHeaderValues.APPLICATION_OCTET_STREAM + if (!keepAlive) { + response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE) + response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.IDENTITY) + } else { + response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE) + response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED) + } + ctx.write(response) + val content : Any = when (channel) { + is FileChannel -> DefaultFileRegion(channel, 0, channel.size()) + else -> ChunkedNioStream(channel) + } + if (keepAlive) { + ctx.write(content) ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate()) + } else { + ctx.writeAndFlush(content) + .addListener(ChannelFutureListener.CLOSE) } + } else { + log.debug(ctx) { + "Cache miss for key '$key'" + } + val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND) + response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0 + ctx.writeAndFlush(response) } - } ?: let { - log.debug(ctx) { - "Cache miss for key '$key'" - } - val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND) - response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0 - ctx.writeAndFlush(response) - } + }.whenComplete { _, ex -> ex?.let(ctx::fireExceptionCaught) } } else { log.warn(ctx) { "Got request for unhandled path '${msg.uri()}'" @@ -103,26 +94,16 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) : log.debug(ctx) { "Added value for key '$key' to build cache" } - val bodyBytes = msg.content().run { - if (isDirect) { - ByteArray(readableBytes()).also { - readBytes(it) - } - } else { - array() - } + cache.put(key, msg.content().retain()).thenRun { + val response = DefaultFullHttpResponse( + msg.protocolVersion(), HttpResponseStatus.CREATED, + Unpooled.copiedBuffer(key.toByteArray()) + ) + response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes() + ctx.writeAndFlush(response) + }.whenComplete { _, ex -> + ctx.fireExceptionCaught(ex) } - try { - cache.put(key, bodyBytes) - } catch(ex : Throwable) { - throw CacheException("Error accessing the cache backend", ex) - } - val response = DefaultFullHttpResponse( - msg.protocolVersion(), HttpResponseStatus.CREATED, - Unpooled.copiedBuffer(key.toByteArray()) - ) - response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes() - ctx.writeAndFlush(response) } else { log.warn(ctx) { "Got request for unhandled path '${msg.uri()}'" diff --git a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/throttling/BucketManager.kt b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/throttling/BucketManager.kt index 1196bd6..891a229 100644 --- a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/throttling/BucketManager.kt +++ b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/throttling/BucketManager.kt @@ -8,7 +8,7 @@ import java.util.concurrent.ConcurrentHashMap import java.util.function.Function class BucketManager private constructor( - private val bucketsByUser: Map = HashMap(), + private val bucketsByUser: Map> = HashMap(), private val bucketsByGroup: Map = HashMap(), loader: Function? ) { @@ -43,22 +43,27 @@ class BucketManager private constructor( companion object { fun from(cfg : Configuration) : BucketManager { - val bucketsByUser = cfg.users.values.asSequence().filter { - it.quota != null - }.map { user -> - val quota = user.quota - val bucket = Bucket.local( - quota.maxAvailableCalls, - quota.calls, - quota.period, - quota.initialAvailableCalls - ) - user to bucket + val bucketsByUser = cfg.users.values.asSequence().map { user -> + val buckets = ( + user.quota + ?.let { quota -> + sequenceOf(quota) + } ?: user.groups.asSequence() + .mapNotNull(Configuration.Group::getUserQuota) + ).map { quota -> + Bucket.local( + quota.maxAvailableCalls, + quota.calls, + quota.period, + quota.initialAvailableCalls + ) + }.toList() + user to buckets }.toMap() val bucketsByGroup = cfg.groups.values.asSequence().filter { - it.quota != null + it.groupQuota != null }.map { group -> - val quota = group.quota + val quota = group.groupQuota val bucket = Bucket.local( quota.maxAvailableCalls, quota.calls, diff --git a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/throttling/ThrottlingHandler.kt b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/throttling/ThrottlingHandler.kt index 494ce3e..a44d461 100644 --- a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/throttling/ThrottlingHandler.kt +++ b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/throttling/ThrottlingHandler.kt @@ -42,7 +42,7 @@ class ThrottlingHandler(cfg: Configuration) : val buckets = mutableListOf() val user = ctx.channel().attr(GradleBuildCacheServer.userAttribute).get() if (user != null) { - bucketManager.getBucketByUser(user)?.let(buckets::add) + bucketManager.getBucketByUser(user)?.let(buckets::addAll) } val groups = ctx.channel().attr(GradleBuildCacheServer.groupAttribute).get() ?: emptySet() if (groups.isNotEmpty()) { diff --git a/gbcs-server/src/main/resources/net/woggioni/gbcs/server/schema/gbcs.xsd b/gbcs-server/src/main/resources/net/woggioni/gbcs/server/schema/gbcs.xsd index d5c15b1..881bf35 100644 --- a/gbcs-server/src/main/resources/net/woggioni/gbcs/server/schema/gbcs.xsd +++ b/gbcs-server/src/main/resources/net/woggioni/gbcs/server/schema/gbcs.xsd @@ -146,7 +146,8 @@ - + + diff --git a/gbcs-server/src/test/kotlin/net/woggioni/gbcs/server/test/AbstractBasicAuthServerTest.kt b/gbcs-server/src/test/kotlin/net/woggioni/gbcs/server/test/AbstractBasicAuthServerTest.kt index 1b17a9e..fbd260f 100644 --- a/gbcs-server/src/test/kotlin/net/woggioni/gbcs/server/test/AbstractBasicAuthServerTest.kt +++ b/gbcs-server/src/test/kotlin/net/woggioni/gbcs/server/test/AbstractBasicAuthServerTest.kt @@ -24,8 +24,8 @@ abstract class AbstractBasicAuthServerTest : AbstractServerTest() { protected val random = Random(101325) protected val keyValuePair = newEntry(random) protected val serverPath = "gbcs" - protected val readersGroup = Configuration.Group("readers", setOf(Role.Reader), null) - protected val writersGroup = Configuration.Group("writers", setOf(Role.Writer), null) + protected val readersGroup = Configuration.Group("readers", setOf(Role.Reader), null, null) + protected val writersGroup = Configuration.Group("writers", setOf(Role.Writer), null, null) abstract protected val users : List diff --git a/gbcs-server/src/test/kotlin/net/woggioni/gbcs/server/test/AbstractServerTest.kt b/gbcs-server/src/test/kotlin/net/woggioni/gbcs/server/test/AbstractServerTest.kt index 552010d..da05365 100644 --- a/gbcs-server/src/test/kotlin/net/woggioni/gbcs/server/test/AbstractServerTest.kt +++ b/gbcs-server/src/test/kotlin/net/woggioni/gbcs/server/test/AbstractServerTest.kt @@ -1,7 +1,7 @@ package net.woggioni.gbcs.server.test -import net.woggioni.gbcs.server.GradleBuildCacheServer import net.woggioni.gbcs.api.Configuration +import net.woggioni.gbcs.server.GradleBuildCacheServer import org.junit.jupiter.api.AfterAll import org.junit.jupiter.api.BeforeAll import org.junit.jupiter.api.MethodOrderer diff --git a/gbcs-server/src/test/kotlin/net/woggioni/gbcs/server/test/AbstractTlsServerTest.kt b/gbcs-server/src/test/kotlin/net/woggioni/gbcs/server/test/AbstractTlsServerTest.kt index b2ec910..7ff0e9e 100644 --- a/gbcs-server/src/test/kotlin/net/woggioni/gbcs/server/test/AbstractTlsServerTest.kt +++ b/gbcs-server/src/test/kotlin/net/woggioni/gbcs/server/test/AbstractTlsServerTest.kt @@ -4,7 +4,6 @@ import net.woggioni.gbcs.api.Configuration import net.woggioni.gbcs.api.Role import net.woggioni.gbcs.common.Xml import net.woggioni.gbcs.server.cache.FileSystemCacheConfiguration -import net.woggioni.gbcs.server.cache.InMemoryCacheConfiguration import net.woggioni.gbcs.server.configuration.Serializer import net.woggioni.gbcs.server.test.utils.CertificateUtils import net.woggioni.gbcs.server.test.utils.CertificateUtils.X509Credentials @@ -46,8 +45,8 @@ abstract class AbstractTlsServerTest : AbstractServerTest() { private lateinit var trustStore: KeyStore protected lateinit var ca: X509Credentials - protected val readersGroup = Configuration.Group("readers", setOf(Role.Reader), null) - protected val writersGroup = Configuration.Group("writers", setOf(Role.Writer), null) + protected val readersGroup = Configuration.Group("readers", setOf(Role.Reader), null, null) + protected val writersGroup = Configuration.Group("writers", setOf(Role.Writer), null, null) protected val random = Random(101325) protected val keyValuePair = newEntry(random) private val serverPath : String? = null diff --git a/gbcs-server/src/test/kotlin/net/woggioni/gbcs/server/test/NoAuthServerTest.kt b/gbcs-server/src/test/kotlin/net/woggioni/gbcs/server/test/NoAuthServerTest.kt index f2759ef..d436808 100644 --- a/gbcs-server/src/test/kotlin/net/woggioni/gbcs/server/test/NoAuthServerTest.kt +++ b/gbcs-server/src/test/kotlin/net/woggioni/gbcs/server/test/NoAuthServerTest.kt @@ -3,7 +3,6 @@ package net.woggioni.gbcs.server.test import io.netty.handler.codec.http.HttpResponseStatus import net.woggioni.gbcs.api.Configuration import net.woggioni.gbcs.common.Xml -import net.woggioni.gbcs.server.cache.FileSystemCacheConfiguration import net.woggioni.gbcs.server.cache.InMemoryCacheConfiguration import net.woggioni.gbcs.server.configuration.Serializer import net.woggioni.gbcs.server.test.utils.NetworkUtils diff --git a/gbcs-server/src/test/kotlin/net/woggioni/gbcs/server/test/TlsServerTest.kt b/gbcs-server/src/test/kotlin/net/woggioni/gbcs/server/test/TlsServerTest.kt index c0d0902..e753a53 100644 --- a/gbcs-server/src/test/kotlin/net/woggioni/gbcs/server/test/TlsServerTest.kt +++ b/gbcs-server/src/test/kotlin/net/woggioni/gbcs/server/test/TlsServerTest.kt @@ -7,7 +7,6 @@ import org.bouncycastle.asn1.x500.X500Name import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Order import org.junit.jupiter.api.Test -import org.junit.jupiter.params.provider.ArgumentsSource import java.net.http.HttpClient import java.net.http.HttpRequest import java.net.http.HttpResponse diff --git a/gbcs-server/src/test/resources/net/woggioni/gbcs/server/test/valid/gbcs-memcached-tls.xml b/gbcs-server/src/test/resources/net/woggioni/gbcs/server/test/valid/gbcs-memcached-tls.xml index 84a7fb7..42965df 100644 --- a/gbcs-server/src/test/resources/net/woggioni/gbcs/server/test/valid/gbcs-memcached-tls.xml +++ b/gbcs-server/src/test/resources/net/woggioni/gbcs/server/test/valid/gbcs-memcached-tls.xml @@ -1,8 +1,8 @@ - + diff --git a/gbcs-server/src/test/resources/net/woggioni/gbcs/server/test/valid/gbcs-memcached.xml b/gbcs-server/src/test/resources/net/woggioni/gbcs/server/test/valid/gbcs-memcached.xml index 5c4ab6e..457105c 100644 --- a/gbcs-server/src/test/resources/net/woggioni/gbcs/server/test/valid/gbcs-memcached.xml +++ b/gbcs-server/src/test/resources/net/woggioni/gbcs/server/test/valid/gbcs-memcached.xml @@ -1,8 +1,8 @@ + xmlns:gbcs-memcache="urn:net.woggioni.gbcs.server.memcache" + xs:schemaLocation="urn:net.woggioni.gbcs.server.memcache jpms://net.woggioni.gbcs.server.memcache/net/woggioni/gbcs/server/memcache/schema/gbcs-memcache.xsd urn:net.woggioni.gbcs.server jpms://net.woggioni.gbcs.server/net/woggioni/gbcs/server/schema/gbcs.xsd"> - - + + diff --git a/gbcs-server/src/test/resources/net/woggioni/gbcs/server/test/valid/gbcs-tls.xml b/gbcs-server/src/test/resources/net/woggioni/gbcs/server/test/valid/gbcs-tls.xml index b3642df..2eb0f7d 100644 --- a/gbcs-server/src/test/resources/net/woggioni/gbcs/server/test/valid/gbcs-tls.xml +++ b/gbcs-server/src/test/resources/net/woggioni/gbcs/server/test/valid/gbcs-tls.xml @@ -32,7 +32,8 @@ - + + @@ -50,7 +51,7 @@ - + diff --git a/gradle.properties b/gradle.properties index b18e4f6..8fbb64c 100644 --- a/gradle.properties +++ b/gradle.properties @@ -2,7 +2,7 @@ org.gradle.configuration-cache=false org.gradle.parallel=true org.gradle.caching=true -gbcs.version = 0.1.1 +gbcs.version = 0.1.2 lys.version = 2025.01.31 diff --git a/settings.gradle b/settings.gradle index 5386481..d05c61c 100644 --- a/settings.gradle +++ b/settings.gradle @@ -27,7 +27,7 @@ rootProject.name = 'gbcs' include 'gbcs-api' include 'gbcs-common' -include 'gbcs-server-memcached' +include 'gbcs-server-memcache' include 'gbcs-cli' include 'docker' include 'gbcs-client'