From c9390ea51df60f0880cad31e3f1b4770eb1b29f0 Mon Sep 17 00:00:00 2001 From: Walter Oggioni Date: Mon, 23 Feb 2026 10:51:45 +0000 Subject: [PATCH] added experimental redis support --- docker/build.gradle | 1 + rbcs-server-redis/build.gradle | 69 +++ .../src/main/java/module-info.java | 20 + .../woggioni/rbcs/server/redis/Exception.kt | 4 + .../server/redis/RedisCacheConfiguration.kt | 107 +++++ .../rbcs/server/redis/RedisCacheHandler.kt | 438 ++++++++++++++++++ .../rbcs/server/redis/RedisCacheProvider.kt | 108 +++++ .../rbcs/server/redis/client/RedisClient.kt | 204 ++++++++ .../redis/client/RedisResponseHandler.kt | 10 + .../net.woggioni.rbcs.api.CacheProvider | 1 + .../rbcs/server/redis/schema/rbcs-redis.xsd | 52 +++ rbcs-server/build.gradle | 1 + .../rbcs/server/test/ConfigurationTest.kt | 2 + .../rbcs/server/test/valid/rbcs-redis-tls.xml | 53 +++ .../rbcs/server/test/valid/rbcs-redis.xml | 21 + settings.gradle | 1 + 16 files changed, 1092 insertions(+) create mode 100644 rbcs-server-redis/build.gradle create mode 100644 rbcs-server-redis/src/main/java/module-info.java create mode 100644 rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/Exception.kt create mode 100644 rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/RedisCacheConfiguration.kt create mode 100644 rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/RedisCacheHandler.kt create mode 100644 rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/RedisCacheProvider.kt create mode 100644 rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/client/RedisClient.kt create mode 100644 rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/client/RedisResponseHandler.kt create mode 100644 rbcs-server-redis/src/main/resources/META-INF/services/net.woggioni.rbcs.api.CacheProvider create mode 100644 rbcs-server-redis/src/main/resources/net/woggioni/rbcs/server/redis/schema/rbcs-redis.xsd create mode 100644 rbcs-server/src/test/resources/net/woggioni/rbcs/server/test/valid/rbcs-redis-tls.xml create mode 100644 rbcs-server/src/test/resources/net/woggioni/rbcs/server/test/valid/rbcs-redis.xml diff --git a/docker/build.gradle b/docker/build.gradle index 5070a87..f08e848 100644 --- a/docker/build.gradle +++ b/docker/build.gradle @@ -20,6 +20,7 @@ configurations { dependencies { docker project(path: ':rbcs-cli', configuration: 'release') docker project(path: ':rbcs-server-memcache', configuration: 'release') + docker project(path: ':rbcs-server-redis', configuration: 'release') } Provider cleanTaskProvider = tasks.named(BasePlugin.CLEAN_TASK_NAME) {} diff --git a/rbcs-server-redis/build.gradle b/rbcs-server-redis/build.gradle new file mode 100644 index 0000000..8177b48 --- /dev/null +++ b/rbcs-server-redis/build.gradle @@ -0,0 +1,69 @@ +plugins { + id 'java-library' + id 'maven-publish' + alias catalog.plugins.kotlin.jvm +} + +configurations { + bundle { + canBeResolved = true + canBeConsumed = false + visible = false + transitive = false + + resolutionStrategy { + dependencies { + exclude group: 'org.slf4j', module: 'slf4j-api' + exclude group: 'org.jetbrains.kotlin', module: 'kotlin-stdlib' + exclude group: 'org.jetbrains', module: 'annotations' + } + } + } + + release { + transitive = false + canBeConsumed = true + canBeResolved = true + visible = true + } +} + +dependencies { + implementation project(':rbcs-common') + implementation project(':rbcs-api') + implementation catalog.jwo + implementation catalog.slf4j.api + implementation catalog.netty.common + implementation catalog.netty.handler + implementation catalog.netty.codec.redis + + bundle catalog.netty.codec.redis + + testRuntimeOnly catalog.logback.classic +} + +tasks.named(JavaPlugin.TEST_TASK_NAME, Test) { + systemProperty("io.netty.leakDetectionLevel", "PARANOID") +} + +Provider bundleTask = tasks.register("bundle", Tar) { + from(tasks.named(JavaPlugin.JAR_TASK_NAME)) + from(configurations.bundle) + group = BasePlugin.BUILD_GROUP +} + +tasks.named(BasePlugin.ASSEMBLE_TASK_NAME) { + dependsOn(bundleTask) +} + +artifacts { + release(bundleTask) +} + +publishing { + publications { + maven(MavenPublication) { + artifact bundleTask + } + } +} diff --git a/rbcs-server-redis/src/main/java/module-info.java b/rbcs-server-redis/src/main/java/module-info.java new file mode 100644 index 0000000..51e008a --- /dev/null +++ b/rbcs-server-redis/src/main/java/module-info.java @@ -0,0 +1,20 @@ +import net.woggioni.rbcs.api.CacheProvider; + +module net.woggioni.rbcs.server.redis { + requires net.woggioni.rbcs.common; + requires net.woggioni.rbcs.api; + requires net.woggioni.jwo; + requires java.xml; + requires kotlin.stdlib; + requires io.netty.transport; + requires io.netty.codec; + requires io.netty.codec.redis; + requires io.netty.common; + requires io.netty.buffer; + requires io.netty.handler; + requires org.slf4j; + + provides CacheProvider with net.woggioni.rbcs.server.redis.RedisCacheProvider; + + opens net.woggioni.rbcs.server.redis.schema; +} diff --git a/rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/Exception.kt b/rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/Exception.kt new file mode 100644 index 0000000..9e55570 --- /dev/null +++ b/rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/Exception.kt @@ -0,0 +1,4 @@ +package net.woggioni.rbcs.server.redis + +class RedisException(msg: String, cause: Throwable? = null) + : RuntimeException(msg, cause) diff --git a/rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/RedisCacheConfiguration.kt b/rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/RedisCacheConfiguration.kt new file mode 100644 index 0000000..3b25bde --- /dev/null +++ b/rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/RedisCacheConfiguration.kt @@ -0,0 +1,107 @@ +package net.woggioni.rbcs.server.redis + +import io.netty.channel.ChannelFactory +import io.netty.channel.EventLoopGroup +import io.netty.channel.pool.FixedChannelPool +import io.netty.channel.socket.DatagramChannel +import io.netty.channel.socket.SocketChannel + +import java.time.Duration +import java.util.concurrent.CompletableFuture +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicReference + +import net.woggioni.rbcs.api.CacheHandler +import net.woggioni.rbcs.api.CacheHandlerFactory +import net.woggioni.rbcs.api.Configuration +import net.woggioni.rbcs.common.HostAndPort +import net.woggioni.rbcs.common.createLogger +import net.woggioni.rbcs.server.redis.client.RedisClient + +data class RedisCacheConfiguration( + val servers: List, + val maxAge: Duration = Duration.ofDays(1), + val keyPrefix: String? = null, + val digestAlgorithm: String? = null, + val compressionMode: CompressionMode? = null, + val compressionLevel: Int, +) : Configuration.Cache { + + companion object { + private val log = createLogger() + } + + enum class CompressionMode { + /** + * Deflate mode + */ + DEFLATE + } + + data class Server( + val endpoint: HostAndPort, + val connectionTimeoutMillis: Int?, + val maxConnections: Int, + val password: String? = null, + ) + + override fun materialize() = object : CacheHandlerFactory { + + private val connectionPoolMap = ConcurrentHashMap() + + override fun newHandler( + cfg: Configuration, + eventLoop: EventLoopGroup, + socketChannelFactory: ChannelFactory, + datagramChannelFactory: ChannelFactory, + ): CacheHandler { + return RedisCacheHandler( + RedisClient( + this@RedisCacheConfiguration.servers, + cfg.connection.chunkSize, + eventLoop, + socketChannelFactory, + connectionPoolMap + ), + keyPrefix, + digestAlgorithm, + compressionMode != null, + compressionLevel, + cfg.connection.chunkSize, + maxAge + ) + } + + override fun asyncClose() = object : CompletableFuture() { + init { + val failure = AtomicReference(null) + val pools = connectionPoolMap.values.toList() + val npools = pools.size + val finished = AtomicInteger(0) + if (pools.isEmpty()) { + complete(null) + } else { + pools.forEach { pool -> + pool.closeAsync().addListener { + if (!it.isSuccess) { + failure.compareAndSet(null, it.cause()) + } + if (finished.incrementAndGet() == npools) { + when (val ex = failure.get()) { + null -> complete(null) + else -> completeExceptionally(ex) + } + } + } + } + } + } + } + + } + + override fun getNamespaceURI() = "urn:net.woggioni.rbcs.server.redis" + + override fun getTypeName() = "redisCacheType" +} diff --git a/rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/RedisCacheHandler.kt b/rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/RedisCacheHandler.kt new file mode 100644 index 0000000..584dd7f --- /dev/null +++ b/rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/RedisCacheHandler.kt @@ -0,0 +1,438 @@ +package net.woggioni.rbcs.server.redis + +import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufAllocator +import io.netty.buffer.CompositeByteBuf +import io.netty.channel.Channel as NettyChannel +import io.netty.channel.ChannelHandlerContext +import io.netty.handler.codec.redis.ArrayRedisMessage +import io.netty.handler.codec.redis.ErrorRedisMessage +import io.netty.handler.codec.redis.FullBulkStringRedisMessage +import io.netty.handler.codec.redis.RedisMessage +import io.netty.handler.codec.redis.SimpleStringRedisMessage + +import java.io.ByteArrayOutputStream +import java.io.ObjectInputStream +import java.io.ObjectOutputStream +import java.nio.ByteBuffer +import java.nio.channels.Channels +import java.nio.channels.FileChannel +import java.nio.channels.ReadableByteChannel +import java.nio.charset.StandardCharsets +import java.nio.file.Files +import java.nio.file.StandardOpenOption +import java.time.Duration +import java.util.zip.Deflater +import java.util.zip.DeflaterOutputStream +import java.util.zip.InflaterOutputStream + +import net.woggioni.rbcs.api.CacheHandler +import net.woggioni.rbcs.api.CacheValueMetadata +import net.woggioni.rbcs.api.exception.ContentTooLargeException +import net.woggioni.rbcs.api.message.CacheMessage +import net.woggioni.rbcs.api.message.CacheMessage.CacheContent +import net.woggioni.rbcs.api.message.CacheMessage.CacheGetRequest +import net.woggioni.rbcs.api.message.CacheMessage.CachePutRequest +import net.woggioni.rbcs.api.message.CacheMessage.CachePutResponse +import net.woggioni.rbcs.api.message.CacheMessage.CacheValueFoundResponse +import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse +import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent +import net.woggioni.rbcs.common.ByteBufInputStream +import net.woggioni.rbcs.common.ByteBufOutputStream +import net.woggioni.rbcs.common.RBCS.processCacheKey +import net.woggioni.rbcs.common.RBCS.toIntOrNull +import net.woggioni.rbcs.common.createLogger +import net.woggioni.rbcs.common.debug +import net.woggioni.rbcs.common.extractChunk +import net.woggioni.rbcs.common.trace +import net.woggioni.rbcs.common.warn +import net.woggioni.rbcs.server.redis.client.RedisClient +import net.woggioni.rbcs.server.redis.client.RedisResponseHandler + +class RedisCacheHandler( + private val client: RedisClient, + private val keyPrefix: String?, + private val digestAlgorithm: String?, + private val compressionEnabled: Boolean, + private val compressionLevel: Int, + private val chunkSize: Int, + private val maxAge: Duration, +) : CacheHandler() { + companion object { + private val log = createLogger() + } + + private interface InProgressRequest + + private inner class InProgressGetRequest( + val key: String, + private val ctx: ChannelHandlerContext, + ) : InProgressRequest { + private val chunk = ctx.alloc().compositeBuffer() + private val outputStream = ByteBufOutputStream(chunk).let { + if (compressionEnabled) { + InflaterOutputStream(it) + } else { + it + } + } + + fun processResponse(data: ByteBuf) { + if (data.readableBytes() < Int.SIZE_BYTES) { + log.debug(ctx) { + "Received empty or corrupt data from Redis for key $key" + } + sendMessageAndFlush(ctx, CacheValueNotFoundResponse(key)) + data.release() + return + } + + val metadataSize = data.readInt() + if (data.readableBytes() < metadataSize) { + log.debug(ctx) { + "Received incomplete metadata from Redis for key $key" + } + sendMessageAndFlush(ctx, CacheValueNotFoundResponse(key)) + data.release() + return + } + + val metadata = ObjectInputStream(ByteBufInputStream(data)).use { + data.retain() + it.readObject() as CacheValueMetadata + } + data.readerIndex(Int.SIZE_BYTES + metadataSize) + + log.trace(ctx) { + "Sending response from cache" + } + sendMessageAndFlush(ctx, CacheValueFoundResponse(key, metadata)) + + // Decompress and stream the remaining payload + data.readBytes(outputStream, data.readableBytes()) + data.release() + commit() + } + + private fun flush(last: Boolean) { + val toSend = extractChunk(chunk, ctx.alloc()) + val msg = if (last) { + log.trace(ctx) { + "Sending last chunk to client" + } + LastCacheContent(toSend) + } else { + log.trace(ctx) { + "Sending chunk to client" + } + CacheContent(toSend) + } + sendMessageAndFlush(ctx, msg) + } + + fun commit() { + chunk.retain() + outputStream.close() + flush(true) + chunk.release() + } + + fun rollback() { + outputStream.close() + } + } + + private inner class InProgressPutRequest( + private val ch: NettyChannel, + metadata: CacheValueMetadata, + val keyString: String, + val keyBytes: ByteBuf, + private val alloc: ByteBufAllocator, + ) : InProgressRequest { + private var totalSize = 0 + private var tmpFile: FileChannel? = null + private val accumulator = alloc.compositeBuffer() + private val stream = ByteBufOutputStream(accumulator).let { + if (compressionEnabled) { + DeflaterOutputStream(it, Deflater(compressionLevel)) + } else { + it + } + } + + init { + ByteArrayOutputStream().let { baos -> + ObjectOutputStream(baos).use { + it.writeObject(metadata) + } + val serializedBytes = baos.toByteArray() + accumulator.writeInt(serializedBytes.size) + accumulator.writeBytes(serializedBytes) + } + } + + fun write(buf: ByteBuf) { + totalSize += buf.readableBytes() + buf.readBytes(stream, buf.readableBytes()) + tmpFile?.let { + flushToDisk(it, accumulator) + } + if (accumulator.readableBytes() > 0x100000) { + log.debug(ch) { + "Entry is too big, buffering it into a file" + } + val opts = arrayOf( + StandardOpenOption.DELETE_ON_CLOSE, + StandardOpenOption.READ, + StandardOpenOption.WRITE, + StandardOpenOption.TRUNCATE_EXISTING + ) + FileChannel.open(Files.createTempFile("rbcs-server-redis", ".tmp"), *opts).let { fc -> + tmpFile = fc + flushToDisk(fc, accumulator) + } + } + } + + private fun flushToDisk(fc: FileChannel, buf: CompositeByteBuf) { + val chunk = extractChunk(buf, alloc) + fc.write(chunk.nioBuffer()) + chunk.release() + } + + fun commit(): Pair { + keyBytes.release() + accumulator.retain() + stream.close() + val fileChannel = tmpFile + return if (fileChannel != null) { + flushToDisk(fileChannel, accumulator) + accumulator.release() + fileChannel.position(0) + val fileSize = fileChannel.size().toIntOrNull() ?: let { + fileChannel.close() + throw ContentTooLargeException("Request body is too large", null) + } + fileSize to fileChannel + } else { + accumulator.readableBytes() to Channels.newChannel(ByteBufInputStream(accumulator)) + } + } + + fun rollback() { + stream.close() + keyBytes.release() + tmpFile?.close() + } + } + + private var inProgressRequest: InProgressRequest? = null + + override fun channelRead0(ctx: ChannelHandlerContext, msg: CacheMessage) { + when (msg) { + is CacheGetRequest -> handleGetRequest(ctx, msg) + is CachePutRequest -> handlePutRequest(ctx, msg) + is LastCacheContent -> handleLastCacheContent(ctx, msg) + is CacheContent -> handleCacheContent(ctx, msg) + else -> ctx.fireChannelRead(msg) + } + } + + private fun handleGetRequest(ctx: ChannelHandlerContext, msg: CacheGetRequest) { + log.debug(ctx) { + "Fetching ${msg.key} from Redis" + } + val keyBytes = processCacheKey(msg.key, keyPrefix, digestAlgorithm) + val keyString = String(keyBytes, StandardCharsets.UTF_8) + val responseHandler = object : RedisResponseHandler { + override fun responseReceived(response: RedisMessage) { + when (response) { + is FullBulkStringRedisMessage -> { + if (response === FullBulkStringRedisMessage.NULL_INSTANCE || response.content().readableBytes() == 0) { + log.debug(ctx) { + "Cache miss for key ${msg.key} on Redis" + } + sendMessageAndFlush(ctx, CacheValueNotFoundResponse(msg.key)) + } else { + log.debug(ctx) { + "Cache hit for key ${msg.key} on Redis" + } + val getRequest = InProgressGetRequest(msg.key, ctx) + inProgressRequest = getRequest + getRequest.processResponse(response.content()) + inProgressRequest = null + } + } + + is ErrorRedisMessage -> { + this@RedisCacheHandler.exceptionCaught( + ctx, RedisException("Redis error for GET ${msg.key}: ${response.content()}") + ) + } + + else -> { + log.warn(ctx) { + "Unexpected response type from Redis for key ${msg.key}: ${response.javaClass.name}" + } + sendMessageAndFlush(ctx, CacheValueNotFoundResponse(msg.key)) + } + } + } + + override fun exceptionCaught(ex: Throwable) { + this@RedisCacheHandler.exceptionCaught(ctx, ex) + } + } + client.sendCommand(keyBytes, ctx.alloc(), responseHandler).thenAccept { channel -> + log.trace(ctx) { + "Sending GET request for key ${msg.key} to Redis" + } + val cmd = buildRedisCommand(ctx.alloc(), "GET", keyString) + channel.writeAndFlush(cmd) + } + } + + private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) { + val keyBytes = processCacheKey(msg.key, keyPrefix, digestAlgorithm) + val keyBuf = ctx.alloc().buffer().also { + it.writeBytes(keyBytes) + } + inProgressRequest = InProgressPutRequest(ctx.channel(), msg.metadata, msg.key, keyBuf, ctx.alloc()) + } + + private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) { + val request = inProgressRequest + when (request) { + is InProgressPutRequest -> { + log.trace(ctx) { + "Received chunk of ${msg.content().readableBytes()} bytes for Redis" + } + request.write(msg.content()) + } + + is InProgressGetRequest -> { + msg.release() + } + } + } + + private fun handleLastCacheContent(ctx: ChannelHandlerContext, msg: LastCacheContent) { + val request = inProgressRequest + when (request) { + is InProgressPutRequest -> { + inProgressRequest = null + log.trace(ctx) { + "Received last chunk of ${msg.content().readableBytes()} bytes for Redis" + } + request.write(msg.content()) + val keyBytes = processCacheKey(request.keyString, keyPrefix, digestAlgorithm) + val keyString = String(keyBytes, StandardCharsets.UTF_8) + val (payloadSize, payloadSource) = request.commit() + + // Read the entire payload into a single ByteBuf for the SET command + val valueBuf = ctx.alloc().buffer(payloadSize) + payloadSource.use { source -> + val bb = ByteBuffer.allocate(chunkSize) + while (true) { + val read = source.read(bb) + if (read < 0) break + bb.flip() + valueBuf.writeBytes(bb) + bb.clear() + } + } + + val expirySeconds = maxAge.toSeconds().toString() + + val responseHandler = object : RedisResponseHandler { + override fun responseReceived(response: RedisMessage) { + when (response) { + is SimpleStringRedisMessage -> { + log.debug(ctx) { + "Inserted key ${request.keyString} into Redis" + } + sendMessageAndFlush(ctx, CachePutResponse(request.keyString)) + } + + is ErrorRedisMessage -> { + this@RedisCacheHandler.exceptionCaught( + ctx, RedisException("Redis error for SET ${request.keyString}: ${response.content()}") + ) + } + + else -> { + this@RedisCacheHandler.exceptionCaught( + ctx, RedisException("Unexpected response for SET ${request.keyString}: ${response.javaClass.name}") + ) + } + } + } + + override fun exceptionCaught(ex: Throwable) { + this@RedisCacheHandler.exceptionCaught(ctx, ex) + } + } + + // Use a ByteBuf key for server selection + client.sendCommand(keyBytes, ctx.alloc(), responseHandler).thenAccept { channel -> + log.trace(ctx) { + "Sending SET request to Redis" + } + // Build SET key value EX seconds + val cmd = buildRedisSetCommand(ctx.alloc(), keyString, valueBuf, expirySeconds) + channel.writeAndFlush(cmd) + }.whenComplete { _, ex -> + if (ex != null) { + valueBuf.release() + this@RedisCacheHandler.exceptionCaught(ctx, ex) + } + } + } + } + } + + override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { + val request = inProgressRequest + when (request) { + is InProgressPutRequest -> { + inProgressRequest = null + request.rollback() + } + + is InProgressGetRequest -> { + inProgressRequest = null + request.rollback() + } + } + super.exceptionCaught(ctx, cause) + } + + private fun buildRedisCommand(alloc: ByteBufAllocator, vararg args: String): ArrayRedisMessage { + val children = args.map { arg -> + FullBulkStringRedisMessage( + alloc.buffer(arg.toByteArray(StandardCharsets.UTF_8)) + ) + } + return ArrayRedisMessage(children) + } + + private fun ByteBufAllocator.buffer(bytes : ByteArray) = buffer().apply { + writeBytes(bytes) + } + + private fun buildRedisSetCommand( + alloc: ByteBufAllocator, + key: String, + value: ByteBuf, + expirySeconds: String, + ): ArrayRedisMessage { + val children = listOf( + FullBulkStringRedisMessage(alloc.buffer("SET".toByteArray(StandardCharsets.UTF_8))), + FullBulkStringRedisMessage(alloc.buffer(key.toByteArray(StandardCharsets.UTF_8))), + FullBulkStringRedisMessage(value), + FullBulkStringRedisMessage(alloc.buffer("EX".toByteArray(StandardCharsets.UTF_8))), + FullBulkStringRedisMessage(alloc.buffer(expirySeconds.toByteArray(StandardCharsets.UTF_8))), + ) + return ArrayRedisMessage(children) + } +} diff --git a/rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/RedisCacheProvider.kt b/rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/RedisCacheProvider.kt new file mode 100644 index 0000000..bc4a5b8 --- /dev/null +++ b/rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/RedisCacheProvider.kt @@ -0,0 +1,108 @@ +package net.woggioni.rbcs.server.redis + +import java.time.Duration +import java.time.temporal.ChronoUnit + +import net.woggioni.rbcs.api.CacheProvider +import net.woggioni.rbcs.api.exception.ConfigurationException +import net.woggioni.rbcs.common.HostAndPort +import net.woggioni.rbcs.common.RBCS +import net.woggioni.rbcs.common.Xml +import net.woggioni.rbcs.common.Xml.Companion.asIterable +import net.woggioni.rbcs.common.Xml.Companion.renderAttribute + +import org.w3c.dom.Document +import org.w3c.dom.Element + + +class RedisCacheProvider : CacheProvider { + override fun getXmlSchemaLocation() = "jpms://net.woggioni.rbcs.server.redis/net/woggioni/rbcs/server/redis/schema/rbcs-redis.xsd" + + override fun getXmlType() = "redisCacheType" + + override fun getXmlNamespace() = "urn:net.woggioni.rbcs.server.redis" + + val xmlNamespacePrefix: String + get() = "rbcs-redis" + + override fun deserialize(el: Element): RedisCacheConfiguration { + val servers = mutableListOf() + val maxAge = el.renderAttribute("max-age") + ?.let(Duration::parse) + ?: Duration.ofDays(1) + val compressionLevel = el.renderAttribute("compression-level") + ?.let(Integer::decode) + ?: -1 + val compressionMode = el.renderAttribute("compression-mode") + ?.let { + when (it) { + "deflate" -> RedisCacheConfiguration.CompressionMode.DEFLATE + else -> RedisCacheConfiguration.CompressionMode.DEFLATE + } + } + val keyPrefix = el.renderAttribute("key-prefix") + val digestAlgorithm = el.renderAttribute("digest") + for (child in el.asIterable()) { + when (child.nodeName) { + "server" -> { + val host = child.renderAttribute("host") ?: throw ConfigurationException("host attribute is required") + val port = child.renderAttribute("port")?.toInt() ?: throw ConfigurationException("port attribute is required") + val maxConnections = child.renderAttribute("max-connections")?.toInt() ?: 1 + val connectionTimeout = child.renderAttribute("connection-timeout") + ?.let(Duration::parse) + ?.let(Duration::toMillis) + ?.let(Long::toInt) + ?: 10000 + val password = child.renderAttribute("password") + servers.add(RedisCacheConfiguration.Server(HostAndPort(host, port), connectionTimeout, maxConnections, password)) + } + } + } + return RedisCacheConfiguration( + servers, + maxAge, + keyPrefix, + digestAlgorithm, + compressionMode, + compressionLevel + ) + } + + override fun serialize(doc: Document, cache: RedisCacheConfiguration) = cache.run { + val result = doc.createElement("cache") + Xml.of(doc, result) { + attr("xmlns:${xmlNamespacePrefix}", xmlNamespace, namespaceURI = "http://www.w3.org/2000/xmlns/") + attr("xs:type", "${xmlNamespacePrefix}:$xmlType", RBCS.XML_SCHEMA_NAMESPACE_URI) + for (server in servers) { + node("server") { + attr("host", server.endpoint.host) + attr("port", server.endpoint.port.toString()) + server.connectionTimeoutMillis?.let { connectionTimeoutMillis -> + attr("connection-timeout", Duration.of(connectionTimeoutMillis.toLong(), ChronoUnit.MILLIS).toString()) + } + attr("max-connections", server.maxConnections.toString()) + server.password?.let { password -> + attr("password", password) + } + } + + } + attr("max-age", maxAge.toString()) + keyPrefix?.let { + attr("key-prefix", it) + } + digestAlgorithm?.let { digestAlgorithm -> + attr("digest", digestAlgorithm) + } + compressionMode?.let { compressionMode -> + attr( + "compression-mode", when (compressionMode) { + RedisCacheConfiguration.CompressionMode.DEFLATE -> "deflate" + } + ) + } + attr("compression-level", compressionLevel.toString()) + } + result + } +} diff --git a/rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/client/RedisClient.kt b/rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/client/RedisClient.kt new file mode 100644 index 0000000..d683377 --- /dev/null +++ b/rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/client/RedisClient.kt @@ -0,0 +1,204 @@ +package net.woggioni.rbcs.server.redis.client + +import io.netty.bootstrap.Bootstrap +import io.netty.buffer.ByteBufAllocator +import io.netty.buffer.Unpooled +import io.netty.channel.Channel +import io.netty.channel.ChannelFactory +import io.netty.channel.ChannelFutureListener +import io.netty.channel.ChannelHandlerContext +import io.netty.channel.ChannelOption +import io.netty.channel.ChannelPipeline +import io.netty.channel.EventLoopGroup +import io.netty.channel.SimpleChannelInboundHandler +import io.netty.channel.pool.AbstractChannelPoolHandler +import io.netty.channel.pool.FixedChannelPool +import io.netty.channel.socket.SocketChannel +import io.netty.handler.codec.redis.ArrayRedisMessage +import io.netty.handler.codec.redis.ErrorRedisMessage +import io.netty.handler.codec.redis.FullBulkStringRedisMessage +import io.netty.handler.codec.redis.RedisArrayAggregator +import io.netty.handler.codec.redis.RedisBulkStringAggregator +import io.netty.handler.codec.redis.RedisDecoder +import io.netty.handler.codec.redis.RedisEncoder +import io.netty.handler.codec.redis.RedisMessage +import io.netty.util.concurrent.Future as NettyFuture +import io.netty.util.concurrent.GenericFutureListener + +import java.io.IOException +import java.net.InetSocketAddress +import java.nio.charset.StandardCharsets +import java.util.concurrent.CompletableFuture +import java.util.concurrent.ConcurrentHashMap + +import net.woggioni.rbcs.common.HostAndPort +import net.woggioni.rbcs.common.createLogger +import net.woggioni.rbcs.common.trace +import net.woggioni.rbcs.server.redis.RedisCacheConfiguration +import net.woggioni.rbcs.server.redis.RedisCacheHandler + + +class RedisClient( + private val servers: List, + private val chunkSize: Int, + private val group: EventLoopGroup, + private val channelFactory: ChannelFactory, + private val connectionPool: ConcurrentHashMap, +) : AutoCloseable { + + private companion object { + private val log = createLogger() + } + + private fun newConnectionPool(server: RedisCacheConfiguration.Server): FixedChannelPool { + val bootstrap = Bootstrap().apply { + group(group) + channelFactory(channelFactory) + option(ChannelOption.SO_KEEPALIVE, true) + remoteAddress(InetSocketAddress(server.endpoint.host, server.endpoint.port)) + server.connectionTimeoutMillis?.let { + option(ChannelOption.CONNECT_TIMEOUT_MILLIS, it) + } + } + val channelPoolHandler = object : AbstractChannelPoolHandler() { + + override fun channelCreated(ch: Channel) { + val pipeline: ChannelPipeline = ch.pipeline() + pipeline.addLast(RedisEncoder()) + pipeline.addLast(RedisDecoder()) + pipeline.addLast(RedisBulkStringAggregator()) + pipeline.addLast(RedisArrayAggregator()) + server.password?.let { password -> + // Send AUTH command synchronously on new connections + val authCmd = buildCommand("AUTH", password) + ch.writeAndFlush(authCmd).addListener(ChannelFutureListener { future -> + if (!future.isSuccess) { + ch.close() + } + }) + // Install a one-shot handler to consume the AUTH response + pipeline.addLast(object : SimpleChannelInboundHandler() { + override fun channelRead0(ctx: ChannelHandlerContext, msg: RedisMessage) { + when (msg) { + is ErrorRedisMessage -> { + ctx.close() + } + else -> { + // AUTH succeeded, remove this one-shot handler + ctx.pipeline().remove(this) + } + } + } + }) + } + } + } + return FixedChannelPool(bootstrap, channelPoolHandler, server.maxConnections) + } + + private fun buildCommand(vararg args: String): ArrayRedisMessage { + val children = args.map { arg -> + FullBulkStringRedisMessage( + Unpooled.wrappedBuffer(arg.toByteArray(StandardCharsets.UTF_8)) + ) + } + return ArrayRedisMessage(children) + } + + fun sendCommand( + key: ByteArray, + alloc: ByteBufAllocator, + responseHandler: RedisResponseHandler, + ): CompletableFuture { + val server = if (servers.size > 1) { + val keyBuffer = alloc.buffer(key.size) + keyBuffer.writeBytes(key) + var checksum = 0 + while (keyBuffer.readableBytes() > 4) { + val byte = keyBuffer.readInt() + checksum = checksum xor byte + } + while (keyBuffer.readableBytes() > 0) { + val byte = keyBuffer.readByte() + checksum = checksum xor byte.toInt() + } + keyBuffer.release() + servers[Math.floorMod(checksum, servers.size)] + } else { + servers.first() + } + + val response = CompletableFuture() + val pool = connectionPool.computeIfAbsent(server.endpoint) { + newConnectionPool(server) + } + pool.acquire().addListener(object : GenericFutureListener> { + override fun operationComplete(channelFuture: NettyFuture) { + if (channelFuture.isSuccess) { + val channel = channelFuture.now + var connectionClosedByTheRemoteServer = true + val closeCallback = { + if (connectionClosedByTheRemoteServer) { + val ex = IOException("The Redis server closed the connection") + val completed = response.completeExceptionally(ex) + if (!completed) responseHandler.exceptionCaught(ex) + } + } + val closeListener = ChannelFutureListener { + closeCallback() + } + channel.closeFuture().addListener(closeListener) + val pipeline = channel.pipeline() + val handler = object : SimpleChannelInboundHandler(false) { + + override fun handlerAdded(ctx: ChannelHandlerContext) { + channel.closeFuture().removeListener(closeListener) + } + + override fun channelRead0( + ctx: ChannelHandlerContext, + msg: RedisMessage, + ) { + pipeline.remove(this) + pool.release(channel) + log.trace(channel) { + "Channel released" + } + responseHandler.responseReceived(msg) + } + + override fun channelInactive(ctx: ChannelHandlerContext) { + closeCallback() + ctx.fireChannelInactive() + } + + override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { + connectionClosedByTheRemoteServer = false + pipeline.remove(this) + ctx.close() + pool.release(channel) + log.trace(channel) { + "Channel released after exception" + } + responseHandler.exceptionCaught(cause) + } + } + + channel.pipeline().addLast(handler) + response.complete(channel) + } else { + response.completeExceptionally(channelFuture.cause()) + } + } + }) + return response + } + + fun shutDown(): NettyFuture<*> { + return group.shutdownGracefully() + } + + override fun close() { + shutDown().sync() + } +} diff --git a/rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/client/RedisResponseHandler.kt b/rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/client/RedisResponseHandler.kt new file mode 100644 index 0000000..0aa6fb0 --- /dev/null +++ b/rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/client/RedisResponseHandler.kt @@ -0,0 +1,10 @@ +package net.woggioni.rbcs.server.redis.client + +import io.netty.handler.codec.redis.RedisMessage + +interface RedisResponseHandler { + + fun responseReceived(response: RedisMessage) + + fun exceptionCaught(ex: Throwable) +} diff --git a/rbcs-server-redis/src/main/resources/META-INF/services/net.woggioni.rbcs.api.CacheProvider b/rbcs-server-redis/src/main/resources/META-INF/services/net.woggioni.rbcs.api.CacheProvider new file mode 100644 index 0000000..20dc9e6 --- /dev/null +++ b/rbcs-server-redis/src/main/resources/META-INF/services/net.woggioni.rbcs.api.CacheProvider @@ -0,0 +1 @@ +net.woggioni.rbcs.server.redis.RedisCacheProvider diff --git a/rbcs-server-redis/src/main/resources/net/woggioni/rbcs/server/redis/schema/rbcs-redis.xsd b/rbcs-server-redis/src/main/resources/net/woggioni/rbcs/server/redis/schema/rbcs-redis.xsd new file mode 100644 index 0000000..3ee4dc8 --- /dev/null +++ b/rbcs-server-redis/src/main/resources/net/woggioni/rbcs/server/redis/schema/rbcs-redis.xsd @@ -0,0 +1,52 @@ + + + + + + + + + + + + + + Password for Redis AUTH command, used when the Redis server requires authentication + + + + + + + + + + + + + + + + + Prepend this string to all the keys inserted in Redis, + useful in case the caching backend is shared with other applications + + + + + + + + + + + + + + + + + diff --git a/rbcs-server/build.gradle b/rbcs-server/build.gradle index d2ea421..91e5eb9 100644 --- a/rbcs-server/build.gradle +++ b/rbcs-server/build.gradle @@ -24,6 +24,7 @@ dependencies { testImplementation catalog.bcpkix.jdk18on testRuntimeOnly project(":rbcs-server-memcache") + testRuntimeOnly project(":rbcs-server-redis") } test { diff --git a/rbcs-server/src/test/kotlin/net/woggioni/rbcs/server/test/ConfigurationTest.kt b/rbcs-server/src/test/kotlin/net/woggioni/rbcs/server/test/ConfigurationTest.kt index 4c6d366..17ab632 100644 --- a/rbcs-server/src/test/kotlin/net/woggioni/rbcs/server/test/ConfigurationTest.kt +++ b/rbcs-server/src/test/kotlin/net/woggioni/rbcs/server/test/ConfigurationTest.kt @@ -21,6 +21,8 @@ class ConfigurationTest { "classpath:net/woggioni/rbcs/server/test/valid/rbcs-memcached.xml", "classpath:net/woggioni/rbcs/server/test/valid/rbcs-tls.xml", "classpath:net/woggioni/rbcs/server/test/valid/rbcs-memcached-tls.xml", + "classpath:net/woggioni/rbcs/server/test/valid/rbcs-redis.xml", + "classpath:net/woggioni/rbcs/server/test/valid/rbcs-redis-tls.xml", ] ) @ParameterizedTest diff --git a/rbcs-server/src/test/resources/net/woggioni/rbcs/server/test/valid/rbcs-redis-tls.xml b/rbcs-server/src/test/resources/net/woggioni/rbcs/server/test/valid/rbcs-redis-tls.xml new file mode 100644 index 0000000..7cd5c41 --- /dev/null +++ b/rbcs-server/src/test/resources/net/woggioni/rbcs/server/test/valid/rbcs-redis-tls.xml @@ -0,0 +1,53 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/rbcs-server/src/test/resources/net/woggioni/rbcs/server/test/valid/rbcs-redis.xml b/rbcs-server/src/test/resources/net/woggioni/rbcs/server/test/valid/rbcs-redis.xml new file mode 100644 index 0000000..eb7995d --- /dev/null +++ b/rbcs-server/src/test/resources/net/woggioni/rbcs/server/test/valid/rbcs-redis.xml @@ -0,0 +1,21 @@ + + + + + + + + + + + + + diff --git a/settings.gradle b/settings.gradle index 1d41b7c..7186d54 100644 --- a/settings.gradle +++ b/settings.gradle @@ -28,6 +28,7 @@ rootProject.name = 'rbcs' include 'rbcs-api' include 'rbcs-common' include 'rbcs-server-memcache' +include 'rbcs-server-redis' include 'rbcs-cli' include 'rbcs-client' include 'rbcs-server'