Compare commits

...

5 Commits

Author SHA1 Message Date
woggioni fcfb93c7b5 test
CI / build (push) Successful in 3m40s
2026-04-13 16:41:06 +08:00
woggioni 742c025fa5 Update netty to 4.2.12
CI / build (push) Successful in 3m32s
2026-03-26 20:23:44 +08:00
woggioni e3a3f21721 renamed docker image tags 2026-03-26 20:18:38 +08:00
woggioni a696eebbf9 added redis-enabled docker image 2026-03-26 20:03:34 +08:00
woggioni c9390ea51d added experimental redis support
CI / build (push) Failing after 4m7s
2026-03-03 02:59:48 +08:00
29 changed files with 1395 additions and 14 deletions
+12
View File
@@ -46,6 +46,18 @@ jobs:
tags: | tags: |
gitea.woggioni.net/woggioni/rbcs:memcache-dev gitea.woggioni.net/woggioni/rbcs:memcache-dev
target: release-memcache target: release-memcache
-
name: Build rbcs redis Docker image
uses: docker/build-push-action@v5.3.0
with:
builder: "multiplatform-builder"
context: "docker/build/docker"
platforms: linux/amd64,linux/arm64
push: true
pull: true
tags: |
gitea.woggioni.net/woggioni/rbcs:redis-dev
target: release-redis
- -
name: Build rbcs native Docker image name: Build rbcs native Docker image
uses: docker/build-push-action@v5.3.0 uses: docker/build-push-action@v5.3.0
+18 -7
View File
@@ -32,8 +32,8 @@ jobs:
push: true push: true
pull: true pull: true
tags: | tags: |
gitea.woggioni.net/woggioni/rbcs:vanilla gitea.woggioni.net/woggioni/rbcs:latest
gitea.woggioni.net/woggioni/rbcs:vanilla-${{ steps.retrieve-version.outputs.VERSION }} gitea.woggioni.net/woggioni/rbcs:${{ steps.retrieve-version.outputs.VERSION }}
target: release-vanilla target: release-vanilla
- -
name: Build rbcs memcache Docker image name: Build rbcs memcache Docker image
@@ -45,11 +45,22 @@ jobs:
push: true push: true
pull: true pull: true
tags: | tags: |
gitea.woggioni.net/woggioni/rbcs:latest
gitea.woggioni.net/woggioni/rbcs:${{ steps.retrieve-version.outputs.VERSION }}
gitea.woggioni.net/woggioni/rbcs:memcache gitea.woggioni.net/woggioni/rbcs:memcache
gitea.woggioni.net/woggioni/rbcs:memcache-${{ steps.retrieve-version.outputs.VERSION }} gitea.woggioni.net/woggioni/rbcs:${{ steps.retrieve-version.outputs.VERSION }}-memcache
target: release-memcache target: release-memcache
-
name: Build rbcs redis Docker image
uses: docker/build-push-action@v5.3.0
with:
builder: "multiplatform-builder"
context: "docker/build/docker"
platforms: linux/amd64,linux/arm64
push: true
pull: true
tags: |
gitea.woggioni.net/woggioni/rbcs:redis
gitea.woggioni.net/woggioni/rbcs:${{ steps.retrieve-version.outputs.VERSION }}-redis
target: release-redis
- -
name: Build rbcs native Docker image name: Build rbcs native Docker image
uses: docker/build-push-action@v5.3.0 uses: docker/build-push-action@v5.3.0
@@ -61,7 +72,7 @@ jobs:
pull: true pull: true
tags: | tags: |
gitea.woggioni.net/woggioni/rbcs:native gitea.woggioni.net/woggioni/rbcs:native
gitea.woggioni.net/woggioni/rbcs:native-${{ steps.retrieve-version.outputs.VERSION }} gitea.woggioni.net/woggioni/rbcs:${{ steps.retrieve-version.outputs.VERSION }}-native
target: release-native target: release-native
- -
name: Build rbcs jlink Docker image name: Build rbcs jlink Docker image
@@ -74,7 +85,7 @@ jobs:
pull: true pull: true
tags: | tags: |
gitea.woggioni.net/woggioni/rbcs:jlink gitea.woggioni.net/woggioni/rbcs:jlink
gitea.woggioni.net/woggioni/rbcs:jlink-${{ steps.retrieve-version.outputs.VERSION }}-jlink gitea.woggioni.net/woggioni/rbcs:${{ steps.retrieve-version.outputs.VERSION }}-jlink
target: release-jlink target: release-jlink
- name: Publish artifacts - name: Publish artifacts
env: env:
+9
View File
@@ -16,6 +16,15 @@ WORKDIR /home/luser
ADD logback.xml . ADD logback.xml .
ENTRYPOINT ["java", "-Dlogback.configurationFile=logback.xml", "-XX:MaxRAMPercentage=70", "-XX:GCTimeRatio=24", "-XX:+UseZGC", "-XX:+ZGenerational", "-jar", "/home/luser/rbcs.jar"] ENTRYPOINT ["java", "-Dlogback.configurationFile=logback.xml", "-XX:MaxRAMPercentage=70", "-XX:GCTimeRatio=24", "-XX:+UseZGC", "-XX:+ZGenerational", "-jar", "/home/luser/rbcs.jar"]
FROM base-release AS release-redis
ADD --chown=luser:luser rbcs-cli-envelope-*.jar rbcs.jar
RUN mkdir plugins
WORKDIR /home/luser/plugins
RUN --mount=type=bind,source=.,target=/build/distributions tar -xf /build/distributions/rbcs-server-redis*.tar
WORKDIR /home/luser
ADD logback.xml .
ENTRYPOINT ["java", "-Dlogback.configurationFile=logback.xml", "-XX:MaxRAMPercentage=70", "-XX:GCTimeRatio=24", "-XX:+UseZGC", "-XX:+ZGenerational", "-jar", "/home/luser/rbcs.jar"]
FROM busybox:musl AS base-native FROM busybox:musl AS base-native
RUN mkdir -p /var/lib/rbcs /etc/rbcs RUN mkdir -p /var/lib/rbcs /etc/rbcs
RUN adduser -D -u 1000 rbcs -h /var/lib/rbcs RUN adduser -D -u 1000 rbcs -h /var/lib/rbcs
+1
View File
@@ -20,6 +20,7 @@ configurations {
dependencies { dependencies {
docker project(path: ':rbcs-cli', configuration: 'release') docker project(path: ':rbcs-cli', configuration: 'release')
docker project(path: ':rbcs-server-memcache', configuration: 'release') docker project(path: ':rbcs-server-memcache', configuration: 'release')
docker project(path: ':rbcs-server-redis', configuration: 'release')
} }
Provider<Task> cleanTaskProvider = tasks.named(BasePlugin.CLEAN_TASK_NAME) {} Provider<Task> cleanTaskProvider = tasks.named(BasePlugin.CLEAN_TASK_NAME) {}
+2 -2
View File
@@ -2,9 +2,9 @@ org.gradle.configuration-cache=false
org.gradle.parallel=true org.gradle.parallel=true
org.gradle.caching=true org.gradle.caching=true
rbcs.version = 0.3.7 rbcs.version = 0.4.0
lys.version = 2026.02.19 lys.version = 2026.03.26
gitea.maven.url = https://gitea.woggioni.net/api/packages/woggioni/maven gitea.maven.url = https://gitea.woggioni.net/api/packages/woggioni/maven
docker.registry.url=gitea.woggioni.net docker.registry.url=gitea.woggioni.net
@@ -136,6 +136,13 @@ public class Configuration {
TlsCertificateExtractor groupExtractor; TlsCertificateExtractor groupExtractor;
} }
@Value
public static class ForwardedClientCertificateAuthentication implements Authentication {
String headerName;
TlsCertificateExtractor userExtractor;
TlsCertificateExtractor groupExtractor;
}
public interface Cache { public interface Cache {
CacheHandlerFactory materialize(); CacheHandlerFactory materialize();
String getNamespaceURI(); String getNamespaceURI();
+3
View File
@@ -50,6 +50,7 @@ configurations {
dependencies { dependencies {
configureNativeImageImplementation project configureNativeImageImplementation project
configureNativeImageImplementation project(':rbcs-server-memcache') configureNativeImageImplementation project(':rbcs-server-memcache')
configureNativeImageImplementation project(':rbcs-server-redis')
implementation catalog.jwo implementation catalog.jwo
implementation catalog.slf4j.api implementation catalog.slf4j.api
@@ -62,6 +63,7 @@ dependencies {
runtimeOnly catalog.logback.classic runtimeOnly catalog.logback.classic
// runtimeOnly catalog.slf4j.simple // runtimeOnly catalog.slf4j.simple
nativeImage project(':rbcs-server-memcache') nativeImage project(':rbcs-server-memcache')
nativeImage project(':rbcs-server-redis')
} }
@@ -138,6 +140,7 @@ Provider<JlinkTask> jlinkTaskProvider = tasks.named(JlinkPlugin.JLINK_TASK_NAME,
) )
additionalModules = [ additionalModules = [
'net.woggioni.rbcs.server.memcache', 'net.woggioni.rbcs.server.memcache',
'net.woggioni.rbcs.server.redis',
'ch.qos.logback.classic', 'ch.qos.logback.classic',
'jdk.crypto.ec' 'jdk.crypto.ec'
] ]
+53
View File
@@ -0,0 +1,53 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<rbcs:server xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rbcs="urn:net.woggioni.rbcs.server"
xmlns:rbcs-redis="urn:net.woggioni.rbcs.server.redis"
xs:schemaLocation="urn:net.woggioni.rbcs.server.redis jpms://net.woggioni.rbcs.server.redis/net/woggioni/rbcs/server/redis/schema/rbcs-redis.xsd urn:net.woggioni.rbcs.server.memcache jpms://net.woggioni.rbcs.server.memcache/net/woggioni/rbcs/server/memcache/schema/rbcs-memcache.xsd urn:net.woggioni.rbcs.server jpms://net.woggioni.rbcs.server/net/woggioni/rbcs/server/schema/rbcs-server.xsd"
>
<bind host="127.0.0.1" port="8080" incoming-connections-backlog-size="1024"/>
<connection
max-request-size="67108864"
idle-timeout="PT10S"
read-idle-timeout="PT20S"
write-idle-timeout="PT20S"/>
<event-executor use-virtual-threads="true"/>
<cache xs:type="rbcs-redis:redisCacheType" max-age="P7D" digest="MD5">
<server host="127.0.0.1" port="6379" max-connections="256"/>
</cache>
<!--cache xs:type="rbcs:inMemoryCacheType" max-age="P7D" enable-compression="false" max-size="0x10000000" /-->
<!--cache xs:type="rbcs:fileSystemCacheType" max-age="P7D" enable-compression="false" /-->
<authorization>
<users>
<user name="woggioni" password="II+qeNLft2pZ/JVNo9F7jpjM/BqEcfsJW27NZ6dPVs8tAwHbxrJppKYsbL7J/SMl">
<quota calls="100" period="PT1S"/>
</user>
<user name="gitea" password="v6T9+q6/VNpvLknji3ixPiyz2YZCQMXj2FN7hvzbfc2Ig+IzAHO0iiBCH9oWuBDq"/>
<anonymous>
<quota calls="10" period="PT60S" initial-available-calls="10" max-available-calls="10"/>
</anonymous>
</users>
<groups>
<group name="readers">
<users>
<anonymous/>
</users>
<roles>
<reader/>
</roles>
</group>
<group name="writers">
<users>
<user ref="woggioni"/>
<user ref="gitea"/>
</users>
<roles>
<reader/>
<writer/>
</roles>
</group>
</groups>
</authorization>
<authentication>
<none/>
</authentication>
</rbcs:server>
@@ -27,16 +27,27 @@ import net.woggioni.rbcs.server.cache.FileSystemCacheConfiguration
import net.woggioni.rbcs.server.cache.InMemoryCacheConfiguration import net.woggioni.rbcs.server.cache.InMemoryCacheConfiguration
import net.woggioni.rbcs.server.configuration.Parser import net.woggioni.rbcs.server.configuration.Parser
import net.woggioni.rbcs.server.memcache.MemcacheCacheConfiguration import net.woggioni.rbcs.server.memcache.MemcacheCacheConfiguration
import net.woggioni.rbcs.server.redis.RedisCacheConfiguration
object GraalNativeImageConfiguration { object GraalNativeImageConfiguration {
@JvmStatic @JvmStatic
fun main(vararg args : String) { fun main(vararg args : String) {
val serverURL = URI.create("file:conf/rbcs-server.xml").toURL() let {
val serverDoc = serverURL.openStream().use { val serverURL = URI.create("file:conf/rbcs-server.xml").toURL()
Xml.parseXml(serverURL, it) val serverDoc = serverURL.openStream().use {
Xml.parseXml(serverURL, it)
}
Parser.parse(serverDoc)
}
let {
val serverURL = URI.create("file:conf/rbcs-server-redis.xml").toURL()
val serverDoc = serverURL.openStream().use {
Xml.parseXml(serverURL, it)
}
Parser.parse(serverDoc)
} }
Parser.parse(serverDoc)
val url = URI.create("file:conf/rbcs-client.xml").toURL() val url = URI.create("file:conf/rbcs-client.xml").toURL()
val clientDoc = url.openStream().use { val clientDoc = url.openStream().use {
@@ -90,6 +101,18 @@ object GraalNativeImageConfiguration {
"MD5", "MD5",
null, null,
1, 1,
),
RedisCacheConfiguration(
listOf(RedisCacheConfiguration.Server(
HostAndPort("127.0.0.1", 6379),
1000,
4)
),
Duration.ofSeconds(60),
"someCustomPrefix",
"MD5",
null,
1,
) )
) )
+69
View File
@@ -0,0 +1,69 @@
plugins {
id 'java-library'
id 'maven-publish'
alias catalog.plugins.kotlin.jvm
}
configurations {
bundle {
canBeResolved = true
canBeConsumed = false
visible = false
transitive = false
resolutionStrategy {
dependencies {
exclude group: 'org.slf4j', module: 'slf4j-api'
exclude group: 'org.jetbrains.kotlin', module: 'kotlin-stdlib'
exclude group: 'org.jetbrains', module: 'annotations'
}
}
}
release {
transitive = false
canBeConsumed = true
canBeResolved = true
visible = true
}
}
dependencies {
implementation project(':rbcs-common')
implementation project(':rbcs-api')
implementation catalog.jwo
implementation catalog.slf4j.api
implementation catalog.netty.common
implementation catalog.netty.handler
implementation catalog.netty.codec.redis
bundle catalog.netty.codec.redis
testRuntimeOnly catalog.logback.classic
}
tasks.named(JavaPlugin.TEST_TASK_NAME, Test) {
systemProperty("io.netty.leakDetectionLevel", "PARANOID")
}
Provider<Tar> bundleTask = tasks.register("bundle", Tar) {
from(tasks.named(JavaPlugin.JAR_TASK_NAME))
from(configurations.bundle)
group = BasePlugin.BUILD_GROUP
}
tasks.named(BasePlugin.ASSEMBLE_TASK_NAME) {
dependsOn(bundleTask)
}
artifacts {
release(bundleTask)
}
publishing {
publications {
maven(MavenPublication) {
artifact bundleTask
}
}
}
@@ -0,0 +1,20 @@
import net.woggioni.rbcs.api.CacheProvider;
module net.woggioni.rbcs.server.redis {
requires net.woggioni.rbcs.common;
requires net.woggioni.rbcs.api;
requires net.woggioni.jwo;
requires java.xml;
requires kotlin.stdlib;
requires io.netty.transport;
requires io.netty.codec;
requires io.netty.codec.redis;
requires io.netty.common;
requires io.netty.buffer;
requires io.netty.handler;
requires org.slf4j;
provides CacheProvider with net.woggioni.rbcs.server.redis.RedisCacheProvider;
opens net.woggioni.rbcs.server.redis.schema;
}
@@ -0,0 +1,4 @@
package net.woggioni.rbcs.server.redis
class RedisException(msg: String, cause: Throwable? = null)
: RuntimeException(msg, cause)
@@ -0,0 +1,107 @@
package net.woggioni.rbcs.server.redis
import io.netty.channel.ChannelFactory
import io.netty.channel.EventLoopGroup
import io.netty.channel.pool.FixedChannelPool
import io.netty.channel.socket.DatagramChannel
import io.netty.channel.socket.SocketChannel
import java.time.Duration
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
import net.woggioni.rbcs.api.CacheHandler
import net.woggioni.rbcs.api.CacheHandlerFactory
import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.common.HostAndPort
import net.woggioni.rbcs.common.createLogger
import net.woggioni.rbcs.server.redis.client.RedisClient
data class RedisCacheConfiguration(
val servers: List<Server>,
val maxAge: Duration = Duration.ofDays(1),
val keyPrefix: String? = null,
val digestAlgorithm: String? = null,
val compressionMode: CompressionMode? = null,
val compressionLevel: Int,
) : Configuration.Cache {
companion object {
private val log = createLogger<RedisCacheConfiguration>()
}
enum class CompressionMode {
/**
* Deflate mode
*/
DEFLATE
}
data class Server(
val endpoint: HostAndPort,
val connectionTimeoutMillis: Int?,
val maxConnections: Int,
val password: String? = null,
)
override fun materialize() = object : CacheHandlerFactory {
private val connectionPoolMap = ConcurrentHashMap<HostAndPort, FixedChannelPool>()
override fun newHandler(
cfg: Configuration,
eventLoop: EventLoopGroup,
socketChannelFactory: ChannelFactory<SocketChannel>,
datagramChannelFactory: ChannelFactory<DatagramChannel>,
): CacheHandler {
return RedisCacheHandler(
RedisClient(
this@RedisCacheConfiguration.servers,
cfg.connection.chunkSize,
eventLoop,
socketChannelFactory,
connectionPoolMap
),
keyPrefix,
digestAlgorithm,
compressionMode != null,
compressionLevel,
cfg.connection.chunkSize,
maxAge
)
}
override fun asyncClose() = object : CompletableFuture<Void>() {
init {
val failure = AtomicReference<Throwable>(null)
val pools = connectionPoolMap.values.toList()
val npools = pools.size
val finished = AtomicInteger(0)
if (pools.isEmpty()) {
complete(null)
} else {
pools.forEach { pool ->
pool.closeAsync().addListener {
if (!it.isSuccess) {
failure.compareAndSet(null, it.cause())
}
if (finished.incrementAndGet() == npools) {
when (val ex = failure.get()) {
null -> complete(null)
else -> completeExceptionally(ex)
}
}
}
}
}
}
}
}
override fun getNamespaceURI() = "urn:net.woggioni.rbcs.server.redis"
override fun getTypeName() = "redisCacheType"
}
@@ -0,0 +1,438 @@
package net.woggioni.rbcs.server.redis
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.CompositeByteBuf
import io.netty.channel.Channel as NettyChannel
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.redis.ArrayRedisMessage
import io.netty.handler.codec.redis.ErrorRedisMessage
import io.netty.handler.codec.redis.FullBulkStringRedisMessage
import io.netty.handler.codec.redis.RedisMessage
import io.netty.handler.codec.redis.SimpleStringRedisMessage
import java.io.ByteArrayOutputStream
import java.io.ObjectInputStream
import java.io.ObjectOutputStream
import java.nio.ByteBuffer
import java.nio.channels.Channels
import java.nio.channels.FileChannel
import java.nio.channels.ReadableByteChannel
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.nio.file.StandardOpenOption
import java.time.Duration
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.InflaterOutputStream
import net.woggioni.rbcs.api.CacheHandler
import net.woggioni.rbcs.api.CacheValueMetadata
import net.woggioni.rbcs.api.exception.ContentTooLargeException
import net.woggioni.rbcs.api.message.CacheMessage
import net.woggioni.rbcs.api.message.CacheMessage.CacheContent
import net.woggioni.rbcs.api.message.CacheMessage.CacheGetRequest
import net.woggioni.rbcs.api.message.CacheMessage.CachePutRequest
import net.woggioni.rbcs.api.message.CacheMessage.CachePutResponse
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueFoundResponse
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse
import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent
import net.woggioni.rbcs.common.ByteBufInputStream
import net.woggioni.rbcs.common.ByteBufOutputStream
import net.woggioni.rbcs.common.RBCS.processCacheKey
import net.woggioni.rbcs.common.RBCS.toIntOrNull
import net.woggioni.rbcs.common.createLogger
import net.woggioni.rbcs.common.debug
import net.woggioni.rbcs.common.extractChunk
import net.woggioni.rbcs.common.trace
import net.woggioni.rbcs.common.warn
import net.woggioni.rbcs.server.redis.client.RedisClient
import net.woggioni.rbcs.server.redis.client.RedisResponseHandler
class RedisCacheHandler(
private val client: RedisClient,
private val keyPrefix: String?,
private val digestAlgorithm: String?,
private val compressionEnabled: Boolean,
private val compressionLevel: Int,
private val chunkSize: Int,
private val maxAge: Duration,
) : CacheHandler() {
companion object {
private val log = createLogger<RedisCacheHandler>()
}
private interface InProgressRequest
private inner class InProgressGetRequest(
val key: String,
private val ctx: ChannelHandlerContext,
) : InProgressRequest {
private val chunk = ctx.alloc().compositeBuffer()
private val outputStream = ByteBufOutputStream(chunk).let {
if (compressionEnabled) {
InflaterOutputStream(it)
} else {
it
}
}
fun processResponse(data: ByteBuf) {
if (data.readableBytes() < Int.SIZE_BYTES) {
log.debug(ctx) {
"Received empty or corrupt data from Redis for key $key"
}
sendMessageAndFlush(ctx, CacheValueNotFoundResponse(key))
data.release()
return
}
val metadataSize = data.readInt()
if (data.readableBytes() < metadataSize) {
log.debug(ctx) {
"Received incomplete metadata from Redis for key $key"
}
sendMessageAndFlush(ctx, CacheValueNotFoundResponse(key))
data.release()
return
}
val metadata = ObjectInputStream(ByteBufInputStream(data)).use {
data.retain()
it.readObject() as CacheValueMetadata
}
data.readerIndex(Int.SIZE_BYTES + metadataSize)
log.trace(ctx) {
"Sending response from cache"
}
sendMessageAndFlush(ctx, CacheValueFoundResponse(key, metadata))
// Decompress and stream the remaining payload
data.readBytes(outputStream, data.readableBytes())
data.release()
commit()
}
private fun flush(last: Boolean) {
val toSend = extractChunk(chunk, ctx.alloc())
val msg = if (last) {
log.trace(ctx) {
"Sending last chunk to client"
}
LastCacheContent(toSend)
} else {
log.trace(ctx) {
"Sending chunk to client"
}
CacheContent(toSend)
}
sendMessageAndFlush(ctx, msg)
}
fun commit() {
chunk.retain()
outputStream.close()
flush(true)
chunk.release()
}
fun rollback() {
outputStream.close()
}
}
private inner class InProgressPutRequest(
private val ch: NettyChannel,
metadata: CacheValueMetadata,
val keyString: String,
val keyBytes: ByteBuf,
private val alloc: ByteBufAllocator,
) : InProgressRequest {
private var totalSize = 0
private var tmpFile: FileChannel? = null
private val accumulator = alloc.compositeBuffer()
private val stream = ByteBufOutputStream(accumulator).let {
if (compressionEnabled) {
DeflaterOutputStream(it, Deflater(compressionLevel))
} else {
it
}
}
init {
ByteArrayOutputStream().let { baos ->
ObjectOutputStream(baos).use {
it.writeObject(metadata)
}
val serializedBytes = baos.toByteArray()
accumulator.writeInt(serializedBytes.size)
accumulator.writeBytes(serializedBytes)
}
}
fun write(buf: ByteBuf) {
totalSize += buf.readableBytes()
buf.readBytes(stream, buf.readableBytes())
tmpFile?.let {
flushToDisk(it, accumulator)
}
if (accumulator.readableBytes() > 0x100000) {
log.debug(ch) {
"Entry is too big, buffering it into a file"
}
val opts = arrayOf(
StandardOpenOption.DELETE_ON_CLOSE,
StandardOpenOption.READ,
StandardOpenOption.WRITE,
StandardOpenOption.TRUNCATE_EXISTING
)
FileChannel.open(Files.createTempFile("rbcs-server-redis", ".tmp"), *opts).let { fc ->
tmpFile = fc
flushToDisk(fc, accumulator)
}
}
}
private fun flushToDisk(fc: FileChannel, buf: CompositeByteBuf) {
val chunk = extractChunk(buf, alloc)
fc.write(chunk.nioBuffer())
chunk.release()
}
fun commit(): Pair<Int, ReadableByteChannel> {
keyBytes.release()
accumulator.retain()
stream.close()
val fileChannel = tmpFile
return if (fileChannel != null) {
flushToDisk(fileChannel, accumulator)
accumulator.release()
fileChannel.position(0)
val fileSize = fileChannel.size().toIntOrNull() ?: let {
fileChannel.close()
throw ContentTooLargeException("Request body is too large", null)
}
fileSize to fileChannel
} else {
accumulator.readableBytes() to Channels.newChannel(ByteBufInputStream(accumulator))
}
}
fun rollback() {
stream.close()
keyBytes.release()
tmpFile?.close()
}
}
private var inProgressRequest: InProgressRequest? = null
override fun channelRead0(ctx: ChannelHandlerContext, msg: CacheMessage) {
when (msg) {
is CacheGetRequest -> handleGetRequest(ctx, msg)
is CachePutRequest -> handlePutRequest(ctx, msg)
is LastCacheContent -> handleLastCacheContent(ctx, msg)
is CacheContent -> handleCacheContent(ctx, msg)
else -> ctx.fireChannelRead(msg)
}
}
private fun handleGetRequest(ctx: ChannelHandlerContext, msg: CacheGetRequest) {
log.debug(ctx) {
"Fetching ${msg.key} from Redis"
}
val keyBytes = processCacheKey(msg.key, keyPrefix, digestAlgorithm)
val keyString = String(keyBytes, StandardCharsets.UTF_8)
val responseHandler = object : RedisResponseHandler {
override fun responseReceived(response: RedisMessage) {
when (response) {
is FullBulkStringRedisMessage -> {
if (response === FullBulkStringRedisMessage.NULL_INSTANCE || response.content().readableBytes() == 0) {
log.debug(ctx) {
"Cache miss for key ${msg.key} on Redis"
}
sendMessageAndFlush(ctx, CacheValueNotFoundResponse(msg.key))
} else {
log.debug(ctx) {
"Cache hit for key ${msg.key} on Redis"
}
val getRequest = InProgressGetRequest(msg.key, ctx)
inProgressRequest = getRequest
getRequest.processResponse(response.content())
inProgressRequest = null
}
}
is ErrorRedisMessage -> {
this@RedisCacheHandler.exceptionCaught(
ctx, RedisException("Redis error for GET ${msg.key}: ${response.content()}")
)
}
else -> {
log.warn(ctx) {
"Unexpected response type from Redis for key ${msg.key}: ${response.javaClass.name}"
}
sendMessageAndFlush(ctx, CacheValueNotFoundResponse(msg.key))
}
}
}
override fun exceptionCaught(ex: Throwable) {
this@RedisCacheHandler.exceptionCaught(ctx, ex)
}
}
client.sendCommand(keyBytes, ctx.alloc(), responseHandler).thenAccept { channel ->
log.trace(ctx) {
"Sending GET request for key ${msg.key} to Redis"
}
val cmd = buildRedisCommand(ctx.alloc(), "GET", keyString)
channel.writeAndFlush(cmd)
}
}
private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) {
val keyBytes = processCacheKey(msg.key, keyPrefix, digestAlgorithm)
val keyBuf = ctx.alloc().buffer().also {
it.writeBytes(keyBytes)
}
inProgressRequest = InProgressPutRequest(ctx.channel(), msg.metadata, msg.key, keyBuf, ctx.alloc())
}
private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) {
val request = inProgressRequest
when (request) {
is InProgressPutRequest -> {
log.trace(ctx) {
"Received chunk of ${msg.content().readableBytes()} bytes for Redis"
}
request.write(msg.content())
}
is InProgressGetRequest -> {
msg.release()
}
}
}
private fun handleLastCacheContent(ctx: ChannelHandlerContext, msg: LastCacheContent) {
val request = inProgressRequest
when (request) {
is InProgressPutRequest -> {
inProgressRequest = null
log.trace(ctx) {
"Received last chunk of ${msg.content().readableBytes()} bytes for Redis"
}
request.write(msg.content())
val keyBytes = processCacheKey(request.keyString, keyPrefix, digestAlgorithm)
val keyString = String(keyBytes, StandardCharsets.UTF_8)
val (payloadSize, payloadSource) = request.commit()
// Read the entire payload into a single ByteBuf for the SET command
val valueBuf = ctx.alloc().buffer(payloadSize)
payloadSource.use { source ->
val bb = ByteBuffer.allocate(chunkSize)
while (true) {
val read = source.read(bb)
if (read < 0) break
bb.flip()
valueBuf.writeBytes(bb)
bb.clear()
}
}
val expirySeconds = maxAge.toSeconds().toString()
val responseHandler = object : RedisResponseHandler {
override fun responseReceived(response: RedisMessage) {
when (response) {
is SimpleStringRedisMessage -> {
log.debug(ctx) {
"Inserted key ${request.keyString} into Redis"
}
sendMessageAndFlush(ctx, CachePutResponse(request.keyString))
}
is ErrorRedisMessage -> {
this@RedisCacheHandler.exceptionCaught(
ctx, RedisException("Redis error for SET ${request.keyString}: ${response.content()}")
)
}
else -> {
this@RedisCacheHandler.exceptionCaught(
ctx, RedisException("Unexpected response for SET ${request.keyString}: ${response.javaClass.name}")
)
}
}
}
override fun exceptionCaught(ex: Throwable) {
this@RedisCacheHandler.exceptionCaught(ctx, ex)
}
}
// Use a ByteBuf key for server selection
client.sendCommand(keyBytes, ctx.alloc(), responseHandler).thenAccept { channel ->
log.trace(ctx) {
"Sending SET request to Redis"
}
// Build SET key value EX seconds
val cmd = buildRedisSetCommand(ctx.alloc(), keyString, valueBuf, expirySeconds)
channel.writeAndFlush(cmd)
}.whenComplete { _, ex ->
if (ex != null) {
valueBuf.release()
this@RedisCacheHandler.exceptionCaught(ctx, ex)
}
}
}
}
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
val request = inProgressRequest
when (request) {
is InProgressPutRequest -> {
inProgressRequest = null
request.rollback()
}
is InProgressGetRequest -> {
inProgressRequest = null
request.rollback()
}
}
super.exceptionCaught(ctx, cause)
}
private fun buildRedisCommand(alloc: ByteBufAllocator, vararg args: String): ArrayRedisMessage {
val children = args.map { arg ->
FullBulkStringRedisMessage(
alloc.buffer(arg.toByteArray(StandardCharsets.UTF_8))
)
}
return ArrayRedisMessage(children)
}
private fun ByteBufAllocator.buffer(bytes : ByteArray) = buffer().apply {
writeBytes(bytes)
}
private fun buildRedisSetCommand(
alloc: ByteBufAllocator,
key: String,
value: ByteBuf,
expirySeconds: String,
): ArrayRedisMessage {
val children = listOf(
FullBulkStringRedisMessage(alloc.buffer("SET".toByteArray(StandardCharsets.UTF_8))),
FullBulkStringRedisMessage(alloc.buffer(key.toByteArray(StandardCharsets.UTF_8))),
FullBulkStringRedisMessage(value),
FullBulkStringRedisMessage(alloc.buffer("EX".toByteArray(StandardCharsets.UTF_8))),
FullBulkStringRedisMessage(alloc.buffer(expirySeconds.toByteArray(StandardCharsets.UTF_8))),
)
return ArrayRedisMessage(children)
}
}
@@ -0,0 +1,108 @@
package net.woggioni.rbcs.server.redis
import java.time.Duration
import java.time.temporal.ChronoUnit
import net.woggioni.rbcs.api.CacheProvider
import net.woggioni.rbcs.api.exception.ConfigurationException
import net.woggioni.rbcs.common.HostAndPort
import net.woggioni.rbcs.common.RBCS
import net.woggioni.rbcs.common.Xml
import net.woggioni.rbcs.common.Xml.Companion.asIterable
import net.woggioni.rbcs.common.Xml.Companion.renderAttribute
import org.w3c.dom.Document
import org.w3c.dom.Element
class RedisCacheProvider : CacheProvider<RedisCacheConfiguration> {
override fun getXmlSchemaLocation() = "jpms://net.woggioni.rbcs.server.redis/net/woggioni/rbcs/server/redis/schema/rbcs-redis.xsd"
override fun getXmlType() = "redisCacheType"
override fun getXmlNamespace() = "urn:net.woggioni.rbcs.server.redis"
val xmlNamespacePrefix: String
get() = "rbcs-redis"
override fun deserialize(el: Element): RedisCacheConfiguration {
val servers = mutableListOf<RedisCacheConfiguration.Server>()
val maxAge = el.renderAttribute("max-age")
?.let(Duration::parse)
?: Duration.ofDays(1)
val compressionLevel = el.renderAttribute("compression-level")
?.let(Integer::decode)
?: -1
val compressionMode = el.renderAttribute("compression-mode")
?.let {
when (it) {
"deflate" -> RedisCacheConfiguration.CompressionMode.DEFLATE
else -> RedisCacheConfiguration.CompressionMode.DEFLATE
}
}
val keyPrefix = el.renderAttribute("key-prefix")
val digestAlgorithm = el.renderAttribute("digest")
for (child in el.asIterable()) {
when (child.nodeName) {
"server" -> {
val host = child.renderAttribute("host") ?: throw ConfigurationException("host attribute is required")
val port = child.renderAttribute("port")?.toInt() ?: throw ConfigurationException("port attribute is required")
val maxConnections = child.renderAttribute("max-connections")?.toInt() ?: 1
val connectionTimeout = child.renderAttribute("connection-timeout")
?.let(Duration::parse)
?.let(Duration::toMillis)
?.let(Long::toInt)
?: 10000
val password = child.renderAttribute("password")
servers.add(RedisCacheConfiguration.Server(HostAndPort(host, port), connectionTimeout, maxConnections, password))
}
}
}
return RedisCacheConfiguration(
servers,
maxAge,
keyPrefix,
digestAlgorithm,
compressionMode,
compressionLevel
)
}
override fun serialize(doc: Document, cache: RedisCacheConfiguration) = cache.run {
val result = doc.createElement("cache")
Xml.of(doc, result) {
attr("xmlns:${xmlNamespacePrefix}", xmlNamespace, namespaceURI = "http://www.w3.org/2000/xmlns/")
attr("xs:type", "${xmlNamespacePrefix}:$xmlType", RBCS.XML_SCHEMA_NAMESPACE_URI)
for (server in servers) {
node("server") {
attr("host", server.endpoint.host)
attr("port", server.endpoint.port.toString())
server.connectionTimeoutMillis?.let { connectionTimeoutMillis ->
attr("connection-timeout", Duration.of(connectionTimeoutMillis.toLong(), ChronoUnit.MILLIS).toString())
}
attr("max-connections", server.maxConnections.toString())
server.password?.let { password ->
attr("password", password)
}
}
}
attr("max-age", maxAge.toString())
keyPrefix?.let {
attr("key-prefix", it)
}
digestAlgorithm?.let { digestAlgorithm ->
attr("digest", digestAlgorithm)
}
compressionMode?.let { compressionMode ->
attr(
"compression-mode", when (compressionMode) {
RedisCacheConfiguration.CompressionMode.DEFLATE -> "deflate"
}
)
}
attr("compression-level", compressionLevel.toString())
}
result
}
}
@@ -0,0 +1,204 @@
package net.woggioni.rbcs.server.redis.client
import io.netty.bootstrap.Bootstrap
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.Unpooled
import io.netty.channel.Channel
import io.netty.channel.ChannelFactory
import io.netty.channel.ChannelFutureListener
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelOption
import io.netty.channel.ChannelPipeline
import io.netty.channel.EventLoopGroup
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.channel.pool.AbstractChannelPoolHandler
import io.netty.channel.pool.FixedChannelPool
import io.netty.channel.socket.SocketChannel
import io.netty.handler.codec.redis.ArrayRedisMessage
import io.netty.handler.codec.redis.ErrorRedisMessage
import io.netty.handler.codec.redis.FullBulkStringRedisMessage
import io.netty.handler.codec.redis.RedisArrayAggregator
import io.netty.handler.codec.redis.RedisBulkStringAggregator
import io.netty.handler.codec.redis.RedisDecoder
import io.netty.handler.codec.redis.RedisEncoder
import io.netty.handler.codec.redis.RedisMessage
import io.netty.util.concurrent.Future as NettyFuture
import io.netty.util.concurrent.GenericFutureListener
import java.io.IOException
import java.net.InetSocketAddress
import java.nio.charset.StandardCharsets
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import net.woggioni.rbcs.common.HostAndPort
import net.woggioni.rbcs.common.createLogger
import net.woggioni.rbcs.common.trace
import net.woggioni.rbcs.server.redis.RedisCacheConfiguration
import net.woggioni.rbcs.server.redis.RedisCacheHandler
class RedisClient(
private val servers: List<RedisCacheConfiguration.Server>,
private val chunkSize: Int,
private val group: EventLoopGroup,
private val channelFactory: ChannelFactory<SocketChannel>,
private val connectionPool: ConcurrentHashMap<HostAndPort, FixedChannelPool>,
) : AutoCloseable {
private companion object {
private val log = createLogger<RedisCacheHandler>()
}
private fun newConnectionPool(server: RedisCacheConfiguration.Server): FixedChannelPool {
val bootstrap = Bootstrap().apply {
group(group)
channelFactory(channelFactory)
option(ChannelOption.SO_KEEPALIVE, true)
remoteAddress(InetSocketAddress(server.endpoint.host, server.endpoint.port))
server.connectionTimeoutMillis?.let {
option(ChannelOption.CONNECT_TIMEOUT_MILLIS, it)
}
}
val channelPoolHandler = object : AbstractChannelPoolHandler() {
override fun channelCreated(ch: Channel) {
val pipeline: ChannelPipeline = ch.pipeline()
pipeline.addLast(RedisEncoder())
pipeline.addLast(RedisDecoder())
pipeline.addLast(RedisBulkStringAggregator())
pipeline.addLast(RedisArrayAggregator())
server.password?.let { password ->
// Send AUTH command synchronously on new connections
val authCmd = buildCommand("AUTH", password)
ch.writeAndFlush(authCmd).addListener(ChannelFutureListener { future ->
if (!future.isSuccess) {
ch.close()
}
})
// Install a one-shot handler to consume the AUTH response
pipeline.addLast(object : SimpleChannelInboundHandler<RedisMessage>() {
override fun channelRead0(ctx: ChannelHandlerContext, msg: RedisMessage) {
when (msg) {
is ErrorRedisMessage -> {
ctx.close()
}
else -> {
// AUTH succeeded, remove this one-shot handler
ctx.pipeline().remove(this)
}
}
}
})
}
}
}
return FixedChannelPool(bootstrap, channelPoolHandler, server.maxConnections)
}
private fun buildCommand(vararg args: String): ArrayRedisMessage {
val children = args.map { arg ->
FullBulkStringRedisMessage(
Unpooled.wrappedBuffer(arg.toByteArray(StandardCharsets.UTF_8))
)
}
return ArrayRedisMessage(children)
}
fun sendCommand(
key: ByteArray,
alloc: ByteBufAllocator,
responseHandler: RedisResponseHandler,
): CompletableFuture<Channel> {
val server = if (servers.size > 1) {
val keyBuffer = alloc.buffer(key.size)
keyBuffer.writeBytes(key)
var checksum = 0
while (keyBuffer.readableBytes() > 4) {
val byte = keyBuffer.readInt()
checksum = checksum xor byte
}
while (keyBuffer.readableBytes() > 0) {
val byte = keyBuffer.readByte()
checksum = checksum xor byte.toInt()
}
keyBuffer.release()
servers[Math.floorMod(checksum, servers.size)]
} else {
servers.first()
}
val response = CompletableFuture<Channel>()
val pool = connectionPool.computeIfAbsent(server.endpoint) {
newConnectionPool(server)
}
pool.acquire().addListener(object : GenericFutureListener<NettyFuture<Channel>> {
override fun operationComplete(channelFuture: NettyFuture<Channel>) {
if (channelFuture.isSuccess) {
val channel = channelFuture.now
var connectionClosedByTheRemoteServer = true
val closeCallback = {
if (connectionClosedByTheRemoteServer) {
val ex = IOException("The Redis server closed the connection")
val completed = response.completeExceptionally(ex)
if (!completed) responseHandler.exceptionCaught(ex)
}
}
val closeListener = ChannelFutureListener {
closeCallback()
}
channel.closeFuture().addListener(closeListener)
val pipeline = channel.pipeline()
val handler = object : SimpleChannelInboundHandler<RedisMessage>(false) {
override fun handlerAdded(ctx: ChannelHandlerContext) {
channel.closeFuture().removeListener(closeListener)
}
override fun channelRead0(
ctx: ChannelHandlerContext,
msg: RedisMessage,
) {
pipeline.remove(this)
pool.release(channel)
log.trace(channel) {
"Channel released"
}
responseHandler.responseReceived(msg)
}
override fun channelInactive(ctx: ChannelHandlerContext) {
closeCallback()
ctx.fireChannelInactive()
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
connectionClosedByTheRemoteServer = false
pipeline.remove(this)
ctx.close()
pool.release(channel)
log.trace(channel) {
"Channel released after exception"
}
responseHandler.exceptionCaught(cause)
}
}
channel.pipeline().addLast(handler)
response.complete(channel)
} else {
response.completeExceptionally(channelFuture.cause())
}
}
})
return response
}
fun shutDown(): NettyFuture<*> {
return group.shutdownGracefully()
}
override fun close() {
shutDown().sync()
}
}
@@ -0,0 +1,10 @@
package net.woggioni.rbcs.server.redis.client
import io.netty.handler.codec.redis.RedisMessage
interface RedisResponseHandler {
fun responseReceived(response: RedisMessage)
fun exceptionCaught(ex: Throwable)
}
@@ -0,0 +1 @@
net.woggioni.rbcs.server.redis.RedisCacheProvider
@@ -0,0 +1,52 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<xs:schema targetNamespace="urn:net.woggioni.rbcs.server.redis"
xmlns:rbcs-redis="urn:net.woggioni.rbcs.server.redis"
xmlns:rbcs="urn:net.woggioni.rbcs.server"
xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:import schemaLocation="jpms://net.woggioni.rbcs.server/net/woggioni/rbcs/server/schema/rbcs-server.xsd" namespace="urn:net.woggioni.rbcs.server"/>
<xs:complexType name="redisServerType">
<xs:attribute name="host" type="xs:token" use="required"/>
<xs:attribute name="port" type="xs:positiveInteger" use="required"/>
<xs:attribute name="connection-timeout" type="xs:duration"/>
<xs:attribute name="max-connections" type="xs:positiveInteger" default="1"/>
<xs:attribute name="password" type="xs:string" use="optional">
<xs:annotation>
<xs:documentation>
Password for Redis AUTH command, used when the Redis server requires authentication
</xs:documentation>
</xs:annotation>
</xs:attribute>
</xs:complexType>
<xs:complexType name="redisCacheType">
<xs:complexContent>
<xs:extension base="rbcs:cacheType">
<xs:sequence maxOccurs="unbounded">
<xs:element name="server" type="rbcs-redis:redisServerType"/>
</xs:sequence>
<xs:attribute name="max-age" type="xs:duration" default="P1D"/>
<xs:attribute name="chunk-size" type="rbcs:byteSizeType" default="0x10000"/>
<xs:attribute name="key-prefix" type="xs:string" use="optional">
<xs:annotation>
<xs:documentation>
Prepend this string to all the keys inserted in Redis,
useful in case the caching backend is shared with other applications
</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="digest" type="xs:token"/>
<xs:attribute name="compression-mode" type="rbcs-redis:compressionType"/>
<xs:attribute name="compression-level" type="rbcs:compressionLevelType" default="-1"/>
</xs:extension>
</xs:complexContent>
</xs:complexType>
<xs:simpleType name="compressionType">
<xs:restriction base="xs:token">
<xs:enumeration value="deflate"/>
</xs:restriction>
</xs:simpleType>
</xs:schema>
+1
View File
@@ -24,6 +24,7 @@ dependencies {
testImplementation catalog.bcpkix.jdk18on testImplementation catalog.bcpkix.jdk18on
testRuntimeOnly project(":rbcs-server-memcache") testRuntimeOnly project(":rbcs-server-memcache")
testRuntimeOnly project(":rbcs-server-redis")
} }
test { test {
@@ -155,6 +155,59 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
} }
} }
@Sharable
private class ForwardedClientCertificateAuthenticator(
authorizer: Authorizer,
private val anonymousUserGroups: Set<Configuration.Group>?,
private val subjectDnUserExtractor: SubjectDnExtractor?,
private val subjectDnGroupExtractor: SubjectDnExtractor?,
private val headerName: String,
private val trustedProxyIPs: List<Cidr>,
private val users: Map<String, Configuration.User>,
private val groups: Map<String, Configuration.Group>,
) : AbstractNettyHttpAuthenticator(authorizer) {
companion object {
private val log = createLogger<ForwardedClientCertificateAuthenticator>()
}
override fun authenticate(ctx: ChannelHandlerContext, req: HttpRequest): AuthenticationResult? {
val clientIp = ctx.channel().attr(clientIp).get()
if (clientIp == null || trustedProxyIPs.none { it.contains(clientIp.address) }) {
log.debug(ctx) {
"Rejecting forwarded client certificate authentication from untrusted address: $clientIp"
}
return null
}
val subjectDn = req.headers()[headerName]
?: return anonymousUserGroups?.let { AuthenticationResult(null, it) }
val ldapName = try {
LdapName(subjectDn)
} catch (e: Exception) {
log.debug(ctx) {
"Invalid subject DN in header $headerName: $subjectDn"
}
return anonymousUserGroups?.let { AuthenticationResult(null, it) }
}
val user = subjectDnUserExtractor?.extract(ldapName)?.let { userName ->
users[userName] ?: throw RuntimeException("Failed to extract user '$userName'")
}
val group = subjectDnGroupExtractor?.extract(ldapName)?.let { groupName ->
groups[groupName] ?: throw RuntimeException("Failed to extract group '$groupName'")
}
val allGroups = ((user?.groups ?: emptySet()).asSequence() + sequenceOf(group).filterNotNull()).toSet()
return AuthenticationResult(user, allGroups)
}
}
private data class SubjectDnExtractor(val rdnType: String, val pattern: Pattern) {
fun extract(ldapName: LdapName): String? {
return ldapName.rdns.find { it.type == rdnType }
?.let { pattern.matcher(it.value.toString()) }
?.takeIf(Matcher::matches)?.group(1)
}
}
@Sharable @Sharable
private class NettyHttpBasicAuthenticator( private class NettyHttpBasicAuthenticator(
private val users: Map<String, Configuration.User>, authorizer: Authorizer private val users: Map<String, Configuration.User>, authorizer: Authorizer
@@ -261,6 +314,23 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
) )
} }
is Configuration.ForwardedClientCertificateAuthentication -> {
ForwardedClientCertificateAuthenticator(
RoleAuthorizer(),
cfg.users[""]?.groups,
auth.userExtractor?.let { extractor ->
SubjectDnExtractor(extractor.rdnType, Pattern.compile(extractor.pattern))
},
auth.groupExtractor?.let { extractor ->
SubjectDnExtractor(extractor.rdnType, Pattern.compile(extractor.pattern))
},
auth.headerName,
cfg.trustedProxyIPs,
cfg.users,
cfg.groups,
)
}
else -> null else -> null
} }
@@ -12,15 +12,18 @@ import io.netty.handler.codec.http.HttpRequest
import io.netty.handler.codec.http.HttpResponseStatus import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpVersion import io.netty.handler.codec.http.HttpVersion
import io.netty.util.ReferenceCountUtil import io.netty.util.ReferenceCountUtil
import java.net.InetSocketAddress
import net.woggioni.rbcs.api.Configuration import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.api.Configuration.Group import net.woggioni.rbcs.api.Configuration.Group
import net.woggioni.rbcs.api.Role import net.woggioni.rbcs.api.Role
import net.woggioni.rbcs.common.createLogger
import net.woggioni.rbcs.server.RemoteBuildCacheServer import net.woggioni.rbcs.server.RemoteBuildCacheServer
abstract class AbstractNettyHttpAuthenticator(private val authorizer: Authorizer) : ChannelInboundHandlerAdapter() { abstract class AbstractNettyHttpAuthenticator(private val authorizer: Authorizer) : ChannelInboundHandlerAdapter() {
companion object { companion object {
private val log = createLogger<AbstractNettyHttpAuthenticator>()
private val AUTHENTICATION_FAILED: FullHttpResponse = DefaultFullHttpResponse( private val AUTHENTICATION_FAILED: FullHttpResponse = DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.UNAUTHORIZED, Unpooled.EMPTY_BUFFER HttpVersion.HTTP_1_1, HttpResponseStatus.UNAUTHORIZED, Unpooled.EMPTY_BUFFER
).apply { ).apply {
@@ -53,6 +56,16 @@ abstract class AbstractNettyHttpAuthenticator(private val authorizer: Authorizer
result.groups.asSequence().flatMap { it.roles.asSequence() } result.groups.asSequence().flatMap { it.roles.asSequence() }
).toSet() ).toSet()
val authorized = authorizer.authorize(roles, msg) val authorized = authorizer.authorize(roles, msg)
if(log.isTraceEnabled) {
val authorizedMessage = if(authorized) { "Authorized" } else { "Forbidden" }
val clientAddress = ctx.channel().attr<InetSocketAddress>(RemoteBuildCacheServer.clientIp).get()
val roleString = "[" + roles.asSequence().map { "\"" + it + "\""}.joinToString(", ") + "]"
result.user?.let { user ->
log.trace("$authorizedMessage ${msg.method()} request from user $user with address $clientAddress, granted roles $roleString")
} ?: {
log.trace("$authorizedMessage anonymous ${msg.method()} request with address $clientAddress, granted roles $roleString")
}
}
if (authorized) { if (authorized) {
super.channelRead(ctx, msg) super.channelRead(ctx, msg)
} else { } else {
@@ -8,6 +8,7 @@ import net.woggioni.rbcs.api.Configuration.Authentication
import net.woggioni.rbcs.api.Configuration.BasicAuthentication import net.woggioni.rbcs.api.Configuration.BasicAuthentication
import net.woggioni.rbcs.api.Configuration.Cache import net.woggioni.rbcs.api.Configuration.Cache
import net.woggioni.rbcs.api.Configuration.ClientCertificateAuthentication import net.woggioni.rbcs.api.Configuration.ClientCertificateAuthentication
import net.woggioni.rbcs.api.Configuration.ForwardedClientCertificateAuthentication
import net.woggioni.rbcs.api.Configuration.Group import net.woggioni.rbcs.api.Configuration.Group
import net.woggioni.rbcs.api.Configuration.KeyStore import net.woggioni.rbcs.api.Configuration.KeyStore
import net.woggioni.rbcs.api.Configuration.Tls import net.woggioni.rbcs.api.Configuration.Tls
@@ -77,6 +78,28 @@ object Parser {
} }
authentication = ClientCertificateAuthentication(tlsExtractorUser, tlsExtractorGroup) authentication = ClientCertificateAuthentication(tlsExtractorUser, tlsExtractorGroup)
} }
"forwarded-client-certificate" -> {
val headerName = gchild.renderAttribute("header-name") ?: "X-Client-Cert-Subject-DN"
var tlsExtractorUser: TlsCertificateExtractor? = null
var tlsExtractorGroup: TlsCertificateExtractor? = null
for (ggchild in gchild.asIterable()) {
when (ggchild.localName) {
"group-extractor" -> {
val attrName = ggchild.renderAttribute("attribute-name")
val pattern = ggchild.renderAttribute("pattern")
tlsExtractorGroup = TlsCertificateExtractor(attrName, pattern)
}
"user-extractor" -> {
val attrName = ggchild.renderAttribute("attribute-name")
val pattern = ggchild.renderAttribute("pattern")
tlsExtractorUser = TlsCertificateExtractor(attrName, pattern)
}
}
}
authentication = ForwardedClientCertificateAuthentication(headerName, tlsExtractorUser, tlsExtractorGroup)
}
} }
} }
} }
@@ -165,6 +165,23 @@ object Serializer {
} }
} }
} }
is Configuration.ForwardedClientCertificateAuthentication -> {
node("forwarded-client-certificate") {
attr("header-name", authentication.headerName)
authentication.groupExtractor?.let { extractor ->
node("group-extractor") {
attr("attribute-name", extractor.rdnType)
attr("pattern", extractor.pattern)
}
}
authentication.userExtractor?.let { extractor ->
node("user-extractor") {
attr("attribute-name", extractor.rdnType)
attr("pattern", extractor.pattern)
}
}
}
}
} }
} }
} }
@@ -311,6 +311,45 @@
</xs:sequence> </xs:sequence>
</xs:complexType> </xs:complexType>
<xs:complexType name="forwardedClientCertificateAuthorizationType">
<xs:annotation>
<xs:documentation>
Authenticate clients based on a custom HTTP header containing the client TLS certificate
subject DN, forwarded by a reverse proxy that performs TLS termination. The proxy must be
listed in the trusted-proxies configuration for the header to be accepted.
</xs:documentation>
</xs:annotation>
<xs:sequence>
<xs:element name="group-extractor" type="rbcs:X500NameExtractorType" minOccurs="0">
<xs:annotation>
<xs:documentation>
A regex based extractor that will be used to determine which group the client belongs to,
based on the X.500 name of the subject DN forwarded by the reverse proxy.
When this is set RBAC works even if the user isn't listed in the &lt;users/&gt; section as
the client will be assigned role solely based on the group he is found to belong to.
Note that this does not allow for a client to be part of multiple groups.
</xs:documentation>
</xs:annotation>
</xs:element>
<xs:element name="user-extractor" type="rbcs:X500NameExtractorType" minOccurs="0">
<xs:annotation>
<xs:documentation>
A regex based extractor that will be used to assign a user to a connected client,
based on the X.500 name of the subject DN forwarded by the reverse proxy.
</xs:documentation>
</xs:annotation>
</xs:element>
</xs:sequence>
<xs:attribute name="header-name" type="xs:token">
<xs:annotation>
<xs:documentation>
Name of the HTTP header containing the client certificate subject DN
forwarded by the reverse proxy. Defaults to "X-Client-Cert-Subject-DN".
</xs:documentation>
</xs:annotation>
</xs:attribute>
</xs:complexType>
<xs:complexType name="X500NameExtractorType"> <xs:complexType name="X500NameExtractorType">
<xs:annotation> <xs:annotation>
<xs:documentation> <xs:documentation>
@@ -380,6 +419,15 @@
</xs:documentation> </xs:documentation>
</xs:annotation> </xs:annotation>
</xs:element> </xs:element>
<xs:element name="forwarded-client-certificate" type="rbcs:forwardedClientCertificateAuthorizationType">
<xs:annotation>
<xs:documentation>
Enable forwarded client certificate authentication. Authenticates clients based on
a custom HTTP header containing the client certificate subject DN, forwarded by a
reverse proxy that performs TLS termination. Requires trusted-proxies to be configured.
</xs:documentation>
</xs:annotation>
</xs:element>
<xs:element name="none"> <xs:element name="none">
<xs:annotation> <xs:annotation>
<xs:documentation> <xs:documentation>
@@ -21,6 +21,8 @@ class ConfigurationTest {
"classpath:net/woggioni/rbcs/server/test/valid/rbcs-memcached.xml", "classpath:net/woggioni/rbcs/server/test/valid/rbcs-memcached.xml",
"classpath:net/woggioni/rbcs/server/test/valid/rbcs-tls.xml", "classpath:net/woggioni/rbcs/server/test/valid/rbcs-tls.xml",
"classpath:net/woggioni/rbcs/server/test/valid/rbcs-memcached-tls.xml", "classpath:net/woggioni/rbcs/server/test/valid/rbcs-memcached-tls.xml",
"classpath:net/woggioni/rbcs/server/test/valid/rbcs-redis.xml",
"classpath:net/woggioni/rbcs/server/test/valid/rbcs-redis-tls.xml",
] ]
) )
@ParameterizedTest @ParameterizedTest
@@ -0,0 +1,53 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<rbcs:server xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rbcs="urn:net.woggioni.rbcs.server"
xmlns:rbcs-redis="urn:net.woggioni.rbcs.server.redis"
xs:schemaLocation="urn:net.woggioni.rbcs.server.redis jpms://net.woggioni.rbcs.server.redis/net/woggioni/rbcs/server/redis/schema/rbcs-redis.xsd urn:net.woggioni.rbcs.server jpms://net.woggioni.rbcs.server/net/woggioni/rbcs/server/schema/rbcs-server.xsd"
>
<bind host="0.0.0.0" port="8443" incoming-connections-backlog-size="4096"/>
<connection
max-request-size="67108864"
idle-timeout="PT30S"
read-idle-timeout="PT60S"
write-idle-timeout="PT60S"
chunk-size="123"/>
<event-executor use-virtual-threads="true"/>
<rate-limiter delay-response="false" message-buffer-size="12000" max-queued-messages="53"/>
<cache xs:type="rbcs-redis:redisCacheType" max-age="P7D" key-prefix="some-prefix-string">
<server host="redis-server" port="6379" password="secret123"/>
</cache>
<authorization>
<users>
<user name="woggioni">
<quota calls="1000" period="PT1S"/>
</user>
<user name="gitea">
<quota calls="10" period="PT1S" initial-available-calls="100" max-available-calls="100"/>
</user>
<anonymous>
<quota calls="2" period="PT5S"/>
</anonymous>
</users>
<groups>
<group name="writers">
<users>
<user ref="woggioni"/>
<user ref="gitea"/>
</users>
<roles>
<reader/>
<writer/>
</roles>
</group>
</groups>
</authorization>
<authentication>
<client-certificate>
<user-extractor attribute-name="CN" pattern="(.*)"/>
</client-certificate>
</authentication>
<tls>
<keystore file="/home/luser/ssl/rbcs.woggioni.net.pfx" key-alias="rbcs.woggioni.net" password="KEYSTORE_PASSWOR" key-password="KEY_PASSWORD"/>
<truststore file="/home/luser/ssl/woggioni.net.pfx" check-certificate-status="false" password="TRUSTSTORE_PASSWORD"/>
</tls>
</rbcs:server>
@@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<rbcs:server xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rbcs="urn:net.woggioni.rbcs.server"
xmlns:rbcs-redis="urn:net.woggioni.rbcs.server.redis"
xs:schemaLocation="urn:net.woggioni.rbcs.server.redis jpms://net.woggioni.rbcs.server.redis/net/woggioni/rbcs/server/redis/schema/rbcs-redis.xsd urn:net.woggioni.rbcs.server jpms://net.woggioni.rbcs.server/net/woggioni/rbcs/server/schema/rbcs-server.xsd">
<bind host="127.0.0.1" port="11443" incoming-connections-backlog-size="50"/>
<connection
read-idle-timeout="PT10M"
write-idle-timeout="PT11M"
idle-timeout="PT30M"
max-request-size="101325"
chunk-size="456"/>
<event-executor use-virtual-threads="false"/>
<rate-limiter delay-response="true" message-buffer-size="65432" max-queued-messages="21"/>
<cache xs:type="rbcs-redis:redisCacheType" max-age="P7D" key-prefix="some-prefix-string" digest="SHA-256" compression-mode="deflate" compression-level="7">
<server host="127.0.0.1" port="6379" max-connections="10" connection-timeout="PT20S"/>
</cache>
<authentication>
<none/>
</authentication>
</rbcs:server>
+1
View File
@@ -28,6 +28,7 @@ rootProject.name = 'rbcs'
include 'rbcs-api' include 'rbcs-api'
include 'rbcs-common' include 'rbcs-common'
include 'rbcs-server-memcache' include 'rbcs-server-memcache'
include 'rbcs-server-redis'
include 'rbcs-cli' include 'rbcs-cli'
include 'rbcs-client' include 'rbcs-client'
include 'rbcs-server' include 'rbcs-server'