Compare commits

...

14 Commits

Author SHA1 Message Date
woggioni 77cc044d0e restored docker image environmental variables 2026-05-30 12:56:03 +08:00
opencode 9a7a2566fa Generalize OTEL API and add memcache tracing support
- Rename RedisSpan -> SpanHandle for generic span handling
- Generalize TelemetryController methods: startSpan/endSpan with dbSystem param
- Rename RedisOtelSpan -> OtelSpanHandle in rbcs-server-otel
- Update Redis cache handler to use new generic API
- Add OpenTelemetry tracing for memcache GET and SET commands
- Add channel property to MemcacheRequestController for server address attribution
- Add uses TelemetryController directive in memcache module-info

Memcache spans follow the same pattern as Redis:
db.system=memcache, db.operation=GET|SET, server.address, server.port
2026-05-27 23:37:50 +08:00
opencode f154bbd33c Add OpenTelemetry tracing support for Redis commands
- Add RedisSpan interface in rbcs-api for opaque span handles
- Extend TelemetryController with startRedisSpan/endRedisSpan methods
- Implement Redis tracing in rbcs-server-otel via OtelController and RedisOtelSpan
- Instrument RedisCacheHandler to create spans around GET and SET commands
- Add uses directive in rbcs-server-redis module-info for ServiceLoader discovery

Redis spans are created as CLIENT spans with attributes:
db.system=redis, db.operation=GET|SET, server.address, server.port
2026-05-23 23:46:37 +08:00
woggioni 316f9e61b0 minor fix 2026-05-21 07:07:07 +08:00
woggioni 953d687651 optimized imports 2026-05-21 06:58:17 +08:00
woggioni 9c9f98cd72 fixed Dockerfile 2026-05-20 22:43:12 +08:00
woggioni df7f747168 updated lys-catalog to 2026.05.16 2026-05-20 22:39:20 +08:00
woggioni 4d9a424528 removed telemetry switch from configuration 2026-05-20 22:20:29 +08:00
woggioni ab2a06e810 refactor 2026-04-30 02:15:34 +08:00
woggioni 1d938b7ea3 Add optional OpenTelemetry Netty server instrumentation
- Update lys.version to 2026.04.14

- Add optional compileOnly dependency on opentelemetry-netty-4.1 in rbcs-server

- Add runtime guard to only activate instrumentation when OTel classes are on classpath

- Insert OTel combined handler after HttpServerCodec in the Netty pipeline

