Compare commits

...

9 Commits

Author SHA1 Message Date
0463038aaa first commit with streaming support (buggy and unreliable) 2025-02-13 23:02:08 +08:00
7eca8a270d 0.1.6 release
All checks were successful
CI / build (push) Successful in 3m29s
2025-02-08 00:54:25 +08:00
84d7c977f9 added randomizer to retries 2025-02-07 23:19:13 +08:00
317eadce07 used virtual thread for garbage colection in FileSystemCache
All checks were successful
CI / build (push) Successful in 2m32s
2025-02-07 20:45:29 +08:00
af79e74b95 fixed max message size for memcache backend 2025-02-06 23:09:22 +08:00
78ae21caa4 0.1.4 release
All checks were successful
CI / build (push) Successful in 8m14s
2025-02-06 15:24:00 +08:00
6c0eadb9fb renamed project to "Remote Cache Build Server" (RBCS) 2025-02-06 15:20:50 +08:00
5fef1b932e updated lys-catalog version
All checks were successful
CI / build (push) Successful in 2m32s
2025-02-05 21:49:08 +08:00
5e173dbf62 fixed unit tests 2025-02-05 21:24:10 +08:00
133 changed files with 2049 additions and 1461 deletions

View File

@@ -31,7 +31,7 @@ jobs:
username: woggioni username: woggioni
password: ${{ secrets.PUBLISHER_TOKEN }} password: ${{ secrets.PUBLISHER_TOKEN }}
- -
name: Build gbcs Docker image name: Build rbcs Docker image
uses: docker/build-push-action@v5.3.0 uses: docker/build-push-action@v5.3.0
with: with:
context: "docker/build/docker" context: "docker/build/docker"
@@ -39,12 +39,12 @@ jobs:
push: true push: true
pull: true pull: true
tags: | tags: |
gitea.woggioni.net/woggioni/gbcs:latest gitea.woggioni.net/woggioni/rbcs:latest
gitea.woggioni.net/woggioni/gbcs:${{ steps.retrieve-version.outputs.VERSION }} gitea.woggioni.net/woggioni/rbcs:${{ steps.retrieve-version.outputs.VERSION }}
target: release target: release
cache-from: type=registry,ref=gitea.woggioni.net/woggioni/gbcs:buildx cache-from: type=registry,ref=gitea.woggioni.net/woggioni/rbcs:buildx
- -
name: Build gbcs memcache Docker image name: Build rbcs memcache Docker image
uses: docker/build-push-action@v5.3.0 uses: docker/build-push-action@v5.3.0
with: with:
context: "docker/build/docker" context: "docker/build/docker"
@@ -52,11 +52,11 @@ jobs:
push: true push: true
pull: true pull: true
tags: | tags: |
gitea.woggioni.net/woggioni/gbcs:memcache gitea.woggioni.net/woggioni/rbcs:memcache
gitea.woggioni.net/woggioni/gbcs:memcache-${{ steps.retrieve-version.outputs.VERSION }} gitea.woggioni.net/woggioni/rbcs:memcache-${{ steps.retrieve-version.outputs.VERSION }}
target: release-memcache target: release-memcache
cache-from: type=registry,ref=gitea.woggioni.net/woggioni/gbcs:buildx cache-from: type=registry,ref=gitea.woggioni.net/woggioni/rbcs:buildx
cache-to: type=registry,mode=max,compression=zstd,image-manifest=true,oci-mediatypes=true,ref=gitea.woggioni.net/woggioni/gbcs:buildx cache-to: type=registry,mode=max,compression=zstd,image-manifest=true,oci-mediatypes=true,ref=gitea.woggioni.net/woggioni/rbcs:buildx
- name: Publish artifacts - name: Publish artifacts
env: env:
PUBLISHER_TOKEN: ${{ secrets.PUBLISHER_TOKEN }} PUBLISHER_TOKEN: ${{ secrets.PUBLISHER_TOKEN }}

2
.gitignore vendored
View File

