From 86a5fba7f4631b28b74900c20eab833e9d516050 Mon Sep 17 00:00:00 2001 From: opencode Date: Thu, 21 May 2026 00:48:37 +0000 Subject: [PATCH] Add OpenTelemetry tracing support for Redis commands - Add RedisSpan interface in rbcs-api for opaque span handles - Extend TelemetryController with startRedisSpan/endRedisSpan methods - Implement Redis tracing in rbcs-server-otel via OtelController and RedisOtelSpan - Instrument RedisCacheHandler to create spans around GET and SET commands - Add uses directive in rbcs-server-redis module-info for ServiceLoader discovery Redis spans are created as CLIENT spans with attributes: db.system=redis, db.operation=GET|SET, server.address, server.port --- .../java/net/woggioni/rbcs/api/RedisSpan.java | 10 ++ .../rbcs/api/TelemetryController.java | 7 ++ .../rbcs/server/otel/OtelController.kt | 27 +++++ .../rbcs/server/otel/RedisOtelSpan.kt | 17 +++ .../src/main/java/module-info.java | 3 + .../rbcs/server/redis/RedisCacheHandler.kt | 113 +++++++++++------- 6 files changed, 136 insertions(+), 41 deletions(-) create mode 100644 rbcs-api/src/main/java/net/woggioni/rbcs/api/RedisSpan.java create mode 100644 rbcs-server-otel/src/main/kotlin/net/woggioni/rbcs/server/otel/RedisOtelSpan.kt diff --git a/rbcs-api/src/main/java/net/woggioni/rbcs/api/RedisSpan.java b/rbcs-api/src/main/java/net/woggioni/rbcs/api/RedisSpan.java new file mode 100644 index 0000000..2898f8b --- /dev/null +++ b/rbcs-api/src/main/java/net/woggioni/rbcs/api/RedisSpan.java @@ -0,0 +1,10 @@ +package net.woggioni.rbcs.api; + +import org.jetbrains.annotations.NotNull; + +public interface RedisSpan { + + void setAttribute(@NotNull String key, @NotNull String value); + + void setAttribute(@NotNull String key, long value); +} diff --git a/rbcs-api/src/main/java/net/woggioni/rbcs/api/TelemetryController.java b/rbcs-api/src/main/java/net/woggioni/rbcs/api/TelemetryController.java index 686ebab..e6914a2 100644 --- a/rbcs-api/src/main/java/net/woggioni/rbcs/api/TelemetryController.java +++ b/rbcs-api/src/main/java/net/woggioni/rbcs/api/TelemetryController.java @@ -2,8 +2,15 @@ package net.woggioni.rbcs.api; import io.netty.channel.ChannelHandler; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; public interface TelemetryController { void initialize(); @NotNull ChannelHandler createHandler(); + + @Nullable RedisSpan startRedisSpan(@NotNull String command, @NotNull String key); + + void endRedisSpan(@Nullable RedisSpan span); + + void endRedisSpan(@Nullable RedisSpan span, @NotNull Throwable error); } diff --git a/rbcs-server-otel/src/main/kotlin/net/woggioni/rbcs/server/otel/OtelController.kt b/rbcs-server-otel/src/main/kotlin/net/woggioni/rbcs/server/otel/OtelController.kt index 44d97bd..bc83c79 100644 --- a/rbcs-server-otel/src/main/kotlin/net/woggioni/rbcs/server/otel/OtelController.kt +++ b/rbcs-server-otel/src/main/kotlin/net/woggioni/rbcs/server/otel/OtelController.kt @@ -2,10 +2,13 @@ package net.woggioni.rbcs.server.otel import io.netty.channel.ChannelHandler import io.opentelemetry.api.GlobalOpenTelemetry +import io.opentelemetry.api.trace.SpanKind +import io.opentelemetry.api.trace.StatusCode import io.opentelemetry.instrumentation.logback.appender.v1_0.OpenTelemetryAppender import io.opentelemetry.instrumentation.netty.v4_1.NettyServerTelemetry import io.opentelemetry.instrumentation.runtimetelemetry.RuntimeTelemetry import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk +import net.woggioni.rbcs.api.RedisSpan import net.woggioni.rbcs.api.TelemetryController import net.woggioni.rbcs.common.createLogger import net.woggioni.rbcs.common.info @@ -14,6 +17,10 @@ class OtelController : TelemetryController { private val log = createLogger() + private val tracer by lazy { + GlobalOpenTelemetry.getTracer("net.woggioni.rbcs.server.redis", "0.5.0") + } + override fun initialize() { log.info { "Initializing OpenTelemetry SDK with auto-configuration" } @@ -36,4 +43,24 @@ class OtelController : TelemetryController { override fun createHandler(): ChannelHandler { return NettyServerTelemetry.create(GlobalOpenTelemetry.get()).createCombinedHandler() } + + override fun startRedisSpan(command: String, key: String): RedisSpan? { + val span = tracer.spanBuilder(command) + .setSpanKind(SpanKind.CLIENT) + .setAttribute("db.system", "redis") + .setAttribute("db.operation", command) + .startSpan() + return RedisOtelSpan(span) + } + + override fun endRedisSpan(span: RedisSpan?) { + (span as? RedisOtelSpan)?.delegate?.end() + } + + override fun endRedisSpan(span: RedisSpan?, error: Throwable) { + val s = (span as? RedisOtelSpan)?.delegate ?: return + s.recordException(error) + s.setStatus(StatusCode.ERROR) + s.end() + } } diff --git a/rbcs-server-otel/src/main/kotlin/net/woggioni/rbcs/server/otel/RedisOtelSpan.kt b/rbcs-server-otel/src/main/kotlin/net/woggioni/rbcs/server/otel/RedisOtelSpan.kt new file mode 100644 index 0000000..9e22617 --- /dev/null +++ b/rbcs-server-otel/src/main/kotlin/net/woggioni/rbcs/server/otel/RedisOtelSpan.kt @@ -0,0 +1,17 @@ +package net.woggioni.rbcs.server.otel + +import io.opentelemetry.api.trace.Span +import net.woggioni.rbcs.api.RedisSpan + +internal class RedisOtelSpan( + val delegate: Span, +) : RedisSpan { + + override fun setAttribute(key: String, value: String) { + delegate.setAttribute(key, value) + } + + override fun setAttribute(key: String, value: Long) { + delegate.setAttribute(key, value) + } +} diff --git a/rbcs-server-redis/src/main/java/module-info.java b/rbcs-server-redis/src/main/java/module-info.java index 51e008a..1402352 100644 --- a/rbcs-server-redis/src/main/java/module-info.java +++ b/rbcs-server-redis/src/main/java/module-info.java @@ -1,4 +1,5 @@ import net.woggioni.rbcs.api.CacheProvider; +import net.woggioni.rbcs.api.TelemetryController; module net.woggioni.rbcs.server.redis { requires net.woggioni.rbcs.common; @@ -16,5 +17,7 @@ module net.woggioni.rbcs.server.redis { provides CacheProvider with net.woggioni.rbcs.server.redis.RedisCacheProvider; + uses TelemetryController; + opens net.woggioni.rbcs.server.redis.schema; } diff --git a/rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/RedisCacheHandler.kt b/rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/RedisCacheHandler.kt index 584dd7f..1f791e4 100644 --- a/rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/RedisCacheHandler.kt +++ b/rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/RedisCacheHandler.kt @@ -14,6 +14,7 @@ import io.netty.handler.codec.redis.SimpleStringRedisMessage import java.io.ByteArrayOutputStream import java.io.ObjectInputStream import java.io.ObjectOutputStream +import java.net.InetSocketAddress import java.nio.ByteBuffer import java.nio.channels.Channels import java.nio.channels.FileChannel @@ -37,8 +38,11 @@ import net.woggioni.rbcs.api.message.CacheMessage.CachePutResponse import net.woggioni.rbcs.api.message.CacheMessage.CacheValueFoundResponse import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent +import net.woggioni.rbcs.api.RedisSpan +import net.woggioni.rbcs.api.TelemetryController import net.woggioni.rbcs.common.ByteBufInputStream import net.woggioni.rbcs.common.ByteBufOutputStream +import net.woggioni.rbcs.common.RBCS.loadService import net.woggioni.rbcs.common.RBCS.processCacheKey import net.woggioni.rbcs.common.RBCS.toIntOrNull import net.woggioni.rbcs.common.createLogger @@ -62,6 +66,10 @@ class RedisCacheHandler( private val log = createLogger() } + private val telemetryController by lazy { + loadService(TelemetryController::class.java).firstOrNull() + } + private interface InProgressRequest private inner class InProgressGetRequest( @@ -244,46 +252,57 @@ class RedisCacheHandler( } val keyBytes = processCacheKey(msg.key, keyPrefix, digestAlgorithm) val keyString = String(keyBytes, StandardCharsets.UTF_8) + val redisSpan = telemetryController?.startRedisSpan("GET", keyString) val responseHandler = object : RedisResponseHandler { override fun responseReceived(response: RedisMessage) { - when (response) { - is FullBulkStringRedisMessage -> { - if (response === FullBulkStringRedisMessage.NULL_INSTANCE || response.content().readableBytes() == 0) { - log.debug(ctx) { - "Cache miss for key ${msg.key} on Redis" + try { + when (response) { + is FullBulkStringRedisMessage -> { + if (response === FullBulkStringRedisMessage.NULL_INSTANCE || response.content().readableBytes() == 0) { + log.debug(ctx) { + "Cache miss for key ${msg.key} on Redis" + } + sendMessageAndFlush(ctx, CacheValueNotFoundResponse(msg.key)) + } else { + log.debug(ctx) { + "Cache hit for key ${msg.key} on Redis" + } + val getRequest = InProgressGetRequest(msg.key, ctx) + inProgressRequest = getRequest + getRequest.processResponse(response.content()) + inProgressRequest = null + } + } + + is ErrorRedisMessage -> { + val ex = RedisException("Redis error for GET ${msg.key}: ${response.content()}") + telemetryController?.endRedisSpan(redisSpan, ex) + this@RedisCacheHandler.exceptionCaught(ctx, ex) + } + + else -> { + log.warn(ctx) { + "Unexpected response type from Redis for key ${msg.key}: ${response.javaClass.name}" } sendMessageAndFlush(ctx, CacheValueNotFoundResponse(msg.key)) - } else { - log.debug(ctx) { - "Cache hit for key ${msg.key} on Redis" - } - val getRequest = InProgressGetRequest(msg.key, ctx) - inProgressRequest = getRequest - getRequest.processResponse(response.content()) - inProgressRequest = null } } - - is ErrorRedisMessage -> { - this@RedisCacheHandler.exceptionCaught( - ctx, RedisException("Redis error for GET ${msg.key}: ${response.content()}") - ) - } - - else -> { - log.warn(ctx) { - "Unexpected response type from Redis for key ${msg.key}: ${response.javaClass.name}" - } - sendMessageAndFlush(ctx, CacheValueNotFoundResponse(msg.key)) - } + } finally { + telemetryController?.endRedisSpan(redisSpan) } } override fun exceptionCaught(ex: Throwable) { + telemetryController?.endRedisSpan(redisSpan, ex) this@RedisCacheHandler.exceptionCaught(ctx, ex) } } client.sendCommand(keyBytes, ctx.alloc(), responseHandler).thenAccept { channel -> + val remoteAddr = channel.remoteAddress() + if (remoteAddr is InetSocketAddress) { + remoteAddr.hostString?.let { redisSpan?.setAttribute("server.address", it) } + redisSpan?.setAttribute("server.port", remoteAddr.port.toLong()) + } log.trace(ctx) { "Sending GET request for key ${msg.key} to Redis" } @@ -344,37 +363,49 @@ class RedisCacheHandler( val expirySeconds = maxAge.toSeconds().toString() + val redisSpan = telemetryController?.startRedisSpan("SET", request.keyString) + val responseHandler = object : RedisResponseHandler { override fun responseReceived(response: RedisMessage) { - when (response) { - is SimpleStringRedisMessage -> { - log.debug(ctx) { - "Inserted key ${request.keyString} into Redis" + try { + when (response) { + is SimpleStringRedisMessage -> { + log.debug(ctx) { + "Inserted key ${request.keyString} into Redis" + } + sendMessageAndFlush(ctx, CachePutResponse(request.keyString)) } - sendMessageAndFlush(ctx, CachePutResponse(request.keyString)) - } - is ErrorRedisMessage -> { - this@RedisCacheHandler.exceptionCaught( - ctx, RedisException("Redis error for SET ${request.keyString}: ${response.content()}") - ) - } + is ErrorRedisMessage -> { + val ex = RedisException("Redis error for SET ${request.keyString}: ${response.content()}") + telemetryController?.endRedisSpan(redisSpan, ex) + this@RedisCacheHandler.exceptionCaught(ctx, ex) + } - else -> { - this@RedisCacheHandler.exceptionCaught( - ctx, RedisException("Unexpected response for SET ${request.keyString}: ${response.javaClass.name}") - ) + else -> { + val ex = RedisException("Unexpected response for SET ${request.keyString}: ${response.javaClass.name}") + telemetryController?.endRedisSpan(redisSpan, ex) + this@RedisCacheHandler.exceptionCaught(ctx, ex) + } } + } finally { + telemetryController?.endRedisSpan(redisSpan) } } override fun exceptionCaught(ex: Throwable) { + telemetryController?.endRedisSpan(redisSpan, ex) this@RedisCacheHandler.exceptionCaught(ctx, ex) } } // Use a ByteBuf key for server selection client.sendCommand(keyBytes, ctx.alloc(), responseHandler).thenAccept { channel -> + val remoteAddr = channel.remoteAddress() + if (remoteAddr is InetSocketAddress) { + remoteAddr.hostString?.let { redisSpan?.setAttribute("server.address", it) } + redisSpan?.setAttribute("server.port", remoteAddr.port.toLong()) + } log.trace(ctx) { "Sending SET request to Redis" }