Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
6a2e53bc00
|
|||
|
c9390ea51d
|
@@ -46,6 +46,18 @@ jobs:
|
||||
tags: |
|
||||
gitea.woggioni.net/woggioni/rbcs:memcache-dev
|
||||
target: release-memcache
|
||||
-
|
||||
name: Build rbcs redis Docker image
|
||||
uses: docker/build-push-action@v5.3.0
|
||||
with:
|
||||
builder: "multiplatform-builder"
|
||||
context: "docker/build/docker"
|
||||
platforms: linux/amd64,linux/arm64
|
||||
push: true
|
||||
pull: true
|
||||
tags: |
|
||||
gitea.woggioni.net/woggioni/rbcs:redis-dev
|
||||
target: release-redis
|
||||
-
|
||||
name: Build rbcs native Docker image
|
||||
uses: docker/build-push-action@v5.3.0
|
||||
|
||||
@@ -50,6 +50,21 @@ jobs:
|
||||
gitea.woggioni.net/woggioni/rbcs:memcache
|
||||
gitea.woggioni.net/woggioni/rbcs:memcache-${{ steps.retrieve-version.outputs.VERSION }}
|
||||
target: release-memcache
|
||||
-
|
||||
name: Build rbcs redis Docker image
|
||||
uses: docker/build-push-action@v5.3.0
|
||||
with:
|
||||
builder: "multiplatform-builder"
|
||||
context: "docker/build/docker"
|
||||
platforms: linux/amd64,linux/arm64
|
||||
push: true
|
||||
pull: true
|
||||
tags: |
|
||||
gitea.woggioni.net/woggioni/rbcs:latest
|
||||
gitea.woggioni.net/woggioni/rbcs:${{ steps.retrieve-version.outputs.VERSION }}
|
||||
gitea.woggioni.net/woggioni/rbcs:redis
|
||||
gitea.woggioni.net/woggioni/rbcs:redis-${{ steps.retrieve-version.outputs.VERSION }}
|
||||
target: release-redis
|
||||
-
|
||||
name: Build rbcs native Docker image
|
||||
uses: docker/build-push-action@v5.3.0
|
||||
|
||||
@@ -16,6 +16,15 @@ WORKDIR /home/luser
|
||||
ADD logback.xml .
|
||||
ENTRYPOINT ["java", "-Dlogback.configurationFile=logback.xml", "-XX:MaxRAMPercentage=70", "-XX:GCTimeRatio=24", "-XX:+UseZGC", "-XX:+ZGenerational", "-jar", "/home/luser/rbcs.jar"]
|
||||
|
||||
FROM base-release AS release-redis
|
||||
ADD --chown=luser:luser rbcs-cli-envelope-*.jar rbcs.jar
|
||||
RUN mkdir plugins
|
||||
WORKDIR /home/luser/plugins
|
||||
RUN --mount=type=bind,source=.,target=/build/distributions tar -xf /build/distributions/rbcs-server-redis*.tar
|
||||
WORKDIR /home/luser
|
||||
ADD logback.xml .
|
||||
ENTRYPOINT ["java", "-Dlogback.configurationFile=logback.xml", "-XX:MaxRAMPercentage=70", "-XX:GCTimeRatio=24", "-XX:+UseZGC", "-XX:+ZGenerational", "-jar", "/home/luser/rbcs.jar"]
|
||||
|
||||
FROM busybox:musl AS base-native
|
||||
RUN mkdir -p /var/lib/rbcs /etc/rbcs
|
||||
RUN adduser -D -u 1000 rbcs -h /var/lib/rbcs
|
||||
|
||||
@@ -20,6 +20,7 @@ configurations {
|
||||
dependencies {
|
||||
docker project(path: ':rbcs-cli', configuration: 'release')
|
||||
docker project(path: ':rbcs-server-memcache', configuration: 'release')
|
||||
docker project(path: ':rbcs-server-redis', configuration: 'release')
|
||||
}
|
||||
|
||||
Provider<Task> cleanTaskProvider = tasks.named(BasePlugin.CLEAN_TASK_NAME) {}
|
||||
|
||||
@@ -50,6 +50,7 @@ configurations {
|
||||
dependencies {
|
||||
configureNativeImageImplementation project
|
||||
configureNativeImageImplementation project(':rbcs-server-memcache')
|
||||
configureNativeImageImplementation project(':rbcs-server-redis')
|
||||
|
||||
implementation catalog.jwo
|
||||
implementation catalog.slf4j.api
|
||||
@@ -62,6 +63,7 @@ dependencies {
|
||||
runtimeOnly catalog.logback.classic
|
||||
// runtimeOnly catalog.slf4j.simple
|
||||
nativeImage project(':rbcs-server-memcache')
|
||||
nativeImage project(':rbcs-server-redis')
|
||||
|
||||
}
|
||||
|
||||
@@ -138,6 +140,7 @@ Provider<JlinkTask> jlinkTaskProvider = tasks.named(JlinkPlugin.JLINK_TASK_NAME,
|
||||
)
|
||||
additionalModules = [
|
||||
'net.woggioni.rbcs.server.memcache',
|
||||
'net.woggioni.rbcs.server.redis',
|
||||
'ch.qos.logback.classic',
|
||||
'jdk.crypto.ec'
|
||||
]
|
||||
|
||||
53
rbcs-cli/conf/rbcs-server-redis.xml
Normal file
53
rbcs-cli/conf/rbcs-server-redis.xml
Normal file
@@ -0,0 +1,53 @@
|
||||
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
|
||||
<rbcs:server xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xmlns:rbcs="urn:net.woggioni.rbcs.server"
|
||||
xmlns:rbcs-redis="urn:net.woggioni.rbcs.server.redis"
|
||||
xs:schemaLocation="urn:net.woggioni.rbcs.server.redis jpms://net.woggioni.rbcs.server.redis/net/woggioni/rbcs/server/redis/schema/rbcs-redis.xsd urn:net.woggioni.rbcs.server.memcache jpms://net.woggioni.rbcs.server.memcache/net/woggioni/rbcs/server/memcache/schema/rbcs-memcache.xsd urn:net.woggioni.rbcs.server jpms://net.woggioni.rbcs.server/net/woggioni/rbcs/server/schema/rbcs-server.xsd"
|
||||
>
|
||||
<bind host="127.0.0.1" port="8080" incoming-connections-backlog-size="1024"/>
|
||||
<connection
|
||||
max-request-size="67108864"
|
||||
idle-timeout="PT10S"
|
||||
read-idle-timeout="PT20S"
|
||||
write-idle-timeout="PT20S"/>
|
||||
<event-executor use-virtual-threads="true"/>
|
||||
<cache xs:type="rbcs-redis:redisCacheType" max-age="P7D" digest="MD5">
|
||||
<server host="127.0.0.1" port="6379" max-connections="256"/>
|
||||
</cache>
|
||||
<!--cache xs:type="rbcs:inMemoryCacheType" max-age="P7D" enable-compression="false" max-size="0x10000000" /-->
|
||||
<!--cache xs:type="rbcs:fileSystemCacheType" max-age="P7D" enable-compression="false" /-->
|
||||
<authorization>
|
||||
<users>
|
||||
<user name="woggioni" password="II+qeNLft2pZ/JVNo9F7jpjM/BqEcfsJW27NZ6dPVs8tAwHbxrJppKYsbL7J/SMl">
|
||||
<quota calls="100" period="PT1S"/>
|
||||
</user>
|
||||
<user name="gitea" password="v6T9+q6/VNpvLknji3ixPiyz2YZCQMXj2FN7hvzbfc2Ig+IzAHO0iiBCH9oWuBDq"/>
|
||||
<anonymous>
|
||||
<quota calls="10" period="PT60S" initial-available-calls="10" max-available-calls="10"/>
|
||||
</anonymous>
|
||||
</users>
|
||||
<groups>
|
||||
<group name="readers">
|
||||
<users>
|
||||
<anonymous/>
|
||||
</users>
|
||||
<roles>
|
||||
<reader/>
|
||||
</roles>
|
||||
</group>
|
||||
<group name="writers">
|
||||
<users>
|
||||
<user ref="woggioni"/>
|
||||
<user ref="gitea"/>
|
||||
</users>
|
||||
<roles>
|
||||
<reader/>
|
||||
<writer/>
|
||||
</roles>
|
||||
</group>
|
||||
</groups>
|
||||
</authorization>
|
||||
<authentication>
|
||||
<none/>
|
||||
</authentication>
|
||||
</rbcs:server>
|
||||
@@ -27,16 +27,27 @@ import net.woggioni.rbcs.server.cache.FileSystemCacheConfiguration
|
||||
import net.woggioni.rbcs.server.cache.InMemoryCacheConfiguration
|
||||
import net.woggioni.rbcs.server.configuration.Parser
|
||||
import net.woggioni.rbcs.server.memcache.MemcacheCacheConfiguration
|
||||
import net.woggioni.rbcs.server.redis.RedisCacheConfiguration
|
||||
|
||||
object GraalNativeImageConfiguration {
|
||||
@JvmStatic
|
||||
fun main(vararg args : String) {
|
||||
|
||||
val serverURL = URI.create("file:conf/rbcs-server.xml").toURL()
|
||||
val serverDoc = serverURL.openStream().use {
|
||||
Xml.parseXml(serverURL, it)
|
||||
let {
|
||||
val serverURL = URI.create("file:conf/rbcs-server.xml").toURL()
|
||||
val serverDoc = serverURL.openStream().use {
|
||||
Xml.parseXml(serverURL, it)
|
||||
}
|
||||
Parser.parse(serverDoc)
|
||||
}
|
||||
|
||||
let {
|
||||
val serverURL = URI.create("file:conf/rbcs-server-redis.xml").toURL()
|
||||
val serverDoc = serverURL.openStream().use {
|
||||
Xml.parseXml(serverURL, it)
|
||||
}
|
||||
Parser.parse(serverDoc)
|
||||
}
|
||||
Parser.parse(serverDoc)
|
||||
|
||||
val url = URI.create("file:conf/rbcs-client.xml").toURL()
|
||||
val clientDoc = url.openStream().use {
|
||||
@@ -90,6 +101,18 @@ object GraalNativeImageConfiguration {
|
||||
"MD5",
|
||||
null,
|
||||
1,
|
||||
),
|
||||
RedisCacheConfiguration(
|
||||
listOf(RedisCacheConfiguration.Server(
|
||||
HostAndPort("127.0.0.1", 6379),
|
||||
1000,
|
||||
4)
|
||||
),
|
||||
Duration.ofSeconds(60),
|
||||
"someCustomPrefix",
|
||||
"MD5",
|
||||
null,
|
||||
1,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
69
rbcs-server-redis/build.gradle
Normal file
69
rbcs-server-redis/build.gradle
Normal file
@@ -0,0 +1,69 @@
|
||||
plugins {
|
||||
id 'java-library'
|
||||
id 'maven-publish'
|
||||
alias catalog.plugins.kotlin.jvm
|
||||
}
|
||||
|
||||
configurations {
|
||||
bundle {
|
||||
canBeResolved = true
|
||||
canBeConsumed = false
|
||||
visible = false
|
||||
transitive = false
|
||||
|
||||
resolutionStrategy {
|
||||
dependencies {
|
||||
exclude group: 'org.slf4j', module: 'slf4j-api'
|
||||
exclude group: 'org.jetbrains.kotlin', module: 'kotlin-stdlib'
|
||||
exclude group: 'org.jetbrains', module: 'annotations'
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
release {
|
||||
transitive = false
|
||||
canBeConsumed = true
|
||||
canBeResolved = true
|
||||
visible = true
|
||||
}
|
||||
}
|
||||
|
||||
dependencies {
|
||||
implementation project(':rbcs-common')
|
||||
implementation project(':rbcs-api')
|
||||
implementation catalog.jwo
|
||||
implementation catalog.slf4j.api
|
||||
implementation catalog.netty.common
|
||||
implementation catalog.netty.handler
|
||||
implementation catalog.netty.codec.redis
|
||||
|
||||
bundle catalog.netty.codec.redis
|
||||
|
||||
testRuntimeOnly catalog.logback.classic
|
||||
}
|
||||
|
||||
tasks.named(JavaPlugin.TEST_TASK_NAME, Test) {
|
||||
systemProperty("io.netty.leakDetectionLevel", "PARANOID")
|
||||
}
|
||||
|
||||
Provider<Tar> bundleTask = tasks.register("bundle", Tar) {
|
||||
from(tasks.named(JavaPlugin.JAR_TASK_NAME))
|
||||
from(configurations.bundle)
|
||||
group = BasePlugin.BUILD_GROUP
|
||||
}
|
||||
|
||||
tasks.named(BasePlugin.ASSEMBLE_TASK_NAME) {
|
||||
dependsOn(bundleTask)
|
||||
}
|
||||
|
||||
artifacts {
|
||||
release(bundleTask)
|
||||
}
|
||||
|
||||
publishing {
|
||||
publications {
|
||||
maven(MavenPublication) {
|
||||
artifact bundleTask
|
||||
}
|
||||
}
|
||||
}
|
||||
20
rbcs-server-redis/src/main/java/module-info.java
Normal file
20
rbcs-server-redis/src/main/java/module-info.java
Normal file
@@ -0,0 +1,20 @@
|
||||
import net.woggioni.rbcs.api.CacheProvider;
|
||||
|
||||
module net.woggioni.rbcs.server.redis {
|
||||
requires net.woggioni.rbcs.common;
|
||||
requires net.woggioni.rbcs.api;
|
||||
requires net.woggioni.jwo;
|
||||
requires java.xml;
|
||||
requires kotlin.stdlib;
|
||||
requires io.netty.transport;
|
||||
requires io.netty.codec;
|
||||
requires io.netty.codec.redis;
|
||||
requires io.netty.common;
|
||||
requires io.netty.buffer;
|
||||
requires io.netty.handler;
|
||||
requires org.slf4j;
|
||||
|
||||
provides CacheProvider with net.woggioni.rbcs.server.redis.RedisCacheProvider;
|
||||
|
||||
opens net.woggioni.rbcs.server.redis.schema;
|
||||
}
|
||||
@@ -0,0 +1,4 @@
|
||||
package net.woggioni.rbcs.server.redis
|
||||
|
||||
class RedisException(msg: String, cause: Throwable? = null)
|
||||
: RuntimeException(msg, cause)
|
||||
@@ -0,0 +1,107 @@
|
||||
package net.woggioni.rbcs.server.redis
|
||||
|
||||
import io.netty.channel.ChannelFactory
|
||||
import io.netty.channel.EventLoopGroup
|
||||
import io.netty.channel.pool.FixedChannelPool
|
||||
import io.netty.channel.socket.DatagramChannel
|
||||
import io.netty.channel.socket.SocketChannel
|
||||
|
||||
import java.time.Duration
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
import net.woggioni.rbcs.api.CacheHandler
|
||||
import net.woggioni.rbcs.api.CacheHandlerFactory
|
||||
import net.woggioni.rbcs.api.Configuration
|
||||
import net.woggioni.rbcs.common.HostAndPort
|
||||
import net.woggioni.rbcs.common.createLogger
|
||||
import net.woggioni.rbcs.server.redis.client.RedisClient
|
||||
|
||||
data class RedisCacheConfiguration(
|
||||
val servers: List<Server>,
|
||||
val maxAge: Duration = Duration.ofDays(1),
|
||||
val keyPrefix: String? = null,
|
||||
val digestAlgorithm: String? = null,
|
||||
val compressionMode: CompressionMode? = null,
|
||||
val compressionLevel: Int,
|
||||
) : Configuration.Cache {
|
||||
|
||||
companion object {
|
||||
private val log = createLogger<RedisCacheConfiguration>()
|
||||
}
|
||||
|
||||
enum class CompressionMode {
|
||||
/**
|
||||
* Deflate mode
|
||||
*/
|
||||
DEFLATE
|
||||
}
|
||||
|
||||
data class Server(
|
||||
val endpoint: HostAndPort,
|
||||
val connectionTimeoutMillis: Int?,
|
||||
val maxConnections: Int,
|
||||
val password: String? = null,
|
||||
)
|
||||
|
||||
override fun materialize() = object : CacheHandlerFactory {
|
||||
|
||||
private val connectionPoolMap = ConcurrentHashMap<HostAndPort, FixedChannelPool>()
|
||||
|
||||
override fun newHandler(
|
||||
cfg: Configuration,
|
||||
eventLoop: EventLoopGroup,
|
||||
socketChannelFactory: ChannelFactory<SocketChannel>,
|
||||
datagramChannelFactory: ChannelFactory<DatagramChannel>,
|
||||
): CacheHandler {
|
||||
return RedisCacheHandler(
|
||||
RedisClient(
|
||||
this@RedisCacheConfiguration.servers,
|
||||
cfg.connection.chunkSize,
|
||||
eventLoop,
|
||||
socketChannelFactory,
|
||||
connectionPoolMap
|
||||
),
|
||||
keyPrefix,
|
||||
digestAlgorithm,
|
||||
compressionMode != null,
|
||||
compressionLevel,
|
||||
cfg.connection.chunkSize,
|
||||
maxAge
|
||||
)
|
||||
}
|
||||
|
||||
override fun asyncClose() = object : CompletableFuture<Void>() {
|
||||
init {
|
||||
val failure = AtomicReference<Throwable>(null)
|
||||
val pools = connectionPoolMap.values.toList()
|
||||
val npools = pools.size
|
||||
val finished = AtomicInteger(0)
|
||||
if (pools.isEmpty()) {
|
||||
complete(null)
|
||||
} else {
|
||||
pools.forEach { pool ->
|
||||
pool.closeAsync().addListener {
|
||||
if (!it.isSuccess) {
|
||||
failure.compareAndSet(null, it.cause())
|
||||
}
|
||||
if (finished.incrementAndGet() == npools) {
|
||||
when (val ex = failure.get()) {
|
||||
null -> complete(null)
|
||||
else -> completeExceptionally(ex)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
override fun getNamespaceURI() = "urn:net.woggioni.rbcs.server.redis"
|
||||
|
||||
override fun getTypeName() = "redisCacheType"
|
||||
}
|
||||
@@ -0,0 +1,438 @@
|
||||
package net.woggioni.rbcs.server.redis
|
||||
|
||||
import io.netty.buffer.ByteBuf
|
||||
import io.netty.buffer.ByteBufAllocator
|
||||
import io.netty.buffer.CompositeByteBuf
|
||||
import io.netty.channel.Channel as NettyChannel
|
||||
import io.netty.channel.ChannelHandlerContext
|
||||
import io.netty.handler.codec.redis.ArrayRedisMessage
|
||||
import io.netty.handler.codec.redis.ErrorRedisMessage
|
||||
import io.netty.handler.codec.redis.FullBulkStringRedisMessage
|
||||
import io.netty.handler.codec.redis.RedisMessage
|
||||
import io.netty.handler.codec.redis.SimpleStringRedisMessage
|
||||
|
||||
import java.io.ByteArrayOutputStream
|
||||
import java.io.ObjectInputStream
|
||||
import java.io.ObjectOutputStream
|
||||
import java.nio.ByteBuffer
|
||||
import java.nio.channels.Channels
|
||||
import java.nio.channels.FileChannel
|
||||
import java.nio.channels.ReadableByteChannel
|
||||
import java.nio.charset.StandardCharsets
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.StandardOpenOption
|
||||
import java.time.Duration
|
||||
import java.util.zip.Deflater
|
||||
import java.util.zip.DeflaterOutputStream
|
||||
import java.util.zip.InflaterOutputStream
|
||||
|
||||
import net.woggioni.rbcs.api.CacheHandler
|
||||
import net.woggioni.rbcs.api.CacheValueMetadata
|
||||
import net.woggioni.rbcs.api.exception.ContentTooLargeException
|
||||
import net.woggioni.rbcs.api.message.CacheMessage
|
||||
import net.woggioni.rbcs.api.message.CacheMessage.CacheContent
|
||||
import net.woggioni.rbcs.api.message.CacheMessage.CacheGetRequest
|
||||
import net.woggioni.rbcs.api.message.CacheMessage.CachePutRequest
|
||||
import net.woggioni.rbcs.api.message.CacheMessage.CachePutResponse
|
||||
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueFoundResponse
|
||||
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse
|
||||
import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent
|
||||
import net.woggioni.rbcs.common.ByteBufInputStream
|
||||
import net.woggioni.rbcs.common.ByteBufOutputStream
|
||||
import net.woggioni.rbcs.common.RBCS.processCacheKey
|
||||
import net.woggioni.rbcs.common.RBCS.toIntOrNull
|
||||
import net.woggioni.rbcs.common.createLogger
|
||||
import net.woggioni.rbcs.common.debug
|
||||
import net.woggioni.rbcs.common.extractChunk
|
||||
import net.woggioni.rbcs.common.trace
|
||||
import net.woggioni.rbcs.common.warn
|
||||
import net.woggioni.rbcs.server.redis.client.RedisClient
|
||||
import net.woggioni.rbcs.server.redis.client.RedisResponseHandler
|
||||
|
||||
class RedisCacheHandler(
|
||||
private val client: RedisClient,
|
||||
private val keyPrefix: String?,
|
||||
private val digestAlgorithm: String?,
|
||||
private val compressionEnabled: Boolean,
|
||||
private val compressionLevel: Int,
|
||||
private val chunkSize: Int,
|
||||
private val maxAge: Duration,
|
||||
) : CacheHandler() {
|
||||
companion object {
|
||||
private val log = createLogger<RedisCacheHandler>()
|
||||
}
|
||||
|
||||
private interface InProgressRequest
|
||||
|
||||
private inner class InProgressGetRequest(
|
||||
val key: String,
|
||||
private val ctx: ChannelHandlerContext,
|
||||
) : InProgressRequest {
|
||||
private val chunk = ctx.alloc().compositeBuffer()
|
||||
private val outputStream = ByteBufOutputStream(chunk).let {
|
||||
if (compressionEnabled) {
|
||||
InflaterOutputStream(it)
|
||||
} else {
|
||||
it
|
||||
}
|
||||
}
|
||||
|
||||
fun processResponse(data: ByteBuf) {
|
||||
if (data.readableBytes() < Int.SIZE_BYTES) {
|
||||
log.debug(ctx) {
|
||||
"Received empty or corrupt data from Redis for key $key"
|
||||
}
|
||||
sendMessageAndFlush(ctx, CacheValueNotFoundResponse(key))
|
||||
data.release()
|
||||
return
|
||||
}
|
||||
|
||||
val metadataSize = data.readInt()
|
||||
if (data.readableBytes() < metadataSize) {
|
||||
log.debug(ctx) {
|
||||
"Received incomplete metadata from Redis for key $key"
|
||||
}
|
||||
sendMessageAndFlush(ctx, CacheValueNotFoundResponse(key))
|
||||
data.release()
|
||||
return
|
||||
}
|
||||
|
||||
val metadata = ObjectInputStream(ByteBufInputStream(data)).use {
|
||||
data.retain()
|
||||
it.readObject() as CacheValueMetadata
|
||||
}
|
||||
data.readerIndex(Int.SIZE_BYTES + metadataSize)
|
||||
|
||||
log.trace(ctx) {
|
||||
"Sending response from cache"
|
||||
}
|
||||
sendMessageAndFlush(ctx, CacheValueFoundResponse(key, metadata))
|
||||
|
||||
// Decompress and stream the remaining payload
|
||||
data.readBytes(outputStream, data.readableBytes())
|
||||
data.release()
|
||||
commit()
|
||||
}
|
||||
|
||||
private fun flush(last: Boolean) {
|
||||
val toSend = extractChunk(chunk, ctx.alloc())
|
||||
val msg = if (last) {
|
||||
log.trace(ctx) {
|
||||
"Sending last chunk to client"
|
||||
}
|
||||
LastCacheContent(toSend)
|
||||
} else {
|
||||
log.trace(ctx) {
|
||||
"Sending chunk to client"
|
||||
}
|
||||
CacheContent(toSend)
|
||||
}
|
||||
sendMessageAndFlush(ctx, msg)
|
||||
}
|
||||
|
||||
fun commit() {
|
||||
chunk.retain()
|
||||
outputStream.close()
|
||||
flush(true)
|
||||
chunk.release()
|
||||
}
|
||||
|
||||
fun rollback() {
|
||||
outputStream.close()
|
||||
}
|
||||
}
|
||||
|
||||
private inner class InProgressPutRequest(
|
||||
private val ch: NettyChannel,
|
||||
metadata: CacheValueMetadata,
|
||||
val keyString: String,
|
||||
val keyBytes: ByteBuf,
|
||||
private val alloc: ByteBufAllocator,
|
||||
) : InProgressRequest {
|
||||
private var totalSize = 0
|
||||
private var tmpFile: FileChannel? = null
|
||||
private val accumulator = alloc.compositeBuffer()
|
||||
private val stream = ByteBufOutputStream(accumulator).let {
|
||||
if (compressionEnabled) {
|
||||
DeflaterOutputStream(it, Deflater(compressionLevel))
|
||||
} else {
|
||||
it
|
||||
}
|
||||
}
|
||||
|
||||
init {
|
||||
ByteArrayOutputStream().let { baos ->
|
||||
ObjectOutputStream(baos).use {
|
||||
it.writeObject(metadata)
|
||||
}
|
||||
val serializedBytes = baos.toByteArray()
|
||||
accumulator.writeInt(serializedBytes.size)
|
||||
accumulator.writeBytes(serializedBytes)
|
||||
}
|
||||
}
|
||||
|
||||
fun write(buf: ByteBuf) {
|
||||
totalSize += buf.readableBytes()
|
||||
buf.readBytes(stream, buf.readableBytes())
|
||||
tmpFile?.let {
|
||||
flushToDisk(it, accumulator)
|
||||
}
|
||||
if (accumulator.readableBytes() > 0x100000) {
|
||||
log.debug(ch) {
|
||||
"Entry is too big, buffering it into a file"
|
||||
}
|
||||
val opts = arrayOf(
|
||||
StandardOpenOption.DELETE_ON_CLOSE,
|
||||
StandardOpenOption.READ,
|
||||
StandardOpenOption.WRITE,
|
||||
StandardOpenOption.TRUNCATE_EXISTING
|
||||
)
|
||||
FileChannel.open(Files.createTempFile("rbcs-server-redis", ".tmp"), *opts).let { fc ->
|
||||
tmpFile = fc
|
||||
flushToDisk(fc, accumulator)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun flushToDisk(fc: FileChannel, buf: CompositeByteBuf) {
|
||||
val chunk = extractChunk(buf, alloc)
|
||||
fc.write(chunk.nioBuffer())
|
||||
chunk.release()
|
||||
}
|
||||
|
||||
fun commit(): Pair<Int, ReadableByteChannel> {
|
||||
keyBytes.release()
|
||||
accumulator.retain()
|
||||
stream.close()
|
||||
val fileChannel = tmpFile
|
||||
return if (fileChannel != null) {
|
||||
flushToDisk(fileChannel, accumulator)
|
||||
accumulator.release()
|
||||
fileChannel.position(0)
|
||||
val fileSize = fileChannel.size().toIntOrNull() ?: let {
|
||||
fileChannel.close()
|
||||
throw ContentTooLargeException("Request body is too large", null)
|
||||
}
|
||||
fileSize to fileChannel
|
||||
} else {
|
||||
accumulator.readableBytes() to Channels.newChannel(ByteBufInputStream(accumulator))
|
||||
}
|
||||
}
|
||||
|
||||
fun rollback() {
|
||||
stream.close()
|
||||
keyBytes.release()
|
||||
tmpFile?.close()
|
||||
}
|
||||
}
|
||||
|
||||
private var inProgressRequest: InProgressRequest? = null
|
||||
|
||||
override fun channelRead0(ctx: ChannelHandlerContext, msg: CacheMessage) {
|
||||
when (msg) {
|
||||
is CacheGetRequest -> handleGetRequest(ctx, msg)
|
||||
is CachePutRequest -> handlePutRequest(ctx, msg)
|
||||
is LastCacheContent -> handleLastCacheContent(ctx, msg)
|
||||
is CacheContent -> handleCacheContent(ctx, msg)
|
||||
else -> ctx.fireChannelRead(msg)
|
||||
}
|
||||
}
|
||||
|
||||
private fun handleGetRequest(ctx: ChannelHandlerContext, msg: CacheGetRequest) {
|
||||
log.debug(ctx) {
|
||||
"Fetching ${msg.key} from Redis"
|
||||
}
|
||||
val keyBytes = processCacheKey(msg.key, keyPrefix, digestAlgorithm)
|
||||
val keyString = String(keyBytes, StandardCharsets.UTF_8)
|
||||
val responseHandler = object : RedisResponseHandler {
|
||||
override fun responseReceived(response: RedisMessage) {
|
||||
when (response) {
|
||||
is FullBulkStringRedisMessage -> {
|
||||
if (response === FullBulkStringRedisMessage.NULL_INSTANCE || response.content().readableBytes() == 0) {
|
||||
log.debug(ctx) {
|
||||
"Cache miss for key ${msg.key} on Redis"
|
||||
}
|
||||
sendMessageAndFlush(ctx, CacheValueNotFoundResponse(msg.key))
|
||||
} else {
|
||||
log.debug(ctx) {
|
||||
"Cache hit for key ${msg.key} on Redis"
|
||||
}
|
||||
val getRequest = InProgressGetRequest(msg.key, ctx)
|
||||
inProgressRequest = getRequest
|
||||
getRequest.processResponse(response.content())
|
||||
inProgressRequest = null
|
||||
}
|
||||
}
|
||||
|
||||
is ErrorRedisMessage -> {
|
||||
this@RedisCacheHandler.exceptionCaught(
|
||||
ctx, RedisException("Redis error for GET ${msg.key}: ${response.content()}")
|
||||
)
|
||||
}
|
||||
|
||||
else -> {
|
||||
log.warn(ctx) {
|
||||
"Unexpected response type from Redis for key ${msg.key}: ${response.javaClass.name}"
|
||||
}
|
||||
sendMessageAndFlush(ctx, CacheValueNotFoundResponse(msg.key))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun exceptionCaught(ex: Throwable) {
|
||||
this@RedisCacheHandler.exceptionCaught(ctx, ex)
|
||||
}
|
||||
}
|
||||
client.sendCommand(keyBytes, ctx.alloc(), responseHandler).thenAccept { channel ->
|
||||
log.trace(ctx) {
|
||||
"Sending GET request for key ${msg.key} to Redis"
|
||||
}
|
||||
val cmd = buildRedisCommand(ctx.alloc(), "GET", keyString)
|
||||
channel.writeAndFlush(cmd)
|
||||
}
|
||||
}
|
||||
|
||||
private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) {
|
||||
val keyBytes = processCacheKey(msg.key, keyPrefix, digestAlgorithm)
|
||||
val keyBuf = ctx.alloc().buffer().also {
|
||||
it.writeBytes(keyBytes)
|
||||
}
|
||||
inProgressRequest = InProgressPutRequest(ctx.channel(), msg.metadata, msg.key, keyBuf, ctx.alloc())
|
||||
}
|
||||
|
||||
private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) {
|
||||
val request = inProgressRequest
|
||||
when (request) {
|
||||
is InProgressPutRequest -> {
|
||||
log.trace(ctx) {
|
||||
"Received chunk of ${msg.content().readableBytes()} bytes for Redis"
|
||||
}
|
||||
request.write(msg.content())
|
||||
}
|
||||
|
||||
is InProgressGetRequest -> {
|
||||
msg.release()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun handleLastCacheContent(ctx: ChannelHandlerContext, msg: LastCacheContent) {
|
||||
val request = inProgressRequest
|
||||
when (request) {
|
||||
is InProgressPutRequest -> {
|
||||
inProgressRequest = null
|
||||
log.trace(ctx) {
|
||||
"Received last chunk of ${msg.content().readableBytes()} bytes for Redis"
|
||||
}
|
||||
request.write(msg.content())
|
||||
val keyBytes = processCacheKey(request.keyString, keyPrefix, digestAlgorithm)
|
||||
val keyString = String(keyBytes, StandardCharsets.UTF_8)
|
||||
val (payloadSize, payloadSource) = request.commit()
|
||||
|
||||
// Read the entire payload into a single ByteBuf for the SET command
|
||||
val valueBuf = ctx.alloc().buffer(payloadSize)
|
||||
payloadSource.use { source ->
|
||||
val bb = ByteBuffer.allocate(chunkSize)
|
||||
while (true) {
|
||||
val read = source.read(bb)
|
||||
if (read < 0) break
|
||||
bb.flip()
|
||||
valueBuf.writeBytes(bb)
|
||||
bb.clear()
|
||||
}
|
||||
}
|
||||
|
||||
val expirySeconds = maxAge.toSeconds().toString()
|
||||
|
||||
val responseHandler = object : RedisResponseHandler {
|
||||
override fun responseReceived(response: RedisMessage) {
|
||||
when (response) {
|
||||
is SimpleStringRedisMessage -> {
|
||||
log.debug(ctx) {
|
||||
"Inserted key ${request.keyString} into Redis"
|
||||
}
|
||||
sendMessageAndFlush(ctx, CachePutResponse(request.keyString))
|
||||
}
|
||||
|
||||
is ErrorRedisMessage -> {
|
||||
this@RedisCacheHandler.exceptionCaught(
|
||||
ctx, RedisException("Redis error for SET ${request.keyString}: ${response.content()}")
|
||||
)
|
||||
}
|
||||
|
||||
else -> {
|
||||
this@RedisCacheHandler.exceptionCaught(
|
||||
ctx, RedisException("Unexpected response for SET ${request.keyString}: ${response.javaClass.name}")
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun exceptionCaught(ex: Throwable) {
|
||||
this@RedisCacheHandler.exceptionCaught(ctx, ex)
|
||||
}
|
||||
}
|
||||
|
||||
// Use a ByteBuf key for server selection
|
||||
client.sendCommand(keyBytes, ctx.alloc(), responseHandler).thenAccept { channel ->
|
||||
log.trace(ctx) {
|
||||
"Sending SET request to Redis"
|
||||
}
|
||||
// Build SET key value EX seconds
|
||||
val cmd = buildRedisSetCommand(ctx.alloc(), keyString, valueBuf, expirySeconds)
|
||||
channel.writeAndFlush(cmd)
|
||||
}.whenComplete { _, ex ->
|
||||
if (ex != null) {
|
||||
valueBuf.release()
|
||||
this@RedisCacheHandler.exceptionCaught(ctx, ex)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
|
||||
val request = inProgressRequest
|
||||
when (request) {
|
||||
is InProgressPutRequest -> {
|
||||
inProgressRequest = null
|
||||
request.rollback()
|
||||
}
|
||||
|
||||
is InProgressGetRequest -> {
|
||||
inProgressRequest = null
|
||||
request.rollback()
|
||||
}
|
||||
}
|
||||
super.exceptionCaught(ctx, cause)
|
||||
}
|
||||
|
||||
private fun buildRedisCommand(alloc: ByteBufAllocator, vararg args: String): ArrayRedisMessage {
|
||||
val children = args.map { arg ->
|
||||
FullBulkStringRedisMessage(
|
||||
alloc.buffer(arg.toByteArray(StandardCharsets.UTF_8))
|
||||
)
|
||||
}
|
||||
return ArrayRedisMessage(children)
|
||||
}
|
||||
|
||||
private fun ByteBufAllocator.buffer(bytes : ByteArray) = buffer().apply {
|
||||
writeBytes(bytes)
|
||||
}
|
||||
|
||||
private fun buildRedisSetCommand(
|
||||
alloc: ByteBufAllocator,
|
||||
key: String,
|
||||
value: ByteBuf,
|
||||
expirySeconds: String,
|
||||
): ArrayRedisMessage {
|
||||
val children = listOf(
|
||||
FullBulkStringRedisMessage(alloc.buffer("SET".toByteArray(StandardCharsets.UTF_8))),
|
||||
FullBulkStringRedisMessage(alloc.buffer(key.toByteArray(StandardCharsets.UTF_8))),
|
||||
FullBulkStringRedisMessage(value),
|
||||
FullBulkStringRedisMessage(alloc.buffer("EX".toByteArray(StandardCharsets.UTF_8))),
|
||||
FullBulkStringRedisMessage(alloc.buffer(expirySeconds.toByteArray(StandardCharsets.UTF_8))),
|
||||
)
|
||||
return ArrayRedisMessage(children)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,108 @@
|
||||
package net.woggioni.rbcs.server.redis
|
||||
|
||||
import java.time.Duration
|
||||
import java.time.temporal.ChronoUnit
|
||||
|
||||
import net.woggioni.rbcs.api.CacheProvider
|
||||
import net.woggioni.rbcs.api.exception.ConfigurationException
|
||||
import net.woggioni.rbcs.common.HostAndPort
|
||||
import net.woggioni.rbcs.common.RBCS
|
||||
import net.woggioni.rbcs.common.Xml
|
||||
import net.woggioni.rbcs.common.Xml.Companion.asIterable
|
||||
import net.woggioni.rbcs.common.Xml.Companion.renderAttribute
|
||||
|
||||
import org.w3c.dom.Document
|
||||
import org.w3c.dom.Element
|
||||
|
||||
|
||||
class RedisCacheProvider : CacheProvider<RedisCacheConfiguration> {
|
||||
override fun getXmlSchemaLocation() = "jpms://net.woggioni.rbcs.server.redis/net/woggioni/rbcs/server/redis/schema/rbcs-redis.xsd"
|
||||
|
||||
override fun getXmlType() = "redisCacheType"
|
||||
|
||||
override fun getXmlNamespace() = "urn:net.woggioni.rbcs.server.redis"
|
||||
|
||||
val xmlNamespacePrefix: String
|
||||
get() = "rbcs-redis"
|
||||
|
||||
override fun deserialize(el: Element): RedisCacheConfiguration {
|
||||
val servers = mutableListOf<RedisCacheConfiguration.Server>()
|
||||
val maxAge = el.renderAttribute("max-age")
|
||||
?.let(Duration::parse)
|
||||
?: Duration.ofDays(1)
|
||||
val compressionLevel = el.renderAttribute("compression-level")
|
||||
?.let(Integer::decode)
|
||||
?: -1
|
||||
val compressionMode = el.renderAttribute("compression-mode")
|
||||
?.let {
|
||||
when (it) {
|
||||
"deflate" -> RedisCacheConfiguration.CompressionMode.DEFLATE
|
||||
else -> RedisCacheConfiguration.CompressionMode.DEFLATE
|
||||
}
|
||||
}
|
||||
val keyPrefix = el.renderAttribute("key-prefix")
|
||||
val digestAlgorithm = el.renderAttribute("digest")
|
||||
for (child in el.asIterable()) {
|
||||
when (child.nodeName) {
|
||||
"server" -> {
|
||||
val host = child.renderAttribute("host") ?: throw ConfigurationException("host attribute is required")
|
||||
val port = child.renderAttribute("port")?.toInt() ?: throw ConfigurationException("port attribute is required")
|
||||
val maxConnections = child.renderAttribute("max-connections")?.toInt() ?: 1
|
||||
val connectionTimeout = child.renderAttribute("connection-timeout")
|
||||
?.let(Duration::parse)
|
||||
?.let(Duration::toMillis)
|
||||
?.let(Long::toInt)
|
||||
?: 10000
|
||||
val password = child.renderAttribute("password")
|
||||
servers.add(RedisCacheConfiguration.Server(HostAndPort(host, port), connectionTimeout, maxConnections, password))
|
||||
}
|
||||
}
|
||||
}
|
||||
return RedisCacheConfiguration(
|
||||
servers,
|
||||
maxAge,
|
||||
keyPrefix,
|
||||
digestAlgorithm,
|
||||
compressionMode,
|
||||
compressionLevel
|
||||
)
|
||||
}
|
||||
|
||||
override fun serialize(doc: Document, cache: RedisCacheConfiguration) = cache.run {
|
||||
val result = doc.createElement("cache")
|
||||
Xml.of(doc, result) {
|
||||
attr("xmlns:${xmlNamespacePrefix}", xmlNamespace, namespaceURI = "http://www.w3.org/2000/xmlns/")
|
||||
attr("xs:type", "${xmlNamespacePrefix}:$xmlType", RBCS.XML_SCHEMA_NAMESPACE_URI)
|
||||
for (server in servers) {
|
||||
node("server") {
|
||||
attr("host", server.endpoint.host)
|
||||
attr("port", server.endpoint.port.toString())
|
||||
server.connectionTimeoutMillis?.let { connectionTimeoutMillis ->
|
||||
attr("connection-timeout", Duration.of(connectionTimeoutMillis.toLong(), ChronoUnit.MILLIS).toString())
|
||||
}
|
||||
attr("max-connections", server.maxConnections.toString())
|
||||
server.password?.let { password ->
|
||||
attr("password", password)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
attr("max-age", maxAge.toString())
|
||||
keyPrefix?.let {
|
||||
attr("key-prefix", it)
|
||||
}
|
||||
digestAlgorithm?.let { digestAlgorithm ->
|
||||
attr("digest", digestAlgorithm)
|
||||
}
|
||||
compressionMode?.let { compressionMode ->
|
||||
attr(
|
||||
"compression-mode", when (compressionMode) {
|
||||
RedisCacheConfiguration.CompressionMode.DEFLATE -> "deflate"
|
||||
}
|
||||
)
|
||||
}
|
||||
attr("compression-level", compressionLevel.toString())
|
||||
}
|
||||
result
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,204 @@
|
||||
package net.woggioni.rbcs.server.redis.client
|
||||
|
||||
import io.netty.bootstrap.Bootstrap
|
||||
import io.netty.buffer.ByteBufAllocator
|
||||
import io.netty.buffer.Unpooled
|
||||
import io.netty.channel.Channel
|
||||
import io.netty.channel.ChannelFactory
|
||||
import io.netty.channel.ChannelFutureListener
|
||||
import io.netty.channel.ChannelHandlerContext
|
||||
import io.netty.channel.ChannelOption
|
||||
import io.netty.channel.ChannelPipeline
|
||||
import io.netty.channel.EventLoopGroup
|
||||
import io.netty.channel.SimpleChannelInboundHandler
|
||||
import io.netty.channel.pool.AbstractChannelPoolHandler
|
||||
import io.netty.channel.pool.FixedChannelPool
|
||||
import io.netty.channel.socket.SocketChannel
|
||||
import io.netty.handler.codec.redis.ArrayRedisMessage
|
||||
import io.netty.handler.codec.redis.ErrorRedisMessage
|
||||
import io.netty.handler.codec.redis.FullBulkStringRedisMessage
|
||||
import io.netty.handler.codec.redis.RedisArrayAggregator
|
||||
import io.netty.handler.codec.redis.RedisBulkStringAggregator
|
||||
import io.netty.handler.codec.redis.RedisDecoder
|
||||
import io.netty.handler.codec.redis.RedisEncoder
|
||||
import io.netty.handler.codec.redis.RedisMessage
|
||||
import io.netty.util.concurrent.Future as NettyFuture
|
||||
import io.netty.util.concurrent.GenericFutureListener
|
||||
|
||||
import java.io.IOException
|
||||
import java.net.InetSocketAddress
|
||||
import java.nio.charset.StandardCharsets
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
import net.woggioni.rbcs.common.HostAndPort
|
||||
import net.woggioni.rbcs.common.createLogger
|
||||
import net.woggioni.rbcs.common.trace
|
||||
import net.woggioni.rbcs.server.redis.RedisCacheConfiguration
|
||||
import net.woggioni.rbcs.server.redis.RedisCacheHandler
|
||||
|
||||
|
||||
class RedisClient(
|
||||
private val servers: List<RedisCacheConfiguration.Server>,
|
||||
private val chunkSize: Int,
|
||||
private val group: EventLoopGroup,
|
||||
private val channelFactory: ChannelFactory<SocketChannel>,
|
||||
private val connectionPool: ConcurrentHashMap<HostAndPort, FixedChannelPool>,
|
||||
) : AutoCloseable {
|
||||
|
||||
private companion object {
|
||||
private val log = createLogger<RedisCacheHandler>()
|
||||
}
|
||||
|
||||
private fun newConnectionPool(server: RedisCacheConfiguration.Server): FixedChannelPool {
|
||||
val bootstrap = Bootstrap().apply {
|
||||
group(group)
|
||||
channelFactory(channelFactory)
|
||||
option(ChannelOption.SO_KEEPALIVE, true)
|
||||
remoteAddress(InetSocketAddress(server.endpoint.host, server.endpoint.port))
|
||||
server.connectionTimeoutMillis?.let {
|
||||
option(ChannelOption.CONNECT_TIMEOUT_MILLIS, it)
|
||||
}
|
||||
}
|
||||
val channelPoolHandler = object : AbstractChannelPoolHandler() {
|
||||
|
||||
override fun channelCreated(ch: Channel) {
|
||||
val pipeline: ChannelPipeline = ch.pipeline()
|
||||
pipeline.addLast(RedisEncoder())
|
||||
pipeline.addLast(RedisDecoder())
|
||||
pipeline.addLast(RedisBulkStringAggregator())
|
||||
pipeline.addLast(RedisArrayAggregator())
|
||||
server.password?.let { password ->
|
||||
// Send AUTH command synchronously on new connections
|
||||
val authCmd = buildCommand("AUTH", password)
|
||||
ch.writeAndFlush(authCmd).addListener(ChannelFutureListener { future ->
|
||||
if (!future.isSuccess) {
|
||||
ch.close()
|
||||
}
|
||||
})
|
||||
// Install a one-shot handler to consume the AUTH response
|
||||
pipeline.addLast(object : SimpleChannelInboundHandler<RedisMessage>() {
|
||||
override fun channelRead0(ctx: ChannelHandlerContext, msg: RedisMessage) {
|
||||
when (msg) {
|
||||
is ErrorRedisMessage -> {
|
||||
ctx.close()
|
||||
}
|
||||
else -> {
|
||||
// AUTH succeeded, remove this one-shot handler
|
||||
ctx.pipeline().remove(this)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
return FixedChannelPool(bootstrap, channelPoolHandler, server.maxConnections)
|
||||
}
|
||||
|
||||
private fun buildCommand(vararg args: String): ArrayRedisMessage {
|
||||
val children = args.map { arg ->
|
||||
FullBulkStringRedisMessage(
|
||||
Unpooled.wrappedBuffer(arg.toByteArray(StandardCharsets.UTF_8))
|
||||
)
|
||||
}
|
||||
return ArrayRedisMessage(children)
|
||||
}
|
||||
|
||||
fun sendCommand(
|
||||
key: ByteArray,
|
||||
alloc: ByteBufAllocator,
|
||||
responseHandler: RedisResponseHandler,
|
||||
): CompletableFuture<Channel> {
|
||||
val server = if (servers.size > 1) {
|
||||
val keyBuffer = alloc.buffer(key.size)
|
||||
keyBuffer.writeBytes(key)
|
||||
var checksum = 0
|
||||
while (keyBuffer.readableBytes() > 4) {
|
||||
val byte = keyBuffer.readInt()
|
||||
checksum = checksum xor byte
|
||||
}
|
||||
while (keyBuffer.readableBytes() > 0) {
|
||||
val byte = keyBuffer.readByte()
|
||||
checksum = checksum xor byte.toInt()
|
||||
}
|
||||
keyBuffer.release()
|
||||
servers[Math.floorMod(checksum, servers.size)]
|
||||
} else {
|
||||
servers.first()
|
||||
}
|
||||
|
||||
val response = CompletableFuture<Channel>()
|
||||
val pool = connectionPool.computeIfAbsent(server.endpoint) {
|
||||
newConnectionPool(server)
|
||||
}
|
||||
pool.acquire().addListener(object : GenericFutureListener<NettyFuture<Channel>> {
|
||||
override fun operationComplete(channelFuture: NettyFuture<Channel>) {
|
||||
if (channelFuture.isSuccess) {
|
||||
val channel = channelFuture.now
|
||||
var connectionClosedByTheRemoteServer = true
|
||||
val closeCallback = {
|
||||
if (connectionClosedByTheRemoteServer) {
|
||||
val ex = IOException("The Redis server closed the connection")
|
||||
val completed = response.completeExceptionally(ex)
|
||||
if (!completed) responseHandler.exceptionCaught(ex)
|
||||
}
|
||||
}
|
||||
val closeListener = ChannelFutureListener {
|
||||
closeCallback()
|
||||
}
|
||||
channel.closeFuture().addListener(closeListener)
|
||||
val pipeline = channel.pipeline()
|
||||
val handler = object : SimpleChannelInboundHandler<RedisMessage>(false) {
|
||||
|
||||
override fun handlerAdded(ctx: ChannelHandlerContext) {
|
||||
channel.closeFuture().removeListener(closeListener)
|
||||
}
|
||||
|
||||
override fun channelRead0(
|
||||
ctx: ChannelHandlerContext,
|
||||
msg: RedisMessage,
|
||||
) {
|
||||
pipeline.remove(this)
|
||||
pool.release(channel)
|
||||
log.trace(channel) {
|
||||
"Channel released"
|
||||
}
|
||||
responseHandler.responseReceived(msg)
|
||||
}
|
||||
|
||||
override fun channelInactive(ctx: ChannelHandlerContext) {
|
||||
closeCallback()
|
||||
ctx.fireChannelInactive()
|
||||
}
|
||||
|
||||
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
|
||||
connectionClosedByTheRemoteServer = false
|
||||
pipeline.remove(this)
|
||||
ctx.close()
|
||||
pool.release(channel)
|
||||
log.trace(channel) {
|
||||
"Channel released after exception"
|
||||
}
|
||||
responseHandler.exceptionCaught(cause)
|
||||
}
|
||||
}
|
||||
|
||||
channel.pipeline().addLast(handler)
|
||||
response.complete(channel)
|
||||
} else {
|
||||
response.completeExceptionally(channelFuture.cause())
|
||||
}
|
||||
}
|
||||
})
|
||||
return response
|
||||
}
|
||||
|
||||
fun shutDown(): NettyFuture<*> {
|
||||
return group.shutdownGracefully()
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
shutDown().sync()
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,10 @@
|
||||
package net.woggioni.rbcs.server.redis.client
|
||||
|
||||
import io.netty.handler.codec.redis.RedisMessage
|
||||
|
||||
interface RedisResponseHandler {
|
||||
|
||||
fun responseReceived(response: RedisMessage)
|
||||
|
||||
fun exceptionCaught(ex: Throwable)
|
||||
}
|
||||
@@ -0,0 +1 @@
|
||||
net.woggioni.rbcs.server.redis.RedisCacheProvider
|
||||
@@ -0,0 +1,52 @@
|
||||
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
|
||||
<xs:schema targetNamespace="urn:net.woggioni.rbcs.server.redis"
|
||||
xmlns:rbcs-redis="urn:net.woggioni.rbcs.server.redis"
|
||||
xmlns:rbcs="urn:net.woggioni.rbcs.server"
|
||||
xmlns:xs="http://www.w3.org/2001/XMLSchema">
|
||||
|
||||
<xs:import schemaLocation="jpms://net.woggioni.rbcs.server/net/woggioni/rbcs/server/schema/rbcs-server.xsd" namespace="urn:net.woggioni.rbcs.server"/>
|
||||
|
||||
<xs:complexType name="redisServerType">
|
||||
<xs:attribute name="host" type="xs:token" use="required"/>
|
||||
<xs:attribute name="port" type="xs:positiveInteger" use="required"/>
|
||||
<xs:attribute name="connection-timeout" type="xs:duration"/>
|
||||
<xs:attribute name="max-connections" type="xs:positiveInteger" default="1"/>
|
||||
<xs:attribute name="password" type="xs:string" use="optional">
|
||||
<xs:annotation>
|
||||
<xs:documentation>
|
||||
Password for Redis AUTH command, used when the Redis server requires authentication
|
||||
</xs:documentation>
|
||||
</xs:annotation>
|
||||
</xs:attribute>
|
||||
</xs:complexType>
|
||||
|
||||
<xs:complexType name="redisCacheType">
|
||||
<xs:complexContent>
|
||||
<xs:extension base="rbcs:cacheType">
|
||||
<xs:sequence maxOccurs="unbounded">
|
||||
<xs:element name="server" type="rbcs-redis:redisServerType"/>
|
||||
</xs:sequence>
|
||||
<xs:attribute name="max-age" type="xs:duration" default="P1D"/>
|
||||
<xs:attribute name="chunk-size" type="rbcs:byteSizeType" default="0x10000"/>
|
||||
<xs:attribute name="key-prefix" type="xs:string" use="optional">
|
||||
<xs:annotation>
|
||||
<xs:documentation>
|
||||
Prepend this string to all the keys inserted in Redis,
|
||||
useful in case the caching backend is shared with other applications
|
||||
</xs:documentation>
|
||||
</xs:annotation>
|
||||
</xs:attribute>
|
||||
<xs:attribute name="digest" type="xs:token"/>
|
||||
<xs:attribute name="compression-mode" type="rbcs-redis:compressionType"/>
|
||||
<xs:attribute name="compression-level" type="rbcs:compressionLevelType" default="-1"/>
|
||||
</xs:extension>
|
||||
</xs:complexContent>
|
||||
</xs:complexType>
|
||||
|
||||
<xs:simpleType name="compressionType">
|
||||
<xs:restriction base="xs:token">
|
||||
<xs:enumeration value="deflate"/>
|
||||
</xs:restriction>
|
||||
</xs:simpleType>
|
||||
|
||||
</xs:schema>
|
||||
@@ -24,6 +24,7 @@ dependencies {
|
||||
testImplementation catalog.bcpkix.jdk18on
|
||||
|
||||
testRuntimeOnly project(":rbcs-server-memcache")
|
||||
testRuntimeOnly project(":rbcs-server-redis")
|
||||
}
|
||||
|
||||
test {
|
||||
|
||||
@@ -21,6 +21,8 @@ class ConfigurationTest {
|
||||
"classpath:net/woggioni/rbcs/server/test/valid/rbcs-memcached.xml",
|
||||
"classpath:net/woggioni/rbcs/server/test/valid/rbcs-tls.xml",
|
||||
"classpath:net/woggioni/rbcs/server/test/valid/rbcs-memcached-tls.xml",
|
||||
"classpath:net/woggioni/rbcs/server/test/valid/rbcs-redis.xml",
|
||||
"classpath:net/woggioni/rbcs/server/test/valid/rbcs-redis-tls.xml",
|
||||
]
|
||||
)
|
||||
@ParameterizedTest
|
||||
|
||||
@@ -0,0 +1,53 @@
|
||||
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
|
||||
<rbcs:server xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xmlns:rbcs="urn:net.woggioni.rbcs.server"
|
||||
xmlns:rbcs-redis="urn:net.woggioni.rbcs.server.redis"
|
||||
xs:schemaLocation="urn:net.woggioni.rbcs.server.redis jpms://net.woggioni.rbcs.server.redis/net/woggioni/rbcs/server/redis/schema/rbcs-redis.xsd urn:net.woggioni.rbcs.server jpms://net.woggioni.rbcs.server/net/woggioni/rbcs/server/schema/rbcs-server.xsd"
|
||||
>
|
||||
<bind host="0.0.0.0" port="8443" incoming-connections-backlog-size="4096"/>
|
||||
<connection
|
||||
max-request-size="67108864"
|
||||
idle-timeout="PT30S"
|
||||
read-idle-timeout="PT60S"
|
||||
write-idle-timeout="PT60S"
|
||||
chunk-size="123"/>
|
||||
<event-executor use-virtual-threads="true"/>
|
||||
<rate-limiter delay-response="false" message-buffer-size="12000" max-queued-messages="53"/>
|
||||
<cache xs:type="rbcs-redis:redisCacheType" max-age="P7D" key-prefix="some-prefix-string">
|
||||
<server host="redis-server" port="6379" password="secret123"/>
|
||||
</cache>
|
||||
<authorization>
|
||||
<users>
|
||||
<user name="woggioni">
|
||||
<quota calls="1000" period="PT1S"/>
|
||||
</user>
|
||||
<user name="gitea">
|
||||
<quota calls="10" period="PT1S" initial-available-calls="100" max-available-calls="100"/>
|
||||
</user>
|
||||
<anonymous>
|
||||
<quota calls="2" period="PT5S"/>
|
||||
</anonymous>
|
||||
</users>
|
||||
<groups>
|
||||
<group name="writers">
|
||||
<users>
|
||||
<user ref="woggioni"/>
|
||||
<user ref="gitea"/>
|
||||
</users>
|
||||
<roles>
|
||||
<reader/>
|
||||
<writer/>
|
||||
</roles>
|
||||
</group>
|
||||
</groups>
|
||||
</authorization>
|
||||
<authentication>
|
||||
<client-certificate>
|
||||
<user-extractor attribute-name="CN" pattern="(.*)"/>
|
||||
</client-certificate>
|
||||
</authentication>
|
||||
<tls>
|
||||
<keystore file="/home/luser/ssl/rbcs.woggioni.net.pfx" key-alias="rbcs.woggioni.net" password="KEYSTORE_PASSWOR" key-password="KEY_PASSWORD"/>
|
||||
<truststore file="/home/luser/ssl/woggioni.net.pfx" check-certificate-status="false" password="TRUSTSTORE_PASSWORD"/>
|
||||
</tls>
|
||||
</rbcs:server>
|
||||
@@ -0,0 +1,21 @@
|
||||
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
|
||||
<rbcs:server xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xmlns:rbcs="urn:net.woggioni.rbcs.server"
|
||||
xmlns:rbcs-redis="urn:net.woggioni.rbcs.server.redis"
|
||||
xs:schemaLocation="urn:net.woggioni.rbcs.server.redis jpms://net.woggioni.rbcs.server.redis/net/woggioni/rbcs/server/redis/schema/rbcs-redis.xsd urn:net.woggioni.rbcs.server jpms://net.woggioni.rbcs.server/net/woggioni/rbcs/server/schema/rbcs-server.xsd">
|
||||
<bind host="127.0.0.1" port="11443" incoming-connections-backlog-size="50"/>
|
||||
<connection
|
||||
read-idle-timeout="PT10M"
|
||||
write-idle-timeout="PT11M"
|
||||
idle-timeout="PT30M"
|
||||
max-request-size="101325"
|
||||
chunk-size="456"/>
|
||||
<event-executor use-virtual-threads="false"/>
|
||||
<rate-limiter delay-response="true" message-buffer-size="65432" max-queued-messages="21"/>
|
||||
<cache xs:type="rbcs-redis:redisCacheType" max-age="P7D" key-prefix="some-prefix-string" digest="SHA-256" compression-mode="deflate" compression-level="7">
|
||||
<server host="127.0.0.1" port="6379" max-connections="10" connection-timeout="PT20S"/>
|
||||
</cache>
|
||||
<authentication>
|
||||
<none/>
|
||||
</authentication>
|
||||
</rbcs:server>
|
||||
@@ -28,6 +28,7 @@ rootProject.name = 'rbcs'
|
||||
include 'rbcs-api'
|
||||
include 'rbcs-common'
|
||||
include 'rbcs-server-memcache'
|
||||
include 'rbcs-server-redis'
|
||||
include 'rbcs-cli'
|
||||
include 'rbcs-client'
|
||||
include 'rbcs-server'
|
||||
|
||||
Reference in New Issue
Block a user