@@ -4,4 +4,4 @@
# Ignore Gradle build output directory # Ignore Gradle build output directory
build build
gbcs-cli/native-image/*.json rbcs-cli/native-image/*.json

View File

@@ -15,7 +15,7 @@ allprojects { subproject ->
version = project.currentTag.map { it[0] }.get() version = project.currentTag.map { it[0] }.get()
} else { } else {
version = project.gitRevision.map { gitRevision -> version = project.gitRevision.map { gitRevision ->
"${getProperty('gbcs.version')}.${gitRevision[0..10]}" "${getProperty('rbcs.version')}.${gitRevision[0..10]}"
}.get() }.get()
} }

View File

@@ -4,13 +4,13 @@ USER luser
WORKDIR /home/luser WORKDIR /home/luser
FROM base-release AS release FROM base-release AS release
ADD gbcs-cli-envelope-*.jar gbcs.jar ADD rbcs-cli-envelope-*.jar rbcs.jar
ENTRYPOINT ["java", "-XX:+UseZGC", "-XX:+ZGenerational", "-jar", "/home/luser/gbcs.jar", "server"] ENTRYPOINT ["java", "-XX:+UseZGC", "-XX:+ZGenerational", "-jar", "/home/luser/rbcs.jar", "server"]
FROM base-release AS release-memcache FROM base-release AS release-memcache
ADD --chown=luser:luser gbcs-cli-envelope-*.jar gbcs.jar ADD --chown=luser:luser rbcs-cli-envelope-*.jar rbcs.jar
RUN mkdir plugins RUN mkdir plugins
WORKDIR /home/luser/plugins WORKDIR /home/luser/plugins
RUN --mount=type=bind,source=.,target=/build/distributions tar -xf /build/distributions/gbcs-server-memcache*.tar RUN --mount=type=bind,source=.,target=/build/distributions tar -xf /build/distributions/rbcs-server-memcache*.tar
WORKDIR /home/luser WORKDIR /home/luser
ENTRYPOINT ["java", "-XX:+UseZGC", "-XX:+ZGenerational", "-jar", "/home/luser/gbcs.jar", "server"] ENTRYPOINT ["java", "-XX:+UseZGC", "-XX:+ZGenerational", "-jar", "/home/luser/rbcs.jar", "server"]

View File

@@ -18,8 +18,8 @@ configurations {
} }
dependencies { dependencies {
docker project(path: ':gbcs-cli', configuration: 'release') docker project(path: ':rbcs-cli', configuration: 'release')
docker project(path: ':gbcs-server-memcache', configuration: 'release') docker project(path: ':rbcs-server-memcache', configuration: 'release')
} }
Provider<Task> cleanTaskProvider = tasks.named(BasePlugin.CLEAN_TASK_NAME) {} Provider<Task> cleanTaskProvider = tasks.named(BasePlugin.CLEAN_TASK_NAME) {}
@@ -35,21 +35,21 @@ Provider<Copy> prepareDockerBuild = tasks.register('prepareDockerBuild', Copy) {
Provider<DockerBuildImage> dockerBuild = tasks.register('dockerBuildImage', DockerBuildImage) { Provider<DockerBuildImage> dockerBuild = tasks.register('dockerBuildImage', DockerBuildImage) {
group = 'docker' group = 'docker'
dependsOn prepareDockerBuild dependsOn prepareDockerBuild
images.add('gitea.woggioni.net/woggioni/gbcs:latest') images.add('gitea.woggioni.net/woggioni/rbcs:latest')
images.add("gitea.woggioni.net/woggioni/gbcs:${version}") images.add("gitea.woggioni.net/woggioni/rbcs:${version}")
} }
Provider<DockerTagImage> dockerTag = tasks.register('dockerTagImage', DockerTagImage) { Provider<DockerTagImage> dockerTag = tasks.register('dockerTagImage', DockerTagImage) {
group = 'docker' group = 'docker'
repository = 'gitea.woggioni.net/woggioni/gbcs' repository = 'gitea.woggioni.net/woggioni/rbcs'
imageId = 'gitea.woggioni.net/woggioni/gbcs:latest' imageId = 'gitea.woggioni.net/woggioni/rbcs:latest'
tag = version tag = version
} }
Provider<DockerTagImage> dockerTagMemcache = tasks.register('dockerTagMemcacheImage', DockerTagImage) { Provider<DockerTagImage> dockerTagMemcache = tasks.register('dockerTagMemcacheImage', DockerTagImage) {
group = 'docker' group = 'docker'
repository = 'gitea.woggioni.net/woggioni/gbcs' repository = 'gitea.woggioni.net/woggioni/rbcs'
imageId = 'gitea.woggioni.net/woggioni/gbcs:memcache' imageId = 'gitea.woggioni.net/woggioni/rbcs:memcache'
tag = "${version}-memcache" tag = "${version}-memcache"
} }

View File

@@ -1,7 +0,0 @@
module net.woggioni.gbcs.api {
requires static lombok;
requires java.xml;
requires io.netty.buffer;
exports net.woggioni.gbcs.api;
exports net.woggioni.gbcs.api.exception;
}

View File

@@ -1,14 +0,0 @@
package net.woggioni.gbcs.api;
import io.netty.buffer.ByteBuf;
import net.woggioni.gbcs.api.exception.ContentTooLargeException;
import java.nio.channels.ReadableByteChannel;
import java.util.concurrent.CompletableFuture;
public interface Cache extends AutoCloseable {
CompletableFuture<ReadableByteChannel> get(String key);
CompletableFuture<Void> put(String key, ByteBuf content) throws ContentTooLargeException;
}

View File

@@ -1,7 +0,0 @@
package net.woggioni.gbcs.api.exception;
public class GbcsException extends RuntimeException {
public GbcsException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@@ -1,17 +0,0 @@
module net.woggioni.gbcs.cli {
requires org.slf4j;
requires net.woggioni.gbcs.server;
requires info.picocli;
requires net.woggioni.gbcs.common;
requires net.woggioni.gbcs.client;
requires kotlin.stdlib;
requires net.woggioni.jwo;
requires net.woggioni.gbcs.api;
exports net.woggioni.gbcs.cli.impl.converters to info.picocli;
opens net.woggioni.gbcs.cli.impl.commands to info.picocli;
opens net.woggioni.gbcs.cli.impl to info.picocli;
opens net.woggioni.gbcs.cli to info.picocli, net.woggioni.gbcs.common;
exports net.woggioni.gbcs.cli;
}

View File

@@ -1,16 +0,0 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<gbcs-client:profiles xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
xmlns:gbcs-client="urn:net.woggioni.gbcs.client"
xs:schemaLocation="urn:net.woggioni.gbcs.client jms://net.woggioni.gbcs.client/net/woggioni/gbcs/client/schema/gbcs-client.xsd"
>
<profile name="profile1" base-url="https://gbcs1.example.com/">
<tls-client-auth
key-store-file="keystore.pfx"
key-store-password="password"
key-alias="woggioni@c962475fa38"
key-password="key-password"/>
</profile>
<profile name="profile2" base-url="https://gbcs2.example.com/">
<basic-auth user="user" password="password"/>
</profile>
</gbcs-client:profiles>

View File

@@ -1,29 +0,0 @@
package net.woggioni.gbcs.common
import net.woggioni.jwo.JWO
import java.net.URI
import java.net.URL
import java.security.MessageDigest
object GBCS {
fun String.toUrl() : URL = URL.of(URI(this), null)
const val GBCS_NAMESPACE_URI: String = "urn:net.woggioni.gbcs.server"
const val GBCS_PREFIX: String = "gbcs"
const val XML_SCHEMA_NAMESPACE_URI = "http://www.w3.org/2001/XMLSchema-instance"
fun digest(
data: ByteArray,
md: MessageDigest = MessageDigest.getInstance("MD5")
): ByteArray {
md.update(data)
return md.digest()
}
fun digestString(
data: ByteArray,
md: MessageDigest = MessageDigest.getInstance("MD5")
): String {
return JWO.bytesToHex(digest(data, md))
}
}

View File

@@ -1 +0,0 @@
net.woggioni.gbcs.common.GbcsUrlStreamHandlerFactory

View File

@@ -1,19 +0,0 @@
import net.woggioni.gbcs.api.CacheProvider;
module net.woggioni.gbcs.server.memcache {
requires net.woggioni.gbcs.common;
requires net.woggioni.gbcs.api;
requires net.woggioni.jwo;
requires java.xml;
requires kotlin.stdlib;
requires io.netty.transport;
requires io.netty.codec;
requires io.netty.codec.memcache;
requires io.netty.common;
requires io.netty.buffer;
requires org.slf4j;
provides CacheProvider with net.woggioni.gbcs.server.memcache.MemcacheCacheProvider;
opens net.woggioni.gbcs.server.memcache.schema;
}

View File

@@ -1,23 +0,0 @@
package net.woggioni.gbcs.server.memcache
import io.netty.buffer.ByteBuf
import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.server.memcache.client.MemcacheClient
import java.nio.channels.ReadableByteChannel
import java.util.concurrent.CompletableFuture
class MemcacheCache(private val cfg : MemcacheCacheConfiguration) : Cache {
private val memcacheClient = MemcacheClient(cfg)
override fun get(key: String): CompletableFuture<ReadableByteChannel?> {
return memcacheClient.get(key)
}
override fun put(key: String, content: ByteBuf): CompletableFuture<Void> {
return memcacheClient.put(key, content, cfg.maxAge)
}
override fun close() {
memcacheClient.close()
}
}

View File

@@ -1,258 +0,0 @@
package net.woggioni.gbcs.server.memcache.client
import io.netty.bootstrap.Bootstrap
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelOption
import io.netty.channel.ChannelPipeline
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.pool.AbstractChannelPoolHandler
import io.netty.channel.pool.ChannelPool
import io.netty.channel.pool.FixedChannelPool
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.DecoderException
import io.netty.handler.codec.memcache.binary.BinaryMemcacheClientCodec
import io.netty.handler.codec.memcache.binary.BinaryMemcacheObjectAggregator
import io.netty.handler.codec.memcache.binary.BinaryMemcacheOpcodes
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus
import io.netty.handler.codec.memcache.binary.DefaultFullBinaryMemcacheRequest
import io.netty.handler.codec.memcache.binary.FullBinaryMemcacheRequest
import io.netty.handler.codec.memcache.binary.FullBinaryMemcacheResponse
import io.netty.util.concurrent.GenericFutureListener
import net.woggioni.gbcs.common.ByteBufInputStream
import net.woggioni.gbcs.common.ByteBufOutputStream
import net.woggioni.gbcs.common.GBCS.digest
import net.woggioni.gbcs.common.HostAndPort
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.server.memcache.MemcacheCacheConfiguration
import net.woggioni.gbcs.server.memcache.MemcacheException
import net.woggioni.jwo.JWO
import java.io.ByteArrayOutputStream
import java.net.InetSocketAddress
import java.nio.channels.Channels
import java.nio.channels.ReadableByteChannel
import java.security.MessageDigest
import java.time.Duration
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.GZIPInputStream
import java.util.zip.GZIPOutputStream
import java.util.zip.InflaterInputStream
import io.netty.util.concurrent.Future as NettyFuture
class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseable {
private companion object {
@JvmStatic
private val log = contextLogger()
}
private val group: NioEventLoopGroup
private val connectionPool: MutableMap<HostAndPort, ChannelPool> = ConcurrentHashMap()
init {
group = NioEventLoopGroup()
}
private fun newConnectionPool(server: MemcacheCacheConfiguration.Server): FixedChannelPool {
val bootstrap = Bootstrap().apply {
group(group)
channel(NioSocketChannel::class.java)
option(ChannelOption.SO_KEEPALIVE, true)
remoteAddress(InetSocketAddress(server.endpoint.host, server.endpoint.port))
server.connectionTimeoutMillis?.let {
option(ChannelOption.CONNECT_TIMEOUT_MILLIS, it)
}
}
val channelPoolHandler = object : AbstractChannelPoolHandler() {
override fun channelCreated(ch: Channel) {
val pipeline: ChannelPipeline = ch.pipeline()
pipeline.addLast(BinaryMemcacheClientCodec())
pipeline.addLast(BinaryMemcacheObjectAggregator(Integer.MAX_VALUE))
}
}
return FixedChannelPool(bootstrap, channelPoolHandler, server.maxConnections)
}
private fun sendRequest(request: FullBinaryMemcacheRequest): CompletableFuture<FullBinaryMemcacheResponse> {
val server = cfg.servers.let { servers ->
if (servers.size > 1) {
val key = request.key().duplicate()
var checksum = 0
while (key.readableBytes() > 4) {
val byte = key.readInt()
checksum = checksum xor byte
}
while (key.readableBytes() > 0) {
val byte = key.readByte()
checksum = checksum xor byte.toInt()
}
servers[checksum % servers.size]
} else {
servers.first()
}
}
val response = CompletableFuture<FullBinaryMemcacheResponse>()
// Custom handler for processing responses
val pool = connectionPool.computeIfAbsent(server.endpoint) {
newConnectionPool(server)
}
pool.acquire().addListener(object : GenericFutureListener<NettyFuture<Channel>> {
override fun operationComplete(channelFuture: NettyFuture<Channel>) {
if (channelFuture.isSuccess) {
val channel = channelFuture.now
val pipeline = channel.pipeline()
channel.pipeline()
.addLast("client-handler", object : SimpleChannelInboundHandler<FullBinaryMemcacheResponse>() {
override fun channelRead0(
ctx: ChannelHandlerContext,
msg: FullBinaryMemcacheResponse
) {
pipeline.removeLast()
pool.release(channel)
msg.touch("The method's caller must remember to release this")
response.complete(msg.retain())
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
val ex = when (cause) {
is DecoderException -> cause.cause!!
else -> cause
}
ctx.close()
pipeline.removeLast()
pool.release(channel)
response.completeExceptionally(ex)
}
})
request.touch()
channel.writeAndFlush(request)
} else {
response.completeExceptionally(channelFuture.cause())
}
}
})
return response
}
private fun encodeExpiry(expiry: Duration): Int {
val expirySeconds = expiry.toSeconds()
return expirySeconds.toInt().takeIf { it.toLong() == expirySeconds }
?: Instant.ofEpochSecond(expirySeconds).epochSecond.toInt()
}
fun get(key: String): CompletableFuture<ReadableByteChannel?> {
val request = (cfg.digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digest(key.toByteArray(), md)
} ?: key.toByteArray(Charsets.UTF_8)).let { digest ->
DefaultFullBinaryMemcacheRequest(Unpooled.wrappedBuffer(digest), null).apply {
setOpcode(BinaryMemcacheOpcodes.GET)
}
}
return sendRequest(request).thenApply { response ->
try {
when (val status = response.status()) {
BinaryMemcacheResponseStatus.SUCCESS -> {
val compressionMode = cfg.compressionMode
val content = response.content().retain()
content.touch()
if (compressionMode != null) {
when (compressionMode) {
MemcacheCacheConfiguration.CompressionMode.GZIP -> {
GZIPInputStream(ByteBufInputStream(content))
}
MemcacheCacheConfiguration.CompressionMode.DEFLATE -> {
InflaterInputStream(ByteBufInputStream(content))
}
}
} else {
ByteBufInputStream(content)
}.let(Channels::newChannel)
}
BinaryMemcacheResponseStatus.KEY_ENOENT -> {
null
}
else -> throw MemcacheException(status)
}
} finally {
response.release()
}
}
}
fun put(key: String, content: ByteBuf, expiry: Duration, cas: Long? = null): CompletableFuture<Void> {
val request = (cfg.digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digest(key.toByteArray(), md)
} ?: key.toByteArray(Charsets.UTF_8)).let { digest ->
val extras = Unpooled.buffer(8, 8)
extras.writeInt(0)
extras.writeInt(encodeExpiry(expiry))
val compressionMode = cfg.compressionMode
content.retain()
val payload = if (compressionMode != null) {
val inputStream = ByteBufInputStream(content)
val buf = content.alloc().buffer()
buf.retain()
val outputStream = when (compressionMode) {
MemcacheCacheConfiguration.CompressionMode.GZIP -> {
GZIPOutputStream(ByteBufOutputStream(buf))
}
MemcacheCacheConfiguration.CompressionMode.DEFLATE -> {
DeflaterOutputStream(ByteBufOutputStream(buf), Deflater(Deflater.DEFAULT_COMPRESSION, false))
}
}
inputStream.use { i ->
outputStream.use { o ->
JWO.copy(i, o)
}
}
buf
} else {
content
}
DefaultFullBinaryMemcacheRequest(Unpooled.wrappedBuffer(digest), extras, payload).apply {
setOpcode(BinaryMemcacheOpcodes.SET)
cas?.let(this::setCas)
}
}
return sendRequest(request).thenApply { response ->
try {
when (val status = response.status()) {
BinaryMemcacheResponseStatus.SUCCESS -> null
else -> throw MemcacheException(status)
}
} finally {
response.release()
}
}
}
fun shutDown(): NettyFuture<*> {
return group.shutdownGracefully()
}
override fun close() {
shutDown().sync()
}
}

View File

@@ -1 +0,0 @@
net.woggioni.gbcs.server.memcache.MemcacheCacheProvider

View File

@@ -1,130 +0,0 @@
package net.woggioni.gbcs.server.cache
import io.netty.buffer.ByteBuf
import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.common.ByteBufInputStream
import net.woggioni.gbcs.common.GBCS.digestString
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.jwo.JWO
import net.woggioni.jwo.LockFile
import java.nio.channels.Channels
import java.nio.channels.FileChannel
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardCopyOption
import java.nio.file.StandardOpenOption
import java.nio.file.attribute.BasicFileAttributes
import java.security.MessageDigest
import java.time.Duration
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicReference
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.Inflater
import java.util.zip.InflaterInputStream
class FileSystemCache(
val root: Path,
val maxAge: Duration,
val digestAlgorithm: String?,
val compressionEnabled: Boolean,
val compressionLevel: Int
) : Cache {
private companion object {
@JvmStatic
private val log = contextLogger()
}
init {
Files.createDirectories(root)
}
private var nextGc = AtomicReference(Instant.now().plus(maxAge))
override fun get(key: String) = (digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digestString(key.toByteArray(), md)
} ?: key).let { digest ->
root.resolve(digest).takeIf(Files::exists)
?.let { file ->
file.takeIf(Files::exists)?.let { file ->
if (compressionEnabled) {
val inflater = Inflater()
Channels.newChannel(
InflaterInputStream(
Channels.newInputStream(
FileChannel.open(
file,
StandardOpenOption.READ
)
), inflater
)
)
} else {
FileChannel.open(file, StandardOpenOption.READ)
}
}
}.also {
gc()
}.let {
CompletableFuture.completedFuture(it)
}
}
override fun put(key: String, content: ByteBuf): CompletableFuture<Void> {
(digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digestString(key.toByteArray(), md)
} ?: key).let { digest ->
val file = root.resolve(digest)
val tmpFile = Files.createTempFile(root, null, ".tmp")
try {
Files.newOutputStream(tmpFile).let {
if (compressionEnabled) {
val deflater = Deflater(compressionLevel)
DeflaterOutputStream(it, deflater)
} else {
it
}
}.use {
JWO.copy(ByteBufInputStream(content), it)
}
Files.move(tmpFile, file, StandardCopyOption.ATOMIC_MOVE)
} catch (t: Throwable) {
Files.delete(tmpFile)
throw t
}
}.also {
gc()
}
return CompletableFuture.completedFuture(null)
}
private fun gc() {
val now = Instant.now()
val oldValue = nextGc.getAndSet(now.plus(maxAge))
if (oldValue < now) {
actualGc(now)
}
}
@Synchronized
private fun actualGc(now: Instant) {
Files.list(root).filter {
val creationTimeStamp = Files.readAttributes(it, BasicFileAttributes::class.java)
.creationTime()
.toInstant()
now > creationTimeStamp.plus(maxAge)
}.forEach { file ->
LockFile.acquire(file, false).use {
Files.delete(file)
}
}
}
override fun close() {}
}

View File

@@ -1,150 +0,0 @@
package net.woggioni.gbcs.server.cache
import io.netty.buffer.ByteBuf
import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.common.ByteBufInputStream
import net.woggioni.gbcs.common.ByteBufOutputStream
import net.woggioni.gbcs.common.GBCS.digestString
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.jwo.JWO
import java.nio.channels.Channels
import java.security.MessageDigest
import java.time.Duration
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.PriorityBlockingQueue
import java.util.concurrent.atomic.AtomicLong
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.Inflater
import java.util.zip.InflaterInputStream
class InMemoryCache(
val maxAge: Duration,
val maxSize: Long,
val digestAlgorithm: String?,
val compressionEnabled: Boolean,
val compressionLevel: Int
) : Cache {
companion object {
@JvmStatic
private val log = contextLogger()
}
private val size = AtomicLong()
private val map = ConcurrentHashMap<String, ByteBuf>()
private class RemovalQueueElement(val key: String, val value : ByteBuf, val expiry : Instant) : Comparable<RemovalQueueElement> {
override fun compareTo(other: RemovalQueueElement) = expiry.compareTo(other.expiry)
}
private val removalQueue = PriorityBlockingQueue<RemovalQueueElement>()
private var running = true
private val garbageCollector = Thread {
while(true) {
val el = removalQueue.take()
val buf = el.value
val now = Instant.now()
if(now > el.expiry) {
val removed = map.remove(el.key, buf)
if(removed) {
updateSizeAfterRemoval(buf)
//Decrease the reference count for map
buf.release()
}
//Decrease the reference count for removalQueue
buf.release()
} else {
removalQueue.put(el)
Thread.sleep(minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1)))
}
}
}.apply {
start()
}
private fun removeEldest() : Long {
while(true) {
val el = removalQueue.take()
val buf = el.value
val removed = map.remove(el.key, buf)
//Decrease the reference count for removalQueue
buf.release()
if(removed) {
val newSize = updateSizeAfterRemoval(buf)
//Decrease the reference count for map
buf.release()
return newSize
}
}
}
private fun updateSizeAfterRemoval(removed: ByteBuf) : Long {
return size.updateAndGet { currentSize : Long ->
currentSize - removed.readableBytes()
}
}
override fun close() {
running = false
garbageCollector.join()
}
override fun get(key: String) =
(digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digestString(key.toByteArray(), md)
} ?: key
).let { digest ->
map[digest]
?.let { value ->
val copy = value.retainedDuplicate()
copy.touch("This has to be released by the caller of the cache")
if (compressionEnabled) {
val inflater = Inflater()
Channels.newChannel(InflaterInputStream(ByteBufInputStream(copy), inflater))
} else {
Channels.newChannel(ByteBufInputStream(copy))
}
}
}.let {
CompletableFuture.completedFuture(it)
}
override fun put(key: String, content: ByteBuf) =
(digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digestString(key.toByteArray(), md)
} ?: key).let { digest ->
content.retain()
val value = if (compressionEnabled) {
val deflater = Deflater(compressionLevel)
val buf = content.alloc().buffer()
buf.retain()
DeflaterOutputStream(ByteBufOutputStream(buf), deflater).use { outputStream ->
ByteBufInputStream(content).use { inputStream ->
JWO.copy(inputStream, outputStream)
}
}
buf
} else {
content
}
val old = map.put(digest, value)
val delta = value.readableBytes() - (old?.readableBytes() ?: 0)
var newSize = size.updateAndGet { currentSize : Long ->
currentSize + delta
}
removalQueue.put(RemovalQueueElement(digest, value.retain(), Instant.now().plus(maxAge)))
while(newSize > maxSize) {
newSize = removeEldest()
}
}.let {
CompletableFuture.completedFuture<Void>(null)
}
}

View File

@@ -1,161 +0,0 @@
package net.woggioni.gbcs.server.handler
import io.netty.buffer.Unpooled
import io.netty.channel.ChannelFutureListener
import io.netty.channel.ChannelHandler
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.DefaultFileRegion
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.http.DefaultFullHttpResponse
import io.netty.handler.codec.http.DefaultHttpResponse
import io.netty.handler.codec.http.FullHttpRequest
import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpHeaderValues
import io.netty.handler.codec.http.HttpMethod
import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpUtil
import io.netty.handler.codec.http.LastHttpContent
import io.netty.handler.stream.ChunkedNioStream
import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.server.debug
import net.woggioni.gbcs.server.warn
import java.nio.channels.FileChannel
import java.nio.file.Path
@ChannelHandler.Sharable
class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
SimpleChannelInboundHandler<FullHttpRequest>() {
private val log = contextLogger()
override fun channelRead0(ctx: ChannelHandlerContext, msg: FullHttpRequest) {
val keepAlive: Boolean = HttpUtil.isKeepAlive(msg)
val method = msg.method()
if (method === HttpMethod.GET) {
val path = Path.of(msg.uri())
val prefix = path.parent
val key = path.fileName?.toString() ?: let {
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response)
return
}
if (serverPrefix == prefix) {
cache.get(key).thenApply { channel ->
if(channel != null) {
log.debug(ctx) {
"Cache hit for key '$key'"
}
val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK)
response.headers()[HttpHeaderNames.CONTENT_TYPE] = HttpHeaderValues.APPLICATION_OCTET_STREAM
if (!keepAlive) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.IDENTITY)
} else {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
}
ctx.write(response)
when (channel) {
is FileChannel -> {
val content = DefaultFileRegion(channel, 0, channel.size())
if (keepAlive) {
ctx.write(content)
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate())
} else {
ctx.writeAndFlush(content)
.addListener(ChannelFutureListener.CLOSE)
}
}
else -> {
val content = ChunkedNioStream(channel)
if (keepAlive) {
ctx.write(content).addListener {
content.close()
}
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate())
} else {
ctx.writeAndFlush(content)
.addListener(ChannelFutureListener.CLOSE)
}
}
}
} else {
log.debug(ctx) {
"Cache miss for key '$key'"
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response)
}
}.whenComplete { _, ex -> ex?.let(ctx::fireExceptionCaught) }
} else {
log.warn(ctx) {
"Got request for unhandled path '${msg.uri()}'"
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.BAD_REQUEST)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response)
}
} else if (method === HttpMethod.PUT) {
val path = Path.of(msg.uri())
val prefix = path.parent
val key = path.fileName.toString()
if (serverPrefix == prefix) {
log.debug(ctx) {
"Added value for key '$key' to build cache"
}
cache.put(key, msg.content()).thenRun {
val response = DefaultFullHttpResponse(
msg.protocolVersion(), HttpResponseStatus.CREATED,
Unpooled.copiedBuffer(key.toByteArray())
)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes()
ctx.writeAndFlush(response)
}.whenComplete { _, ex ->
ctx.fireExceptionCaught(ex)
}
} else {
log.warn(ctx) {
"Got request for unhandled path '${msg.uri()}'"
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.BAD_REQUEST)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
ctx.writeAndFlush(response)
}
} else if(method == HttpMethod.TRACE) {
val replayedRequestHead = ctx.alloc().buffer()
replayedRequestHead.writeCharSequence("TRACE ${Path.of(msg.uri())} ${msg.protocolVersion().text()}\r\n", Charsets.US_ASCII)
msg.headers().forEach { (key, value) ->
replayedRequestHead.apply {
writeCharSequence(key, Charsets.US_ASCII)
writeCharSequence(": ", Charsets.US_ASCII)
writeCharSequence(value, Charsets.UTF_8)
writeCharSequence("\r\n", Charsets.US_ASCII)
}
}
replayedRequestHead.writeCharSequence("\r\n", Charsets.US_ASCII)
val requestBody = msg.content()
requestBody.retain()
val responseBody = ctx.alloc().compositeBuffer(2).apply {
addComponents(true, replayedRequestHead)
addComponents(true, requestBody)
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK, responseBody)
response.headers().apply {
set(HttpHeaderNames.CONTENT_TYPE, "message/http")
set(HttpHeaderNames.CONTENT_LENGTH, responseBody.readableBytes())
}
ctx.writeAndFlush(response)
} else {
log.warn(ctx) {
"Got request with unhandled method '${msg.method().name()}'"
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.METHOD_NOT_ALLOWED)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
ctx.writeAndFlush(response)
}
}
}

View File

@@ -1,99 +0,0 @@
package net.woggioni.gbcs.server.throttling
import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.handler.codec.http.DefaultFullHttpResponse
import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpVersion
import net.woggioni.gbcs.api.Configuration
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.server.GradleBuildCacheServer
import net.woggioni.jwo.Bucket
import net.woggioni.jwo.LongMath
import java.net.InetSocketAddress
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeUnit
@Sharable
class ThrottlingHandler(cfg: Configuration) :
ChannelInboundHandlerAdapter() {
private val log = contextLogger()
private val bucketManager = BucketManager.from(cfg)
private val connectionConfiguration = cfg.connection
/**
* If the suggested waiting time from the bucket is lower than this
* amount, then the server will simply wait by itself before sending a response
* instead of replying with 429
*/
private val waitThreshold = minOf(
connectionConfiguration.idleTimeout,
connectionConfiguration.readIdleTimeout,
connectionConfiguration.writeIdleTimeout
).dividedBy(2)
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
val buckets = mutableListOf<Bucket>()
val user = ctx.channel().attr(GradleBuildCacheServer.userAttribute).get()
if (user != null) {
bucketManager.getBucketByUser(user)?.let(buckets::addAll)
}
val groups = ctx.channel().attr(GradleBuildCacheServer.groupAttribute).get() ?: emptySet()
if (groups.isNotEmpty()) {
groups.forEach { group ->
bucketManager.getBucketByGroup(group)?.let(buckets::add)
}
}
if (user == null && groups.isEmpty()) {
bucketManager.getBucketByAddress(ctx.channel().remoteAddress() as InetSocketAddress)?.let(buckets::add)
}
if (buckets.isEmpty()) {
return super.channelRead(ctx, msg)
} else {
handleBuckets(buckets, ctx, msg, true)
}
}
private fun handleBuckets(buckets : List<Bucket>, ctx : ChannelHandlerContext, msg : Any, delayResponse : Boolean) {
var nextAttempt = -1L
for (bucket in buckets) {
val bucketNextAttempt = bucket.removeTokensWithEstimate(1)
if (bucketNextAttempt > nextAttempt) {
nextAttempt = bucketNextAttempt
}
}
if(nextAttempt < 0) {
super.channelRead(ctx, msg)
return
}
val waitDuration = Duration.of(LongMath.ceilDiv(nextAttempt, 100_000_000L) * 100L, ChronoUnit.MILLIS)
if (delayResponse && waitDuration < waitThreshold) {
ctx.executor().schedule({
handleBuckets(buckets, ctx, msg, false)
}, waitDuration.toMillis(), TimeUnit.MILLISECONDS)
} else {
sendThrottledResponse(ctx, waitDuration)
}
}
private fun sendThrottledResponse(ctx: ChannelHandlerContext, retryAfter: Duration) {
val response = DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
HttpResponseStatus.TOO_MANY_REQUESTS
)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
retryAfter.seconds.takeIf {
it > 0
}?.let {
response.headers()[HttpHeaderNames.RETRY_AFTER] = retryAfter.seconds
}
ctx.writeAndFlush(response)
}
}

View File

@@ -1,2 +0,0 @@
net.woggioni.gbcs.server.cache.FileSystemCacheProvider
net.woggioni.gbcs.server.cache.InMemoryCacheProvider

View File

@@ -2,11 +2,10 @@ org.gradle.configuration-cache=false
org.gradle.parallel=true org.gradle.parallel=true
org.gradle.caching=true org.gradle.caching=true
gbcs.version = 0.1.2 rbcs.version = 0.1.6
lys.version = 2025.01.31 lys.version = 2025.02.08
gitea.maven.url = https://gitea.woggioni.net/api/packages/woggioni/maven gitea.maven.url = https://gitea.woggioni.net/api/packages/woggioni/maven
docker.registry.url=gitea.woggioni.net docker.registry.url=gitea.woggioni.net
jpms-check.configurationName = runtimeClasspath

View File

@@ -0,0 +1,8 @@
module net.woggioni.rbcs.api {
requires static lombok;
requires java.xml;
requires io.netty.buffer;
exports net.woggioni.rbcs.api;
exports net.woggioni.rbcs.api.exception;
exports net.woggioni.rbcs.api.event;
}

View File

@@ -0,0 +1,17 @@
package net.woggioni.rbcs.api;
import io.netty.buffer.ByteBufAllocator;
import java.util.concurrent.CompletableFuture;
public interface Cache extends AutoCloseable {
default void get(String key, ResponseHandle responseHandle, ByteBufAllocator alloc) {
throw new UnsupportedOperationException();
}
default CompletableFuture<RequestHandle> put(String key, ResponseHandle responseHandle, ByteBufAllocator alloc) {
throw new UnsupportedOperationException();
}
}

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.api; package net.woggioni.rbcs.api;
import org.w3c.dom.Document; import org.w3c.dom.Document;
import org.w3c.dom.Element; import org.w3c.dom.Element;

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.api; package net.woggioni.rbcs.api;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
@@ -135,7 +135,7 @@ public class Configuration {
} }
public interface Cache { public interface Cache {
net.woggioni.gbcs.api.Cache materialize(); net.woggioni.rbcs.api.Cache materialize();
String getNamespaceURI(); String getNamespaceURI();
String getTypeName(); String getTypeName();
} }

View File

@@ -0,0 +1,8 @@
package net.woggioni.rbcs.api;
import net.woggioni.rbcs.api.event.RequestStreamingEvent;
@FunctionalInterface
public interface RequestHandle {
void handleEvent(RequestStreamingEvent evt);
}

