From 5b52677c2869e4c1d29919ed6a6febec38408e9b Mon Sep 17 00:00:00 2001 From: opencode Date: Thu, 21 May 2026 11:16:48 +0000 Subject: [PATCH] Generalize OTEL API and add memcache tracing support - Rename RedisSpan -> SpanHandle for generic span handling - Generalize TelemetryController methods: startSpan/endSpan with dbSystem param - Rename RedisOtelSpan -> OtelSpanHandle in rbcs-server-otel - Update Redis cache handler to use new generic API - Add OpenTelemetry tracing for memcache GET and SET commands - Add channel property to MemcacheRequestController for server address attribution - Add uses TelemetryController directive in memcache module-info Memcache spans follow the same pattern as Redis: db.system=memcache, db.operation=GET|SET, server.address, server.port --- .../api/{RedisSpan.java => SpanHandle.java} | 2 +- .../rbcs/api/TelemetryController.java | 6 +- .../src/main/java/module-info.java | 3 + .../server/memcache/MemcacheCacheHandler.kt | 57 ++++++++++++++++--- .../server/memcache/client/MemcacheClient.kt | 2 + .../client/MemcacheRequestController.kt | 3 + .../rbcs/server/otel/OtelController.kt | 16 +++--- .../{RedisOtelSpan.kt => OtelSpanHandle.kt} | 6 +- .../rbcs/server/redis/RedisCacheHandler.kt | 20 +++---- 9 files changed, 82 insertions(+), 33 deletions(-) rename rbcs-api/src/main/java/net/woggioni/rbcs/api/{RedisSpan.java => SpanHandle.java} (87%) rename rbcs-server-otel/src/main/kotlin/net/woggioni/rbcs/server/otel/{RedisOtelSpan.kt => OtelSpanHandle.kt} (78%) diff --git a/rbcs-api/src/main/java/net/woggioni/rbcs/api/RedisSpan.java b/rbcs-api/src/main/java/net/woggioni/rbcs/api/SpanHandle.java similarity index 87% rename from rbcs-api/src/main/java/net/woggioni/rbcs/api/RedisSpan.java rename to rbcs-api/src/main/java/net/woggioni/rbcs/api/SpanHandle.java index 2898f8b..8fe3c18 100644 --- a/rbcs-api/src/main/java/net/woggioni/rbcs/api/RedisSpan.java +++ b/rbcs-api/src/main/java/net/woggioni/rbcs/api/SpanHandle.java @@ -2,7 +2,7 @@ package net.woggioni.rbcs.api; import org.jetbrains.annotations.NotNull; -public interface RedisSpan { +public interface SpanHandle { void setAttribute(@NotNull String key, @NotNull String value); diff --git a/rbcs-api/src/main/java/net/woggioni/rbcs/api/TelemetryController.java b/rbcs-api/src/main/java/net/woggioni/rbcs/api/TelemetryController.java index e6914a2..300399b 100644 --- a/rbcs-api/src/main/java/net/woggioni/rbcs/api/TelemetryController.java +++ b/rbcs-api/src/main/java/net/woggioni/rbcs/api/TelemetryController.java @@ -8,9 +8,9 @@ public interface TelemetryController { void initialize(); @NotNull ChannelHandler createHandler(); - @Nullable RedisSpan startRedisSpan(@NotNull String command, @NotNull String key); + @Nullable SpanHandle startSpan(@NotNull String command, @NotNull String key, @NotNull String dbSystem); - void endRedisSpan(@Nullable RedisSpan span); + void endSpan(@Nullable SpanHandle span); - void endRedisSpan(@Nullable RedisSpan span, @NotNull Throwable error); + void endSpan(@Nullable SpanHandle span, @NotNull Throwable error); } diff --git a/rbcs-server-memcache/src/main/java/module-info.java b/rbcs-server-memcache/src/main/java/module-info.java index 65abedc..170df4d 100644 --- a/rbcs-server-memcache/src/main/java/module-info.java +++ b/rbcs-server-memcache/src/main/java/module-info.java @@ -1,4 +1,5 @@ import net.woggioni.rbcs.api.CacheProvider; +import net.woggioni.rbcs.api.TelemetryController; module net.woggioni.rbcs.server.memcache { requires net.woggioni.rbcs.common; @@ -16,5 +17,7 @@ module net.woggioni.rbcs.server.memcache { provides CacheProvider with net.woggioni.rbcs.server.memcache.MemcacheCacheProvider; + uses TelemetryController; + opens net.woggioni.rbcs.server.memcache.schema; } \ No newline at end of file diff --git a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheHandler.kt b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheHandler.kt index 22273fd..02254e2 100644 --- a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheHandler.kt +++ b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheHandler.kt @@ -9,9 +9,11 @@ import java.nio.channels.FileChannel import java.nio.channels.ReadableByteChannel import java.nio.file.Files import java.nio.file.StandardOpenOption +import java.net.InetSocketAddress import java.time.Duration import java.time.Instant import java.util.concurrent.CompletableFuture +import java.util.concurrent.atomic.AtomicReference import java.util.zip.Deflater import java.util.zip.DeflaterOutputStream import java.util.zip.InflaterOutputStream @@ -39,8 +41,11 @@ import net.woggioni.rbcs.api.message.CacheMessage.CachePutResponse import net.woggioni.rbcs.api.message.CacheMessage.CacheValueFoundResponse import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent +import net.woggioni.rbcs.api.SpanHandle +import net.woggioni.rbcs.api.TelemetryController import net.woggioni.rbcs.common.ByteBufInputStream import net.woggioni.rbcs.common.ByteBufOutputStream +import net.woggioni.rbcs.common.RBCS.loadService import net.woggioni.rbcs.common.RBCS.processCacheKey import net.woggioni.rbcs.common.RBCS.toIntOrNull import net.woggioni.rbcs.common.createLogger @@ -70,6 +75,10 @@ class MemcacheCacheHandler( } } + private val telemetryController by lazy { + loadService(TelemetryController::class.java).firstOrNull() + } + private interface InProgressRequest { } @@ -153,7 +162,9 @@ class MemcacheCacheHandler( metadata: CacheValueMetadata, val digest: ByteBuf, val requestController: CompletableFuture, - private val alloc: ByteBufAllocator + private val alloc: ByteBufAllocator, + val entryKey: String, + val memcacheSpanRef: AtomicReference, ) : InProgressRequest { private var totalSize = 0 private var tmpFile: FileChannel? = null @@ -251,6 +262,7 @@ class MemcacheCacheHandler( val key = ctx.alloc().buffer().also { it.writeBytes(processCacheKey(msg.key, keyPrefix, digestAlgorithm)) } + val memcacheSpan = telemetryController?.startSpan("GET", msg.key, "memcache") val responseHandler = object : MemcacheResponseHandler { override fun responseReceived(response: BinaryMemcacheResponse) { val status = response.status() @@ -266,8 +278,15 @@ class MemcacheCacheHandler( log.debug(ctx) { "Cache miss for key ${msg.key} on memcache" } + telemetryController?.endSpan(memcacheSpan) sendMessageAndFlush(ctx, CacheValueNotFoundResponse(msg.key)) } + + else -> { + val ex = MemcacheException(status) + telemetryController?.endSpan(memcacheSpan, ex) + this@MemcacheCacheHandler.exceptionCaught(ctx, ex) + } } } @@ -282,11 +301,13 @@ class MemcacheCacheHandler( if (content is LastMemcacheContent) { inProgressRequest = null inProgressGetRequest.commit() + telemetryController?.endSpan(memcacheSpan) } } } override fun exceptionCaught(ex: Throwable) { + telemetryController?.endSpan(memcacheSpan, ex) (inProgressRequest as? InProgressGetRequest).let { inProgressGetRequest -> inProgressGetRequest?.let { inProgressRequest = null @@ -297,6 +318,11 @@ class MemcacheCacheHandler( } } client.sendRequest(key.retainedDuplicate(), responseHandler).thenAccept { requestHandle -> + val remoteAddr = requestHandle.channel.remoteAddress() + if (remoteAddr is InetSocketAddress) { + remoteAddr.hostString?.let { memcacheSpan?.setAttribute("server.address", it) } + memcacheSpan?.setAttribute("server.port", remoteAddr.port.toLong()) + } log.trace(ctx) { "Sending GET request for key ${msg.key} to memcache" } @@ -312,6 +338,7 @@ class MemcacheCacheHandler( val key = ctx.alloc().buffer().also { it.writeBytes(processCacheKey(msg.key, keyPrefix, digestAlgorithm)) } + val memcacheSpanRef = AtomicReference(null) val responseHandler = object : MemcacheResponseHandler { override fun responseReceived(response: BinaryMemcacheResponse) { val status = response.status() @@ -320,16 +347,22 @@ class MemcacheCacheHandler( log.debug(ctx) { "Inserted key ${msg.key} into memcache" } + telemetryController?.endSpan(memcacheSpanRef.get()) sendMessageAndFlush(ctx, CachePutResponse(msg.key)) } - else -> this@MemcacheCacheHandler.exceptionCaught(ctx, MemcacheException(status)) + else -> { + val ex = MemcacheException(status) + telemetryController?.endSpan(memcacheSpanRef.get(), ex) + this@MemcacheCacheHandler.exceptionCaught(ctx, ex) + } } } override fun contentReceived(content: MemcacheContent) {} override fun exceptionCaught(ex: Throwable) { + telemetryController?.endSpan(memcacheSpanRef.get(), ex) this@MemcacheCacheHandler.exceptionCaught(ctx, ex) } } @@ -339,7 +372,7 @@ class MemcacheCacheHandler( this@MemcacheCacheHandler.exceptionCaught(ctx, ex) } } - inProgressRequest = InProgressPutRequest(ctx.channel(), msg.metadata, key, requestController, ctx.alloc()) + inProgressRequest = InProgressPutRequest(ctx.channel(), msg.metadata, key, requestController, ctx.alloc(), msg.key, memcacheSpanRef) } private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) { @@ -362,22 +395,30 @@ class MemcacheCacheHandler( val request = inProgressRequest when (request) { is InProgressPutRequest -> { + val putRequest = request inProgressRequest = null log.trace(ctx) { "Received last chunk of ${msg.content().readableBytes()} bytes for memcache" } - request.write(msg.content()) - val key = request.digest.retainedDuplicate() - val (payloadSize, payloadSource) = request.commit() + putRequest.write(msg.content()) + val memcacheSpan = telemetryController?.startSpan("SET", putRequest.entryKey, "memcache") + putRequest.memcacheSpanRef.set(memcacheSpan) + val key = putRequest.digest.retainedDuplicate() + val (payloadSize, payloadSource) = putRequest.commit() val extras = ctx.alloc().buffer(8, 8) extras.writeInt(0) extras.writeInt(encodeExpiry(maxAge)) - val totalBodyLength = request.digest.readableBytes() + extras.readableBytes() + payloadSize + val totalBodyLength = putRequest.digest.readableBytes() + extras.readableBytes() + payloadSize log.trace(ctx) { "Trying to send SET request to memcache" } - request.requestController.whenComplete { requestController, ex -> + putRequest.requestController.whenComplete { requestController, ex -> if (ex == null) { + val remoteAddr = requestController.channel.remoteAddress() + if (remoteAddr is InetSocketAddress) { + remoteAddr.hostString?.let { memcacheSpan?.setAttribute("server.address", it) } + memcacheSpan?.setAttribute("server.port", remoteAddr.port.toLong()) + } log.trace(ctx) { "Sending SET request to memcache" } diff --git a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/client/MemcacheClient.kt b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/client/MemcacheClient.kt index 7899b4e..52eae11 100644 --- a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/client/MemcacheClient.kt +++ b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/client/MemcacheClient.kt @@ -147,6 +147,8 @@ class MemcacheClient( channel.pipeline().addLast(handler) response.complete(object : MemcacheRequestController { + override val channel: Channel = channel + private var channelReleased = false override fun sendRequest(request: BinaryMemcacheRequest) { diff --git a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/client/MemcacheRequestController.kt b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/client/MemcacheRequestController.kt index 06cc772..942606d 100644 --- a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/client/MemcacheRequestController.kt +++ b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/client/MemcacheRequestController.kt @@ -1,10 +1,13 @@ package net.woggioni.rbcs.server.memcache.client +import io.netty.channel.Channel import io.netty.handler.codec.memcache.MemcacheContent import io.netty.handler.codec.memcache.binary.BinaryMemcacheRequest interface MemcacheRequestController { + val channel: Channel + fun sendRequest(request : BinaryMemcacheRequest) fun sendContent(content : MemcacheContent) diff --git a/rbcs-server-otel/src/main/kotlin/net/woggioni/rbcs/server/otel/OtelController.kt b/rbcs-server-otel/src/main/kotlin/net/woggioni/rbcs/server/otel/OtelController.kt index bc83c79..56cdb09 100644 --- a/rbcs-server-otel/src/main/kotlin/net/woggioni/rbcs/server/otel/OtelController.kt +++ b/rbcs-server-otel/src/main/kotlin/net/woggioni/rbcs/server/otel/OtelController.kt @@ -8,7 +8,7 @@ import io.opentelemetry.instrumentation.logback.appender.v1_0.OpenTelemetryAppen import io.opentelemetry.instrumentation.netty.v4_1.NettyServerTelemetry import io.opentelemetry.instrumentation.runtimetelemetry.RuntimeTelemetry import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk -import net.woggioni.rbcs.api.RedisSpan +import net.woggioni.rbcs.api.SpanHandle import net.woggioni.rbcs.api.TelemetryController import net.woggioni.rbcs.common.createLogger import net.woggioni.rbcs.common.info @@ -44,21 +44,21 @@ class OtelController : TelemetryController { return NettyServerTelemetry.create(GlobalOpenTelemetry.get()).createCombinedHandler() } - override fun startRedisSpan(command: String, key: String): RedisSpan? { + override fun startSpan(command: String, key: String, dbSystem: String): SpanHandle? { val span = tracer.spanBuilder(command) .setSpanKind(SpanKind.CLIENT) - .setAttribute("db.system", "redis") + .setAttribute("db.system", dbSystem) .setAttribute("db.operation", command) .startSpan() - return RedisOtelSpan(span) + return OtelSpanHandle(span) } - override fun endRedisSpan(span: RedisSpan?) { - (span as? RedisOtelSpan)?.delegate?.end() + override fun endSpan(span: SpanHandle?) { + (span as? OtelSpanHandle)?.delegate?.end() } - override fun endRedisSpan(span: RedisSpan?, error: Throwable) { - val s = (span as? RedisOtelSpan)?.delegate ?: return + override fun endSpan(span: SpanHandle?, error: Throwable) { + val s = (span as? OtelSpanHandle)?.delegate ?: return s.recordException(error) s.setStatus(StatusCode.ERROR) s.end() diff --git a/rbcs-server-otel/src/main/kotlin/net/woggioni/rbcs/server/otel/RedisOtelSpan.kt b/rbcs-server-otel/src/main/kotlin/net/woggioni/rbcs/server/otel/OtelSpanHandle.kt similarity index 78% rename from rbcs-server-otel/src/main/kotlin/net/woggioni/rbcs/server/otel/RedisOtelSpan.kt rename to rbcs-server-otel/src/main/kotlin/net/woggioni/rbcs/server/otel/OtelSpanHandle.kt index 9e22617..e360908 100644 --- a/rbcs-server-otel/src/main/kotlin/net/woggioni/rbcs/server/otel/RedisOtelSpan.kt +++ b/rbcs-server-otel/src/main/kotlin/net/woggioni/rbcs/server/otel/OtelSpanHandle.kt @@ -1,11 +1,11 @@ package net.woggioni.rbcs.server.otel import io.opentelemetry.api.trace.Span -import net.woggioni.rbcs.api.RedisSpan +import net.woggioni.rbcs.api.SpanHandle -internal class RedisOtelSpan( +internal class OtelSpanHandle( val delegate: Span, -) : RedisSpan { +) : SpanHandle { override fun setAttribute(key: String, value: String) { delegate.setAttribute(key, value) diff --git a/rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/RedisCacheHandler.kt b/rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/RedisCacheHandler.kt index 731b0ac..de9e2f2 100644 --- a/rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/RedisCacheHandler.kt +++ b/rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/RedisCacheHandler.kt @@ -36,7 +36,7 @@ import net.woggioni.rbcs.api.message.CacheMessage.CachePutResponse import net.woggioni.rbcs.api.message.CacheMessage.CacheValueFoundResponse import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent -import net.woggioni.rbcs.api.RedisSpan +import net.woggioni.rbcs.api.SpanHandle import net.woggioni.rbcs.api.TelemetryController import net.woggioni.rbcs.common.ByteBufInputStream import net.woggioni.rbcs.common.ByteBufOutputStream @@ -250,7 +250,7 @@ class RedisCacheHandler( } val keyBytes = processCacheKey(msg.key, keyPrefix, digestAlgorithm) val keyString = String(keyBytes, StandardCharsets.UTF_8) - val redisSpan = telemetryController?.startRedisSpan("GET", keyString) + val redisSpan = telemetryController?.startSpan("GET", keyString, "redis") val responseHandler = object : RedisResponseHandler { override fun responseReceived(response: RedisMessage) { try { @@ -274,7 +274,7 @@ class RedisCacheHandler( is ErrorRedisMessage -> { val ex = RedisException("Redis error for GET ${msg.key}: ${response.content()}") - telemetryController?.endRedisSpan(redisSpan, ex) + telemetryController?.endSpan(redisSpan, ex) this@RedisCacheHandler.exceptionCaught(ctx, ex) } @@ -286,12 +286,12 @@ class RedisCacheHandler( } } } finally { - telemetryController?.endRedisSpan(redisSpan) + telemetryController?.endSpan(redisSpan) } } override fun exceptionCaught(ex: Throwable) { - telemetryController?.endRedisSpan(redisSpan, ex) + telemetryController?.endSpan(redisSpan, ex) this@RedisCacheHandler.exceptionCaught(ctx, ex) } } @@ -361,7 +361,7 @@ class RedisCacheHandler( val expirySeconds = maxAge.toSeconds().toString() - val redisSpan = telemetryController?.startRedisSpan("SET", request.keyString) + val redisSpan = telemetryController?.startSpan("SET", request.keyString, "redis") val responseHandler = object : RedisResponseHandler { override fun responseReceived(response: RedisMessage) { @@ -376,23 +376,23 @@ class RedisCacheHandler( is ErrorRedisMessage -> { val ex = RedisException("Redis error for SET ${request.keyString}: ${response.content()}") - telemetryController?.endRedisSpan(redisSpan, ex) + telemetryController?.endSpan(redisSpan, ex) this@RedisCacheHandler.exceptionCaught(ctx, ex) } else -> { val ex = RedisException("Unexpected response for SET ${request.keyString}: ${response.javaClass.name}") - telemetryController?.endRedisSpan(redisSpan, ex) + telemetryController?.endSpan(redisSpan, ex) this@RedisCacheHandler.exceptionCaught(ctx, ex) } } } finally { - telemetryController?.endRedisSpan(redisSpan) + telemetryController?.endSpan(redisSpan) } } override fun exceptionCaught(ex: Throwable) { - telemetryController?.endRedisSpan(redisSpan, ex) + telemetryController?.endSpan(redisSpan, ex) this@RedisCacheHandler.exceptionCaught(ctx, ex) } }