- Add requires-static JPMS directives for optional module support
2026-04-29 02:59:51 +08:00
woggioni 5d190d81ab version bump to 0.5.0 2026-04-13 22:28:33 +08:00
woggioni e6f35f4340 Added support for client certificate forwarding 2026-04-13 22:19:12 +08:00
woggioni 6d214eb066 uniformed Docker images 2026-04-13 22:19:12 +08:00
woggioni 0a50ae0643 improved error handling 2026-04-13 22:19:12 +08:00
64 changed files with 785 additions and 193 deletions
+17 -5
View File
@@ -32,7 +32,7 @@ jobs:
push: true
pull: true
tags: |
gitea.woggioni.net/woggioni/rbcs:vanilla-dev
gitea.woggioni.net/woggioni/rbcs:dev-vanilla
target: release-vanilla
-
name: Build rbcs memcache Docker image
@@ -44,7 +44,7 @@ jobs:
push: true
pull: true
tags: |
gitea.woggioni.net/woggioni/rbcs:memcache-dev
gitea.woggioni.net/woggioni/rbcs:dev-memcache
target: release-memcache
-
name: Build rbcs redis Docker image
@@ -56,8 +56,20 @@ jobs:
push: true
pull: true
tags: |
gitea.woggioni.net/woggioni/rbcs:redis-dev
gitea.woggioni.net/woggioni/rbcs:dev-redis
target: release-redis
-
name: Build rbcs full Docker image
uses: docker/build-push-action@v5.3.0
with:
builder: "multiplatform-builder"
context: "docker/build/docker"
platforms: linux/amd64,linux/arm64
push: true
pull: true
tags: |
gitea.woggioni.net/woggioni/rbcs:dev-full
target: release-full
-
name: Build rbcs native Docker image
uses: docker/build-push-action@v5.3.0
@@ -68,7 +80,7 @@ jobs:
push: true
pull: true
tags: |
gitea.woggioni.net/woggioni/rbcs:native-dev
gitea.woggioni.net/woggioni/rbcs:dev-native
target: release-native
-
name: Build rbcs jlink Docker image
@@ -80,6 +92,6 @@ jobs:
push: true
pull: true
tags: |
gitea.woggioni.net/woggioni/rbcs:jlink-dev
gitea.woggioni.net/woggioni/rbcs:dev-jlink
target: release-jlink
+13
View File
@@ -61,6 +61,19 @@ jobs:
gitea.woggioni.net/woggioni/rbcs:redis
gitea.woggioni.net/woggioni/rbcs:${{ steps.retrieve-version.outputs.VERSION }}-redis
target: release-redis
-
name: Build rbcs full Docker image
uses: docker/build-push-action@v5.3.0
with:
builder: "multiplatform-builder"
context: "docker/build/docker"
platforms: linux/amd64,linux/arm64
push: true
pull: true
tags: |
gitea.woggioni.net/woggioni/rbcs:full
gitea.woggioni.net/woggioni/rbcs:${{ steps.retrieve-version.outputs.VERSION }}-full
target: release-full
-
name: Build rbcs native Docker image
uses: docker/build-push-action@v5.3.0
+5
View File
@@ -5,3 +5,8 @@
build
rbcs-cli/native-image/*.json
# Ignore JDTLS files
.classpath
.project
.settings
+1
View File
@@ -139,6 +139,7 @@ Configures TLS encryption.
<rbcs:server xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rbcs="urn:net.woggioni.rbcs.server"
xs:schemaLocation="urn:net.woggioni.rbcs.server jpms://net.woggioni.rbcs.server/net/woggioni/rbcs/server/schema/rbcs-server.xsd"
path="/my/custom/path"
>
<bind host="0.0.0.0" port="8080" incoming-connections-backlog-size="1024" proxy-protocol="true">
<trusted-proxies>
+38 -22
View File
@@ -1,52 +1,68 @@
FROM eclipse-temurin:25-jre-alpine AS base-release
RUN adduser -D luser
USER luser
WORKDIR /home/luser
RUN adduser -D rbcs
USER rbcs
ENV RBCS_CONFIGURATION_DIR="/etc/rbcs"
WORKDIR /var/lib/rbcs
FROM base-release AS release-vanilla
ADD rbcs-cli-envelope-*.jar rbcs.jar
ENTRYPOINT ["java", "-Dlogback.configurationFile=logback.xml", "-XX:MaxRAMPercentage=70", "-XX:GCTimeRatio=24", "-XX:+UseZGC", "-XX:+ZGenerational", "-jar", "/home/luser/rbcs.jar"]
ADD logback.xml /etc/rbcs/logback.xml
ENTRYPOINT ["java", "-jar", "/var/lib/rbcs/rbcs.jar"]
FROM base-release AS release-memcache
ADD --chown=luser:luser rbcs-cli-envelope-*.jar rbcs.jar
ADD --chown=rbcs:rbcs rbcs-cli-envelope-*.jar rbcs.jar
RUN mkdir plugins
WORKDIR /home/luser/plugins
WORKDIR /var/lib/rbcs/plugins
RUN --mount=type=bind,source=.,target=/build/distributions tar -xf /build/distributions/rbcs-server-memcache*.tar
WORKDIR /home/luser
ADD logback.xml .
ENTRYPOINT ["java", "-Dlogback.configurationFile=logback.xml", "-XX:MaxRAMPercentage=70", "-XX:GCTimeRatio=24", "-XX:+UseZGC", "-XX:+ZGenerational", "-jar", "/home/luser/rbcs.jar"]
WORKDIR /var/lib/rbcs
ADD logback.xml /etc/rbcs/logback.xml
ENTRYPOINT ["java", "-jar", "/var/lib/rbcs/rbcs.jar"]
FROM base-release AS release-redis
ADD --chown=luser:luser rbcs-cli-envelope-*.jar rbcs.jar
ADD --chown=rbcs:rbcs rbcs-cli-envelope-*.jar rbcs.jar
RUN mkdir plugins
WORKDIR /home/luser/plugins
WORKDIR /var/lib/rbcs/plugins
RUN --mount=type=bind,source=.,target=/build/distributions tar -xf /build/distributions/rbcs-server-redis*.tar
WORKDIR /home/luser
ADD logback.xml .
ENTRYPOINT ["java", "-Dlogback.configurationFile=logback.xml", "-XX:MaxRAMPercentage=70", "-XX:GCTimeRatio=24", "-XX:+UseZGC", "-XX:+ZGenerational", "-jar", "/home/luser/rbcs.jar"]
WORKDIR /var/lib/rbcs
ADD logback.xml /etc/rbcs/logback.xml
ENTRYPOINT ["java", "-jar", "/var/lib/rbcs/rbcs.jar"]
FROM base-release AS release-full
ADD --chown=rbcs:rbcs rbcs-cli-envelope-*.jar rbcs.jar
RUN mkdir plugins
WORKDIR /var/lib/rbcs/plugins
RUN --mount=type=bind,source=.,target=/build/distributions tar -xf /build/distributions/rbcs-server-memcache*.tar
RUN --mount=type=bind,source=.,target=/build/distributions tar -xf /build/distributions/rbcs-server-redis*.tar
RUN --mount=type=bind,source=.,target=/build/distributions tar -xf /build/distributions/rbcs-server-otel*.tar
WORKDIR /var/lib/rbcs
ADD logback.xml /etc/rbcs/logback.xml
ENV OTEL_SDK_DISABLED="true"
ENTRYPOINT ["java", "-jar", "/var/lib/rbcs/rbcs.jar"]
FROM busybox:musl AS base-native
RUN mkdir -p /var/lib/rbcs /etc/rbcs
RUN mkdir -p /var/lib/rbcs /var/tmp/rbcs /etc/rbcs
RUN adduser -D -u 1000 rbcs -h /var/lib/rbcs
RUN chown rbcs:rbcs /var/tmp/rbcs
FROM scratch AS release-native
COPY --from=base-native /etc/passwd /etc/passwd
COPY --from=base-native /etc/rbcs /etc/rbcs
COPY --from=base-native /var/lib/rbcs /var/lib/rbcs
COPY --from=base-native /var/tmp/rbcs /var/tmp/rbcs
ADD rbcs-cli.upx /usr/bin/rbcs-cli
ENV RBCS_CONFIGURATION_DIR="/etc/rbcs"
USER rbcs
WORKDIR /var/lib/rbcs
ENTRYPOINT ["/usr/bin/rbcs-cli", "-XX:MaximumHeapSizePercent=70"]
ENV RBCS_CONFIGURATION_DIR="/etc/rbcs"
ENTRYPOINT ["/usr/bin/rbcs-cli", "-XX:MaximumHeapSizePercent=70", "-Dio.netty.tmpdir=/var/tmp/rbcs", "-Dlogback.configurationFile=/etc/rbcs/logback.xml"]
FROM debian:12-slim AS release-jlink
RUN mkdir -p /usr/share/java/rbcs
RUN --mount=type=bind,source=.,target=/build/distributions tar -xf /build/distributions/rbcs-cli*.tar -C /usr/share/java/rbcs
RUN chmod 755 /usr/share/java/rbcs/bin/*
ADD --chmod=755 rbcs-cli.sh /usr/local/bin/rbcs-cli
RUN adduser -u 1000 luser
USER luser
WORKDIR /home/luser
ADD logback.xml .
ENV JAVA_OPTS=-XX:-UseJVMCICompiler\ -Dlogback.configurationFile=logback.xml\ -XX:MaxRAMPercentage=70\ -XX:GCTimeRatio=24\ -XX:+UseZGC\ -XX:+ZGenerational
RUN adduser -u 1000 rbcs
USER rbcs
WORKDIR /var/lib/rbcs
ADD logback.xml /etc/rbcs/logback.xml
ENV RBCS_CONFIGURATION_DIR="/etc/rbcs"
ENTRYPOINT ["/usr/local/bin/rbcs-cli"]
+1
View File
@@ -21,6 +21,7 @@ dependencies {
docker project(path: ':rbcs-cli', configuration: 'release')
docker project(path: ':rbcs-server-memcache', configuration: 'release')
docker project(path: ':rbcs-server-redis', configuration: 'release')
docker project(path: ':rbcs-server-otel', configuration: 'release')
}
Provider<Task> cleanTaskProvider = tasks.named(BasePlugin.CLEAN_TASK_NAME) {}
+2 -2
View File
@@ -2,9 +2,9 @@ org.gradle.configuration-cache=false
org.gradle.parallel=true
org.gradle.caching=true
rbcs.version = 0.4.0
rbcs.version = 0.5.0
lys.version = 2026.03.26
lys.version = 2026.05.27
gitea.maven.url = https://gitea.woggioni.net/api/packages/woggioni/maven
docker.registry.url=gitea.woggioni.net
+1
View File
@@ -11,6 +11,7 @@ dependencies {
api catalog.netty.buffer
api catalog.netty.handler
api catalog.netty.codec.http
api catalog.jetbrains.annotations
}
publishing {
+1 -1
View File
@@ -8,7 +8,7 @@ module net.woggioni.rbcs.api {
requires io.netty.buffer;
requires org.slf4j;
requires java.xml;
requires org.jetbrains.annotations;
exports net.woggioni.rbcs.api;
exports net.woggioni.rbcs.api.exception;
@@ -18,10 +18,10 @@ import java.util.stream.Collectors;
public class Configuration {
String host;
int port;
String serverPath;
boolean proxyProtocolEnabled;
List<Cidr> trustedProxyIPs;
int incomingConnectionsBacklogSize;
String serverPath;
@NonNull
EventExecutor eventExecutor;
@NonNull
@@ -136,6 +136,13 @@ public class Configuration {
TlsCertificateExtractor groupExtractor;
}
@Value
public static class ForwardedClientCertificateAuthentication implements Authentication {
String headerName;
TlsCertificateExtractor userExtractor;
TlsCertificateExtractor groupExtractor;
}
public interface Cache {
CacheHandlerFactory materialize();
String getNamespaceURI();
@@ -161,10 +168,10 @@ public class Configuration {
return new Configuration(
host,
port,
serverPath != null && !serverPath.isEmpty() && !serverPath.equals("/") ? serverPath : null,
proxyProtocolEnabled,
trustedProxyIPs,
incomingConnectionsBacklogSize,
serverPath != null && !serverPath.isEmpty() && !serverPath.equals("/") ? serverPath : null,
eventExecutor,
rateLimiter,
connection,
@@ -0,0 +1,13 @@
package net.woggioni.rbcs.api;
import org.jetbrains.annotations.NotNull;
public interface SpanHandle {
void setAttribute(@NotNull String key, @NotNull String value);
void setAttribute(@NotNull String key, long value);
void setAttribute(@NotNull String key, boolean value);
}
@@ -0,0 +1,18 @@
package net.woggioni.rbcs.api;
import io.netty.channel.ChannelHandler;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.Map;
public interface TelemetryController {
void initialize();
@NotNull ChannelHandler createHandler();
@Nullable SpanHandle startSpan(@NotNull String command);
void endSpan(@Nullable SpanHandle span);
void endSpan(@Nullable SpanHandle span, @NotNull Throwable error);
}
+7 -4
View File
@@ -30,7 +30,6 @@ configurations {
transitive = false
canBeConsumed = true
canBeResolved = true
visible = true
}
configureNativeImageImplementation {
@@ -51,6 +50,7 @@ dependencies {
configureNativeImageImplementation project
configureNativeImageImplementation project(':rbcs-server-memcache')
configureNativeImageImplementation project(':rbcs-server-redis')
// configureNativeImageImplementation project(':rbcs-server-otel')
implementation catalog.jwo
implementation catalog.slf4j.api
@@ -59,9 +59,7 @@ dependencies {
implementation project(':rbcs-client')
implementation project(':rbcs-server')
// runtimeOnly catalog.slf4j.jdk14
runtimeOnly catalog.logback.classic
// runtimeOnly catalog.slf4j.simple
nativeImage project(':rbcs-server-memcache')
nativeImage project(':rbcs-server-redis')
@@ -142,7 +140,12 @@ Provider<JlinkTask> jlinkTaskProvider = tasks.named(JlinkPlugin.JLINK_TASK_NAME,
'net.woggioni.rbcs.server.memcache',
'net.woggioni.rbcs.server.redis',
'ch.qos.logback.classic',
'jdk.crypto.ec'
'jdk.crypto.ec',
// 'io.opentelemetry.api',
// 'io.opentelemetry.instrumentation.netty_4_1',
// 'io.opentelemetry.sdk.autoconfigure',
// 'io.opentelemetry.instrumentation.logback_appender_1_0',
// 'io.opentelemetry.extension.trace.propagation'
]
compressionLevel = 2
stripDebug = false
@@ -7,6 +7,8 @@ import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.concurrent.ExecutionException
import java.util.zip.Deflater
import net.woggioni.rbcs.client.Configuration as ClientConfiguration
import net.woggioni.rbcs.client.impl.Parser as ClientConfigurationParser
import net.woggioni.jwo.NullOutputStream
import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.api.Configuration.User
@@ -16,8 +18,6 @@ import net.woggioni.rbcs.cli.impl.commands.BenchmarkCommand
import net.woggioni.rbcs.cli.impl.commands.GetCommand
import net.woggioni.rbcs.cli.impl.commands.HealthCheckCommand
import net.woggioni.rbcs.cli.impl.commands.PutCommand
import net.woggioni.rbcs.client.Configuration as ClientConfiguration
import net.woggioni.rbcs.client.impl.Parser as ClientConfigurationParser
import net.woggioni.rbcs.common.HostAndPort
import net.woggioni.rbcs.common.PasswordSecurity.hashPassword
import net.woggioni.rbcs.common.RBCS
@@ -120,10 +120,10 @@ object GraalNativeImageConfiguration {
val serverConfiguration = Configuration(
"127.0.0.1",
serverPort,
null,
false,
emptyList(),
100,
null,
Configuration.EventExecutor(true),
Configuration.RateLimiter(
false, 0x100000, 10
+3 -1
View File
@@ -1,11 +1,11 @@
module net.woggioni.rbcs.cli {
requires org.slf4j;
requires net.woggioni.jwo;
requires net.woggioni.rbcs.server;
requires info.picocli;
requires net.woggioni.rbcs.common;
requires net.woggioni.rbcs.client;
requires kotlin.stdlib;
requires net.woggioni.jwo;
requires net.woggioni.rbcs.api;
exports net.woggioni.rbcs.cli.impl.converters to info.picocli;
@@ -14,4 +14,6 @@ module net.woggioni.rbcs.cli {
opens net.woggioni.rbcs.cli to info.picocli, net.woggioni.rbcs.common;
exports net.woggioni.rbcs.cli;
uses net.woggioni.rbcs.api.TelemetryController;
}
@@ -1,6 +1,8 @@
package net.woggioni.rbcs.cli
import net.woggioni.jwo.Application
import net.woggioni.jwo.LoggerController
import net.woggioni.rbcs.api.TelemetryController
import net.woggioni.rbcs.cli.impl.AbstractVersionProvider
import net.woggioni.rbcs.cli.impl.RbcsCommand
import net.woggioni.rbcs.cli.impl.commands.BenchmarkCommand
@@ -10,6 +12,7 @@ import net.woggioni.rbcs.cli.impl.commands.HealthCheckCommand
import net.woggioni.rbcs.cli.impl.commands.PasswordHashCommand
import net.woggioni.rbcs.cli.impl.commands.PutCommand
import net.woggioni.rbcs.cli.impl.commands.ServerCommand
import net.woggioni.rbcs.common.RBCS.loadService
import net.woggioni.rbcs.common.RbcsUrlStreamHandlerFactory
import net.woggioni.rbcs.common.createLogger
import picocli.CommandLine
@@ -61,6 +64,10 @@ class RemoteBuildCacheServerCli : RbcsCommand() {
@JvmStatic
fun main(vararg args: String) {
loadService(TelemetryController::class.java)
.firstOrNull()
?.initialize()
LoggerController.initializeLoggers()
System.exit(createCommandLine().execute(*args))
}
}
@@ -1,5 +1,18 @@
package net.woggioni.rbcs.client
import java.io.IOException
import java.net.InetSocketAddress
import java.net.URI
import java.security.cert.X509Certificate
import java.util.Base64
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger
import javax.net.ssl.TrustManagerFactory
import javax.net.ssl.X509TrustManager
import kotlin.random.Random
import io.netty.util.concurrent.Future as NettyFuture
import io.netty.bootstrap.Bootstrap
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
@@ -34,20 +47,7 @@ import io.netty.handler.timeout.IdleState
import io.netty.handler.timeout.IdleStateEvent
import io.netty.handler.timeout.IdleStateHandler
import io.netty.util.concurrent.Future
import io.netty.util.concurrent.Future as NettyFuture
import io.netty.util.concurrent.GenericFutureListener
import java.io.IOException
import java.net.InetSocketAddress
import java.net.URI
import java.security.cert.X509Certificate
import java.util.Base64
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger
import javax.net.ssl.TrustManagerFactory
import javax.net.ssl.X509TrustManager
import kotlin.random.Random
import net.woggioni.rbcs.api.CacheValueMetadata
import net.woggioni.rbcs.common.RBCS.loadKeystore
import net.woggioni.rbcs.common.createLogger
@@ -1,10 +1,10 @@
package net.woggioni.rbcs.client
import io.netty.util.concurrent.EventExecutorGroup
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit
import kotlin.math.pow
import kotlin.random.Random
import io.netty.util.concurrent.EventExecutorGroup
sealed class OperationOutcome<T> {
class Success<T>(val result: T) : OperationOutcome<T>()
@@ -1,10 +1,10 @@
package net.woggioni.rbcs.client
import io.netty.util.concurrent.DefaultEventExecutorGroup
import io.netty.util.concurrent.EventExecutorGroup
import java.util.concurrent.CompletableFuture
import java.util.stream.Stream
import kotlin.random.Random
import io.netty.util.concurrent.DefaultEventExecutorGroup
import io.netty.util.concurrent.EventExecutorGroup
import net.woggioni.rbcs.common.contextLogger
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.extension.ExtensionContext
@@ -1,7 +1,7 @@
package net.woggioni.rbcs.common
import io.netty.buffer.ByteBuf
import java.io.InputStream
import io.netty.buffer.ByteBuf
class ByteBufInputStream(private val buf : ByteBuf) : InputStream() {
override fun read(): Int {
@@ -1,7 +1,7 @@
package net.woggioni.rbcs.common
import io.netty.buffer.ByteBuf
import java.io.OutputStream
import io.netty.buffer.ByteBuf
class ByteBufOutputStream(private val buf : ByteBuf) : OutputStream() {
override fun write(b: Int) {
@@ -1,18 +1,20 @@
package net.woggioni.rbcs.common
import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext
import java.nio.file.Files
import java.nio.file.Path
import java.util.logging.LogManager
import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext
import net.woggioni.jwo.LoggerController
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.slf4j.MDC
import org.slf4j.event.Level
import org.slf4j.spi.LoggingEventBuilder
inline fun <reified T> T.contextLogger() = LoggerFactory.getLogger(T::class.java)
inline fun <reified T> createLogger() = LoggerFactory.getLogger(T::class.java)
fun <T> lazyLogger(cls: Class<T>) = LoggerController.lazyLogger(cls)
inline fun <reified T> T.contextLogger() = lazyLogger(T::class.java)
inline fun <reified T> createLogger() = lazyLogger(T::class.java)
inline fun Logger.traceParam(messageBuilder: () -> Pair<String, Array<Any>>) {
if (isTraceEnabled) {
@@ -17,6 +17,7 @@ import java.security.cert.PKIXParameters
import java.security.cert.PKIXRevocationChecker
import java.security.cert.X509Certificate
import java.util.EnumSet
import java.util.ServiceLoader
import javax.net.ssl.TrustManagerFactory
import javax.net.ssl.X509TrustManager
import net.woggioni.jwo.JWO
@@ -164,4 +165,10 @@ object RBCS {
.single() as X509TrustManager
}
}
inline fun <T, reified U> U.loadService(serviceClass : Class<T>): Sequence<T> {
return (U::class.java.module.layer?.let { layer ->
ServiceLoader.load(layer, serviceClass)
} ?: ServiceLoader.load(serviceClass)).asSequence()
}
}
@@ -16,13 +16,13 @@ import javax.xml.transform.stream.StreamResult
import javax.xml.transform.stream.StreamSource
import javax.xml.validation.Schema
import javax.xml.validation.SchemaFactory
import org.xml.sax.ErrorHandler as ErrHandler
import net.woggioni.jwo.JWO
import org.slf4j.event.Level
import org.w3c.dom.Document
import org.w3c.dom.Element
import org.w3c.dom.Node
import org.w3c.dom.NodeList
import org.xml.sax.ErrorHandler as ErrHandler
import org.xml.sax.SAXNotRecognizedException
import org.xml.sax.SAXNotSupportedException
import org.xml.sax.SAXParseException
@@ -1,4 +1,5 @@
import net.woggioni.rbcs.api.CacheProvider;
import net.woggioni.rbcs.api.TelemetryController;
module net.woggioni.rbcs.server.memcache {
requires net.woggioni.rbcs.common;
@@ -16,5 +17,7 @@ module net.woggioni.rbcs.server.memcache {
provides CacheProvider with net.woggioni.rbcs.server.memcache.MemcacheCacheProvider;
uses TelemetryController;
opens net.woggioni.rbcs.server.memcache.schema;
}
@@ -1,15 +1,15 @@
package net.woggioni.rbcs.server.memcache
import io.netty.channel.ChannelFactory
import io.netty.channel.EventLoopGroup
import io.netty.channel.pool.FixedChannelPool
import io.netty.channel.socket.DatagramChannel
import io.netty.channel.socket.SocketChannel
import java.time.Duration
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
import io.netty.channel.ChannelFactory
import io.netty.channel.EventLoopGroup
import io.netty.channel.pool.FixedChannelPool
import io.netty.channel.socket.DatagramChannel
import io.netty.channel.socket.SocketChannel
import net.woggioni.rbcs.api.CacheHandler
import net.woggioni.rbcs.api.CacheHandlerFactory
import net.woggioni.rbcs.api.Configuration
@@ -1,18 +1,5 @@
package net.woggioni.rbcs.server.memcache
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.CompositeByteBuf
import io.netty.channel.Channel as NettyChannel
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.memcache.DefaultLastMemcacheContent
import io.netty.handler.codec.memcache.DefaultMemcacheContent
import io.netty.handler.codec.memcache.LastMemcacheContent
import io.netty.handler.codec.memcache.MemcacheContent
import io.netty.handler.codec.memcache.binary.BinaryMemcacheOpcodes
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus
import io.netty.handler.codec.memcache.binary.DefaultBinaryMemcacheRequest
import java.io.ByteArrayOutputStream
import java.io.ObjectInputStream
import java.io.ObjectOutputStream
@@ -22,12 +9,27 @@ import java.nio.channels.FileChannel
import java.nio.channels.ReadableByteChannel
import java.nio.file.Files
import java.nio.file.StandardOpenOption
import java.net.InetSocketAddress
import java.time.Duration
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicReference
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.InflaterOutputStream
import io.netty.channel.Channel as NettyChannel
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.CompositeByteBuf
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.memcache.DefaultLastMemcacheContent
import io.netty.handler.codec.memcache.DefaultMemcacheContent
import io.netty.handler.codec.memcache.LastMemcacheContent
import io.netty.handler.codec.memcache.MemcacheContent
import io.netty.handler.codec.memcache.binary.BinaryMemcacheOpcodes
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus
import io.netty.handler.codec.memcache.binary.DefaultBinaryMemcacheRequest
import net.woggioni.rbcs.api.CacheHandler
import net.woggioni.rbcs.api.CacheValueMetadata
import net.woggioni.rbcs.api.exception.ContentTooLargeException
@@ -39,8 +41,11 @@ import net.woggioni.rbcs.api.message.CacheMessage.CachePutResponse
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueFoundResponse
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse
import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent
import net.woggioni.rbcs.api.SpanHandle
import net.woggioni.rbcs.api.TelemetryController
import net.woggioni.rbcs.common.ByteBufInputStream
import net.woggioni.rbcs.common.ByteBufOutputStream
import net.woggioni.rbcs.common.RBCS.loadService
import net.woggioni.rbcs.common.RBCS.processCacheKey
import net.woggioni.rbcs.common.RBCS.toIntOrNull
import net.woggioni.rbcs.common.createLogger
@@ -70,6 +75,10 @@ class MemcacheCacheHandler(
}
}
private val telemetryController by lazy {
loadService(TelemetryController::class.java).firstOrNull()
}
private interface InProgressRequest {
}
@@ -153,7 +162,9 @@ class MemcacheCacheHandler(
metadata: CacheValueMetadata,
val digest: ByteBuf,
val requestController: CompletableFuture<MemcacheRequestController>,
private val alloc: ByteBufAllocator
private val alloc: ByteBufAllocator,
val entryKey: String,
val memcacheSpanRef: AtomicReference<SpanHandle?>,
) : InProgressRequest {
private var totalSize = 0
private var tmpFile: FileChannel? = null
@@ -251,6 +262,17 @@ class MemcacheCacheHandler(
val key = ctx.alloc().buffer().also {
it.writeBytes(processCacheKey(msg.key, keyPrefix, digestAlgorithm))
}
val memcacheSpan = telemetryController?.startSpan("GET")?.apply {
setAttribute("db.system", "memcache")
setAttribute("db.operation.name", "GET")
val remoteAddr = ctx.channel().remoteAddress()
if (remoteAddr is InetSocketAddress) {
remoteAddr.hostString?.let {
setAttribute("server.address", it)
}
setAttribute("server.port", remoteAddr.port.toLong())
}
}
val responseHandler = object : MemcacheResponseHandler {
override fun responseReceived(response: BinaryMemcacheResponse) {
val status = response.status()
@@ -266,8 +288,15 @@ class MemcacheCacheHandler(
log.debug(ctx) {
"Cache miss for key ${msg.key} on memcache"
}
telemetryController?.endSpan(memcacheSpan)
sendMessageAndFlush(ctx, CacheValueNotFoundResponse(msg.key))
}
else -> {
val ex = MemcacheException(status)
telemetryController?.endSpan(memcacheSpan, ex)
this@MemcacheCacheHandler.exceptionCaught(ctx, ex)
}
}
}
@@ -282,11 +311,13 @@ class MemcacheCacheHandler(
if (content is LastMemcacheContent) {
inProgressRequest = null
inProgressGetRequest.commit()
telemetryController?.endSpan(memcacheSpan)
}
}
}
override fun exceptionCaught(ex: Throwable) {
telemetryController?.endSpan(memcacheSpan, ex)
(inProgressRequest as? InProgressGetRequest).let { inProgressGetRequest ->
inProgressGetRequest?.let {
inProgressRequest = null
@@ -312,6 +343,7 @@ class MemcacheCacheHandler(
val key = ctx.alloc().buffer().also {
it.writeBytes(processCacheKey(msg.key, keyPrefix, digestAlgorithm))
}
val memcacheSpanRef = AtomicReference<SpanHandle?>(null)
val responseHandler = object : MemcacheResponseHandler {
override fun responseReceived(response: BinaryMemcacheResponse) {
val status = response.status()
@@ -320,16 +352,22 @@ class MemcacheCacheHandler(
log.debug(ctx) {
"Inserted key ${msg.key} into memcache"
}
telemetryController?.endSpan(memcacheSpanRef.get())
sendMessageAndFlush(ctx, CachePutResponse(msg.key))
}
else -> this@MemcacheCacheHandler.exceptionCaught(ctx, MemcacheException(status))
else -> {
val ex = MemcacheException(status)
telemetryController?.endSpan(memcacheSpanRef.get(), ex)
this@MemcacheCacheHandler.exceptionCaught(ctx, ex)
}
}
}
override fun contentReceived(content: MemcacheContent) {}
override fun exceptionCaught(ex: Throwable) {
telemetryController?.endSpan(memcacheSpanRef.get(), ex)
this@MemcacheCacheHandler.exceptionCaught(ctx, ex)
}
}
@@ -339,7 +377,7 @@ class MemcacheCacheHandler(
this@MemcacheCacheHandler.exceptionCaught(ctx, ex)
}
}
inProgressRequest = InProgressPutRequest(ctx.channel(), msg.metadata, key, requestController, ctx.alloc())
inProgressRequest = InProgressPutRequest(ctx.channel(), msg.metadata, key, requestController, ctx.alloc(), msg.key, memcacheSpanRef)
}
private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) {
@@ -362,22 +400,41 @@ class MemcacheCacheHandler(
val request = inProgressRequest
when (request) {
is InProgressPutRequest -> {
val putRequest = request
inProgressRequest = null
log.trace(ctx) {
"Received last chunk of ${msg.content().readableBytes()} bytes for memcache"
}
request.write(msg.content())
val key = request.digest.retainedDuplicate()
val (payloadSize, payloadSource) = request.commit()
putRequest.write(msg.content())
val memcacheSpan = telemetryController?.startSpan("SET",
)?.apply {
setAttribute("db.system", "memcache")
setAttribute("db.operation.name", "SET")
val remoteAddr = ctx.channel().remoteAddress()
if (remoteAddr is InetSocketAddress) {
remoteAddr.hostString?.let {
setAttribute("server.address", it)
}
setAttribute("server.port", remoteAddr.port.toLong())
}
}
putRequest.memcacheSpanRef.set(memcacheSpan)
val key = putRequest.digest.retainedDuplicate()
val (payloadSize, payloadSource) = putRequest.commit()
val extras = ctx.alloc().buffer(8, 8)
extras.writeInt(0)
extras.writeInt(encodeExpiry(maxAge))
val totalBodyLength = request.digest.readableBytes() + extras.readableBytes() + payloadSize
val totalBodyLength = putRequest.digest.readableBytes() + extras.readableBytes() + payloadSize
log.trace(ctx) {
"Trying to send SET request to memcache"
}
request.requestController.whenComplete { requestController, ex ->
putRequest.requestController.whenComplete { requestController, ex ->
if (ex == null) {
val remoteAddr = requestController.channel.remoteAddress()
if (remoteAddr is InetSocketAddress) {
remoteAddr.hostString?.let { memcacheSpan?.setAttribute("server.address", it) }
memcacheSpan?.setAttribute("server.port", remoteAddr.port.toLong())
}
log.trace(ctx) {
"Sending SET request to memcache"
}
@@ -1,6 +1,11 @@
package net.woggioni.rbcs.server.memcache.client
import java.io.IOException
import java.net.InetSocketAddress
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import io.netty.util.concurrent.Future as NettyFuture
import io.netty.bootstrap.Bootstrap
import io.netty.buffer.ByteBuf
import io.netty.channel.Channel
@@ -20,12 +25,7 @@ import io.netty.handler.codec.memcache.MemcacheObject
import io.netty.handler.codec.memcache.binary.BinaryMemcacheClientCodec
import io.netty.handler.codec.memcache.binary.BinaryMemcacheRequest
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
import io.netty.util.concurrent.Future as NettyFuture
import io.netty.util.concurrent.GenericFutureListener
import java.io.IOException
import java.net.InetSocketAddress
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import net.woggioni.rbcs.common.HostAndPort
import net.woggioni.rbcs.common.createLogger
import net.woggioni.rbcs.common.trace
@@ -147,6 +147,8 @@ class MemcacheClient(
channel.pipeline().addLast(handler)
response.complete(object : MemcacheRequestController {
override val channel: Channel = channel
private var channelReleased = false
override fun sendRequest(request: BinaryMemcacheRequest) {
@@ -1,10 +1,13 @@
package net.woggioni.rbcs.server.memcache.client
import io.netty.channel.Channel
import io.netty.handler.codec.memcache.MemcacheContent
import io.netty.handler.codec.memcache.binary.BinaryMemcacheRequest
interface MemcacheRequestController {
val channel: Channel
fun sendRequest(request : BinaryMemcacheRequest)
fun sendContent(content : MemcacheContent)
@@ -1,11 +1,11 @@
package net.woggioni.rbcs.server.memcache.client
import io.netty.buffer.ByteBufUtil
import io.netty.buffer.Unpooled
import java.io.ByteArrayInputStream
import java.nio.ByteBuffer
import java.nio.channels.Channels
import kotlin.random.Random
import io.netty.buffer.ByteBufUtil
import io.netty.buffer.Unpooled
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
+81
View File
@@ -0,0 +1,81 @@
plugins {
id 'java-library'
id 'maven-publish'
alias catalog.plugins.kotlin.jvm
}
configurations {
bundle {
canBeResolved = false
canBeConsumed = false
transitive = true
}
filteredBundle {
canBeResolved = true
canBeConsumed = false
transitive = true
extendsFrom bundle
resolutionStrategy {
dependencies {
exclude group: 'org.slf4j', module: 'slf4j-api'
exclude group: 'org.jetbrains.kotlin', module: 'kotlin-stdlib'
exclude group: 'org.jetbrains', module: 'annotations'
}
}
}
release {
transitive = false
canBeConsumed = true
canBeResolved = true
}
compileOnly {
extendsFrom bundle
}
}
dependencies {
compileOnly project(':rbcs-common')
compileOnly project(':rbcs-api')
compileOnly catalog.netty.transport
compileOnly catalog.slf4j.api
compileOnly catalog.kotlin.stdlib.jdk8
compileOnly catalog.logback.core
compileOnly catalog.logback.classic
bundle catalog.opentelemetry.netty.'4'.'1'
bundle catalog.opentelemetry.sdk.extension.autoconfigure
bundle catalog.opentelemetry.logback.appender.'1'.'0'
bundle catalog.opentelemetry.logback.mdc.'1'.'0'
bundle catalog.opentelemetry.extension.trace.propagators
bundle catalog.opentelemetry.exporter.otlp
bundle catalog.opentelemetry.runtime.telemetry
}
Provider<Tar> bundleTask = tasks.register("bundle", Tar) {
from(tasks.named(JavaPlugin.JAR_TASK_NAME))
from(configurations.filteredBundle)
group = BasePlugin.BUILD_GROUP
}
tasks.named(BasePlugin.ASSEMBLE_TASK_NAME) {
dependsOn(bundleTask)
}
artifacts {
release(bundleTask)
}
publishing {
publications {
maven(MavenPublication) {
artifact bundleTask
}
}
}
@@ -0,0 +1,20 @@
module net.woggioni.rbcs.server.otel {
requires net.woggioni.rbcs.common;
requires kotlin.stdlib;
requires io.netty.transport;
requires io.netty.common;
requires io.netty.buffer;
requires org.slf4j;
requires ch.qos.logback.core;
requires ch.qos.logback.classic;
requires io.opentelemetry.api;
requires io.opentelemetry.sdk.autoconfigure;
requires io.opentelemetry.instrumentation.runtime_telemetry;
requires io.opentelemetry.instrumentation.netty_4_1;
requires io.opentelemetry.instrumentation.logback_appender_1_0;
requires io.opentelemetry.extension.trace.propagation;
requires net.woggioni.rbcs.api;
provides net.woggioni.rbcs.api.TelemetryController with net.woggioni.rbcs.server.otel.OtelController;
}
@@ -0,0 +1,65 @@
package net.woggioni.rbcs.server.otel
import io.netty.channel.ChannelHandler
import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.api.trace.StatusCode
import io.opentelemetry.instrumentation.logback.appender.v1_0.OpenTelemetryAppender
import io.opentelemetry.instrumentation.netty.v4_1.NettyServerTelemetry
import io.opentelemetry.instrumentation.runtimetelemetry.RuntimeTelemetry
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk
import net.woggioni.rbcs.api.SpanHandle
import net.woggioni.rbcs.api.TelemetryController
import net.woggioni.rbcs.common.createLogger
import net.woggioni.rbcs.common.info
class OtelController : TelemetryController {
private val log = createLogger<OtelController>()
private val tracer by lazy {
GlobalOpenTelemetry.getTracer("net.woggioni.rbcs.server.redis", "0.5.0")
}
override fun initialize() {
log.info { "Initializing OpenTelemetry SDK with auto-configuration" }
val sdk = AutoConfiguredOpenTelemetrySdk.builder()
.setResultAsGlobal()
.build()
.openTelemetrySdk
RuntimeTelemetry.create(sdk)
runCatching {
OpenTelemetryAppender.install(sdk)
log.info { "OpenTelemetry logback appender installed" }
}.onFailure { ex ->
val msg = ex.localizedMessage ?: ex.javaClass.name
log.info { "Failed to install OpenTelemetry logback appender: $msg" }
}
log.info { "OpenTelemetry SDK initialized successfully" }
}
override fun createHandler(): ChannelHandler {
return NettyServerTelemetry.create(GlobalOpenTelemetry.get()).createCombinedHandler()
}
override fun startSpan(name: String): SpanHandle {
val span = tracer.spanBuilder(name)
.setSpanKind(SpanKind.CLIENT)
.startSpan()
return OtelSpanHandle(span)
}
override fun endSpan(span: SpanHandle?) {
(span as? OtelSpanHandle)?.delegate?.end()
}
override fun endSpan(span: SpanHandle?, error: Throwable) {
val s = (span as? OtelSpanHandle)?.delegate ?: return
s.recordException(error)
s.setStatus(StatusCode.ERROR)
s.end()
}
}
@@ -0,0 +1,21 @@
package net.woggioni.rbcs.server.otel
import io.opentelemetry.api.trace.Span
import net.woggioni.rbcs.api.SpanHandle
internal class OtelSpanHandle(
val delegate: Span,
) : SpanHandle {
override fun setAttribute(key: String, value: String) {
delegate.setAttribute(key, value)
}
override fun setAttribute(key: String, value: Long) {
delegate.setAttribute(key, value)
}
override fun setAttribute(key: String, value: Boolean) {
delegate.setAttribute(key, value)
}
}
@@ -1,4 +1,5 @@
import net.woggioni.rbcs.api.CacheProvider;
import net.woggioni.rbcs.api.TelemetryController;
module net.woggioni.rbcs.server.redis {
requires net.woggioni.rbcs.common;
@@ -16,5 +17,7 @@ module net.woggioni.rbcs.server.redis {
provides CacheProvider with net.woggioni.rbcs.server.redis.RedisCacheProvider;
uses TelemetryController;
opens net.woggioni.rbcs.server.redis.schema;
}
@@ -1,17 +1,15 @@
package net.woggioni.rbcs.server.redis
import io.netty.channel.ChannelFactory
import io.netty.channel.EventLoopGroup
import io.netty.channel.pool.FixedChannelPool
import io.netty.channel.socket.DatagramChannel
import io.netty.channel.socket.SocketChannel
import java.time.Duration
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
import io.netty.channel.ChannelFactory
import io.netty.channel.EventLoopGroup
import io.netty.channel.pool.FixedChannelPool
import io.netty.channel.socket.DatagramChannel
import io.netty.channel.socket.SocketChannel
import net.woggioni.rbcs.api.CacheHandler
import net.woggioni.rbcs.api.CacheHandlerFactory
import net.woggioni.rbcs.api.Configuration
@@ -1,19 +1,9 @@
package net.woggioni.rbcs.server.redis
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.CompositeByteBuf
import io.netty.channel.Channel as NettyChannel
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.redis.ArrayRedisMessage
import io.netty.handler.codec.redis.ErrorRedisMessage
import io.netty.handler.codec.redis.FullBulkStringRedisMessage
import io.netty.handler.codec.redis.RedisMessage
import io.netty.handler.codec.redis.SimpleStringRedisMessage
import java.io.ByteArrayOutputStream
import java.io.ObjectInputStream
import java.io.ObjectOutputStream
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.Channels
import java.nio.channels.FileChannel
@@ -25,7 +15,16 @@ import java.time.Duration
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.InflaterOutputStream
import io.netty.channel.Channel as NettyChannel
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.CompositeByteBuf
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.redis.ArrayRedisMessage
import io.netty.handler.codec.redis.ErrorRedisMessage
import io.netty.handler.codec.redis.FullBulkStringRedisMessage
import io.netty.handler.codec.redis.RedisMessage
import io.netty.handler.codec.redis.SimpleStringRedisMessage
import net.woggioni.rbcs.api.CacheHandler
import net.woggioni.rbcs.api.CacheValueMetadata
import net.woggioni.rbcs.api.exception.ContentTooLargeException
@@ -37,8 +36,10 @@ import net.woggioni.rbcs.api.message.CacheMessage.CachePutResponse
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueFoundResponse
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse
import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent
import net.woggioni.rbcs.api.TelemetryController
import net.woggioni.rbcs.common.ByteBufInputStream
import net.woggioni.rbcs.common.ByteBufOutputStream
import net.woggioni.rbcs.common.RBCS.loadService
import net.woggioni.rbcs.common.RBCS.processCacheKey
import net.woggioni.rbcs.common.RBCS.toIntOrNull
import net.woggioni.rbcs.common.createLogger
@@ -62,6 +63,10 @@ class RedisCacheHandler(
private val log = createLogger<RedisCacheHandler>()
}
private val telemetryController by lazy {
loadService(TelemetryController::class.java).firstOrNull()
}
private interface InProgressRequest
private inner class InProgressGetRequest(
@@ -244,6 +249,17 @@ class RedisCacheHandler(
}
val keyBytes = processCacheKey(msg.key, keyPrefix, digestAlgorithm)
val keyString = String(keyBytes, StandardCharsets.UTF_8)
val redisSpan = telemetryController?.startSpan("GET")?.apply {
setAttribute("db.system", "redis")
setAttribute("db.operation.name", "GET")
val remoteAddr = ctx.channel().remoteAddress()
if (remoteAddr is InetSocketAddress) {
remoteAddr.hostString?.let {
setAttribute("server.address", it)
}
setAttribute("server.port", remoteAddr.port.toLong())
}
}
val responseHandler = object : RedisResponseHandler {
override fun responseReceived(response: RedisMessage) {
when (response) {
@@ -252,11 +268,13 @@ class RedisCacheHandler(
log.debug(ctx) {
"Cache miss for key ${msg.key} on Redis"
}
telemetryController?.endSpan(redisSpan)
sendMessageAndFlush(ctx, CacheValueNotFoundResponse(msg.key))
} else {
log.debug(ctx) {
"Cache hit for key ${msg.key} on Redis"
}
telemetryController?.endSpan(redisSpan)
val getRequest = InProgressGetRequest(msg.key, ctx)
inProgressRequest = getRequest
getRequest.processResponse(response.content())
@@ -265,25 +283,32 @@ class RedisCacheHandler(
}
is ErrorRedisMessage -> {
this@RedisCacheHandler.exceptionCaught(
ctx, RedisException("Redis error for GET ${msg.key}: ${response.content()}")
)
val ex = RedisException("Redis error for GET ${msg.key}: ${response.content()}")
telemetryController?.endSpan(redisSpan, ex)
this@RedisCacheHandler.exceptionCaught(ctx, ex)
}
else -> {
log.warn(ctx) {
"Unexpected response type from Redis for key ${msg.key}: ${response.javaClass.name}"
}
telemetryController?.endSpan(redisSpan)
sendMessageAndFlush(ctx, CacheValueNotFoundResponse(msg.key))
}
}
}
override fun exceptionCaught(ex: Throwable) {
telemetryController?.endSpan(redisSpan, ex)
this@RedisCacheHandler.exceptionCaught(ctx, ex)
}
}
client.sendCommand(keyBytes, ctx.alloc(), responseHandler).thenAccept { channel ->
val remoteAddr = channel.remoteAddress()
if (remoteAddr is InetSocketAddress) {
remoteAddr.hostString?.let { redisSpan?.setAttribute("server.address", it) }
redisSpan?.setAttribute("server.port", remoteAddr.port.toLong())
}
log.trace(ctx) {
"Sending GET request for key ${msg.key} to Redis"
}
@@ -344,6 +369,18 @@ class RedisCacheHandler(
val expirySeconds = maxAge.toSeconds().toString()
val redisSpan = telemetryController?.startSpan("SET")?.apply {
setAttribute("db.system", "redis")
setAttribute("db.operation.name", "SET")
val remoteAddr = ctx.channel().remoteAddress()
if (remoteAddr is InetSocketAddress) {
remoteAddr.hostString?.let {
setAttribute("server.address", it)
}
setAttribute("server.port", remoteAddr.port.toLong())
}
}
val responseHandler = object : RedisResponseHandler {
override fun responseReceived(response: RedisMessage) {
when (response) {
@@ -351,30 +388,37 @@ class RedisCacheHandler(
log.debug(ctx) {
"Inserted key ${request.keyString} into Redis"
}
telemetryController?.endSpan(redisSpan)
sendMessageAndFlush(ctx, CachePutResponse(request.keyString))
}
is ErrorRedisMessage -> {
this@RedisCacheHandler.exceptionCaught(
ctx, RedisException("Redis error for SET ${request.keyString}: ${response.content()}")
)
val ex = RedisException("Redis error for SET ${request.keyString}: ${response.content()}")
telemetryController?.endSpan(redisSpan, ex)
this@RedisCacheHandler.exceptionCaught(ctx, ex)
}
else -> {
this@RedisCacheHandler.exceptionCaught(
ctx, RedisException("Unexpected response for SET ${request.keyString}: ${response.javaClass.name}")
)
val ex = RedisException("Unexpected response for SET ${request.keyString}: ${response.javaClass.name}")
telemetryController?.endSpan(redisSpan, ex)
this@RedisCacheHandler.exceptionCaught(ctx, ex)
}
}
}
override fun exceptionCaught(ex: Throwable) {
telemetryController?.endSpan(redisSpan, ex)
this@RedisCacheHandler.exceptionCaught(ctx, ex)
}
}
// Use a ByteBuf key for server selection
client.sendCommand(keyBytes, ctx.alloc(), responseHandler).thenAccept { channel ->
val remoteAddr = channel.remoteAddress()
if (remoteAddr is InetSocketAddress) {
remoteAddr.hostString?.let { redisSpan?.setAttribute("server.address", it) }
redisSpan?.setAttribute("server.port", remoteAddr.port.toLong())
}
log.trace(ctx) {
"Sending SET request to Redis"
}
@@ -2,7 +2,6 @@ package net.woggioni.rbcs.server.redis
import java.time.Duration
import java.time.temporal.ChronoUnit
import net.woggioni.rbcs.api.CacheProvider
import net.woggioni.rbcs.api.exception.ConfigurationException
import net.woggioni.rbcs.common.HostAndPort
@@ -10,7 +9,6 @@ import net.woggioni.rbcs.common.RBCS
import net.woggioni.rbcs.common.Xml
import net.woggioni.rbcs.common.Xml.Companion.asIterable
import net.woggioni.rbcs.common.Xml.Companion.renderAttribute
import org.w3c.dom.Document
import org.w3c.dom.Element
@@ -1,5 +1,11 @@
package net.woggioni.rbcs.server.redis.client
import java.io.IOException
import java.net.InetSocketAddress
import java.nio.charset.StandardCharsets
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import io.netty.util.concurrent.Future as NettyFuture
import io.netty.bootstrap.Bootstrap
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.Unpooled
@@ -22,15 +28,7 @@ import io.netty.handler.codec.redis.RedisBulkStringAggregator
import io.netty.handler.codec.redis.RedisDecoder
import io.netty.handler.codec.redis.RedisEncoder
import io.netty.handler.codec.redis.RedisMessage
import io.netty.util.concurrent.Future as NettyFuture
import io.netty.util.concurrent.GenericFutureListener
import java.io.IOException
import java.net.InetSocketAddress
import java.nio.charset.StandardCharsets
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import net.woggioni.rbcs.common.HostAndPort
import net.woggioni.rbcs.common.createLogger
import net.woggioni.rbcs.common.trace
+5
View File
@@ -13,6 +13,11 @@ dependencies {
implementation catalog.netty.buffer
implementation catalog.netty.transport
implementation catalog.netty.codec.haproxy
compileOnly catalog.opentelemetry.netty['4']['1']
compileOnly catalog.opentelemetry.sdk.extension.autoconfigure
compileOnly catalog.opentelemetry.logback.appender['1']['0']
compileOnly catalog.opentelemetry.extension.trace.propagators
compileOnly catalog.logback.classic
api project(':rbcs-common')
api project(':rbcs-api')
@@ -26,5 +26,6 @@ module net.woggioni.rbcs.server {
uses CacheProvider;
uses net.woggioni.rbcs.api.TelemetryController;
provides CacheProvider with FileSystemCacheProvider, InMemoryCacheProvider;
}
@@ -1,5 +1,23 @@
package net.woggioni.rbcs.server
import java.io.OutputStream
import java.net.InetSocketAddress
import java.nio.file.Files
import java.nio.file.Path
import java.security.PrivateKey
import java.security.cert.X509Certificate
import java.time.Duration
import java.time.Instant
import java.util.Arrays
import java.util.Base64
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.regex.Matcher
import java.util.regex.Pattern
import javax.naming.ldap.LdapName
import javax.net.ssl.SSLPeerUnverifiedException
import io.netty.bootstrap.ServerBootstrap
import io.netty.buffer.ByteBuf
import io.netty.channel.Channel
@@ -37,32 +55,16 @@ import io.netty.handler.timeout.IdleStateEvent
import io.netty.handler.timeout.IdleStateHandler
import io.netty.util.AttributeKey
import io.netty.util.concurrent.EventExecutorGroup
import java.io.OutputStream
import java.net.InetSocketAddress
import java.nio.file.Files
import java.nio.file.Path
import java.security.PrivateKey
import java.security.cert.X509Certificate
import java.time.Duration
import java.time.Instant
import java.util.Arrays
import java.util.Base64
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.regex.Matcher
import java.util.regex.Pattern
import javax.naming.ldap.LdapName
import javax.net.ssl.SSLPeerUnverifiedException
import net.woggioni.rbcs.api.AsyncCloseable
import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.api.TelemetryController
import net.woggioni.rbcs.api.exception.ConfigurationException
import net.woggioni.rbcs.common.Cidr
import net.woggioni.rbcs.common.PasswordSecurity.decodePasswordHash
import net.woggioni.rbcs.common.PasswordSecurity.hashPassword
import net.woggioni.rbcs.common.RBCS.getTrustManager
import net.woggioni.rbcs.common.RBCS.loadKeystore
import net.woggioni.rbcs.common.RBCS.loadService
import net.woggioni.rbcs.common.RBCS.toUrl
import net.woggioni.rbcs.common.Xml
import net.woggioni.rbcs.common.createLogger
@@ -149,12 +151,68 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
((user?.groups ?: emptySet()).asSequence() + sequenceOf(group).filterNotNull()).toSet()
AuthenticationResult(user, allGroups)
} ?: anonymousUserGroups?.let { AuthenticationResult(null, it) }
} catch (es: SSLPeerUnverifiedException) {
} catch (ex: SSLPeerUnverifiedException) {
log.debug(ctx) {
ex.message ?: "Error witch client certificate authentication"
}
anonymousUserGroups?.let { AuthenticationResult(null, it) }
}
}
}
@Sharable
private class ForwardedClientCertificateAuthenticator(
authorizer: Authorizer,
private val anonymousUserGroups: Set<Configuration.Group>?,
private val subjectDnUserExtractor: SubjectDnExtractor?,
private val subjectDnGroupExtractor: SubjectDnExtractor?,
private val headerName: String,
private val trustedProxyIPs: List<Cidr>,
private val users: Map<String, Configuration.User>,
private val groups: Map<String, Configuration.Group>,
) : AbstractNettyHttpAuthenticator(authorizer) {
companion object {
private val log = createLogger<ForwardedClientCertificateAuthenticator>()
}
override fun authenticate(ctx: ChannelHandlerContext, req: HttpRequest): AuthenticationResult? {
val clientIp = ctx.channel().attr(clientIp).get()
if (clientIp == null || trustedProxyIPs.none { it.contains(clientIp.address) }) {
log.debug(ctx) {
"Rejecting forwarded client certificate authentication from untrusted address: $clientIp"
}
return null
}
val subjectDn = req.headers()[headerName]
?: return anonymousUserGroups?.let { AuthenticationResult(null, it) }
val ldapName = try {
LdapName(subjectDn)
} catch (e: Exception) {
log.debug(ctx) {
"Invalid subject DN in header $headerName: $subjectDn"
}
return anonymousUserGroups?.let { AuthenticationResult(null, it) }
}
val user = subjectDnUserExtractor?.extract(ldapName)?.let { userName ->
users[userName] ?: throw RuntimeException("Failed to extract user '$userName'")
}
val group = subjectDnGroupExtractor?.extract(ldapName)?.let { groupName ->
groups[groupName] ?: throw RuntimeException("Failed to extract group '$groupName'")
}
val allGroups = ((user?.groups ?: emptySet()).asSequence() + sequenceOf(group).filterNotNull()).toSet()
return AuthenticationResult(user, allGroups)
}
}
private data class SubjectDnExtractor(val rdnType: String, val pattern: Pattern) {
fun extract(ldapName: LdapName): String? {
return ldapName.rdns.find { it.type == rdnType }
?.let { pattern.matcher(it.value.toString()) }
?.takeIf(Matcher::matches)?.group(1)
}
}
@Sharable
private class NettyHttpBasicAuthenticator(
private val users: Map<String, Configuration.User>, authorizer: Authorizer
@@ -261,6 +319,23 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
)
}
is Configuration.ForwardedClientCertificateAuthentication -> {
ForwardedClientCertificateAuthenticator(
RoleAuthorizer(),
cfg.users[""]?.groups,
auth.userExtractor?.let { extractor ->
SubjectDnExtractor(extractor.rdnType, Pattern.compile(extractor.pattern))
},
auth.groupExtractor?.let { extractor ->
SubjectDnExtractor(extractor.rdnType, Pattern.compile(extractor.pattern))
},
auth.headerName,
cfg.trustedProxyIPs,
cfg.users,
cfg.groups,
)
}
else -> null
}
@@ -358,6 +433,10 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
maxChunkSize = cfg.connection.chunkSize
}
pipeline.addLast(HttpServerCodec(httpDecoderConfig))
loadService(TelemetryController::class.java)
.firstOrNull()
?.createHandler()
?.let { pipeline.addLast(it) }
pipeline.addLast(ReadTriggerDuplexHandler.NAME, ReadTriggerDuplexHandler())
pipeline.addLast(MaxRequestSizeHandler.NAME, MaxRequestSizeHandler(cfg.connection.maxRequestSize))
pipeline.addLast(HttpChunkContentCompressor(1024))
@@ -1,5 +1,6 @@
package net.woggioni.rbcs.server.auth
import java.net.InetSocketAddress
import io.netty.buffer.Unpooled
import io.netty.channel.ChannelFutureListener
import io.netty.channel.ChannelHandlerContext
@@ -15,12 +16,15 @@ import io.netty.util.ReferenceCountUtil
import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.api.Configuration.Group
import net.woggioni.rbcs.api.Role
import net.woggioni.rbcs.common.createLogger
import net.woggioni.rbcs.common.debug
import net.woggioni.rbcs.server.RemoteBuildCacheServer
abstract class AbstractNettyHttpAuthenticator(private val authorizer: Authorizer) : ChannelInboundHandlerAdapter() {
companion object {
private val log = createLogger<AbstractNettyHttpAuthenticator>()
private val AUTHENTICATION_FAILED: FullHttpResponse = DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.UNAUTHORIZED, Unpooled.EMPTY_BUFFER
).apply {
@@ -53,6 +57,18 @@ abstract class AbstractNettyHttpAuthenticator(private val authorizer: Authorizer
result.groups.asSequence().flatMap { it.roles.asSequence() }
).toSet()
val authorized = authorizer.authorize(roles, msg)
log.debug {
val authorizedMessage = if (authorized) {
"Authorized"
} else {
"Forbidden"
}
val clientAddress = ctx.channel().attr(RemoteBuildCacheServer.clientIp).get()
val roleString = "[" + roles.asSequence().map { "\"" + it + "\"" }.joinToString(", ") + "]"
result.user?.name?.takeUnless(String::isEmpty)?.let { username ->
"$authorizedMessage ${msg.method()} request from user $username with address $clientAddress, granted roles $roleString"
} ?: "$authorizedMessage anonymous ${msg.method()} request with address $clientAddress, granted roles $roleString"
}
if (authorized) {
super.channelRead(ctx, msg)
} else {
@@ -1,11 +1,11 @@
package net.woggioni.rbcs.server.cache
import java.nio.file.Path
import java.time.Duration
import io.netty.channel.ChannelFactory
import io.netty.channel.EventLoopGroup
import io.netty.channel.socket.DatagramChannel
import io.netty.channel.socket.SocketChannel
import java.nio.file.Path
import java.time.Duration
import net.woggioni.jwo.Application
import net.woggioni.rbcs.api.CacheHandlerFactory
import net.woggioni.rbcs.api.Configuration
@@ -1,14 +1,14 @@
package net.woggioni.rbcs.server.cache
import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.http.LastHttpContent
import io.netty.handler.stream.ChunkedNioFile
import java.nio.channels.Channels
import java.util.Base64
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.InflaterInputStream
import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.http.LastHttpContent
import io.netty.handler.stream.ChunkedNioFile
import net.woggioni.rbcs.api.CacheHandler
import net.woggioni.rbcs.api.message.CacheMessage
import net.woggioni.rbcs.api.message.CacheMessage.CacheContent
@@ -1,10 +1,10 @@
package net.woggioni.rbcs.server.cache
import java.time.Duration
import io.netty.channel.ChannelFactory
import io.netty.channel.EventLoopGroup
import io.netty.channel.socket.DatagramChannel
import io.netty.channel.socket.SocketChannel
import java.time.Duration
import net.woggioni.rbcs.api.CacheHandlerFactory
import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.common.RBCS
@@ -1,10 +1,10 @@
package net.woggioni.rbcs.server.cache
import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelHandlerContext
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.InflaterOutputStream
import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelHandlerContext
import net.woggioni.rbcs.api.CacheHandler
import net.woggioni.rbcs.api.message.CacheMessage
import net.woggioni.rbcs.api.message.CacheMessage.CacheContent
@@ -8,6 +8,7 @@ import net.woggioni.rbcs.api.Configuration.Authentication
import net.woggioni.rbcs.api.Configuration.BasicAuthentication
import net.woggioni.rbcs.api.Configuration.Cache
import net.woggioni.rbcs.api.Configuration.ClientCertificateAuthentication
import net.woggioni.rbcs.api.Configuration.ForwardedClientCertificateAuthentication
import net.woggioni.rbcs.api.Configuration.Group
import net.woggioni.rbcs.api.Configuration.KeyStore
import net.woggioni.rbcs.api.Configuration.Tls
@@ -77,6 +78,28 @@ object Parser {
}
authentication = ClientCertificateAuthentication(tlsExtractorUser, tlsExtractorGroup)
}
"forwarded-client-certificate" -> {
val headerName = gchild.renderAttribute("header-name") ?: "X-Client-Cert-Subject-DN"
var tlsExtractorUser: TlsCertificateExtractor? = null
var tlsExtractorGroup: TlsCertificateExtractor? = null
for (ggchild in gchild.asIterable()) {
when (ggchild.localName) {
"group-extractor" -> {
val attrName = ggchild.renderAttribute("attribute-name")
val pattern = ggchild.renderAttribute("pattern")
tlsExtractorGroup = TlsCertificateExtractor(attrName, pattern)
}
"user-extractor" -> {
val attrName = ggchild.renderAttribute("attribute-name")
val pattern = ggchild.renderAttribute("pattern")
tlsExtractorUser = TlsCertificateExtractor(attrName, pattern)
}
}
}
authentication = ForwardedClientCertificateAuthentication(headerName, tlsExtractorUser, tlsExtractorGroup)
}
}
}
}
@@ -165,6 +165,23 @@ object Serializer {
}
}
}
is Configuration.ForwardedClientCertificateAuthentication -> {
node("forwarded-client-certificate") {
attr("header-name", authentication.headerName)
authentication.groupExtractor?.let { extractor ->
node("group-extractor") {
attr("attribute-name", extractor.rdnType)
attr("pattern", extractor.pattern)
}
}
authentication.userExtractor?.let { extractor ->
node("user-extractor") {
attr("attribute-name", extractor.rdnType)
attr("pattern", extractor.pattern)
}
}
}
}
}
}
}
@@ -1,5 +1,9 @@
package net.woggioni.rbcs.server.exception
import java.net.ConnectException
import java.net.SocketException
import javax.net.ssl.SSLException
import javax.net.ssl.SSLPeerUnverifiedException
import io.netty.buffer.Unpooled
import io.netty.channel.ChannelDuplexHandler
import io.netty.channel.ChannelFutureListener
@@ -13,10 +17,6 @@ import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpVersion
import io.netty.handler.timeout.ReadTimeoutException
import io.netty.handler.timeout.WriteTimeoutException
import java.net.ConnectException
import java.net.SocketException
import javax.net.ssl.SSLException
import javax.net.ssl.SSLPeerUnverifiedException
import net.woggioni.rbcs.api.exception.CacheException
import net.woggioni.rbcs.api.exception.ContentTooLargeException
import net.woggioni.rbcs.common.contextLogger
@@ -74,6 +74,7 @@ object ExceptionHandler : ChannelDuplexHandler() {
}
is SSLPeerUnverifiedException -> {
log.debug(cause.message, cause)
ctx.writeAndFlush(NOT_AUTHORIZED.retainedDuplicate())
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE)
}
@@ -1,10 +1,10 @@
package net.woggioni.rbcs.server.handler
import java.net.InetAddress
import java.net.InetSocketAddress
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.haproxy.HAProxyMessage
import java.net.InetAddress
import java.net.InetSocketAddress
import net.woggioni.rbcs.common.Cidr
import net.woggioni.rbcs.common.createLogger
import net.woggioni.rbcs.common.trace
@@ -1,5 +1,6 @@
package net.woggioni.rbcs.server.handler
import java.nio.file.Path
import io.netty.channel.ChannelDuplexHandler
import io.netty.channel.ChannelHandler
import io.netty.channel.ChannelHandlerContext
@@ -18,7 +19,6 @@ import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpUtil
import io.netty.handler.codec.http.HttpVersion
import io.netty.handler.codec.http.LastHttpContent
import java.nio.file.Path
import net.woggioni.rbcs.api.CacheValueMetadata
import net.woggioni.rbcs.api.message.CacheMessage
import net.woggioni.rbcs.api.message.CacheMessage.CacheContent
@@ -1,5 +1,6 @@
package net.woggioni.rbcs.server.handler
import java.nio.file.Path
import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter
@@ -10,7 +11,6 @@ import io.netty.handler.codec.http.HttpHeaderValues
import io.netty.handler.codec.http.HttpRequest
import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.LastHttpContent
import java.nio.file.Path
@Sharable
object TraceHandler : ChannelInboundHandlerAdapter() {
@@ -1,5 +1,10 @@
package net.woggioni.rbcs.server.throttling
import java.net.InetSocketAddress
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.ArrayDeque
import java.util.concurrent.TimeUnit
import io.netty.buffer.ByteBufHolder
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter
@@ -10,11 +15,6 @@ import io.netty.handler.codec.http.HttpRequest
import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpVersion
import io.netty.handler.codec.http.LastHttpContent
import java.net.InetSocketAddress
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.ArrayDeque
import java.util.concurrent.TimeUnit
import net.woggioni.jwo.Bucket
import net.woggioni.jwo.LongMath
import net.woggioni.rbcs.api.Configuration
@@ -311,6 +311,45 @@
</xs:sequence>
</xs:complexType>
<xs:complexType name="forwardedClientCertificateAuthorizationType">
<xs:annotation>
<xs:documentation>
Authenticate clients based on a custom HTTP header containing the client TLS certificate
subject DN, forwarded by a reverse proxy that performs TLS termination. The proxy must be
listed in the trusted-proxies configuration for the header to be accepted.
</xs:documentation>
</xs:annotation>
<xs:sequence>
<xs:element name="group-extractor" type="rbcs:X500NameExtractorType" minOccurs="0">
<xs:annotation>
<xs:documentation>
A regex based extractor that will be used to determine which group the client belongs to,
based on the X.500 name of the subject DN forwarded by the reverse proxy.
When this is set RBAC works even if the user isn't listed in the &lt;users/&gt; section as
the client will be assigned role solely based on the group he is found to belong to.
Note that this does not allow for a client to be part of multiple groups.
</xs:documentation>
</xs:annotation>
</xs:element>
<xs:element name="user-extractor" type="rbcs:X500NameExtractorType" minOccurs="0">
<xs:annotation>
<xs:documentation>
A regex based extractor that will be used to assign a user to a connected client,
based on the X.500 name of the subject DN forwarded by the reverse proxy.
</xs:documentation>
</xs:annotation>
</xs:element>
</xs:sequence>
<xs:attribute name="header-name" type="xs:token">
<xs:annotation>
<xs:documentation>
Name of the HTTP header containing the client certificate subject DN
forwarded by the reverse proxy. Defaults to "X-Client-Cert-Subject-DN".
</xs:documentation>
</xs:annotation>
</xs:attribute>
</xs:complexType>
<xs:complexType name="X500NameExtractorType">
<xs:annotation>
<xs:documentation>
@@ -380,6 +419,15 @@
</xs:documentation>
</xs:annotation>
</xs:element>
<xs:element name="forwarded-client-certificate" type="rbcs:forwardedClientCertificateAuthorizationType">
<xs:annotation>
<xs:documentation>
Enable forwarded client certificate authentication. Authenticates clients based on
a custom HTTP header containing the client certificate subject DN, forwarded by a
reverse proxy that performs TLS termination. Requires trusted-proxies to be configured.
</xs:documentation>
</xs:annotation>
</xs:element>
<xs:element name="none">
<xs:annotation>
<xs:documentation>
@@ -140,11 +140,10 @@ abstract class AbstractTlsServerTest : AbstractServerTest() {
cfg = Configuration(
"127.0.0.1",
getFreePort(),
serverPath,
false,
emptyList(),
100,
serverPath,
Configuration.EventExecutor(false),
Configuration.RateLimiter(true, 0x100000, 50),
Configuration.Connection(
@@ -1,11 +1,11 @@
package net.woggioni.rbcs.server.test
import io.netty.handler.codec.http.HttpResponseStatus
import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.time.Duration
import java.time.temporal.ChronoUnit
import io.netty.handler.codec.http.HttpResponseStatus
import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.api.Role
import net.woggioni.rbcs.common.PasswordSecurity.hashPassword
@@ -1,9 +1,9 @@
package net.woggioni.rbcs.server.test
import io.netty.handler.codec.http.HttpResponseStatus
import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import io.netty.handler.codec.http.HttpResponseStatus
import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.common.PasswordSecurity.hashPassword
import org.junit.jupiter.api.Assertions
@@ -1,9 +1,9 @@
package net.woggioni.rbcs.server.test
import io.netty.handler.codec.http.HttpResponseStatus
import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import io.netty.handler.codec.http.HttpResponseStatus
import net.woggioni.rbcs.api.Configuration
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Order
@@ -1,6 +1,5 @@
package net.woggioni.rbcs.server.test
import io.netty.handler.codec.http.HttpResponseStatus
import java.net.URI
import java.net.http.HttpClient
import java.net.http.HttpRequest
@@ -11,6 +10,7 @@ import java.time.temporal.ChronoUnit
import java.util.Base64
import java.util.zip.Deflater
import kotlin.random.Random
import io.netty.handler.codec.http.HttpResponseStatus
import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.common.RBCS.getFreePort
import net.woggioni.rbcs.common.Xml
@@ -34,10 +34,10 @@ class NoAuthServerTest : AbstractServerTest() {
cfg = Configuration(
"127.0.0.1",
getFreePort(),
serverPath,
false,
emptyList(),
100,
serverPath,
Configuration.EventExecutor(false),
Configuration.RateLimiter(true, 0x100000, 50),
Configuration.Connection(
@@ -1,9 +1,9 @@
package net.woggioni.rbcs.server.test
import io.netty.handler.codec.http.HttpResponseStatus
import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import io.netty.handler.codec.http.HttpResponseStatus
import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.api.Role
import org.bouncycastle.asn1.x500.X500Name
@@ -1,7 +1,8 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<rbcs:server xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rbcs="urn:net.woggioni.rbcs.server"
xs:schemaLocation="urn:net.woggioni.rbcs.server jpms://net.woggioni.rbcs.server/net/woggioni/rbcs/server/schema/rbcs-server.xsd">
xs:schemaLocation="urn:net.woggioni.rbcs.server jpms://net.woggioni.rbcs.server/net/woggioni/rbcs/server/schema/rbcs-server.xsd"
path="/my/custom/path">
<bind host="127.0.0.1" port="11443" incoming-connections-backlog-size="22" proxy-protocol="true">
<trusted-proxies>
<allow cidr="192.168.0.11/32"/>
@@ -1,12 +1,5 @@
package net.woggioni.rbcs.servlet
import jakarta.annotation.PreDestroy
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import jakarta.servlet.annotation.WebServlet
import jakarta.servlet.http.HttpServlet
import jakarta.servlet.http.HttpServletRequest
import jakarta.servlet.http.HttpServletResponse
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.nio.file.Path
@@ -17,6 +10,13 @@ import java.util.concurrent.PriorityBlockingQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
import java.util.logging.Logger
import jakarta.annotation.PreDestroy
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import jakarta.servlet.annotation.WebServlet
import jakarta.servlet.http.HttpServlet
import jakarta.servlet.http.HttpServletRequest
import jakarta.servlet.http.HttpServletResponse
import net.woggioni.jwo.HttpClient.HttpStatus
import net.woggioni.jwo.JWO
+1
View File
@@ -33,4 +33,5 @@ include 'rbcs-cli'
include 'rbcs-client'
include 'rbcs-server'
include 'rbcs-servlet'
include 'rbcs-server-otel'
include 'docker'