View File

@@ -0,0 +1,8 @@
package net.woggioni.rbcs.api;
import net.woggioni.rbcs.api.event.ResponseStreamingEvent;
@FunctionalInterface
public interface ResponseHandle {
void handleEvent(ResponseStreamingEvent evt);
}

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.api; package net.woggioni.rbcs.api;
public enum Role { public enum Role {
Reader, Writer Reader, Writer

View File

@@ -0,0 +1,26 @@
package net.woggioni.rbcs.api.event;
import io.netty.buffer.ByteBuf;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
public sealed interface RequestStreamingEvent {
@Getter
@RequiredArgsConstructor
non-sealed class ChunkReceived implements RequestStreamingEvent {
private final ByteBuf chunk;
}
final class LastChunkReceived extends ChunkReceived {
public LastChunkReceived(ByteBuf chunk) {
super(chunk);
}
}
@Getter
@RequiredArgsConstructor
final class ExceptionCaught implements RequestStreamingEvent {
private final Throwable exception;
}
}

View File

@@ -0,0 +1,42 @@
package net.woggioni.rbcs.api.event;
import io.netty.buffer.ByteBuf;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import java.nio.channels.FileChannel;
public sealed interface ResponseStreamingEvent {
final class ResponseReceived implements ResponseStreamingEvent {
}
@Getter
@RequiredArgsConstructor
non-sealed class ChunkReceived implements ResponseStreamingEvent {
private final ByteBuf chunk;
}
@Getter
@RequiredArgsConstructor
non-sealed class FileReceived implements ResponseStreamingEvent {
private final FileChannel file;
}
final class LastChunkReceived extends ChunkReceived {
public LastChunkReceived(ByteBuf chunk) {
super(chunk);
}
}
@Getter
@RequiredArgsConstructor
final class ExceptionCaught implements ResponseStreamingEvent {
private final Throwable exception;
}
final class NotFound implements ResponseStreamingEvent { }
NotFound NOT_FOUND = new NotFound();
ResponseReceived RESPONSE_RECEIVED = new ResponseReceived();
}

View File

@@ -1,6 +1,6 @@
package net.woggioni.gbcs.api.exception; package net.woggioni.rbcs.api.exception;
public class CacheException extends GbcsException { public class CacheException extends RbcsException {
public CacheException(String message, Throwable cause) { public CacheException(String message, Throwable cause) {
super(message, cause); super(message, cause);
} }

View File

@@ -1,6 +1,6 @@
package net.woggioni.gbcs.api.exception; package net.woggioni.rbcs.api.exception;
public class ConfigurationException extends GbcsException { public class ConfigurationException extends RbcsException {
public ConfigurationException(String message, Throwable cause) { public ConfigurationException(String message, Throwable cause) {
super(message, cause); super(message, cause);
} }

View File

@@ -1,6 +1,6 @@
package net.woggioni.gbcs.api.exception; package net.woggioni.rbcs.api.exception;
public class ContentTooLargeException extends GbcsException { public class ContentTooLargeException extends RbcsException {
public ContentTooLargeException(String message, Throwable cause) { public ContentTooLargeException(String message, Throwable cause) {
super(message, cause); super(message, cause);
} }

View File

@@ -0,0 +1,7 @@
package net.woggioni.rbcs.api.exception;
public class RbcsException extends RuntimeException {
public RbcsException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@@ -17,9 +17,9 @@ import net.woggioni.gradle.graalvm.JlinkPlugin
import net.woggioni.gradle.graalvm.JlinkTask import net.woggioni.gradle.graalvm.JlinkTask
Property<String> mainModuleName = objects.property(String.class) Property<String> mainModuleName = objects.property(String.class)
mainModuleName.set('net.woggioni.gbcs.cli') mainModuleName.set('net.woggioni.rbcs.cli')
Property<String> mainClassName = objects.property(String.class) Property<String> mainClassName = objects.property(String.class)
mainClassName.set('net.woggioni.gbcs.cli.GradleBuildCacheServerCli') mainClassName.set('net.woggioni.rbcs.cli.RemoteBuildCacheServerCli')
tasks.named(JavaPlugin.COMPILE_JAVA_TASK_NAME, JavaCompile) { tasks.named(JavaPlugin.COMPILE_JAVA_TASK_NAME, JavaCompile) {
options.javaModuleMainClass = mainClassName options.javaModuleMainClass = mainClassName
@@ -44,11 +44,10 @@ envelopeJar {
dependencies { dependencies {
implementation catalog.jwo implementation catalog.jwo
implementation catalog.slf4j.api implementation catalog.slf4j.api
implementation catalog.netty.codec.http
implementation catalog.picocli implementation catalog.picocli
implementation project(':gbcs-client') implementation project(':rbcs-client')
implementation project(':gbcs-server') implementation project(':rbcs-server')
// runtimeOnly catalog.slf4j.jdk14 // runtimeOnly catalog.slf4j.jdk14
runtimeOnly catalog.logback.classic runtimeOnly catalog.logback.classic
@@ -56,10 +55,10 @@ dependencies {
} }
Provider<EnvelopeJarTask> envelopeJarTaskProvider = tasks.named('envelopeJar', EnvelopeJarTask.class) { Provider<EnvelopeJarTask> envelopeJarTaskProvider = tasks.named('envelopeJar', EnvelopeJarTask.class) {
// systemProperties['java.util.logging.config.class'] = 'net.woggioni.gbcs.LoggingConfig' // systemProperties['java.util.logging.config.class'] = 'net.woggioni.rbcs.LoggingConfig'
// systemProperties['log.config.source'] = 'net/woggioni/gbcs/cli/logging.properties' // systemProperties['log.config.source'] = 'net/woggioni/rbcs/cli/logging.properties'
// systemProperties['java.util.logging.config.file'] = 'classpath:net/woggioni/gbcs/cli/logging.properties' // systemProperties['java.util.logging.config.file'] = 'classpath:net/woggioni/rbcs/cli/logging.properties'
systemProperties['logback.configurationFile'] = 'classpath:net/woggioni/gbcs/cli/logback.xml' systemProperties['logback.configurationFile'] = 'classpath:net/woggioni/rbcs/cli/logback.xml'
systemProperties['io.netty.leakDetectionLevel'] = 'DISABLED' systemProperties['io.netty.leakDetectionLevel'] = 'DISABLED'
// systemProperties['org.slf4j.simpleLogger.showDateTime'] = 'true' // systemProperties['org.slf4j.simpleLogger.showDateTime'] = 'true'
@@ -83,7 +82,7 @@ tasks.named(NativeImagePlugin.NATIVE_IMAGE_TASK_NAME, NativeImageTask) {
tasks.named(JlinkPlugin.JLINK_TASK_NAME, JlinkTask) { tasks.named(JlinkPlugin.JLINK_TASK_NAME, JlinkTask) {
mainClass = mainClassName mainClass = mainClassName
mainModule = 'net.woggioni.gbcs.cli' mainModule = 'net.woggioni.rbcs.cli'
} }
artifacts { artifacts {

View File

@@ -0,0 +1,17 @@
module net.woggioni.rbcs.cli {
requires org.slf4j;
requires net.woggioni.rbcs.server;
requires info.picocli;
requires net.woggioni.rbcs.common;
requires net.woggioni.rbcs.client;
requires kotlin.stdlib;
requires net.woggioni.jwo;
requires net.woggioni.rbcs.api;
exports net.woggioni.rbcs.cli.impl.converters to info.picocli;
opens net.woggioni.rbcs.cli.impl.commands to info.picocli;
opens net.woggioni.rbcs.cli.impl to info.picocli;
opens net.woggioni.rbcs.cli to info.picocli, net.woggioni.rbcs.common;
exports net.woggioni.rbcs.cli;
}

View File

@@ -1,43 +1,43 @@
package net.woggioni.gbcs.cli package net.woggioni.rbcs.cli
import net.woggioni.gbcs.cli.impl.AbstractVersionProvider import net.woggioni.rbcs.cli.impl.AbstractVersionProvider
import net.woggioni.gbcs.cli.impl.GbcsCommand import net.woggioni.rbcs.cli.impl.RbcsCommand
import net.woggioni.gbcs.cli.impl.commands.BenchmarkCommand import net.woggioni.rbcs.cli.impl.commands.BenchmarkCommand
import net.woggioni.gbcs.cli.impl.commands.ClientCommand import net.woggioni.rbcs.cli.impl.commands.ClientCommand
import net.woggioni.gbcs.cli.impl.commands.GetCommand import net.woggioni.rbcs.cli.impl.commands.GetCommand
import net.woggioni.gbcs.cli.impl.commands.HealthCheckCommand import net.woggioni.rbcs.cli.impl.commands.HealthCheckCommand
import net.woggioni.gbcs.cli.impl.commands.PasswordHashCommand import net.woggioni.rbcs.cli.impl.commands.PasswordHashCommand
import net.woggioni.gbcs.cli.impl.commands.PutCommand import net.woggioni.rbcs.cli.impl.commands.PutCommand
import net.woggioni.gbcs.cli.impl.commands.ServerCommand import net.woggioni.rbcs.cli.impl.commands.ServerCommand
import net.woggioni.gbcs.common.GbcsUrlStreamHandlerFactory import net.woggioni.rbcs.common.RbcsUrlStreamHandlerFactory
import net.woggioni.gbcs.common.contextLogger import net.woggioni.rbcs.common.contextLogger
import net.woggioni.jwo.Application import net.woggioni.jwo.Application
import picocli.CommandLine import picocli.CommandLine
import picocli.CommandLine.Model.CommandSpec import picocli.CommandLine.Model.CommandSpec
@CommandLine.Command( @CommandLine.Command(
name = "gbcs", versionProvider = GradleBuildCacheServerCli.VersionProvider::class name = "rbcs", versionProvider = RemoteBuildCacheServerCli.VersionProvider::class
) )
class GradleBuildCacheServerCli : GbcsCommand() { class RemoteBuildCacheServerCli : RbcsCommand() {
class VersionProvider : AbstractVersionProvider() class VersionProvider : AbstractVersionProvider()
companion object { companion object {
@JvmStatic @JvmStatic
fun main(vararg args: String) { fun main(vararg args: String) {
val currentClassLoader = GradleBuildCacheServerCli::class.java.classLoader val currentClassLoader = RemoteBuildCacheServerCli::class.java.classLoader
Thread.currentThread().contextClassLoader = currentClassLoader Thread.currentThread().contextClassLoader = currentClassLoader
if(currentClassLoader.javaClass.name == "net.woggioni.envelope.loader.ModuleClassLoader") { if(currentClassLoader.javaClass.name == "net.woggioni.envelope.loader.ModuleClassLoader") {
//We're running in an envelope jar and custom URL protocols won't work //We're running in an envelope jar and custom URL protocols won't work
GbcsUrlStreamHandlerFactory.install() RbcsUrlStreamHandlerFactory.install()
} }
val log = contextLogger() val log = contextLogger()
val app = Application.builder("gbcs") val app = Application.builder("rbcs")
.configurationDirectoryEnvVar("GBCS_CONFIGURATION_DIR") .configurationDirectoryEnvVar("RBCS_CONFIGURATION_DIR")
.configurationDirectoryPropertyKey("net.woggioni.gbcs.conf.dir") .configurationDirectoryPropertyKey("net.woggioni.rbcs.conf.dir")
.build() .build()
val gbcsCli = GradleBuildCacheServerCli() val rbcsCli = RemoteBuildCacheServerCli()
val commandLine = CommandLine(gbcsCli) val commandLine = CommandLine(rbcsCli)
commandLine.setExecutionExceptionHandler { ex, cl, parseResult -> commandLine.setExecutionExceptionHandler { ex, cl, parseResult ->
log.error(ex.message, ex) log.error(ex.message, ex)
CommandLine.ExitCode.SOFTWARE CommandLine.ExitCode.SOFTWARE

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.cli.impl package net.woggioni.rbcs.cli.impl
import picocli.CommandLine import picocli.CommandLine
import java.util.jar.Attributes import java.util.jar.Attributes

View File

@@ -1,11 +1,11 @@
package net.woggioni.gbcs.cli.impl package net.woggioni.rbcs.cli.impl
import net.woggioni.jwo.Application import net.woggioni.jwo.Application
import picocli.CommandLine import picocli.CommandLine
import java.nio.file.Path import java.nio.file.Path
abstract class GbcsCommand : Runnable { abstract class RbcsCommand : Runnable {
@CommandLine.Option(names = ["-h", "--help"], usageHelp = true) @CommandLine.Option(names = ["-h", "--help"], usageHelp = true)
var usageHelp = false var usageHelp = false

View File

@@ -1,11 +1,13 @@
package net.woggioni.gbcs.cli.impl.commands package net.woggioni.rbcs.cli.impl.commands
import net.woggioni.gbcs.cli.impl.GbcsCommand import net.woggioni.rbcs.cli.impl.RbcsCommand
import net.woggioni.gbcs.client.GradleBuildCacheClient import net.woggioni.rbcs.client.RemoteBuildCacheClient
import net.woggioni.gbcs.common.contextLogger import net.woggioni.rbcs.common.contextLogger
import net.woggioni.gbcs.common.error import net.woggioni.rbcs.common.error
import net.woggioni.gbcs.common.info import net.woggioni.rbcs.common.info
import net.woggioni.jwo.JWO import net.woggioni.jwo.JWO
import net.woggioni.jwo.LongMath
import net.woggioni.rbcs.common.debug
import picocli.CommandLine import picocli.CommandLine
import java.security.SecureRandom import java.security.SecureRandom
import java.time.Duration import java.time.Duration
@@ -20,7 +22,7 @@ import kotlin.random.Random
description = ["Run a load test against the server"], description = ["Run a load test against the server"],
showDefaultValues = true showDefaultValues = true
) )
class BenchmarkCommand : GbcsCommand() { class BenchmarkCommand : RbcsCommand() {
private val log = contextLogger() private val log = contextLogger()
@CommandLine.Spec @CommandLine.Spec
@@ -46,7 +48,8 @@ class BenchmarkCommand : GbcsCommand() {
clientCommand.configuration.profiles[profileName] clientCommand.configuration.profiles[profileName]
?: throw IllegalArgumentException("Profile $profileName does not exist in configuration") ?: throw IllegalArgumentException("Profile $profileName does not exist in configuration")
} }
GradleBuildCacheClient(profile).use { client -> val progressThreshold = LongMath.ceilDiv(numberOfEntries.toLong(), 20)
RemoteBuildCacheClient(profile).use { client ->
val entryGenerator = sequence { val entryGenerator = sequence {
val random = Random(SecureRandom.getInstance("NativePRNGNonBlocking").nextLong()) val random = Random(SecureRandom.getInstance("NativePRNGNonBlocking").nextLong())
@@ -79,7 +82,12 @@ class BenchmarkCommand : GbcsCommand() {
completionQueue.put(result) completionQueue.put(result)
} }
semaphore.release() semaphore.release()
completionCounter.incrementAndGet() val completed = completionCounter.incrementAndGet()
if(completed.mod(progressThreshold) == 0L) {
log.debug {
"Inserted $completed / $numberOfEntries"
}
}
} }
} else { } else {
Thread.sleep(0) Thread.sleep(0)
@@ -121,7 +129,12 @@ class BenchmarkCommand : GbcsCommand() {
} }
} }
future.whenComplete { _, _ -> future.whenComplete { _, _ ->
completionCounter.incrementAndGet() val completed = completionCounter.incrementAndGet()
if(completed.mod(progressThreshold) == 0L) {
log.debug {
"Retrieved $completed / ${entries.size}"
}
}
semaphore.release() semaphore.release()
} }
} else { } else {

View File

@@ -1,24 +1,24 @@
package net.woggioni.gbcs.cli.impl.commands package net.woggioni.rbcs.cli.impl.commands
import net.woggioni.gbcs.cli.impl.GbcsCommand import net.woggioni.rbcs.cli.impl.RbcsCommand
import net.woggioni.gbcs.client.GradleBuildCacheClient import net.woggioni.rbcs.client.RemoteBuildCacheClient
import net.woggioni.jwo.Application import net.woggioni.jwo.Application
import picocli.CommandLine import picocli.CommandLine
import java.nio.file.Path import java.nio.file.Path
@CommandLine.Command( @CommandLine.Command(
name = "client", name = "client",
description = ["GBCS client"], description = ["RBCS client"],
showDefaultValues = true showDefaultValues = true
) )
class ClientCommand(app : Application) : GbcsCommand() { class ClientCommand(app : Application) : RbcsCommand() {
@CommandLine.Option( @CommandLine.Option(
names = ["-c", "--configuration"], names = ["-c", "--configuration"],
description = ["Path to the client configuration file"], description = ["Path to the client configuration file"],
paramLabel = "CONFIGURATION_FILE" paramLabel = "CONFIGURATION_FILE"
) )
private var configurationFile : Path = findConfigurationFile(app, "gbcs-client.xml") private var configurationFile : Path = findConfigurationFile(app, "rbcs-client.xml")
@CommandLine.Option( @CommandLine.Option(
names = ["-p", "--profile"], names = ["-p", "--profile"],
@@ -28,8 +28,8 @@ class ClientCommand(app : Application) : GbcsCommand() {
) )
var profileName : String? = null var profileName : String? = null
val configuration : GradleBuildCacheClient.Configuration by lazy { val configuration : RemoteBuildCacheClient.Configuration by lazy {
GradleBuildCacheClient.Configuration.parse(configurationFile) RemoteBuildCacheClient.Configuration.parse(configurationFile)
} }
override fun run() { override fun run() {

View File

@@ -1,8 +1,8 @@
package net.woggioni.gbcs.cli.impl.commands package net.woggioni.rbcs.cli.impl.commands
import net.woggioni.gbcs.cli.impl.GbcsCommand import net.woggioni.rbcs.cli.impl.RbcsCommand
import net.woggioni.gbcs.client.GradleBuildCacheClient import net.woggioni.rbcs.client.RemoteBuildCacheClient
import net.woggioni.gbcs.common.contextLogger import net.woggioni.rbcs.common.contextLogger
import picocli.CommandLine import picocli.CommandLine
import java.nio.file.Files import java.nio.file.Files
import java.nio.file.Path import java.nio.file.Path
@@ -12,7 +12,7 @@ import java.nio.file.Path
description = ["Fetch a value from the cache with the specified key"], description = ["Fetch a value from the cache with the specified key"],
showDefaultValues = true showDefaultValues = true
) )
class GetCommand : GbcsCommand() { class GetCommand : RbcsCommand() {
private val log = contextLogger() private val log = contextLogger()
@CommandLine.Spec @CommandLine.Spec
@@ -38,7 +38,7 @@ class GetCommand : GbcsCommand() {
clientCommand.configuration.profiles[profileName] clientCommand.configuration.profiles[profileName]
?: throw IllegalArgumentException("Profile $profileName does not exist in configuration") ?: throw IllegalArgumentException("Profile $profileName does not exist in configuration")
} }
GradleBuildCacheClient(profile).use { client -> RemoteBuildCacheClient(profile).use { client ->
client.get(key).thenApply { value -> client.get(key).thenApply { value ->
value?.let { value?.let {
(output?.let(Files::newOutputStream) ?: System.out).use { (output?.let(Files::newOutputStream) ?: System.out).use {

View File

@@ -1,8 +1,8 @@
package net.woggioni.gbcs.cli.impl.commands package net.woggioni.rbcs.cli.impl.commands
import net.woggioni.gbcs.cli.impl.GbcsCommand import net.woggioni.rbcs.cli.impl.RbcsCommand
import net.woggioni.gbcs.client.GradleBuildCacheClient import net.woggioni.rbcs.client.RemoteBuildCacheClient
import net.woggioni.gbcs.common.contextLogger import net.woggioni.rbcs.common.contextLogger
import picocli.CommandLine import picocli.CommandLine
import java.security.SecureRandom import java.security.SecureRandom
import kotlin.random.Random import kotlin.random.Random
@@ -12,7 +12,7 @@ import kotlin.random.Random
description = ["Check server health"], description = ["Check server health"],
showDefaultValues = true showDefaultValues = true
) )
class HealthCheckCommand : GbcsCommand() { class HealthCheckCommand : RbcsCommand() {
private val log = contextLogger() private val log = contextLogger()
@CommandLine.Spec @CommandLine.Spec
@@ -24,7 +24,7 @@ class HealthCheckCommand : GbcsCommand() {
clientCommand.configuration.profiles[profileName] clientCommand.configuration.profiles[profileName]
?: throw IllegalArgumentException("Profile $profileName does not exist in configuration") ?: throw IllegalArgumentException("Profile $profileName does not exist in configuration")
} }
GradleBuildCacheClient(profile).use { client -> RemoteBuildCacheClient(profile).use { client ->
val random = Random(SecureRandom.getInstance("NativePRNGNonBlocking").nextLong()) val random = Random(SecureRandom.getInstance("NativePRNGNonBlocking").nextLong())
val nonce = ByteArray(0xa0) val nonce = ByteArray(0xa0)
random.nextBytes(nonce) random.nextBytes(nonce)

View File

@@ -1,8 +1,8 @@
package net.woggioni.gbcs.cli.impl.commands package net.woggioni.rbcs.cli.impl.commands
import net.woggioni.gbcs.cli.impl.GbcsCommand import net.woggioni.rbcs.cli.impl.RbcsCommand
import net.woggioni.gbcs.cli.impl.converters.OutputStreamConverter import net.woggioni.rbcs.cli.impl.converters.OutputStreamConverter
import net.woggioni.gbcs.common.PasswordSecurity.hashPassword import net.woggioni.rbcs.common.PasswordSecurity.hashPassword
import net.woggioni.jwo.UncloseableOutputStream import net.woggioni.jwo.UncloseableOutputStream
import picocli.CommandLine import picocli.CommandLine
import java.io.OutputStream import java.io.OutputStream
@@ -12,10 +12,10 @@ import java.io.PrintWriter
@CommandLine.Command( @CommandLine.Command(
name = "password", name = "password",
description = ["Generate a password hash to add to GBCS configuration file"], description = ["Generate a password hash to add to RBCS configuration file"],
showDefaultValues = true showDefaultValues = true
) )
class PasswordHashCommand : GbcsCommand() { class PasswordHashCommand : RbcsCommand() {
@CommandLine.Option( @CommandLine.Option(
names = ["-o", "--output-file"], names = ["-o", "--output-file"],
description = ["Write the output to a file instead of stdout"], description = ["Write the output to a file instead of stdout"],

View File

@@ -1,9 +1,9 @@
package net.woggioni.gbcs.cli.impl.commands package net.woggioni.rbcs.cli.impl.commands
import net.woggioni.gbcs.cli.impl.GbcsCommand import net.woggioni.rbcs.cli.impl.RbcsCommand
import net.woggioni.gbcs.cli.impl.converters.InputStreamConverter import net.woggioni.rbcs.cli.impl.converters.InputStreamConverter
import net.woggioni.gbcs.client.GradleBuildCacheClient import net.woggioni.rbcs.client.RemoteBuildCacheClient
import net.woggioni.gbcs.common.contextLogger import net.woggioni.rbcs.common.contextLogger
import picocli.CommandLine import picocli.CommandLine
import java.io.InputStream import java.io.InputStream
@@ -12,7 +12,7 @@ import java.io.InputStream
description = ["Add or replace a value to the cache with the specified key"], description = ["Add or replace a value to the cache with the specified key"],
showDefaultValues = true showDefaultValues = true
) )
class PutCommand : GbcsCommand() { class PutCommand : RbcsCommand() {
private val log = contextLogger() private val log = contextLogger()
@CommandLine.Spec @CommandLine.Spec
@@ -39,7 +39,7 @@ class PutCommand : GbcsCommand() {
clientCommand.configuration.profiles[profileName] clientCommand.configuration.profiles[profileName]
?: throw IllegalArgumentException("Profile $profileName does not exist in configuration") ?: throw IllegalArgumentException("Profile $profileName does not exist in configuration")
} }
GradleBuildCacheClient(profile).use { client -> RemoteBuildCacheClient(profile).use { client ->
value.use { value.use {
client.put(key, it.readAllBytes()) client.put(key, it.readAllBytes())
}.get() }.get()

View File

@@ -1,13 +1,12 @@
package net.woggioni.gbcs.cli.impl.commands package net.woggioni.rbcs.cli.impl.commands
import net.woggioni.gbcs.api.Configuration import net.woggioni.rbcs.cli.impl.RbcsCommand
import net.woggioni.gbcs.cli.impl.GbcsCommand import net.woggioni.rbcs.cli.impl.converters.DurationConverter
import net.woggioni.gbcs.cli.impl.converters.DurationConverter import net.woggioni.rbcs.common.contextLogger
import net.woggioni.gbcs.common.contextLogger import net.woggioni.rbcs.common.debug
import net.woggioni.gbcs.common.debug import net.woggioni.rbcs.common.info
import net.woggioni.gbcs.common.info import net.woggioni.rbcs.server.RemoteBuildCacheServer
import net.woggioni.gbcs.server.GradleBuildCacheServer import net.woggioni.rbcs.server.RemoteBuildCacheServer.Companion.DEFAULT_CONFIGURATION_URL
import net.woggioni.gbcs.server.GradleBuildCacheServer.Companion.DEFAULT_CONFIGURATION_URL
import net.woggioni.jwo.Application import net.woggioni.jwo.Application
import net.woggioni.jwo.JWO import net.woggioni.jwo.JWO
import picocli.CommandLine import picocli.CommandLine
@@ -18,10 +17,10 @@ import java.time.Duration
@CommandLine.Command( @CommandLine.Command(
name = "server", name = "server",
description = ["GBCS server"], description = ["RBCS server"],
showDefaultValues = true showDefaultValues = true
) )
class ServerCommand(app : Application) : GbcsCommand() { class ServerCommand(app : Application) : RbcsCommand() {
private val log = contextLogger() private val log = contextLogger()
@@ -50,7 +49,7 @@ class ServerCommand(app : Application) : GbcsCommand() {
description = ["Read the application configuration from this file"], description = ["Read the application configuration from this file"],
paramLabel = "CONFIG_FILE" paramLabel = "CONFIG_FILE"
) )
private var configurationFile: Path = findConfigurationFile(app, "gbcs-server.xml") private var configurationFile: Path = findConfigurationFile(app, "rbcs-server.xml")
override fun run() { override fun run() {
if (!Files.exists(configurationFile)) { if (!Files.exists(configurationFile)) {
@@ -58,15 +57,15 @@ class ServerCommand(app : Application) : GbcsCommand() {
createDefaultConfigurationFile(configurationFile) createDefaultConfigurationFile(configurationFile)
} }
val configuration = GradleBuildCacheServer.loadConfiguration(configurationFile) val configuration = RemoteBuildCacheServer.loadConfiguration(configurationFile)
log.debug { log.debug {
ByteArrayOutputStream().also { ByteArrayOutputStream().also {
GradleBuildCacheServer.dumpConfiguration(configuration, it) RemoteBuildCacheServer.dumpConfiguration(configuration, it)
}.let { }.let {
"Server configuration:\n${String(it.toByteArray())}" "Server configuration:\n${String(it.toByteArray())}"
} }
} }
val server = GradleBuildCacheServer(configuration) val server = RemoteBuildCacheServer(configuration)
server.run().use { server -> server.run().use { server ->
timeout?.let { timeout?.let {
Thread.sleep(it) Thread.sleep(it)

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.cli.impl.converters package net.woggioni.rbcs.cli.impl.converters
import picocli.CommandLine import picocli.CommandLine
import java.time.Duration import java.time.Duration

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.cli.impl.converters package net.woggioni.rbcs.cli.impl.converters
import picocli.CommandLine import picocli.CommandLine
import java.io.InputStream import java.io.InputStream

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.cli.impl.converters package net.woggioni.rbcs.cli.impl.converters
import picocli.CommandLine import picocli.CommandLine
import java.io.OutputStream import java.io.OutputStream

View File

@@ -4,11 +4,13 @@ plugins {
} }
dependencies { dependencies {
implementation project(':gbcs-api') implementation project(':rbcs-api')
implementation project(':gbcs-common') implementation project(':rbcs-common')
implementation catalog.picocli
implementation catalog.slf4j.api implementation catalog.slf4j.api
implementation catalog.netty.buffer implementation catalog.netty.buffer
implementation catalog.netty.handler
implementation catalog.netty.transport
implementation catalog.netty.common
implementation catalog.netty.codec.http implementation catalog.netty.codec.http
testRuntimeOnly catalog.logback.classic testRuntimeOnly catalog.logback.classic

View File

@@ -1,4 +1,4 @@
module net.woggioni.gbcs.client { module net.woggioni.rbcs.client {
requires io.netty.handler; requires io.netty.handler;
requires io.netty.codec.http; requires io.netty.codec.http;
requires io.netty.transport; requires io.netty.transport;
@@ -6,12 +6,12 @@ module net.woggioni.gbcs.client {
requires io.netty.common; requires io.netty.common;
requires io.netty.buffer; requires io.netty.buffer;
requires java.xml; requires java.xml;
requires net.woggioni.gbcs.common; requires net.woggioni.rbcs.common;
requires net.woggioni.gbcs.api; requires net.woggioni.rbcs.api;
requires io.netty.codec; requires io.netty.codec;
requires org.slf4j; requires org.slf4j;
exports net.woggioni.gbcs.client; exports net.woggioni.rbcs.client;
opens net.woggioni.gbcs.client.schema; opens net.woggioni.rbcs.client.schema;
} }

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.client package net.woggioni.rbcs.client
import io.netty.bootstrap.Bootstrap import io.netty.bootstrap.Bootstrap
import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBuf
@@ -30,11 +30,11 @@ import io.netty.handler.ssl.SslContextBuilder
import io.netty.handler.stream.ChunkedWriteHandler import io.netty.handler.stream.ChunkedWriteHandler
import io.netty.util.concurrent.Future import io.netty.util.concurrent.Future
import io.netty.util.concurrent.GenericFutureListener import io.netty.util.concurrent.GenericFutureListener
import net.woggioni.gbcs.client.impl.Parser import net.woggioni.rbcs.client.impl.Parser
import net.woggioni.gbcs.common.Xml import net.woggioni.rbcs.common.Xml
import net.woggioni.gbcs.common.contextLogger import net.woggioni.rbcs.common.contextLogger
import net.woggioni.gbcs.common.debug import net.woggioni.rbcs.common.debug
import net.woggioni.gbcs.common.trace import net.woggioni.rbcs.common.trace
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.net.URI import java.net.URI
import java.nio.file.Files import java.nio.file.Files
@@ -45,10 +45,10 @@ import java.time.Duration
import java.util.Base64 import java.util.Base64
import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import kotlin.random.Random
import io.netty.util.concurrent.Future as NettyFuture import io.netty.util.concurrent.Future as NettyFuture
class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoCloseable {
class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoCloseable {
private val group: NioEventLoopGroup private val group: NioEventLoopGroup
private var sslContext: SslContext private var sslContext: SslContext
private val log = contextLogger() private val log = contextLogger()
@@ -206,6 +206,7 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
retryPolicy.initialDelayMillis.toDouble(), retryPolicy.initialDelayMillis.toDouble(),
retryPolicy.exp, retryPolicy.exp,
outcomeHandler, outcomeHandler,
Random.Default,
operation operation
) )
} else { } else {

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.client package net.woggioni.rbcs.client
import io.netty.handler.codec.http.HttpResponseStatus import io.netty.handler.codec.http.HttpResponseStatus

View File

@@ -1,9 +1,9 @@
package net.woggioni.gbcs.client.impl package net.woggioni.rbcs.client.impl
import net.woggioni.gbcs.api.exception.ConfigurationException import net.woggioni.rbcs.api.exception.ConfigurationException
import net.woggioni.gbcs.client.GradleBuildCacheClient import net.woggioni.rbcs.client.RemoteBuildCacheClient
import net.woggioni.gbcs.common.Xml.Companion.asIterable import net.woggioni.rbcs.common.Xml.Companion.asIterable
import net.woggioni.gbcs.common.Xml.Companion.renderAttribute import net.woggioni.rbcs.common.Xml.Companion.renderAttribute
import org.w3c.dom.Document import org.w3c.dom.Document
import java.net.URI import java.net.URI
import java.nio.file.Files import java.nio.file.Files
@@ -15,9 +15,9 @@ import java.time.Duration
object Parser { object Parser {
fun parse(document: Document): GradleBuildCacheClient.Configuration { fun parse(document: Document): RemoteBuildCacheClient.Configuration {
val root = document.documentElement val root = document.documentElement
val profiles = mutableMapOf<String, GradleBuildCacheClient.Configuration.Profile>() val profiles = mutableMapOf<String, RemoteBuildCacheClient.Configuration.Profile>()
for (child in root.asIterable()) { for (child in root.asIterable()) {
val tagName = child.localName val tagName = child.localName
@@ -27,8 +27,8 @@ object Parser {
child.renderAttribute("name") ?: throw ConfigurationException("name attribute is required") child.renderAttribute("name") ?: throw ConfigurationException("name attribute is required")
val uri = child.renderAttribute("base-url")?.let(::URI) val uri = child.renderAttribute("base-url")?.let(::URI)
?: throw ConfigurationException("base-url attribute is required") ?: throw ConfigurationException("base-url attribute is required")
var authentication: GradleBuildCacheClient.Configuration.Authentication? = null var authentication: RemoteBuildCacheClient.Configuration.Authentication? = null
var retryPolicy: GradleBuildCacheClient.Configuration.RetryPolicy? = null var retryPolicy: RemoteBuildCacheClient.Configuration.RetryPolicy? = null
for (gchild in child.asIterable()) { for (gchild in child.asIterable()) {
when (gchild.localName) { when (gchild.localName) {
"tls-client-auth" -> { "tls-client-auth" -> {
@@ -49,7 +49,7 @@ object Parser {
.toList() .toList()
.toTypedArray() .toTypedArray()
authentication = authentication =
GradleBuildCacheClient.Configuration.Authentication.TlsClientAuthenticationCredentials( RemoteBuildCacheClient.Configuration.Authentication.TlsClientAuthenticationCredentials(
key, key,
certChain certChain
) )
@@ -61,7 +61,7 @@ object Parser {
val password = gchild.renderAttribute("password") val password = gchild.renderAttribute("password")
?: throw ConfigurationException("password attribute is required") ?: throw ConfigurationException("password attribute is required")
authentication = authentication =
GradleBuildCacheClient.Configuration.Authentication.BasicAuthenticationCredentials( RemoteBuildCacheClient.Configuration.Authentication.BasicAuthenticationCredentials(
username, username,
password password
) )
@@ -80,7 +80,7 @@ object Parser {
gchild.renderAttribute("exp") gchild.renderAttribute("exp")
?.let(String::toDouble) ?.let(String::toDouble)
?: 2.0f ?: 2.0f
retryPolicy = GradleBuildCacheClient.Configuration.RetryPolicy( retryPolicy = RemoteBuildCacheClient.Configuration.RetryPolicy(
maxAttempts, maxAttempts,
initialDelay.toMillis(), initialDelay.toMillis(),
exp.toDouble() exp.toDouble()
@@ -93,7 +93,7 @@ object Parser {
?: 50 ?: 50
val connectionTimeout = child.renderAttribute("connection-timeout") val connectionTimeout = child.renderAttribute("connection-timeout")
?.let(Duration::parse) ?.let(Duration::parse)
profiles[name] = GradleBuildCacheClient.Configuration.Profile( profiles[name] = RemoteBuildCacheClient.Configuration.Profile(
uri, uri,
authentication, authentication,
connectionTimeout, connectionTimeout,
@@ -103,6 +103,6 @@ object Parser {
} }
} }
} }
return GradleBuildCacheClient.Configuration(profiles) return RemoteBuildCacheClient.Configuration(profiles)
} }
} }

View File

@@ -1,8 +1,10 @@
package net.woggioni.gbcs.client package net.woggioni.rbcs.client
import io.netty.util.concurrent.EventExecutorGroup import io.netty.util.concurrent.EventExecutorGroup
import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import kotlin.math.pow
import kotlin.random.Random
sealed class OperationOutcome<T> { sealed class OperationOutcome<T> {
class Success<T>(val result: T) : OperationOutcome<T>() class Success<T>(val result: T) : OperationOutcome<T>()
@@ -24,8 +26,10 @@ fun <T> executeWithRetry(
initialDelay: Double, initialDelay: Double,
exp: Double, exp: Double,
outcomeHandler: OutcomeHandler<T>, outcomeHandler: OutcomeHandler<T>,
randomizer : Random?,
cb: () -> CompletableFuture<T> cb: () -> CompletableFuture<T>
): CompletableFuture<T> { ): CompletableFuture<T> {
val finalResult = cb() val finalResult = cb()
var future = finalResult var future = finalResult
var shortCircuit = false var shortCircuit = false
@@ -46,7 +50,7 @@ fun <T> executeWithRetry(
is OutcomeHandlerResult.Retry -> { is OutcomeHandlerResult.Retry -> {
val res = CompletableFuture<T>() val res = CompletableFuture<T>()
val delay = run { val delay = run {
val scheduledDelay = (initialDelay * Math.pow(exp, i.toDouble())).toLong() val scheduledDelay = (initialDelay * exp.pow(i.toDouble()) * (1.0 + (randomizer?.nextDouble(-0.5, 0.5) ?: 0.0))).toLong()
outcomeHandlerResult.suggestedDelayMillis?.coerceAtMost(scheduledDelay) ?: scheduledDelay outcomeHandlerResult.suggestedDelayMillis?.coerceAtMost(scheduledDelay) ?: scheduledDelay
} }
eventExecutorGroup.schedule({ eventExecutorGroup.schedule({

View File

@@ -1,25 +1,25 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?> <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<xs:schema targetNamespace="urn:net.woggioni.gbcs.client" <xs:schema targetNamespace="urn:net.woggioni.rbcs.client"
xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:xs="http://www.w3.org/2001/XMLSchema"
xmlns:gbcs-client="urn:net.woggioni.gbcs.client" xmlns:rbcs-client="urn:net.woggioni.rbcs.client"
elementFormDefault="unqualified" elementFormDefault="unqualified"
> >
<xs:element name="profiles" type="gbcs-client:profilesType"/> <xs:element name="profiles" type="rbcs-client:profilesType"/>
<xs:complexType name="profilesType"> <xs:complexType name="profilesType">
<xs:sequence minOccurs="0"> <xs:sequence minOccurs="0">
<xs:element name="profile" type="gbcs-client:profileType" maxOccurs="unbounded"/> <xs:element name="profile" type="rbcs-client:profileType" maxOccurs="unbounded"/>
</xs:sequence> </xs:sequence>
</xs:complexType> </xs:complexType>
<xs:complexType name="profileType"> <xs:complexType name="profileType">
<xs:sequence> <xs:sequence>
<xs:choice> <xs:choice>
<xs:element name="no-auth" type="gbcs-client:noAuthType"/> <xs:element name="no-auth" type="rbcs-client:noAuthType"/>
<xs:element name="basic-auth" type="gbcs-client:basicAuthType"/> <xs:element name="basic-auth" type="rbcs-client:basicAuthType"/>
<xs:element name="tls-client-auth" type="gbcs-client:tlsClientAuthType"/> <xs:element name="tls-client-auth" type="rbcs-client:tlsClientAuthType"/>
</xs:choice> </xs:choice>
<xs:element name="retry-policy" type="gbcs-client:retryType" minOccurs="0"/> <xs:element name="retry-policy" type="rbcs-client:retryType" minOccurs="0"/>
</xs:sequence> </xs:sequence>
<xs:attribute name="name" type="xs:token" use="required"/> <xs:attribute name="name" type="xs:token" use="required"/>
<xs:attribute name="base-url" type="xs:anyURI" use="required"/> <xs:attribute name="base-url" type="xs:anyURI" use="required"/>

View File

@@ -1,8 +1,8 @@
package net.woggioni.gbcs.client package net.woggioni.rbcs.client
import io.netty.util.concurrent.DefaultEventExecutorGroup import io.netty.util.concurrent.DefaultEventExecutorGroup
import io.netty.util.concurrent.EventExecutorGroup import io.netty.util.concurrent.EventExecutorGroup
import net.woggioni.gbcs.common.contextLogger import net.woggioni.rbcs.common.contextLogger
import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.extension.ExtensionContext import org.junit.jupiter.api.extension.ExtensionContext
import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.ParameterizedTest
@@ -89,7 +89,7 @@ class RetryTest {
val random = Random(testArgs.seed) val random = Random(testArgs.seed)
val future = val future =
executeWithRetry(executor, testArgs.maxAttempt, testArgs.initialDelay, testArgs.exp, outcomeHandler) { executeWithRetry(executor, testArgs.maxAttempt, testArgs.initialDelay, testArgs.exp, outcomeHandler, null) {
val now = System.nanoTime() val now = System.nanoTime()
val result = CompletableFuture<Int>() val result = CompletableFuture<Int>()
executor.submit { executor.submit {
@@ -129,7 +129,7 @@ class RetryTest {
previousAttempt.first + testArgs.initialDelay * Math.pow(testArgs.exp, index.toDouble()) * 1e6 previousAttempt.first + testArgs.initialDelay * Math.pow(testArgs.exp, index.toDouble()) * 1e6
val actualTimestamp = timestamp val actualTimestamp = timestamp
val err = Math.abs(expectedTimestamp - actualTimestamp) / expectedTimestamp val err = Math.abs(expectedTimestamp - actualTimestamp) / expectedTimestamp
Assertions.assertTrue(err < 1e-3) Assertions.assertTrue(err < 1e-2)
} }
if (index == attempts.size - 1 && index < testArgs.maxAttempt - 1) { if (index == attempts.size - 1 && index < testArgs.maxAttempt - 1) {
/* /*

View File

@@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<rbcs-client:profiles xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rbcs-client="urn:net.woggioni.rbcs.client"
xs:schemaLocation="urn:net.woggioni.rbcs.client jms://net.woggioni.rbcs.client/net/woggioni/rbcs/client/schema/rbcs-client.xsd"
>
<profile name="profile1" base-url="https://rbcs1.example.com/">
<tls-client-auth
key-store-file="keystore.pfx"
key-store-password="password"
key-alias="woggioni@c962475fa38"
key-password="key-password"/>
</profile>
<profile name="profile2" base-url="https://rbcs2.example.com/">
<basic-auth user="user" password="password"/>
</profile>
</rbcs-client:profiles>

View File

@@ -6,7 +6,7 @@ plugins {
} }
dependencies { dependencies {
implementation project(':gbcs-api') implementation project(':rbcs-api')
implementation catalog.slf4j.api implementation catalog.slf4j.api
implementation catalog.jwo implementation catalog.jwo
implementation catalog.netty.buffer implementation catalog.netty.buffer

View File

@@ -1,4 +1,4 @@
module net.woggioni.gbcs.common { module net.woggioni.rbcs.common {
requires java.xml; requires java.xml;
requires java.logging; requires java.logging;
requires org.slf4j; requires org.slf4j;
@@ -6,6 +6,6 @@ module net.woggioni.gbcs.common {
requires net.woggioni.jwo; requires net.woggioni.jwo;
requires io.netty.buffer; requires io.netty.buffer;
provides java.net.spi.URLStreamHandlerProvider with net.woggioni.gbcs.common.GbcsUrlStreamHandlerFactory; provides java.net.spi.URLStreamHandlerProvider with net.woggioni.rbcs.common.RbcsUrlStreamHandlerFactory;
exports net.woggioni.gbcs.common; exports net.woggioni.rbcs.common;
} }

View File

@@ -0,0 +1,15 @@
package net.woggioni.rbcs.common
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.CompositeByteBuf
fun extractChunk(buf: CompositeByteBuf, alloc: ByteBufAllocator): ByteBuf {
val chunk = alloc.compositeBuffer()
for (component in buf.decompose(0, buf.readableBytes())) {
chunk.addComponent(true, component.retain())
}
buf.removeComponents(0, buf.numComponents())
buf.clear()
return chunk
}

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.common package net.woggioni.rbcs.common
import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBuf
import java.io.InputStream import java.io.InputStream

View File

@@ -1,7 +1,6 @@
package net.woggioni.gbcs.common package net.woggioni.rbcs.common
import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBuf
import java.io.InputStream
import java.io.OutputStream import java.io.OutputStream
class ByteBufOutputStream(private val buf : ByteBuf) : OutputStream() { class ByteBufOutputStream(private val buf : ByteBuf) : OutputStream() {

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.common package net.woggioni.rbcs.common
class ResourceNotFoundException(msg : String? = null, cause: Throwable? = null) : RuntimeException(msg, cause) { class ResourceNotFoundException(msg : String? = null, cause: Throwable? = null) : RuntimeException(msg, cause) {
} }

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.common package net.woggioni.rbcs.common
data class HostAndPort(val host: String, val port: Int = 0) { data class HostAndPort(val host: String, val port: Int = 0) {

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.common package net.woggioni.rbcs.common
import org.slf4j.Logger import org.slf4j.Logger
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.common package net.woggioni.rbcs.common
import java.security.SecureRandom import java.security.SecureRandom
import java.security.spec.KeySpec import java.security.spec.KeySpec

View File

@@ -0,0 +1,47 @@
package net.woggioni.rbcs.common
import net.woggioni.jwo.JWO
import java.net.URI
import java.net.URL
import java.security.MessageDigest
object RBCS {
fun String.toUrl() : URL = URL.of(URI(this), null)
const val RBCS_NAMESPACE_URI: String = "urn:net.woggioni.rbcs.server"
const val RBCS_PREFIX: String = "rbcs"
const val XML_SCHEMA_NAMESPACE_URI = "http://www.w3.org/2001/XMLSchema-instance"
fun ByteArray.toInt(index : Int = 0) : Long {
if(index + 4 > size) throw IllegalArgumentException("Not enough bytes to decode a 32 bits integer")
var value : Long = 0
for (b in index until index + 4) {
value = (value shl 8) + (get(b).toInt() and 0xFF)
}
return value
}
fun ByteArray.toLong(index : Int = 0) : Long {
if(index + 8 > size) throw IllegalArgumentException("Not enough bytes to decode a 64 bits long integer")
var value : Long = 0
for (b in index until index + 8) {
value = (value shl 8) + (get(b).toInt() and 0xFF)
}
return value
}
fun digest(
data: ByteArray,
md: MessageDigest = MessageDigest.getInstance("MD5")
): ByteArray {
md.update(data)
return md.digest()
}
fun digestString(
data: ByteArray,
md: MessageDigest = MessageDigest.getInstance("MD5")
): String {
return JWO.bytesToHex(digest(data, md))
}
}

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.common package net.woggioni.rbcs.common
import java.io.IOException import java.io.IOException
import java.io.InputStream import java.io.InputStream
@@ -6,14 +6,13 @@ import java.net.URL
import java.net.URLConnection import java.net.URLConnection
import java.net.URLStreamHandler import java.net.URLStreamHandler
import java.net.spi.URLStreamHandlerProvider import java.net.spi.URLStreamHandlerProvider
import java.util.Optional
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import java.util.stream.Collectors import java.util.stream.Collectors
class GbcsUrlStreamHandlerFactory : URLStreamHandlerProvider() { class RbcsUrlStreamHandlerFactory : URLStreamHandlerProvider() {
private class ClasspathHandler(private val classLoader: ClassLoader = GbcsUrlStreamHandlerFactory::class.java.classLoader) : private class ClasspathHandler(private val classLoader: ClassLoader = RbcsUrlStreamHandlerFactory::class.java.classLoader) :
URLStreamHandler() { URLStreamHandler() {
override fun openConnection(u: URL): URLConnection? { override fun openConnection(u: URL): URLConnection? {
@@ -88,12 +87,12 @@ class GbcsUrlStreamHandlerFactory : URLStreamHandlerProvider() {
private val installed = AtomicBoolean(false) private val installed = AtomicBoolean(false)
fun install() { fun install() {
if (!installed.getAndSet(true)) { if (!installed.getAndSet(true)) {
URL.setURLStreamHandlerFactory(GbcsUrlStreamHandlerFactory()) URL.setURLStreamHandlerFactory(RbcsUrlStreamHandlerFactory())
} }
} }
private val packageMap: Map<String, List<Module>> by lazy { private val packageMap: Map<String, List<Module>> by lazy {
GbcsUrlStreamHandlerFactory::class.java.module.layer RbcsUrlStreamHandlerFactory::class.java.module.layer
.modules() .modules()
.stream() .stream()
.flatMap { m: Module -> .flatMap { m: Module ->

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.common package net.woggioni.rbcs.common
import net.woggioni.jwo.JWO import net.woggioni.jwo.JWO
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory

View File

@@ -0,0 +1 @@
net.woggioni.rbcs.common.RbcsUrlStreamHandlerFactory

View File

@@ -29,11 +29,12 @@ configurations {
} }
dependencies { dependencies {
implementation project(':gbcs-common') implementation project(':rbcs-common')
implementation project(':gbcs-api') implementation project(':rbcs-api')
implementation catalog.jwo implementation catalog.jwo
implementation catalog.slf4j.api implementation catalog.slf4j.api
implementation catalog.netty.common implementation catalog.netty.common
implementation catalog.netty.handler
implementation catalog.netty.codec.memcache implementation catalog.netty.codec.memcache
bundle catalog.netty.codec.memcache bundle catalog.netty.codec.memcache

View File

@@ -0,0 +1,20 @@
import net.woggioni.rbcs.api.CacheProvider;
module net.woggioni.rbcs.server.memcache {
requires net.woggioni.rbcs.common;
requires net.woggioni.rbcs.api;
requires net.woggioni.jwo;
requires java.xml;
requires kotlin.stdlib;
requires io.netty.transport;
requires io.netty.codec;
requires io.netty.codec.memcache;
requires io.netty.common;
requires io.netty.buffer;
requires io.netty.handler;
requires org.slf4j;
provides CacheProvider with net.woggioni.rbcs.server.memcache.MemcacheCacheProvider;
opens net.woggioni.rbcs.server.memcache.schema;
}

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.server.memcache package net.woggioni.rbcs.server.memcache
class MemcacheException(status : Short, msg : String? = null, cause : Throwable? = null) class MemcacheException(status : Short, msg : String? = null, cause : Throwable? = null)
: RuntimeException(msg ?: "Memcached status $status", cause) : RuntimeException(msg ?: "Memcached status $status", cause)

View File

@@ -0,0 +1,235 @@
package net.woggioni.rbcs.server.memcache
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.Unpooled
import io.netty.handler.codec.memcache.binary.BinaryMemcacheOpcodes
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus
import io.netty.handler.codec.memcache.binary.DefaultBinaryMemcacheRequest
import net.woggioni.rbcs.api.Cache
import net.woggioni.rbcs.api.RequestHandle
import net.woggioni.rbcs.api.ResponseHandle
import net.woggioni.rbcs.api.event.RequestStreamingEvent
import net.woggioni.rbcs.api.event.ResponseStreamingEvent
import net.woggioni.rbcs.api.exception.ContentTooLargeException
import net.woggioni.rbcs.common.ByteBufOutputStream
import net.woggioni.rbcs.common.RBCS.digest
import net.woggioni.rbcs.common.contextLogger
import net.woggioni.rbcs.common.debug
import net.woggioni.rbcs.common.extractChunk
import net.woggioni.rbcs.server.memcache.client.MemcacheClient
import net.woggioni.rbcs.server.memcache.client.MemcacheResponseHandle
import net.woggioni.rbcs.server.memcache.client.StreamingRequestEvent
import net.woggioni.rbcs.server.memcache.client.StreamingResponseEvent
import java.security.MessageDigest
import java.time.Duration
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.Inflater
import java.util.zip.InflaterOutputStream
class MemcacheCache(private val cfg: MemcacheCacheConfiguration) : Cache {
companion object {
@JvmStatic
private val log = contextLogger()
}
private val memcacheClient = MemcacheClient(cfg)
override fun get(key: String, responseHandle: ResponseHandle, alloc: ByteBufAllocator) {
val compressionMode = cfg.compressionMode
val buf = alloc.compositeBuffer()
val stream = ByteBufOutputStream(buf).let { outputStream ->
if (compressionMode != null) {
when (compressionMode) {
MemcacheCacheConfiguration.CompressionMode.DEFLATE -> {
InflaterOutputStream(
outputStream,
Inflater()
)
}
}
} else {
outputStream
}
}
val memcacheResponseHandle = object : MemcacheResponseHandle {
override fun handleEvent(evt: StreamingResponseEvent) {
when (evt) {
is StreamingResponseEvent.ResponseReceived -> {
if (evt.response.status() == BinaryMemcacheResponseStatus.SUCCESS) {
responseHandle.handleEvent(ResponseStreamingEvent.RESPONSE_RECEIVED)
} else if (evt.response.status() == BinaryMemcacheResponseStatus.KEY_ENOENT) {
responseHandle.handleEvent(ResponseStreamingEvent.NOT_FOUND)
} else {
responseHandle.handleEvent(ResponseStreamingEvent.ExceptionCaught(MemcacheException(evt.response.status())))
}
}
is StreamingResponseEvent.LastContentReceived -> {
evt.content.content().let { content ->
content.readBytes(stream, content.readableBytes())
}
buf.retain()
stream.close()
val chunk = extractChunk(buf, alloc)
buf.release()
responseHandle.handleEvent(
ResponseStreamingEvent.LastChunkReceived(
chunk
)
)
}
is StreamingResponseEvent.ContentReceived -> {
evt.content.content().let { content ->
content.readBytes(stream, content.readableBytes())
}
if (buf.readableBytes() >= cfg.chunkSize) {
val chunk = extractChunk(buf, alloc)
responseHandle.handleEvent(ResponseStreamingEvent.ChunkReceived(chunk))
}
}
is StreamingResponseEvent.ExceptionCaught -> {
responseHandle.handleEvent(ResponseStreamingEvent.ExceptionCaught(evt.exception))
}
}
}
}
memcacheClient.sendRequest(Unpooled.wrappedBuffer(key.toByteArray()), memcacheResponseHandle)
.thenApply { memcacheRequestHandle ->
val request = (cfg.digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digest(key.toByteArray(), md)
} ?: key.toByteArray(Charsets.UTF_8)
).let { digest ->
DefaultBinaryMemcacheRequest(Unpooled.wrappedBuffer(digest)).apply {
setOpcode(BinaryMemcacheOpcodes.GET)
}
}
memcacheRequestHandle.handleEvent(StreamingRequestEvent.SendRequest(request))
}.exceptionally { ex ->
responseHandle.handleEvent(ResponseStreamingEvent.ExceptionCaught(ex))
}
}
private fun encodeExpiry(expiry: Duration): Int {
val expirySeconds = expiry.toSeconds()
return expirySeconds.toInt().takeIf { it.toLong() == expirySeconds }
?: Instant.ofEpochSecond(expirySeconds).epochSecond.toInt()
}
override fun put(
key: String,
responseHandle: ResponseHandle,
alloc: ByteBufAllocator
): CompletableFuture<RequestHandle> {
val memcacheResponseHandle = object : MemcacheResponseHandle {
override fun handleEvent(evt: StreamingResponseEvent) {
when (evt) {
is StreamingResponseEvent.ResponseReceived -> {
when (evt.response.status()) {
BinaryMemcacheResponseStatus.SUCCESS -> {
responseHandle.handleEvent(ResponseStreamingEvent.RESPONSE_RECEIVED)
}
BinaryMemcacheResponseStatus.KEY_ENOENT -> {
responseHandle.handleEvent(ResponseStreamingEvent.NOT_FOUND)
}
BinaryMemcacheResponseStatus.E2BIG -> {
responseHandle.handleEvent(
ResponseStreamingEvent.ExceptionCaught(
ContentTooLargeException("Request payload is too big", null)
)
)
}
else -> {
responseHandle.handleEvent(ResponseStreamingEvent.ExceptionCaught(MemcacheException(evt.response.status())))
}
}
}
is StreamingResponseEvent.LastContentReceived -> {
responseHandle.handleEvent(
ResponseStreamingEvent.LastChunkReceived(
evt.content.content().retain()
)
)
}
is StreamingResponseEvent.ContentReceived -> {
responseHandle.handleEvent(ResponseStreamingEvent.ChunkReceived(evt.content.content().retain()))
}
is StreamingResponseEvent.ExceptionCaught -> {
responseHandle.handleEvent(ResponseStreamingEvent.ExceptionCaught(evt.exception))
}
}
}
}
val result: CompletableFuture<RequestHandle> =
memcacheClient.sendRequest(Unpooled.wrappedBuffer(key.toByteArray()), memcacheResponseHandle)
.thenApply { memcacheRequestHandle ->
val request = (cfg.digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digest(key.toByteArray(), md)
} ?: key.toByteArray(Charsets.UTF_8)).let { digest ->
val extras = Unpooled.buffer(8, 8)
extras.writeInt(0)
extras.writeInt(encodeExpiry(cfg.maxAge))
DefaultBinaryMemcacheRequest(Unpooled.wrappedBuffer(digest), extras).apply {
setOpcode(BinaryMemcacheOpcodes.SET)
}
}
// memcacheRequestHandle.handleEvent(StreamingRequestEvent.SendRequest(request))
val compressionMode = cfg.compressionMode
val buf = alloc.heapBuffer()
val stream = ByteBufOutputStream(buf).let { outputStream ->
if (compressionMode != null) {
when (compressionMode) {
MemcacheCacheConfiguration.CompressionMode.DEFLATE -> {
DeflaterOutputStream(
outputStream,
Deflater(Deflater.DEFAULT_COMPRESSION, false)
)
}
}
} else {
outputStream
}
}
RequestHandle { evt ->
when (evt) {
is RequestStreamingEvent.LastChunkReceived -> {
evt.chunk.readBytes(stream, evt.chunk.readableBytes())
buf.retain()
stream.close()
request.setTotalBodyLength(buf.readableBytes() + request.keyLength() + request.extrasLength())
memcacheRequestHandle.handleEvent(StreamingRequestEvent.SendRequest(request))
memcacheRequestHandle.handleEvent(StreamingRequestEvent.SendLastChunk(buf))
}
is RequestStreamingEvent.ChunkReceived -> {
evt.chunk.readBytes(stream, evt.chunk.readableBytes())
}
is RequestStreamingEvent.ExceptionCaught -> {
stream.close()
}
}
}
}
return result
}
override fun close() {
memcacheClient.close()
}
}

View File

@@ -1,7 +1,7 @@
package net.woggioni.gbcs.server.memcache package net.woggioni.rbcs.server.memcache
import net.woggioni.gbcs.api.Configuration import net.woggioni.rbcs.api.Configuration
import net.woggioni.gbcs.common.HostAndPort import net.woggioni.rbcs.common.HostAndPort
import java.time.Duration import java.time.Duration
data class MemcacheCacheConfiguration( data class MemcacheCacheConfiguration(
@@ -10,14 +10,10 @@ data class MemcacheCacheConfiguration(
val maxSize: Int = 0x100000, val maxSize: Int = 0x100000,
val digestAlgorithm: String? = null, val digestAlgorithm: String? = null,
val compressionMode: CompressionMode? = null, val compressionMode: CompressionMode? = null,
val chunkSize : Int
) : Configuration.Cache { ) : Configuration.Cache {
enum class CompressionMode { enum class CompressionMode {
/**
* Gzip mode
*/
GZIP,
/** /**
* Deflate mode * Deflate mode
*/ */
@@ -33,7 +29,7 @@ data class MemcacheCacheConfiguration(
override fun materialize() = MemcacheCache(this) override fun materialize() = MemcacheCache(this)
override fun getNamespaceURI() = "urn:net.woggioni.gbcs.server.memcache" override fun getNamespaceURI() = "urn:net.woggioni.rbcs.server.memcache"
override fun getTypeName() = "memcacheCacheType" override fun getTypeName() = "memcacheCacheType"
} }

View File

@@ -1,12 +1,12 @@
package net.woggioni.gbcs.server.memcache package net.woggioni.rbcs.server.memcache
import net.woggioni.gbcs.api.CacheProvider import net.woggioni.rbcs.api.CacheProvider
import net.woggioni.gbcs.api.exception.ConfigurationException import net.woggioni.rbcs.api.exception.ConfigurationException
import net.woggioni.gbcs.common.GBCS import net.woggioni.rbcs.common.RBCS
import net.woggioni.gbcs.common.HostAndPort import net.woggioni.rbcs.common.HostAndPort
import net.woggioni.gbcs.common.Xml import net.woggioni.rbcs.common.Xml
import net.woggioni.gbcs.common.Xml.Companion.asIterable import net.woggioni.rbcs.common.Xml.Companion.asIterable
import net.woggioni.gbcs.common.Xml.Companion.renderAttribute import net.woggioni.rbcs.common.Xml.Companion.renderAttribute
import org.w3c.dom.Document import org.w3c.dom.Document
import org.w3c.dom.Element import org.w3c.dom.Element
import java.time.Duration import java.time.Duration
@@ -14,14 +14,14 @@ import java.time.temporal.ChronoUnit
class MemcacheCacheProvider : CacheProvider<MemcacheCacheConfiguration> { class MemcacheCacheProvider : CacheProvider<MemcacheCacheConfiguration> {
override fun getXmlSchemaLocation() = "jpms://net.woggioni.gbcs.server.memcache/net/woggioni/gbcs/server/memcache/schema/gbcs-memcache.xsd" override fun getXmlSchemaLocation() = "jpms://net.woggioni.rbcs.server.memcache/net/woggioni/rbcs/server/memcache/schema/rbcs-memcache.xsd"
override fun getXmlType() = "memcacheCacheType" override fun getXmlType() = "memcacheCacheType"
override fun getXmlNamespace() = "urn:net.woggioni.gbcs.server.memcache" override fun getXmlNamespace() = "urn:net.woggioni.rbcs.server.memcache"
val xmlNamespacePrefix : String val xmlNamespacePrefix : String
get() = "gbcs-memcache" get() = "rbcs-memcache"
override fun deserialize(el: Element): MemcacheCacheConfiguration { override fun deserialize(el: Element): MemcacheCacheConfiguration {
val servers = mutableListOf<MemcacheCacheConfiguration.Server>() val servers = mutableListOf<MemcacheCacheConfiguration.Server>()
@@ -29,12 +29,14 @@ class MemcacheCacheProvider : CacheProvider<MemcacheCacheConfiguration> {
?.let(Duration::parse) ?.let(Duration::parse)
?: Duration.ofDays(1) ?: Duration.ofDays(1)
val maxSize = el.renderAttribute("max-size") val maxSize = el.renderAttribute("max-size")
?.let(String::toInt) ?.let(Integer::decode)
?: 0x100000 ?: 0x100000
val chunkSize = el.renderAttribute("chunk-size")
?.let(Integer::decode)
?: 0x4000
val compressionMode = el.renderAttribute("compression-mode") val compressionMode = el.renderAttribute("compression-mode")
?.let { ?.let {
when (it) { when (it) {
"gzip" -> MemcacheCacheConfiguration.CompressionMode.GZIP
"deflate" -> MemcacheCacheConfiguration.CompressionMode.DEFLATE "deflate" -> MemcacheCacheConfiguration.CompressionMode.DEFLATE
else -> MemcacheCacheConfiguration.CompressionMode.DEFLATE else -> MemcacheCacheConfiguration.CompressionMode.DEFLATE
} }
@@ -63,6 +65,7 @@ class MemcacheCacheProvider : CacheProvider<MemcacheCacheConfiguration> {
maxSize, maxSize,
digestAlgorithm, digestAlgorithm,
compressionMode, compressionMode,
chunkSize
) )
} }
@@ -70,8 +73,7 @@ class MemcacheCacheProvider : CacheProvider<MemcacheCacheConfiguration> {
val result = doc.createElement("cache") val result = doc.createElement("cache")
Xml.of(doc, result) { Xml.of(doc, result) {
attr("xmlns:${xmlNamespacePrefix}", xmlNamespace, namespaceURI = "http://www.w3.org/2000/xmlns/") attr("xmlns:${xmlNamespacePrefix}", xmlNamespace, namespaceURI = "http://www.w3.org/2000/xmlns/")
attr("xs:type", "${xmlNamespacePrefix}:$xmlType", RBCS.XML_SCHEMA_NAMESPACE_URI)
attr("xs:type", "${xmlNamespacePrefix}:$xmlType", GBCS.XML_SCHEMA_NAMESPACE_URI)
for (server in servers) { for (server in servers) {
node("server") { node("server") {
attr("host", server.endpoint.host) attr("host", server.endpoint.host)
@@ -84,13 +86,13 @@ class MemcacheCacheProvider : CacheProvider<MemcacheCacheConfiguration> {
} }
attr("max-age", maxAge.toString()) attr("max-age", maxAge.toString())
attr("max-size", maxSize.toString()) attr("max-size", maxSize.toString())
attr("chunk-size", chunkSize.toString())
digestAlgorithm?.let { digestAlgorithm -> digestAlgorithm?.let { digestAlgorithm ->
attr("digest", digestAlgorithm) attr("digest", digestAlgorithm)
} }
compressionMode?.let { compressionMode -> compressionMode?.let { compressionMode ->
attr( attr(
"compression-mode", when (compressionMode) { "compression-mode", when (compressionMode) {
MemcacheCacheConfiguration.CompressionMode.GZIP -> "gzip"
MemcacheCacheConfiguration.CompressionMode.DEFLATE -> "deflate" MemcacheCacheConfiguration.CompressionMode.DEFLATE -> "deflate"
} }
) )

View File

@@ -0,0 +1,30 @@
package net.woggioni.rbcs.server.memcache.client
import io.netty.buffer.ByteBuf
import io.netty.handler.codec.memcache.LastMemcacheContent
import io.netty.handler.codec.memcache.MemcacheContent
import io.netty.handler.codec.memcache.binary.BinaryMemcacheRequest
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
sealed interface StreamingRequestEvent {
class SendRequest(val request : BinaryMemcacheRequest) : StreamingRequestEvent
open class SendChunk(val chunk : ByteBuf) : StreamingRequestEvent
class SendLastChunk(chunk : ByteBuf) : SendChunk(chunk)
class ExceptionCaught(val exception : Throwable) : StreamingRequestEvent
}
sealed interface StreamingResponseEvent {
class ResponseReceived(val response : BinaryMemcacheResponse) : StreamingResponseEvent
open class ContentReceived(val content : MemcacheContent) : StreamingResponseEvent
class LastContentReceived(val lastContent : LastMemcacheContent) : ContentReceived(lastContent)
class ExceptionCaught(val exception : Throwable) : StreamingResponseEvent
}
interface MemcacheRequestHandle {
fun handleEvent(evt : StreamingRequestEvent)
}
interface MemcacheResponseHandle {
fun handleEvent(evt : StreamingResponseEvent)
}

View File

@@ -0,0 +1,183 @@
package net.woggioni.rbcs.server.memcache.client
import io.netty.bootstrap.Bootstrap
import io.netty.buffer.ByteBuf
import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelOption
import io.netty.channel.ChannelPipeline
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.pool.AbstractChannelPoolHandler
import io.netty.channel.pool.ChannelPool
import io.netty.channel.pool.FixedChannelPool
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.memcache.DefaultLastMemcacheContent
import io.netty.handler.codec.memcache.DefaultMemcacheContent
import io.netty.handler.codec.memcache.LastMemcacheContent
import io.netty.handler.codec.memcache.MemcacheContent
import io.netty.handler.codec.memcache.MemcacheObject
import io.netty.handler.codec.memcache.binary.BinaryMemcacheClientCodec
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
import io.netty.handler.logging.LoggingHandler
import io.netty.util.concurrent.GenericFutureListener
import net.woggioni.rbcs.common.HostAndPort
import net.woggioni.rbcs.common.contextLogger
import net.woggioni.rbcs.common.debug
import net.woggioni.rbcs.server.memcache.MemcacheCacheConfiguration
import java.net.InetSocketAddress
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
import io.netty.util.concurrent.Future as NettyFuture
class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseable {
private companion object {
@JvmStatic
private val log = contextLogger()
}
private val group: NioEventLoopGroup
private val connectionPool: MutableMap<HostAndPort, ChannelPool> = ConcurrentHashMap()
init {
group = NioEventLoopGroup()
}
private val counter = AtomicLong(0)
private fun newConnectionPool(server: MemcacheCacheConfiguration.Server): FixedChannelPool {
val bootstrap = Bootstrap().apply {
group(group)
channel(NioSocketChannel::class.java)
option(ChannelOption.SO_KEEPALIVE, true)
remoteAddress(InetSocketAddress(server.endpoint.host, server.endpoint.port))
server.connectionTimeoutMillis?.let {
option(ChannelOption.CONNECT_TIMEOUT_MILLIS, it)
}
}
val channelPoolHandler = object : AbstractChannelPoolHandler() {
override fun channelCreated(ch: Channel) {
val pipeline: ChannelPipeline = ch.pipeline()
pipeline.addLast(BinaryMemcacheClientCodec())
pipeline.addLast(LoggingHandler())
}
}
return FixedChannelPool(bootstrap, channelPoolHandler, server.maxConnections)
}
fun sendRequest(key: ByteBuf, responseHandle: MemcacheResponseHandle): CompletableFuture<MemcacheRequestHandle> {
val server = cfg.servers.let { servers ->
if (servers.size > 1) {
var checksum = 0
while (key.readableBytes() > 4) {
val byte = key.readInt()
checksum = checksum xor byte
}
while (key.readableBytes() > 0) {
val byte = key.readByte()
checksum = checksum xor byte.toInt()
}
servers[checksum % servers.size]
} else {
servers.first()
}
}
val response = CompletableFuture<MemcacheRequestHandle>()
// Custom handler for processing responses
val pool = connectionPool.computeIfAbsent(server.endpoint) {
newConnectionPool(server)
}
pool.acquire().addListener(object : GenericFutureListener<NettyFuture<Channel>> {
override fun operationComplete(channelFuture: NettyFuture<Channel>) {
if (channelFuture.isSuccess) {
val channel = channelFuture.now
val pipeline = channel.pipeline()
val handler = object : SimpleChannelInboundHandler<MemcacheObject>() {
override fun channelRead0(
ctx: ChannelHandlerContext,
msg: MemcacheObject
) {
when (msg) {
is BinaryMemcacheResponse -> responseHandle.handleEvent(
StreamingResponseEvent.ResponseReceived(
msg
)
)
is LastMemcacheContent -> {
responseHandle.handleEvent(
StreamingResponseEvent.LastContentReceived(
msg
)
)
pipeline.removeLast()
pool.release(channel)
}
is MemcacheContent -> responseHandle.handleEvent(
StreamingResponseEvent.ContentReceived(
msg
)
)
}
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
responseHandle.handleEvent(StreamingResponseEvent.ExceptionCaught(cause))
ctx.close()
pipeline.removeLast()
pool.release(channel)
}
}
channel.pipeline()
.addLast("client-handler", handler)
response.complete(object : MemcacheRequestHandle {
override fun handleEvent(evt: StreamingRequestEvent) {
when (evt) {
is StreamingRequestEvent.SendRequest -> {
channel.writeAndFlush(evt.request)
}
is StreamingRequestEvent.SendLastChunk -> {
channel.writeAndFlush(DefaultLastMemcacheContent(evt.chunk))
val value = counter.incrementAndGet()
log.debug {
"Finished request counter: $value"
}
}
is StreamingRequestEvent.SendChunk -> {
channel.writeAndFlush(DefaultMemcacheContent(evt.chunk))
}
is StreamingRequestEvent.ExceptionCaught -> {
responseHandle.handleEvent(StreamingResponseEvent.ExceptionCaught(evt.exception))
channel.close()
pipeline.removeLast()
pool.release(channel)
}
}
}
})
} else {
response.completeExceptionally(channelFuture.cause())
}
}
})
return response
}
fun shutDown(): NettyFuture<*> {
return group.shutdownGracefully()
}
override fun close() {
shutDown().sync()
}
}

View File

@@ -0,0 +1 @@
net.woggioni.rbcs.server.memcache.MemcacheCacheProvider

View File

@@ -1,10 +1,10 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?> <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<xs:schema targetNamespace="urn:net.woggioni.gbcs.server.memcache" <xs:schema targetNamespace="urn:net.woggioni.rbcs.server.memcache"
xmlns:gbcs-memcache="urn:net.woggioni.gbcs.server.memcache" xmlns:rbcs-memcache="urn:net.woggioni.rbcs.server.memcache"
xmlns:gbcs="urn:net.woggioni.gbcs.server" xmlns:rbcs="urn:net.woggioni.rbcs.server"
xmlns:xs="http://www.w3.org/2001/XMLSchema"> xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:import schemaLocation="jpms://net.woggioni.gbcs.server/net/woggioni/gbcs/server/schema/gbcs.xsd" namespace="urn:net.woggioni.gbcs.server"/> <xs:import schemaLocation="jpms://net.woggioni.rbcs.server/net/woggioni/rbcs/server/schema/rbcs.xsd" namespace="urn:net.woggioni.rbcs.server"/>
<xs:complexType name="memcacheServerType"> <xs:complexType name="memcacheServerType">
<xs:attribute name="host" type="xs:token" use="required"/> <xs:attribute name="host" type="xs:token" use="required"/>
@@ -15,14 +15,15 @@
<xs:complexType name="memcacheCacheType"> <xs:complexType name="memcacheCacheType">
<xs:complexContent> <xs:complexContent>
<xs:extension base="gbcs:cacheType"> <xs:extension base="rbcs:cacheType">
<xs:sequence maxOccurs="unbounded"> <xs:sequence maxOccurs="unbounded">
<xs:element name="server" type="gbcs-memcache:memcacheServerType"/> <xs:element name="server" type="rbcs-memcache:memcacheServerType"/>
</xs:sequence> </xs:sequence>
<xs:attribute name="max-age" type="xs:duration" default="P1D"/> <xs:attribute name="max-age" type="xs:duration" default="P1D"/>
<xs:attribute name="max-size" type="xs:unsignedInt" default="1048576"/> <xs:attribute name="max-size" type="rbcs:byteSize" default="1048576"/>
<xs:attribute name="chunk-size" type="rbcs:byteSize" default="0x4000"/>
<xs:attribute name="digest" type="xs:token" /> <xs:attribute name="digest" type="xs:token" />
<xs:attribute name="compression-mode" type="gbcs-memcache:compressionType"/> <xs:attribute name="compression-mode" type="rbcs-memcache:compressionType"/>
</xs:extension> </xs:extension>
</xs:complexContent> </xs:complexContent>
</xs:complexType> </xs:complexType>
@@ -30,7 +31,6 @@
<xs:simpleType name="compressionType"> <xs:simpleType name="compressionType">
<xs:restriction base="xs:token"> <xs:restriction base="xs:token">
<xs:enumeration value="deflate"/> <xs:enumeration value="deflate"/>
<xs:enumeration value="gzip"/>
</xs:restriction> </xs:restriction>
</xs:simpleType> </xs:simpleType>

View File

@@ -9,9 +9,12 @@ dependencies {
implementation catalog.jwo implementation catalog.jwo
implementation catalog.slf4j.api implementation catalog.slf4j.api
implementation catalog.netty.codec.http implementation catalog.netty.codec.http
implementation catalog.netty.handler
implementation catalog.netty.buffer
implementation catalog.netty.transport
api project(':gbcs-common') api project(':rbcs-common')
api project(':gbcs-api') api project(':rbcs-api')
// runtimeOnly catalog.slf4j.jdk14 // runtimeOnly catalog.slf4j.jdk14
testRuntimeOnly catalog.logback.classic testRuntimeOnly catalog.logback.classic
@@ -19,7 +22,7 @@ dependencies {
testImplementation catalog.bcprov.jdk18on testImplementation catalog.bcprov.jdk18on
testImplementation catalog.bcpkix.jdk18on testImplementation catalog.bcpkix.jdk18on
testRuntimeOnly project(":gbcs-server-memcache") testRuntimeOnly project(":rbcs-server-memcache")
} }
test { test {
@@ -36,3 +39,4 @@ publishing {
} }

View File

@@ -1,8 +1,8 @@
import net.woggioni.gbcs.api.CacheProvider; import net.woggioni.rbcs.api.CacheProvider;
import net.woggioni.gbcs.server.cache.FileSystemCacheProvider; import net.woggioni.rbcs.server.cache.FileSystemCacheProvider;
import net.woggioni.gbcs.server.cache.InMemoryCacheProvider; import net.woggioni.rbcs.server.cache.InMemoryCacheProvider;
module net.woggioni.gbcs.server { module net.woggioni.rbcs.server {
requires java.sql; requires java.sql;
requires java.xml; requires java.xml;
requires java.logging; requires java.logging;
@@ -16,13 +16,13 @@ module net.woggioni.gbcs.server {
requires io.netty.codec; requires io.netty.codec;
requires org.slf4j; requires org.slf4j;
requires net.woggioni.jwo; requires net.woggioni.jwo;
requires net.woggioni.gbcs.common; requires net.woggioni.rbcs.common;
requires net.woggioni.gbcs.api; requires net.woggioni.rbcs.api;
exports net.woggioni.gbcs.server; exports net.woggioni.rbcs.server;
opens net.woggioni.gbcs.server; opens net.woggioni.rbcs.server;
opens net.woggioni.gbcs.server.schema; opens net.woggioni.rbcs.server.schema;
uses CacheProvider; uses CacheProvider;
provides CacheProvider with FileSystemCacheProvider, InMemoryCacheProvider; provides CacheProvider with FileSystemCacheProvider, InMemoryCacheProvider;

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.server package net.woggioni.rbcs.server
import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelHandlerContext
import org.slf4j.Logger import org.slf4j.Logger

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.server package net.woggioni.rbcs.server
import io.netty.bootstrap.ServerBootstrap import io.netty.bootstrap.ServerBootstrap
import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBuf
@@ -16,7 +16,6 @@ import io.netty.handler.codec.compression.CompressionOptions
import io.netty.handler.codec.http.DefaultHttpContent import io.netty.handler.codec.http.DefaultHttpContent
import io.netty.handler.codec.http.HttpContentCompressor import io.netty.handler.codec.http.HttpContentCompressor
import io.netty.handler.codec.http.HttpHeaderNames import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpObjectAggregator
import io.netty.handler.codec.http.HttpRequest import io.netty.handler.codec.http.HttpRequest
import io.netty.handler.codec.http.HttpServerCodec import io.netty.handler.codec.http.HttpServerCodec
import io.netty.handler.ssl.ClientAuth import io.netty.handler.ssl.ClientAuth
@@ -30,26 +29,26 @@ import io.netty.handler.timeout.IdleStateHandler
import io.netty.util.AttributeKey import io.netty.util.AttributeKey
import io.netty.util.concurrent.DefaultEventExecutorGroup import io.netty.util.concurrent.DefaultEventExecutorGroup
import io.netty.util.concurrent.EventExecutorGroup import io.netty.util.concurrent.EventExecutorGroup
import net.woggioni.gbcs.api.Configuration
import net.woggioni.gbcs.api.exception.ConfigurationException
import net.woggioni.gbcs.common.GBCS.toUrl
import net.woggioni.gbcs.common.PasswordSecurity.decodePasswordHash
import net.woggioni.gbcs.common.PasswordSecurity.hashPassword
import net.woggioni.gbcs.common.Xml
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.common.debug
import net.woggioni.gbcs.common.info
import net.woggioni.gbcs.server.auth.AbstractNettyHttpAuthenticator
import net.woggioni.gbcs.server.auth.Authorizer
import net.woggioni.gbcs.server.auth.ClientCertificateValidator
import net.woggioni.gbcs.server.auth.RoleAuthorizer
import net.woggioni.gbcs.server.configuration.Parser
import net.woggioni.gbcs.server.configuration.Serializer
import net.woggioni.gbcs.server.exception.ExceptionHandler
import net.woggioni.gbcs.server.handler.ServerHandler
import net.woggioni.gbcs.server.throttling.ThrottlingHandler
import net.woggioni.jwo.JWO import net.woggioni.jwo.JWO
import net.woggioni.jwo.Tuple2 import net.woggioni.jwo.Tuple2
import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.api.exception.ConfigurationException
import net.woggioni.rbcs.common.PasswordSecurity.decodePasswordHash
import net.woggioni.rbcs.common.PasswordSecurity.hashPassword
import net.woggioni.rbcs.common.RBCS.toUrl
import net.woggioni.rbcs.common.Xml
import net.woggioni.rbcs.common.contextLogger
import net.woggioni.rbcs.common.debug
import net.woggioni.rbcs.common.info
import net.woggioni.rbcs.server.auth.AbstractNettyHttpAuthenticator
import net.woggioni.rbcs.server.auth.Authorizer
import net.woggioni.rbcs.server.auth.ClientCertificateValidator
import net.woggioni.rbcs.server.auth.RoleAuthorizer
import net.woggioni.rbcs.server.configuration.Parser
import net.woggioni.rbcs.server.configuration.Serializer
import net.woggioni.rbcs.server.exception.ExceptionHandler
import net.woggioni.rbcs.server.handler.ServerHandler
import net.woggioni.rbcs.server.throttling.ThrottlingHandler
import java.io.OutputStream import java.io.OutputStream
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.nio.file.Files import java.nio.file.Files
@@ -59,13 +58,14 @@ import java.security.PrivateKey
import java.security.cert.X509Certificate import java.security.cert.X509Certificate
import java.util.Arrays import java.util.Arrays
import java.util.Base64 import java.util.Base64
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.regex.Matcher import java.util.regex.Matcher
import java.util.regex.Pattern import java.util.regex.Pattern
import javax.naming.ldap.LdapName import javax.naming.ldap.LdapName
import javax.net.ssl.SSLPeerUnverifiedException import javax.net.ssl.SSLPeerUnverifiedException
class GradleBuildCacheServer(private val cfg: Configuration) { class RemoteBuildCacheServer(private val cfg: Configuration) {
private val log = contextLogger() private val log = contextLogger()
companion object { companion object {
@@ -73,7 +73,7 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
val userAttribute: AttributeKey<Configuration.User> = AttributeKey.valueOf("user") val userAttribute: AttributeKey<Configuration.User> = AttributeKey.valueOf("user")
val groupAttribute: AttributeKey<Set<Configuration.Group>> = AttributeKey.valueOf("group") val groupAttribute: AttributeKey<Set<Configuration.Group>> = AttributeKey.valueOf("group")
val DEFAULT_CONFIGURATION_URL by lazy { "classpath:net/woggioni/gbcs/gbcs-default.xml".toUrl() } val DEFAULT_CONFIGURATION_URL by lazy { "classpath:net/woggioni/rbcs/server/rbcs-default.xml".toUrl() }
private const val SSL_HANDLER_NAME = "sslHandler" private const val SSL_HANDLER_NAME = "sslHandler"
fun loadConfiguration(configurationFile: Path): Configuration { fun loadConfiguration(configurationFile: Path): Configuration {
@@ -128,7 +128,8 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
val clientCertificate = peerCertificates.first() as X509Certificate val clientCertificate = peerCertificates.first() as X509Certificate
val user = userExtractor?.extract(clientCertificate) val user = userExtractor?.extract(clientCertificate)
val group = groupExtractor?.extract(clientCertificate) val group = groupExtractor?.extract(clientCertificate)
val allGroups = ((user?.groups ?: emptySet()).asSequence() + sequenceOf(group).filterNotNull()).toSet() val allGroups =
((user?.groups ?: emptySet()).asSequence() + sequenceOf(group).filterNotNull()).toSet()
AuthenticationResult(user, allGroups) AuthenticationResult(user, allGroups)
} ?: anonymousUserGroups?.let { AuthenticationResult(null, it) } } ?: anonymousUserGroups?.let { AuthenticationResult(null, it) }
} catch (es: SSLPeerUnverifiedException) { } catch (es: SSLPeerUnverifiedException) {
@@ -191,7 +192,7 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
private class ServerInitializer( private class ServerInitializer(
private val cfg: Configuration, private val cfg: Configuration,
private val eventExecutorGroup: EventExecutorGroup private val eventExecutorGroup: EventExecutorGroup
) : ChannelInitializer<Channel>() { ) : ChannelInitializer<Channel>(), AutoCloseable {
companion object { companion object {
private fun createSslCtx(tls: Configuration.Tls): SslContext { private fun createSslCtx(tls: Configuration.Tls): SslContext {
@@ -245,14 +246,9 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
private val log = contextLogger() private val log = contextLogger()
private val serverHandler = let { private val cache = cfg.cache.materialize()
val cacheImplementation = cfg.cache.materialize()
val prefix = Path.of("/").resolve(Path.of(cfg.serverPath ?: "/"))
ServerHandler(cacheImplementation, prefix)
}
private val exceptionHandler = ExceptionHandler() private val exceptionHandler = ExceptionHandler()
private val throttlingHandler = ThrottlingHandler(cfg)
private val authenticator = when (val auth = cfg.authentication) { private val authenticator = when (val auth = cfg.authentication) {
is Configuration.BasicAuthentication -> NettyHttpBasicAuthenticator(cfg.users, RoleAuthorizer()) is Configuration.BasicAuthentication -> NettyHttpBasicAuthenticator(cfg.users, RoleAuthorizer())
@@ -344,12 +340,15 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
IdleState.READER_IDLE -> log.debug { IdleState.READER_IDLE -> log.debug {
"Read timeout reached on channel ${ch.id().asShortText()}, closing the connection" "Read timeout reached on channel ${ch.id().asShortText()}, closing the connection"
} }
IdleState.WRITER_IDLE -> log.debug { IdleState.WRITER_IDLE -> log.debug {
"Write timeout reached on channel ${ch.id().asShortText()}, closing the connection" "Write timeout reached on channel ${ch.id().asShortText()}, closing the connection"
} }
IdleState.ALL_IDLE -> log.debug { IdleState.ALL_IDLE -> log.debug {
"Idle timeout reached on channel ${ch.id().asShortText()}, closing the connection" "Idle timeout reached on channel ${ch.id().asShortText()}, closing the connection"
} }
null -> throw IllegalStateException("This should never happen") null -> throw IllegalStateException("This should never happen")
} }
ctx.close() ctx.close()
@@ -362,38 +361,57 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
pipeline.addLast(HttpServerCodec()) pipeline.addLast(HttpServerCodec())
pipeline.addLast(HttpChunkContentCompressor(1024)) pipeline.addLast(HttpChunkContentCompressor(1024))
pipeline.addLast(ChunkedWriteHandler()) pipeline.addLast(ChunkedWriteHandler())
pipeline.addLast(HttpObjectAggregator(cfg.connection.maxRequestSize))
authenticator?.let { authenticator?.let {
pipeline.addLast(it) pipeline.addLast(it)
} }
pipeline.addLast(throttlingHandler) pipeline.addLast(ThrottlingHandler(cfg))
val serverHandler = let {
val prefix = Path.of("/").resolve(Path.of(cfg.serverPath ?: "/"))
ServerHandler(cache, prefix)
}
pipeline.addLast(eventExecutorGroup, serverHandler) pipeline.addLast(eventExecutorGroup, serverHandler)
pipeline.addLast(exceptionHandler) pipeline.addLast(exceptionHandler)
} }
override fun close() {
cache.close()
}
} }
class ServerHandle( class ServerHandle(
httpChannelFuture: ChannelFuture, httpChannelFuture: ChannelFuture,
private val executorGroups: Iterable<EventExecutorGroup> private val executorGroups: Iterable<EventExecutorGroup>,
private val serverInitializer: AutoCloseable
) : AutoCloseable { ) : AutoCloseable {
private val httpChannel: Channel = httpChannelFuture.channel() private val httpChannel: Channel = httpChannelFuture.channel()
private val closeFuture: ChannelFuture = httpChannel.closeFuture() private val closeFuture: ChannelFuture = httpChannel.closeFuture()
private val log = contextLogger() private val log = contextLogger()
fun shutdown(): ChannelFuture { fun shutdown(): Future<Void> {
return httpChannel.close() return httpChannel.close()
} }
override fun close() { override fun close() {
try { try {
closeFuture.sync() closeFuture.sync()
} finally { } catch (ex: Throwable) {
log.error(ex.message, ex)
}
try {
serverInitializer.close()
} catch (ex: Throwable) {
log.error(ex.message, ex)
}
executorGroups.forEach { executorGroups.forEach {
try {
it.shutdownGracefully().sync() it.shutdownGracefully().sync()
} catch (ex: Throwable) {
log.error(ex.message, ex)
} }
} }
log.info { log.info {
"GradleBuildCacheServer has been gracefully shut down" "RemoteBuildCacheServer has been gracefully shut down"
} }
} }
} }
@@ -411,11 +429,12 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
} }
DefaultEventExecutorGroup(Runtime.getRuntime().availableProcessors(), threadFactory) DefaultEventExecutorGroup(Runtime.getRuntime().availableProcessors(), threadFactory)
} }
val serverInitializer = ServerInitializer(cfg, eventExecutorGroup)
val bootstrap = ServerBootstrap().apply { val bootstrap = ServerBootstrap().apply {
// Configure the server // Configure the server
group(bossGroup, workerGroup) group(bossGroup, workerGroup)
channel(serverSocketChannel) channel(serverSocketChannel)
childHandler(ServerInitializer(cfg, eventExecutorGroup)) childHandler(serverInitializer)
option(ChannelOption.SO_BACKLOG, cfg.incomingConnectionsBacklogSize) option(ChannelOption.SO_BACKLOG, cfg.incomingConnectionsBacklogSize)
childOption(ChannelOption.SO_KEEPALIVE, true) childOption(ChannelOption.SO_KEEPALIVE, true)
} }
@@ -425,8 +444,8 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
val bindAddress = InetSocketAddress(cfg.host, cfg.port) val bindAddress = InetSocketAddress(cfg.host, cfg.port)
val httpChannel = bootstrap.bind(bindAddress).sync() val httpChannel = bootstrap.bind(bindAddress).sync()
log.info { log.info {
"GradleBuildCacheServer is listening on ${cfg.host}:${cfg.port}" "RemoteBuildCacheServer is listening on ${cfg.host}:${cfg.port}"
} }
return ServerHandle(httpChannel, setOf(bossGroup, workerGroup, eventExecutorGroup)) return ServerHandle(httpChannel, setOf(bossGroup, workerGroup, eventExecutorGroup), serverInitializer)
} }
} }

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.server.auth package net.woggioni.rbcs.server.auth
import io.netty.buffer.Unpooled import io.netty.buffer.Unpooled
import io.netty.channel.ChannelFutureListener import io.netty.channel.ChannelFutureListener
@@ -6,15 +6,16 @@ import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.handler.codec.http.DefaultFullHttpResponse import io.netty.handler.codec.http.DefaultFullHttpResponse
import io.netty.handler.codec.http.FullHttpResponse import io.netty.handler.codec.http.FullHttpResponse
import io.netty.handler.codec.http.HttpContent
import io.netty.handler.codec.http.HttpHeaderNames import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpRequest import io.netty.handler.codec.http.HttpRequest
import io.netty.handler.codec.http.HttpResponseStatus import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpVersion import io.netty.handler.codec.http.HttpVersion
import io.netty.util.ReferenceCountUtil import io.netty.util.ReferenceCountUtil
import net.woggioni.gbcs.api.Configuration import net.woggioni.rbcs.api.Configuration
import net.woggioni.gbcs.api.Configuration.Group import net.woggioni.rbcs.api.Configuration.Group
import net.woggioni.gbcs.api.Role import net.woggioni.rbcs.api.Role
import net.woggioni.gbcs.server.GradleBuildCacheServer import net.woggioni.rbcs.server.RemoteBuildCacheServer
abstract class AbstractNettyHttpAuthenticator(private val authorizer: Authorizer) : ChannelInboundHandlerAdapter() { abstract class AbstractNettyHttpAuthenticator(private val authorizer: Authorizer) : ChannelInboundHandlerAdapter() {
@@ -40,8 +41,8 @@ abstract class AbstractNettyHttpAuthenticator(private val authorizer: Authorizer
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) { override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
if (msg is HttpRequest) { if (msg is HttpRequest) {
val result = authenticate(ctx, msg) ?: return authenticationFailure(ctx, msg) val result = authenticate(ctx, msg) ?: return authenticationFailure(ctx, msg)
ctx.channel().attr(GradleBuildCacheServer.userAttribute).set(result.user) ctx.channel().attr(RemoteBuildCacheServer.userAttribute).set(result.user)
ctx.channel().attr(GradleBuildCacheServer.groupAttribute).set(result.groups) ctx.channel().attr(RemoteBuildCacheServer.groupAttribute).set(result.groups)
val roles = ( val roles = (
(result.user?.let { user -> (result.user?.let { user ->
@@ -57,6 +58,8 @@ abstract class AbstractNettyHttpAuthenticator(private val authorizer: Authorizer
} else { } else {
authorizationFailure(ctx, msg) authorizationFailure(ctx, msg)
} }
} else if(msg is HttpContent) {
ctx.fireChannelRead(msg)
} }
} }

View File

@@ -1,7 +1,7 @@
package net.woggioni.gbcs.server.auth package net.woggioni.rbcs.server.auth
import io.netty.handler.codec.http.HttpRequest import io.netty.handler.codec.http.HttpRequest
import net.woggioni.gbcs.api.Role import net.woggioni.rbcs.api.Role
fun interface Authorizer { fun interface Authorizer {
fun authorize(roles : Set<Role>, request: HttpRequest) : Boolean fun authorize(roles : Set<Role>, request: HttpRequest) : Boolean

View File

@@ -1,4 +1,4 @@
package net.woggioni.gbcs.server.auth package net.woggioni.rbcs.server.auth
import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter import io.netty.channel.ChannelInboundHandlerAdapter

View File

@@ -1,8 +1,8 @@
package net.woggioni.gbcs.server.auth package net.woggioni.rbcs.server.auth
import io.netty.handler.codec.http.HttpMethod import io.netty.handler.codec.http.HttpMethod
import io.netty.handler.codec.http.HttpRequest import io.netty.handler.codec.http.HttpRequest
import net.woggioni.gbcs.api.Role import net.woggioni.rbcs.api.Role
class RoleAuthorizer : Authorizer { class RoleAuthorizer : Authorizer {

View File

@@ -0,0 +1,201 @@
package net.woggioni.rbcs.server.cache
import io.netty.buffer.ByteBufAllocator
import net.woggioni.jwo.JWO
import net.woggioni.rbcs.api.Cache
import net.woggioni.rbcs.api.RequestHandle
import net.woggioni.rbcs.api.ResponseHandle
import net.woggioni.rbcs.api.event.RequestStreamingEvent
import net.woggioni.rbcs.api.event.ResponseStreamingEvent
import net.woggioni.rbcs.common.ByteBufOutputStream
import net.woggioni.rbcs.common.RBCS.digestString
import net.woggioni.rbcs.common.contextLogger
import net.woggioni.rbcs.common.extractChunk
import java.nio.channels.FileChannel
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardCopyOption
import java.nio.file.StandardOpenOption
import java.nio.file.attribute.BasicFileAttributes
import java.security.MessageDigest
import java.time.Duration
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.InflaterInputStream
class FileSystemCache(
val root: Path,
val maxAge: Duration,
val digestAlgorithm: String?,
val compressionEnabled: Boolean,
val compressionLevel: Int,
val chunkSize: Int
) : Cache {
private companion object {
@JvmStatic
private val log = contextLogger()
}
init {
Files.createDirectories(root)
}
@Volatile
private var running = true
private var nextGc = Instant.now()
override fun get(key: String, responseHandle: ResponseHandle, alloc: ByteBufAllocator) {
(digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digestString(key.toByteArray(), md)
} ?: key).let { digest ->
root.resolve(digest).takeIf(Files::exists)
?.let { file ->
file.takeIf(Files::exists)?.let { file ->
responseHandle.handleEvent(ResponseStreamingEvent.RESPONSE_RECEIVED)
if (compressionEnabled) {
val compositeBuffer = alloc.compositeBuffer()
ByteBufOutputStream(compositeBuffer).use { outputStream ->
InflaterInputStream(Files.newInputStream(file)).use { inputStream ->
val ioBuffer = alloc.buffer(chunkSize)
try {
while (true) {
val read = ioBuffer.writeBytes(inputStream, chunkSize)
val last = read < 0
if (read > 0) {
ioBuffer.readBytes(outputStream, read)
}
if (last) {
compositeBuffer.retain()
outputStream.close()
}
if (compositeBuffer.readableBytes() >= chunkSize || last) {
val chunk = extractChunk(compositeBuffer, alloc)
val evt = if (last) {
ResponseStreamingEvent.LastChunkReceived(chunk)
} else {
ResponseStreamingEvent.ChunkReceived(chunk)
}
responseHandle.handleEvent(evt)
}
if (last) break
}
} finally {
ioBuffer.release()
}
}
}
} else {
responseHandle.handleEvent(
ResponseStreamingEvent.FileReceived(
FileChannel.open(file, StandardOpenOption.READ)
)
)
}
}
} ?: responseHandle.handleEvent(ResponseStreamingEvent.NOT_FOUND)
}
}
override fun put(
key: String,
responseHandle: ResponseHandle,
alloc: ByteBufAllocator
): CompletableFuture<RequestHandle> {
try {
(digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digestString(key.toByteArray(), md)
} ?: key).let { digest ->
val file = root.resolve(digest)
val tmpFile = Files.createTempFile(root, null, ".tmp")
val stream = Files.newOutputStream(tmpFile).let {
if (compressionEnabled) {
val deflater = Deflater(compressionLevel)
DeflaterOutputStream(it, deflater)
} else {
it
}
}
return CompletableFuture.completedFuture(object : RequestHandle {
override fun handleEvent(evt: RequestStreamingEvent) {
try {
when (evt) {
is RequestStreamingEvent.LastChunkReceived -> {
evt.chunk.readBytes(stream, evt.chunk.readableBytes())
stream.close()
Files.move(tmpFile, file, StandardCopyOption.ATOMIC_MOVE)
responseHandle.handleEvent(ResponseStreamingEvent.RESPONSE_RECEIVED)
}
is RequestStreamingEvent.ChunkReceived -> {
evt.chunk.readBytes(stream, evt.chunk.readableBytes())
}
is RequestStreamingEvent.ExceptionCaught -> {
Files.delete(tmpFile)
stream.close()
}
}
} catch (ex: Throwable) {
responseHandle.handleEvent(ResponseStreamingEvent.ExceptionCaught(ex))
}
}
})
}
} catch (ex: Throwable) {
responseHandle.handleEvent(ResponseStreamingEvent.ExceptionCaught(ex))
return CompletableFuture.failedFuture(ex)
}
}
private val garbageCollector = Thread.ofVirtual().name("file-system-cache-gc").start {
while (running) {
gc()
}
}
private fun gc() {
val now = Instant.now()
if (nextGc < now) {
val oldestEntry = actualGc(now)
nextGc = (oldestEntry ?: now).plus(maxAge)
}
Thread.sleep(minOf(Duration.between(now, nextGc), Duration.ofSeconds(1)))
}
/**
* Returns the creation timestamp of the oldest cache entry (if any)
*/
private fun actualGc(now: Instant): Instant? {
var result: Instant? = null
Files.list(root)
.filter { path ->
JWO.splitExtension(path)
.map { it._2 }
.map { it != ".tmp" }
.orElse(true)
}
.filter {
val creationTimeStamp = Files.readAttributes(it, BasicFileAttributes::class.java)
.creationTime()
.toInstant()
if (result == null || creationTimeStamp < result) {
result = creationTimeStamp
}
now > creationTimeStamp.plus(maxAge)
}.forEach(Files::delete)
return result
}
override fun close() {
running = false
garbageCollector.join()
}
}

View File

@@ -1,7 +1,7 @@
package net.woggioni.gbcs.server.cache package net.woggioni.rbcs.server.cache
import net.woggioni.gbcs.api.Configuration import net.woggioni.rbcs.api.Configuration
import net.woggioni.gbcs.common.GBCS import net.woggioni.rbcs.common.RBCS
import net.woggioni.jwo.Application import net.woggioni.jwo.Application
import java.nio.file.Path import java.nio.file.Path
import java.time.Duration import java.time.Duration
@@ -12,16 +12,18 @@ data class FileSystemCacheConfiguration(
val digestAlgorithm : String?, val digestAlgorithm : String?,
val compressionEnabled: Boolean, val compressionEnabled: Boolean,
val compressionLevel: Int, val compressionLevel: Int,
val chunkSize: Int,
) : Configuration.Cache { ) : Configuration.Cache {
override fun materialize() = FileSystemCache( override fun materialize() = FileSystemCache(
root ?: Application.builder("gbcs").build().computeCacheDirectory(), root ?: Application.builder("rbcs").build().computeCacheDirectory(),
maxAge, maxAge,
digestAlgorithm, digestAlgorithm,
compressionEnabled, compressionEnabled,
compressionLevel compressionLevel,
chunkSize,
) )
override fun getNamespaceURI() = GBCS.GBCS_NAMESPACE_URI override fun getNamespaceURI() = RBCS.RBCS_NAMESPACE_URI
override fun getTypeName() = "fileSystemCacheType" override fun getTypeName() = "fileSystemCacheType"
} }

View File

@@ -1,9 +1,9 @@
package net.woggioni.gbcs.server.cache package net.woggioni.rbcs.server.cache
import net.woggioni.gbcs.api.CacheProvider import net.woggioni.rbcs.api.CacheProvider
import net.woggioni.gbcs.common.GBCS import net.woggioni.rbcs.common.RBCS
import net.woggioni.gbcs.common.Xml import net.woggioni.rbcs.common.Xml
import net.woggioni.gbcs.common.Xml.Companion.renderAttribute import net.woggioni.rbcs.common.Xml.Companion.renderAttribute
import org.w3c.dom.Document import org.w3c.dom.Document
import org.w3c.dom.Element import org.w3c.dom.Element
import java.nio.file.Path import java.nio.file.Path
@@ -12,11 +12,11 @@ import java.util.zip.Deflater
class FileSystemCacheProvider : CacheProvider<FileSystemCacheConfiguration> { class FileSystemCacheProvider : CacheProvider<FileSystemCacheConfiguration> {
override fun getXmlSchemaLocation() = "classpath:net/woggioni/gbcs/server/schema/gbcs.xsd" override fun getXmlSchemaLocation() = "classpath:net/woggioni/rbcs/server/schema/rbcs.xsd"
override fun getXmlType() = "fileSystemCacheType" override fun getXmlType() = "fileSystemCacheType"
override fun getXmlNamespace() = "urn:net.woggioni.gbcs.server" override fun getXmlNamespace() = "urn:net.woggioni.rbcs.server"
override fun deserialize(el: Element): FileSystemCacheConfiguration { override fun deserialize(el: Element): FileSystemCacheConfiguration {
val path = el.renderAttribute("path") val path = el.renderAttribute("path")
@@ -31,21 +31,25 @@ class FileSystemCacheProvider : CacheProvider<FileSystemCacheConfiguration> {
?.let(String::toInt) ?.let(String::toInt)
?: Deflater.DEFAULT_COMPRESSION ?: Deflater.DEFAULT_COMPRESSION
val digestAlgorithm = el.renderAttribute("digest") ?: "MD5" val digestAlgorithm = el.renderAttribute("digest") ?: "MD5"
val chunkSize = el.renderAttribute("chunk-size")
?.let(Integer::decode)
?: 0x4000
return FileSystemCacheConfiguration( return FileSystemCacheConfiguration(
path, path,
maxAge, maxAge,
digestAlgorithm, digestAlgorithm,
enableCompression, enableCompression,
compressionLevel compressionLevel,
chunkSize
) )
} }
override fun serialize(doc: Document, cache : FileSystemCacheConfiguration) = cache.run { override fun serialize(doc: Document, cache : FileSystemCacheConfiguration) = cache.run {
val result = doc.createElement("cache") val result = doc.createElement("cache")
Xml.of(doc, result) { Xml.of(doc, result) {
val prefix = doc.lookupPrefix(GBCS.GBCS_NAMESPACE_URI) val prefix = doc.lookupPrefix(RBCS.RBCS_NAMESPACE_URI)
attr("xs:type", "${prefix}:fileSystemCacheType", GBCS.XML_SCHEMA_NAMESPACE_URI) attr("xs:type", "${prefix}:fileSystemCacheType", RBCS.XML_SCHEMA_NAMESPACE_URI)
attr("path", root.toString()) attr("path", root.toString())
attr("max-age", maxAge.toString()) attr("max-age", maxAge.toString())
digestAlgorithm?.let { digestAlgorithm -> digestAlgorithm?.let { digestAlgorithm ->
@@ -57,6 +61,7 @@ class FileSystemCacheProvider : CacheProvider<FileSystemCacheConfiguration> {
}?.let { }?.let {
attr("compression-level", it.toString()) attr("compression-level", it.toString())
} }
attr("chunk-size", chunkSize.toString())
} }
result result
} }

View File

@@ -0,0 +1,208 @@
package net.woggioni.rbcs.server.cache
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import net.woggioni.rbcs.api.Cache
import net.woggioni.rbcs.api.RequestHandle
import net.woggioni.rbcs.api.ResponseHandle
import net.woggioni.rbcs.api.event.RequestStreamingEvent
import net.woggioni.rbcs.api.event.ResponseStreamingEvent
import net.woggioni.rbcs.common.ByteBufOutputStream
import net.woggioni.rbcs.common.RBCS.digestString
import net.woggioni.rbcs.common.contextLogger
import net.woggioni.rbcs.common.extractChunk
import java.security.MessageDigest
import java.time.Duration
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.PriorityBlockingQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.Inflater
import java.util.zip.InflaterOutputStream
class InMemoryCache(
private val maxAge: Duration,
private val maxSize: Long,
private val digestAlgorithm: String?,
private val compressionEnabled: Boolean,
private val compressionLevel: Int,
private val chunkSize : Int
) : Cache {
companion object {
@JvmStatic
private val log = contextLogger()
}
private val size = AtomicLong()
private val map = ConcurrentHashMap<String, ByteBuf>()
private class RemovalQueueElement(val key: String, val value: ByteBuf, val expiry: Instant) :
Comparable<RemovalQueueElement> {
override fun compareTo(other: RemovalQueueElement) = expiry.compareTo(other.expiry)
}
private val removalQueue = PriorityBlockingQueue<RemovalQueueElement>()
@Volatile
private var running = true
private val garbageCollector = Thread.ofVirtual().name("in-memory-cache-gc").start {
while (running) {
val el = removalQueue.poll(1, TimeUnit.SECONDS) ?: continue
val buf = el.value
val now = Instant.now()
if (now > el.expiry) {
val removed = map.remove(el.key, buf)
if (removed) {
updateSizeAfterRemoval(buf)
//Decrease the reference count for map
buf.release()
}
} else {
removalQueue.put(el)
Thread.sleep(minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1)))
}
}
}
private fun removeEldest(): Long {
while (true) {
val el = removalQueue.take()
val buf = el.value
val removed = map.remove(el.key, buf)
if (removed) {
val newSize = updateSizeAfterRemoval(buf)
//Decrease the reference count for map
buf.release()
return newSize
}
}
}
private fun updateSizeAfterRemoval(removed: ByteBuf): Long {
return size.updateAndGet { currentSize: Long ->
currentSize - removed.readableBytes()
}
}
override fun close() {
running = false
garbageCollector.join()
}
override fun get(key: String, responseHandle: ResponseHandle, alloc: ByteBufAllocator) {
try {
(digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digestString(key.toByteArray(), md)
} ?: key
).let { digest ->
map[digest]
?.let { value ->
val copy = value.retainedDuplicate()
responseHandle.handleEvent(ResponseStreamingEvent.RESPONSE_RECEIVED)
val output = alloc.compositeBuffer()
if (compressionEnabled) {
try {
val stream = ByteBufOutputStream(output).let {
val inflater = Inflater()
InflaterOutputStream(it, inflater)
}
stream.use { os ->
var readable = copy.readableBytes()
while (true) {
copy.readBytes(os, chunkSize.coerceAtMost(readable))
readable = copy.readableBytes()
val last = readable == 0
if (last) stream.flush()
if (output.readableBytes() >= chunkSize || last) {
val chunk = extractChunk(output, alloc)
val evt = if (last) {
ResponseStreamingEvent.LastChunkReceived(chunk)
} else {
ResponseStreamingEvent.ChunkReceived(chunk)
}
responseHandle.handleEvent(evt)
}
if (last) break
}
}
} finally {
copy.release()
}
} else {
responseHandle.handleEvent(
ResponseStreamingEvent.LastChunkReceived(copy)
)
}
} ?: responseHandle.handleEvent(ResponseStreamingEvent.NOT_FOUND)
}
} catch (ex: Throwable) {
responseHandle.handleEvent(ResponseStreamingEvent.ExceptionCaught(ex))
}
}
override fun put(
key: String,
responseHandle: ResponseHandle,
alloc: ByteBufAllocator
): CompletableFuture<RequestHandle> {
return CompletableFuture.completedFuture(object : RequestHandle {
val buf = alloc.heapBuffer()
val stream = ByteBufOutputStream(buf).let {
if (compressionEnabled) {
val deflater = Deflater(compressionLevel)
DeflaterOutputStream(it, deflater)
} else {
it
}
}
override fun handleEvent(evt: RequestStreamingEvent) {
when (evt) {
is RequestStreamingEvent.ChunkReceived -> {
evt.chunk.readBytes(stream, evt.chunk.readableBytes())
if (evt is RequestStreamingEvent.LastChunkReceived) {
(digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digestString(key.toByteArray(), md)
} ?: key
).let { digest ->
val oldSize = map.put(digest, buf.retain())?.let { old ->
val result = old.readableBytes()
old.release()
result
} ?: 0
val delta = buf.readableBytes() - oldSize
var newSize = size.updateAndGet { currentSize : Long ->
currentSize + delta
}
removalQueue.put(RemovalQueueElement(digest, buf, Instant.now().plus(maxAge)))
while(newSize > maxSize) {
newSize = removeEldest()
}
stream.close()
responseHandle.handleEvent(ResponseStreamingEvent.RESPONSE_RECEIVED)
}
}
}
is RequestStreamingEvent.ExceptionCaught -> {
stream.close()
}
else -> {
}
}
}
})
}
}

View File

@@ -1,7 +1,7 @@
package net.woggioni.gbcs.server.cache package net.woggioni.rbcs.server.cache
import net.woggioni.gbcs.api.Configuration import net.woggioni.rbcs.api.Configuration
import net.woggioni.gbcs.common.GBCS import net.woggioni.rbcs.common.RBCS
import java.time.Duration import java.time.Duration
data class InMemoryCacheConfiguration( data class InMemoryCacheConfiguration(
@@ -10,16 +10,18 @@ data class InMemoryCacheConfiguration(
val digestAlgorithm : String?, val digestAlgorithm : String?,
val compressionEnabled: Boolean, val compressionEnabled: Boolean,
val compressionLevel: Int, val compressionLevel: Int,
val chunkSize : Int
) : Configuration.Cache { ) : Configuration.Cache {
override fun materialize() = InMemoryCache( override fun materialize() = InMemoryCache(
maxAge, maxAge,
maxSize, maxSize,
digestAlgorithm, digestAlgorithm,
compressionEnabled, compressionEnabled,
compressionLevel compressionLevel,
chunkSize
) )
override fun getNamespaceURI() = GBCS.GBCS_NAMESPACE_URI override fun getNamespaceURI() = RBCS.RBCS_NAMESPACE_URI
override fun getTypeName() = "inMemoryCacheType" override fun getTypeName() = "inMemoryCacheType"
} }

Some files were not shown because too many files have changed in this diff Show More