Generalize OTEL API and add memcache tracing support

- Rename RedisSpan -> SpanHandle for generic span handling
- Generalize TelemetryController methods: startSpan/endSpan with dbSystem param
- Rename RedisOtelSpan -> OtelSpanHandle in rbcs-server-otel
- Update Redis cache handler to use new generic API
- Add OpenTelemetry tracing for memcache GET and SET commands
- Add channel property to MemcacheRequestController for server address attribution
- Add uses TelemetryController directive in memcache module-info

Memcache spans follow the same pattern as Redis:
db.system=memcache, db.operation=GET|SET, server.address, server.port
This commit is contained in:
opencode
2026-05-21 11:16:48 +00:00
parent 86a5fba7f4
commit 16db4107aa
9 changed files with 82 additions and 33 deletions
@@ -2,7 +2,7 @@ package net.woggioni.rbcs.api;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
public interface RedisSpan { public interface SpanHandle {
void setAttribute(@NotNull String key, @NotNull String value); void setAttribute(@NotNull String key, @NotNull String value);
@@ -8,9 +8,9 @@ public interface TelemetryController {
void initialize(); void initialize();
@NotNull ChannelHandler createHandler(); @NotNull ChannelHandler createHandler();
@Nullable RedisSpan startRedisSpan(@NotNull String command, @NotNull String key); @Nullable SpanHandle startSpan(@NotNull String command, @NotNull String key, @NotNull String dbSystem);
void endRedisSpan(@Nullable RedisSpan span); void endSpan(@Nullable SpanHandle span);
void endRedisSpan(@Nullable RedisSpan span, @NotNull Throwable error); void endSpan(@Nullable SpanHandle span, @NotNull Throwable error);
} }
@@ -1,4 +1,5 @@
import net.woggioni.rbcs.api.CacheProvider; import net.woggioni.rbcs.api.CacheProvider;
import net.woggioni.rbcs.api.TelemetryController;
module net.woggioni.rbcs.server.memcache { module net.woggioni.rbcs.server.memcache {
requires net.woggioni.rbcs.common; requires net.woggioni.rbcs.common;
@@ -16,5 +17,7 @@ module net.woggioni.rbcs.server.memcache {
provides CacheProvider with net.woggioni.rbcs.server.memcache.MemcacheCacheProvider; provides CacheProvider with net.woggioni.rbcs.server.memcache.MemcacheCacheProvider;
uses TelemetryController;
opens net.woggioni.rbcs.server.memcache.schema; opens net.woggioni.rbcs.server.memcache.schema;
} }
@@ -22,9 +22,11 @@ import java.nio.channels.FileChannel
import java.nio.channels.ReadableByteChannel import java.nio.channels.ReadableByteChannel
import java.nio.file.Files import java.nio.file.Files
import java.nio.file.StandardOpenOption import java.nio.file.StandardOpenOption
import java.net.InetSocketAddress
import java.time.Duration import java.time.Duration
import java.time.Instant import java.time.Instant
import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicReference
import java.util.zip.Deflater import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream import java.util.zip.DeflaterOutputStream
import java.util.zip.InflaterOutputStream import java.util.zip.InflaterOutputStream
@@ -39,8 +41,11 @@ import net.woggioni.rbcs.api.message.CacheMessage.CachePutResponse
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueFoundResponse import net.woggioni.rbcs.api.message.CacheMessage.CacheValueFoundResponse
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse
import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent
import net.woggioni.rbcs.api.SpanHandle
import net.woggioni.rbcs.api.TelemetryController
import net.woggioni.rbcs.common.ByteBufInputStream import net.woggioni.rbcs.common.ByteBufInputStream
import net.woggioni.rbcs.common.ByteBufOutputStream import net.woggioni.rbcs.common.ByteBufOutputStream
import net.woggioni.rbcs.common.RBCS.loadService
import net.woggioni.rbcs.common.RBCS.processCacheKey import net.woggioni.rbcs.common.RBCS.processCacheKey
import net.woggioni.rbcs.common.RBCS.toIntOrNull import net.woggioni.rbcs.common.RBCS.toIntOrNull
import net.woggioni.rbcs.common.createLogger import net.woggioni.rbcs.common.createLogger
@@ -70,6 +75,10 @@ class MemcacheCacheHandler(
} }
} }
private val telemetryController by lazy {
loadService(TelemetryController::class.java).firstOrNull()
}
private interface InProgressRequest { private interface InProgressRequest {
} }
@@ -153,7 +162,9 @@ class MemcacheCacheHandler(
metadata: CacheValueMetadata, metadata: CacheValueMetadata,
val digest: ByteBuf, val digest: ByteBuf,
val requestController: CompletableFuture<MemcacheRequestController>, val requestController: CompletableFuture<MemcacheRequestController>,
private val alloc: ByteBufAllocator private val alloc: ByteBufAllocator,
val entryKey: String,
val memcacheSpanRef: AtomicReference<SpanHandle?>,
) : InProgressRequest { ) : InProgressRequest {
private var totalSize = 0 private var totalSize = 0
private var tmpFile: FileChannel? = null private var tmpFile: FileChannel? = null
@@ -251,6 +262,7 @@ class MemcacheCacheHandler(
val key = ctx.alloc().buffer().also { val key = ctx.alloc().buffer().also {
it.writeBytes(processCacheKey(msg.key, keyPrefix, digestAlgorithm)) it.writeBytes(processCacheKey(msg.key, keyPrefix, digestAlgorithm))
} }
val memcacheSpan = telemetryController?.startSpan("GET", msg.key, "memcache")
val responseHandler = object : MemcacheResponseHandler { val responseHandler = object : MemcacheResponseHandler {
override fun responseReceived(response: BinaryMemcacheResponse) { override fun responseReceived(response: BinaryMemcacheResponse) {
val status = response.status() val status = response.status()
@@ -266,8 +278,15 @@ class MemcacheCacheHandler(
log.debug(ctx) { log.debug(ctx) {
"Cache miss for key ${msg.key} on memcache" "Cache miss for key ${msg.key} on memcache"
} }
telemetryController?.endSpan(memcacheSpan)
sendMessageAndFlush(ctx, CacheValueNotFoundResponse(msg.key)) sendMessageAndFlush(ctx, CacheValueNotFoundResponse(msg.key))
} }
else -> {
val ex = MemcacheException(status)
telemetryController?.endSpan(memcacheSpan, ex)
this@MemcacheCacheHandler.exceptionCaught(ctx, ex)
}
} }
} }
@@ -282,11 +301,13 @@ class MemcacheCacheHandler(
if (content is LastMemcacheContent) { if (content is LastMemcacheContent) {
inProgressRequest = null inProgressRequest = null
inProgressGetRequest.commit() inProgressGetRequest.commit()
telemetryController?.endSpan(memcacheSpan)
} }
} }
} }
override fun exceptionCaught(ex: Throwable) { override fun exceptionCaught(ex: Throwable) {
telemetryController?.endSpan(memcacheSpan, ex)
(inProgressRequest as? InProgressGetRequest).let { inProgressGetRequest -> (inProgressRequest as? InProgressGetRequest).let { inProgressGetRequest ->
inProgressGetRequest?.let { inProgressGetRequest?.let {
inProgressRequest = null inProgressRequest = null
@@ -297,6 +318,11 @@ class MemcacheCacheHandler(
} }
} }
client.sendRequest(key.retainedDuplicate(), responseHandler).thenAccept { requestHandle -> client.sendRequest(key.retainedDuplicate(), responseHandler).thenAccept { requestHandle ->
val remoteAddr = requestHandle.channel.remoteAddress()
if (remoteAddr is InetSocketAddress) {
remoteAddr.hostString?.let { memcacheSpan?.setAttribute("server.address", it) }
memcacheSpan?.setAttribute("server.port", remoteAddr.port.toLong())
}
log.trace(ctx) { log.trace(ctx) {
"Sending GET request for key ${msg.key} to memcache" "Sending GET request for key ${msg.key} to memcache"
} }
@@ -312,6 +338,7 @@ class MemcacheCacheHandler(
val key = ctx.alloc().buffer().also { val key = ctx.alloc().buffer().also {
it.writeBytes(processCacheKey(msg.key, keyPrefix, digestAlgorithm)) it.writeBytes(processCacheKey(msg.key, keyPrefix, digestAlgorithm))
} }
val memcacheSpanRef = AtomicReference<SpanHandle?>(null)
val responseHandler = object : MemcacheResponseHandler { val responseHandler = object : MemcacheResponseHandler {
override fun responseReceived(response: BinaryMemcacheResponse) { override fun responseReceived(response: BinaryMemcacheResponse) {
val status = response.status() val status = response.status()
@@ -320,16 +347,22 @@ class MemcacheCacheHandler(
log.debug(ctx) { log.debug(ctx) {
"Inserted key ${msg.key} into memcache" "Inserted key ${msg.key} into memcache"
} }
telemetryController?.endSpan(memcacheSpanRef.get())
sendMessageAndFlush(ctx, CachePutResponse(msg.key)) sendMessageAndFlush(ctx, CachePutResponse(msg.key))
} }
else -> this@MemcacheCacheHandler.exceptionCaught(ctx, MemcacheException(status)) else -> {
val ex = MemcacheException(status)
telemetryController?.endSpan(memcacheSpanRef.get(), ex)
this@MemcacheCacheHandler.exceptionCaught(ctx, ex)
}
} }
} }
override fun contentReceived(content: MemcacheContent) {} override fun contentReceived(content: MemcacheContent) {}
override fun exceptionCaught(ex: Throwable) { override fun exceptionCaught(ex: Throwable) {
telemetryController?.endSpan(memcacheSpanRef.get(), ex)
this@MemcacheCacheHandler.exceptionCaught(ctx, ex) this@MemcacheCacheHandler.exceptionCaught(ctx, ex)
} }
} }
@@ -339,7 +372,7 @@ class MemcacheCacheHandler(
this@MemcacheCacheHandler.exceptionCaught(ctx, ex) this@MemcacheCacheHandler.exceptionCaught(ctx, ex)
} }
} }
inProgressRequest = InProgressPutRequest(ctx.channel(), msg.metadata, key, requestController, ctx.alloc()) inProgressRequest = InProgressPutRequest(ctx.channel(), msg.metadata, key, requestController, ctx.alloc(), msg.key, memcacheSpanRef)
} }
private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) { private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) {
@@ -362,22 +395,30 @@ class MemcacheCacheHandler(
val request = inProgressRequest val request = inProgressRequest
when (request) { when (request) {
is InProgressPutRequest -> { is InProgressPutRequest -> {
val putRequest = request
inProgressRequest = null inProgressRequest = null
log.trace(ctx) { log.trace(ctx) {
"Received last chunk of ${msg.content().readableBytes()} bytes for memcache" "Received last chunk of ${msg.content().readableBytes()} bytes for memcache"
} }
request.write(msg.content()) putRequest.write(msg.content())
val key = request.digest.retainedDuplicate() val memcacheSpan = telemetryController?.startSpan("SET", putRequest.entryKey, "memcache")
val (payloadSize, payloadSource) = request.commit() putRequest.memcacheSpanRef.set(memcacheSpan)
val key = putRequest.digest.retainedDuplicate()
val (payloadSize, payloadSource) = putRequest.commit()
val extras = ctx.alloc().buffer(8, 8) val extras = ctx.alloc().buffer(8, 8)
extras.writeInt(0) extras.writeInt(0)
extras.writeInt(encodeExpiry(maxAge)) extras.writeInt(encodeExpiry(maxAge))
val totalBodyLength = request.digest.readableBytes() + extras.readableBytes() + payloadSize val totalBodyLength = putRequest.digest.readableBytes() + extras.readableBytes() + payloadSize
log.trace(ctx) { log.trace(ctx) {
"Trying to send SET request to memcache" "Trying to send SET request to memcache"
} }
request.requestController.whenComplete { requestController, ex -> putRequest.requestController.whenComplete { requestController, ex ->
if (ex == null) { if (ex == null) {
val remoteAddr = requestController.channel.remoteAddress()
if (remoteAddr is InetSocketAddress) {
remoteAddr.hostString?.let { memcacheSpan?.setAttribute("server.address", it) }
memcacheSpan?.setAttribute("server.port", remoteAddr.port.toLong())
}
log.trace(ctx) { log.trace(ctx) {
"Sending SET request to memcache" "Sending SET request to memcache"
} }
@@ -147,6 +147,8 @@ class MemcacheClient(
channel.pipeline().addLast(handler) channel.pipeline().addLast(handler)
response.complete(object : MemcacheRequestController { response.complete(object : MemcacheRequestController {
override val channel: Channel = channel
private var channelReleased = false private var channelReleased = false
override fun sendRequest(request: BinaryMemcacheRequest) { override fun sendRequest(request: BinaryMemcacheRequest) {
@@ -1,10 +1,13 @@
package net.woggioni.rbcs.server.memcache.client package net.woggioni.rbcs.server.memcache.client
import io.netty.channel.Channel
import io.netty.handler.codec.memcache.MemcacheContent import io.netty.handler.codec.memcache.MemcacheContent
import io.netty.handler.codec.memcache.binary.BinaryMemcacheRequest import io.netty.handler.codec.memcache.binary.BinaryMemcacheRequest
interface MemcacheRequestController { interface MemcacheRequestController {
val channel: Channel
fun sendRequest(request : BinaryMemcacheRequest) fun sendRequest(request : BinaryMemcacheRequest)
fun sendContent(content : MemcacheContent) fun sendContent(content : MemcacheContent)
@@ -8,7 +8,7 @@ import io.opentelemetry.instrumentation.logback.appender.v1_0.OpenTelemetryAppen
import io.opentelemetry.instrumentation.netty.v4_1.NettyServerTelemetry import io.opentelemetry.instrumentation.netty.v4_1.NettyServerTelemetry
import io.opentelemetry.instrumentation.runtimetelemetry.RuntimeTelemetry import io.opentelemetry.instrumentation.runtimetelemetry.RuntimeTelemetry
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk
import net.woggioni.rbcs.api.RedisSpan import net.woggioni.rbcs.api.SpanHandle
import net.woggioni.rbcs.api.TelemetryController import net.woggioni.rbcs.api.TelemetryController
import net.woggioni.rbcs.common.createLogger import net.woggioni.rbcs.common.createLogger
import net.woggioni.rbcs.common.info import net.woggioni.rbcs.common.info
@@ -44,21 +44,21 @@ class OtelController : TelemetryController {
return NettyServerTelemetry.create(GlobalOpenTelemetry.get()).createCombinedHandler() return NettyServerTelemetry.create(GlobalOpenTelemetry.get()).createCombinedHandler()
} }
override fun startRedisSpan(command: String, key: String): RedisSpan? { override fun startSpan(command: String, key: String, dbSystem: String): SpanHandle? {
val span = tracer.spanBuilder(command) val span = tracer.spanBuilder(command)
.setSpanKind(SpanKind.CLIENT) .setSpanKind(SpanKind.CLIENT)
.setAttribute("db.system", "redis") .setAttribute("db.system", dbSystem)
.setAttribute("db.operation", command) .setAttribute("db.operation", command)
.startSpan() .startSpan()
return RedisOtelSpan(span) return OtelSpanHandle(span)
} }
override fun endRedisSpan(span: RedisSpan?) { override fun endSpan(span: SpanHandle?) {
(span as? RedisOtelSpan)?.delegate?.end() (span as? OtelSpanHandle)?.delegate?.end()
} }
override fun endRedisSpan(span: RedisSpan?, error: Throwable) { override fun endSpan(span: SpanHandle?, error: Throwable) {
val s = (span as? RedisOtelSpan)?.delegate ?: return val s = (span as? OtelSpanHandle)?.delegate ?: return
s.recordException(error) s.recordException(error)
s.setStatus(StatusCode.ERROR) s.setStatus(StatusCode.ERROR)
s.end() s.end()
@@ -1,11 +1,11 @@
package net.woggioni.rbcs.server.otel package net.woggioni.rbcs.server.otel
import io.opentelemetry.api.trace.Span import io.opentelemetry.api.trace.Span
import net.woggioni.rbcs.api.RedisSpan import net.woggioni.rbcs.api.SpanHandle
internal class RedisOtelSpan( internal class OtelSpanHandle(
val delegate: Span, val delegate: Span,
) : RedisSpan { ) : SpanHandle {
override fun setAttribute(key: String, value: String) { override fun setAttribute(key: String, value: String) {
delegate.setAttribute(key, value) delegate.setAttribute(key, value)
@@ -38,7 +38,7 @@ import net.woggioni.rbcs.api.message.CacheMessage.CachePutResponse
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueFoundResponse import net.woggioni.rbcs.api.message.CacheMessage.CacheValueFoundResponse
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse
import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent
import net.woggioni.rbcs.api.RedisSpan import net.woggioni.rbcs.api.SpanHandle
import net.woggioni.rbcs.api.TelemetryController import net.woggioni.rbcs.api.TelemetryController
import net.woggioni.rbcs.common.ByteBufInputStream import net.woggioni.rbcs.common.ByteBufInputStream
import net.woggioni.rbcs.common.ByteBufOutputStream import net.woggioni.rbcs.common.ByteBufOutputStream
@@ -252,7 +252,7 @@ class RedisCacheHandler(
} }
val keyBytes = processCacheKey(msg.key, keyPrefix, digestAlgorithm) val keyBytes = processCacheKey(msg.key, keyPrefix, digestAlgorithm)
val keyString = String(keyBytes, StandardCharsets.UTF_8) val keyString = String(keyBytes, StandardCharsets.UTF_8)
val redisSpan = telemetryController?.startRedisSpan("GET", keyString) val redisSpan = telemetryController?.startSpan("GET", keyString, "redis")
val responseHandler = object : RedisResponseHandler { val responseHandler = object : RedisResponseHandler {
override fun responseReceived(response: RedisMessage) { override fun responseReceived(response: RedisMessage) {
try { try {
@@ -276,7 +276,7 @@ class RedisCacheHandler(
is ErrorRedisMessage -> { is ErrorRedisMessage -> {
val ex = RedisException("Redis error for GET ${msg.key}: ${response.content()}") val ex = RedisException("Redis error for GET ${msg.key}: ${response.content()}")
telemetryController?.endRedisSpan(redisSpan, ex) telemetryController?.endSpan(redisSpan, ex)
this@RedisCacheHandler.exceptionCaught(ctx, ex) this@RedisCacheHandler.exceptionCaught(ctx, ex)
} }
@@ -288,12 +288,12 @@ class RedisCacheHandler(
} }
} }
} finally { } finally {
telemetryController?.endRedisSpan(redisSpan) telemetryController?.endSpan(redisSpan)
} }
} }
override fun exceptionCaught(ex: Throwable) { override fun exceptionCaught(ex: Throwable) {
telemetryController?.endRedisSpan(redisSpan, ex) telemetryController?.endSpan(redisSpan, ex)
this@RedisCacheHandler.exceptionCaught(ctx, ex) this@RedisCacheHandler.exceptionCaught(ctx, ex)
} }
} }
@@ -363,7 +363,7 @@ class RedisCacheHandler(
val expirySeconds = maxAge.toSeconds().toString() val expirySeconds = maxAge.toSeconds().toString()
val redisSpan = telemetryController?.startRedisSpan("SET", request.keyString) val redisSpan = telemetryController?.startSpan("SET", request.keyString, "redis")
val responseHandler = object : RedisResponseHandler { val responseHandler = object : RedisResponseHandler {
override fun responseReceived(response: RedisMessage) { override fun responseReceived(response: RedisMessage) {
@@ -378,23 +378,23 @@ class RedisCacheHandler(
is ErrorRedisMessage -> { is ErrorRedisMessage -> {
val ex = RedisException("Redis error for SET ${request.keyString}: ${response.content()}") val ex = RedisException("Redis error for SET ${request.keyString}: ${response.content()}")
telemetryController?.endRedisSpan(redisSpan, ex) telemetryController?.endSpan(redisSpan, ex)
this@RedisCacheHandler.exceptionCaught(ctx, ex) this@RedisCacheHandler.exceptionCaught(ctx, ex)
} }
else -> { else -> {
val ex = RedisException("Unexpected response for SET ${request.keyString}: ${response.javaClass.name}") val ex = RedisException("Unexpected response for SET ${request.keyString}: ${response.javaClass.name}")
telemetryController?.endRedisSpan(redisSpan, ex) telemetryController?.endSpan(redisSpan, ex)
this@RedisCacheHandler.exceptionCaught(ctx, ex) this@RedisCacheHandler.exceptionCaught(ctx, ex)
} }
} }
} finally { } finally {
telemetryController?.endRedisSpan(redisSpan) telemetryController?.endSpan(redisSpan)
} }
} }
override fun exceptionCaught(ex: Throwable) { override fun exceptionCaught(ex: Throwable) {
telemetryController?.endRedisSpan(redisSpan, ex) telemetryController?.endSpan(redisSpan, ex)
this@RedisCacheHandler.exceptionCaught(ctx, ex) this@RedisCacheHandler.exceptionCaught(ctx, ex)
} }
} }