From 9a7a2566fa0ae825de37b9037a2bbebc8b394966 Mon Sep 17 00:00:00 2001 From: opencode Date: Thu, 21 May 2026 11:16:48 +0000 Subject: [PATCH] Generalize OTEL API and add memcache tracing support - Rename RedisSpan -> SpanHandle for generic span handling - Generalize TelemetryController methods: startSpan/endSpan with dbSystem param - Rename RedisOtelSpan -> OtelSpanHandle in rbcs-server-otel - Update Redis cache handler to use new generic API - Add OpenTelemetry tracing for memcache GET and SET commands - Add channel property to MemcacheRequestController for server address attribution - Add uses TelemetryController directive in memcache module-info Memcache spans follow the same pattern as Redis: db.system=memcache, db.operation=GET|SET, server.address, server.port --- gradle.properties | 2 +- .../api/{RedisSpan.java => SpanHandle.java} | 5 +- .../rbcs/api/TelemetryController.java | 8 +- .../src/main/java/module-info.java | 3 + .../server/memcache/MemcacheCacheHandler.kt | 73 ++++++++-- .../server/memcache/client/MemcacheClient.kt | 2 + .../client/MemcacheRequestController.kt | 3 + .../rbcs/server/otel/OtelController.kt | 19 ++- .../{RedisOtelSpan.kt => OtelSpanHandle.kt} | 10 +- .../rbcs/server/redis/RedisCacheHandler.kt | 127 ++++++++++-------- 10 files changed, 170 insertions(+), 82 deletions(-) rename rbcs-api/src/main/java/net/woggioni/rbcs/api/{RedisSpan.java => SpanHandle.java} (68%) rename rbcs-server-otel/src/main/kotlin/net/woggioni/rbcs/server/otel/{RedisOtelSpan.kt => OtelSpanHandle.kt} (61%) diff --git a/gradle.properties b/gradle.properties index 603f069..e4ea54a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -4,7 +4,7 @@ org.gradle.caching=true rbcs.version = 0.5.0 -lys.version = 2026.05.16 +lys.version = 2026.05.27 gitea.maven.url = https://gitea.woggioni.net/api/packages/woggioni/maven docker.registry.url=gitea.woggioni.net diff --git a/rbcs-api/src/main/java/net/woggioni/rbcs/api/RedisSpan.java b/rbcs-api/src/main/java/net/woggioni/rbcs/api/SpanHandle.java similarity index 68% rename from rbcs-api/src/main/java/net/woggioni/rbcs/api/RedisSpan.java rename to rbcs-api/src/main/java/net/woggioni/rbcs/api/SpanHandle.java index 2898f8b..4fc5a21 100644 --- a/rbcs-api/src/main/java/net/woggioni/rbcs/api/RedisSpan.java +++ b/rbcs-api/src/main/java/net/woggioni/rbcs/api/SpanHandle.java @@ -2,9 +2,12 @@ package net.woggioni.rbcs.api; import org.jetbrains.annotations.NotNull; -public interface RedisSpan { +public interface SpanHandle { void setAttribute(@NotNull String key, @NotNull String value); void setAttribute(@NotNull String key, long value); + + void setAttribute(@NotNull String key, boolean value); + } diff --git a/rbcs-api/src/main/java/net/woggioni/rbcs/api/TelemetryController.java b/rbcs-api/src/main/java/net/woggioni/rbcs/api/TelemetryController.java index e6914a2..bbe6932 100644 --- a/rbcs-api/src/main/java/net/woggioni/rbcs/api/TelemetryController.java +++ b/rbcs-api/src/main/java/net/woggioni/rbcs/api/TelemetryController.java @@ -4,13 +4,15 @@ import io.netty.channel.ChannelHandler; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import java.util.Map; + public interface TelemetryController { void initialize(); @NotNull ChannelHandler createHandler(); - @Nullable RedisSpan startRedisSpan(@NotNull String command, @NotNull String key); + @Nullable SpanHandle startSpan(@NotNull String command); - void endRedisSpan(@Nullable RedisSpan span); + void endSpan(@Nullable SpanHandle span); - void endRedisSpan(@Nullable RedisSpan span, @NotNull Throwable error); + void endSpan(@Nullable SpanHandle span, @NotNull Throwable error); } diff --git a/rbcs-server-memcache/src/main/java/module-info.java b/rbcs-server-memcache/src/main/java/module-info.java index 65abedc..170df4d 100644 --- a/rbcs-server-memcache/src/main/java/module-info.java +++ b/rbcs-server-memcache/src/main/java/module-info.java @@ -1,4 +1,5 @@ import net.woggioni.rbcs.api.CacheProvider; +import net.woggioni.rbcs.api.TelemetryController; module net.woggioni.rbcs.server.memcache { requires net.woggioni.rbcs.common; @@ -16,5 +17,7 @@ module net.woggioni.rbcs.server.memcache { provides CacheProvider with net.woggioni.rbcs.server.memcache.MemcacheCacheProvider; + uses TelemetryController; + opens net.woggioni.rbcs.server.memcache.schema; } \ No newline at end of file diff --git a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheHandler.kt b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheHandler.kt index 22273fd..3ce3aa6 100644 --- a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheHandler.kt +++ b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheHandler.kt @@ -9,9 +9,11 @@ import java.nio.channels.FileChannel import java.nio.channels.ReadableByteChannel import java.nio.file.Files import java.nio.file.StandardOpenOption +import java.net.InetSocketAddress import java.time.Duration import java.time.Instant import java.util.concurrent.CompletableFuture +import java.util.concurrent.atomic.AtomicReference import java.util.zip.Deflater import java.util.zip.DeflaterOutputStream import java.util.zip.InflaterOutputStream @@ -39,8 +41,11 @@ import net.woggioni.rbcs.api.message.CacheMessage.CachePutResponse import net.woggioni.rbcs.api.message.CacheMessage.CacheValueFoundResponse import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent +import net.woggioni.rbcs.api.SpanHandle +import net.woggioni.rbcs.api.TelemetryController import net.woggioni.rbcs.common.ByteBufInputStream import net.woggioni.rbcs.common.ByteBufOutputStream +import net.woggioni.rbcs.common.RBCS.loadService import net.woggioni.rbcs.common.RBCS.processCacheKey import net.woggioni.rbcs.common.RBCS.toIntOrNull import net.woggioni.rbcs.common.createLogger @@ -70,6 +75,10 @@ class MemcacheCacheHandler( } } + private val telemetryController by lazy { + loadService(TelemetryController::class.java).firstOrNull() + } + private interface InProgressRequest { } @@ -153,7 +162,9 @@ class MemcacheCacheHandler( metadata: CacheValueMetadata, val digest: ByteBuf, val requestController: CompletableFuture, - private val alloc: ByteBufAllocator + private val alloc: ByteBufAllocator, + val entryKey: String, + val memcacheSpanRef: AtomicReference, ) : InProgressRequest { private var totalSize = 0 private var tmpFile: FileChannel? = null @@ -251,6 +262,17 @@ class MemcacheCacheHandler( val key = ctx.alloc().buffer().also { it.writeBytes(processCacheKey(msg.key, keyPrefix, digestAlgorithm)) } + val memcacheSpan = telemetryController?.startSpan("GET")?.apply { + setAttribute("db.system", "memcache") + setAttribute("db.operation.name", "GET") + val remoteAddr = ctx.channel().remoteAddress() + if (remoteAddr is InetSocketAddress) { + remoteAddr.hostString?.let { + setAttribute("server.address", it) + } + setAttribute("server.port", remoteAddr.port.toLong()) + } + } val responseHandler = object : MemcacheResponseHandler { override fun responseReceived(response: BinaryMemcacheResponse) { val status = response.status() @@ -266,8 +288,15 @@ class MemcacheCacheHandler( log.debug(ctx) { "Cache miss for key ${msg.key} on memcache" } + telemetryController?.endSpan(memcacheSpan) sendMessageAndFlush(ctx, CacheValueNotFoundResponse(msg.key)) } + + else -> { + val ex = MemcacheException(status) + telemetryController?.endSpan(memcacheSpan, ex) + this@MemcacheCacheHandler.exceptionCaught(ctx, ex) + } } } @@ -282,11 +311,13 @@ class MemcacheCacheHandler( if (content is LastMemcacheContent) { inProgressRequest = null inProgressGetRequest.commit() + telemetryController?.endSpan(memcacheSpan) } } } override fun exceptionCaught(ex: Throwable) { + telemetryController?.endSpan(memcacheSpan, ex) (inProgressRequest as? InProgressGetRequest).let { inProgressGetRequest -> inProgressGetRequest?.let { inProgressRequest = null @@ -312,6 +343,7 @@ class MemcacheCacheHandler( val key = ctx.alloc().buffer().also { it.writeBytes(processCacheKey(msg.key, keyPrefix, digestAlgorithm)) } + val memcacheSpanRef = AtomicReference(null) val responseHandler = object : MemcacheResponseHandler { override fun responseReceived(response: BinaryMemcacheResponse) { val status = response.status() @@ -320,16 +352,22 @@ class MemcacheCacheHandler( log.debug(ctx) { "Inserted key ${msg.key} into memcache" } + telemetryController?.endSpan(memcacheSpanRef.get()) sendMessageAndFlush(ctx, CachePutResponse(msg.key)) } - else -> this@MemcacheCacheHandler.exceptionCaught(ctx, MemcacheException(status)) + else -> { + val ex = MemcacheException(status) + telemetryController?.endSpan(memcacheSpanRef.get(), ex) + this@MemcacheCacheHandler.exceptionCaught(ctx, ex) + } } } override fun contentReceived(content: MemcacheContent) {} override fun exceptionCaught(ex: Throwable) { + telemetryController?.endSpan(memcacheSpanRef.get(), ex) this@MemcacheCacheHandler.exceptionCaught(ctx, ex) } } @@ -339,7 +377,7 @@ class MemcacheCacheHandler( this@MemcacheCacheHandler.exceptionCaught(ctx, ex) } } - inProgressRequest = InProgressPutRequest(ctx.channel(), msg.metadata, key, requestController, ctx.alloc()) + inProgressRequest = InProgressPutRequest(ctx.channel(), msg.metadata, key, requestController, ctx.alloc(), msg.key, memcacheSpanRef) } private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) { @@ -362,22 +400,41 @@ class MemcacheCacheHandler( val request = inProgressRequest when (request) { is InProgressPutRequest -> { + val putRequest = request inProgressRequest = null log.trace(ctx) { "Received last chunk of ${msg.content().readableBytes()} bytes for memcache" } - request.write(msg.content()) - val key = request.digest.retainedDuplicate() - val (payloadSize, payloadSource) = request.commit() + putRequest.write(msg.content()) + val memcacheSpan = telemetryController?.startSpan("SET", + )?.apply { + setAttribute("db.system", "memcache") + setAttribute("db.operation.name", "SET") + val remoteAddr = ctx.channel().remoteAddress() + if (remoteAddr is InetSocketAddress) { + remoteAddr.hostString?.let { + setAttribute("server.address", it) + } + setAttribute("server.port", remoteAddr.port.toLong()) + } + } + putRequest.memcacheSpanRef.set(memcacheSpan) + val key = putRequest.digest.retainedDuplicate() + val (payloadSize, payloadSource) = putRequest.commit() val extras = ctx.alloc().buffer(8, 8) extras.writeInt(0) extras.writeInt(encodeExpiry(maxAge)) - val totalBodyLength = request.digest.readableBytes() + extras.readableBytes() + payloadSize + val totalBodyLength = putRequest.digest.readableBytes() + extras.readableBytes() + payloadSize log.trace(ctx) { "Trying to send SET request to memcache" } - request.requestController.whenComplete { requestController, ex -> + putRequest.requestController.whenComplete { requestController, ex -> if (ex == null) { + val remoteAddr = requestController.channel.remoteAddress() + if (remoteAddr is InetSocketAddress) { + remoteAddr.hostString?.let { memcacheSpan?.setAttribute("server.address", it) } + memcacheSpan?.setAttribute("server.port", remoteAddr.port.toLong()) + } log.trace(ctx) { "Sending SET request to memcache" } diff --git a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/client/MemcacheClient.kt b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/client/MemcacheClient.kt index 7899b4e..52eae11 100644 --- a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/client/MemcacheClient.kt +++ b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/client/MemcacheClient.kt @@ -147,6 +147,8 @@ class MemcacheClient( channel.pipeline().addLast(handler) response.complete(object : MemcacheRequestController { + override val channel: Channel = channel + private var channelReleased = false override fun sendRequest(request: BinaryMemcacheRequest) { diff --git a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/client/MemcacheRequestController.kt b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/client/MemcacheRequestController.kt index 06cc772..942606d 100644 --- a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/client/MemcacheRequestController.kt +++ b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/client/MemcacheRequestController.kt @@ -1,10 +1,13 @@ package net.woggioni.rbcs.server.memcache.client +import io.netty.channel.Channel import io.netty.handler.codec.memcache.MemcacheContent import io.netty.handler.codec.memcache.binary.BinaryMemcacheRequest interface MemcacheRequestController { + val channel: Channel + fun sendRequest(request : BinaryMemcacheRequest) fun sendContent(content : MemcacheContent) diff --git a/rbcs-server-otel/src/main/kotlin/net/woggioni/rbcs/server/otel/OtelController.kt b/rbcs-server-otel/src/main/kotlin/net/woggioni/rbcs/server/otel/OtelController.kt index bc83c79..12cbd95 100644 --- a/rbcs-server-otel/src/main/kotlin/net/woggioni/rbcs/server/otel/OtelController.kt +++ b/rbcs-server-otel/src/main/kotlin/net/woggioni/rbcs/server/otel/OtelController.kt @@ -2,13 +2,14 @@ package net.woggioni.rbcs.server.otel import io.netty.channel.ChannelHandler import io.opentelemetry.api.GlobalOpenTelemetry +import io.opentelemetry.api.common.AttributeKey import io.opentelemetry.api.trace.SpanKind import io.opentelemetry.api.trace.StatusCode import io.opentelemetry.instrumentation.logback.appender.v1_0.OpenTelemetryAppender import io.opentelemetry.instrumentation.netty.v4_1.NettyServerTelemetry import io.opentelemetry.instrumentation.runtimetelemetry.RuntimeTelemetry import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk -import net.woggioni.rbcs.api.RedisSpan +import net.woggioni.rbcs.api.SpanHandle import net.woggioni.rbcs.api.TelemetryController import net.woggioni.rbcs.common.createLogger import net.woggioni.rbcs.common.info @@ -44,21 +45,19 @@ class OtelController : TelemetryController { return NettyServerTelemetry.create(GlobalOpenTelemetry.get()).createCombinedHandler() } - override fun startRedisSpan(command: String, key: String): RedisSpan? { - val span = tracer.spanBuilder(command) + override fun startSpan(name: String): SpanHandle { + val span = tracer.spanBuilder(name) .setSpanKind(SpanKind.CLIENT) - .setAttribute("db.system", "redis") - .setAttribute("db.operation", command) .startSpan() - return RedisOtelSpan(span) + return OtelSpanHandle(span) } - override fun endRedisSpan(span: RedisSpan?) { - (span as? RedisOtelSpan)?.delegate?.end() + override fun endSpan(span: SpanHandle?) { + (span as? OtelSpanHandle)?.delegate?.end() } - override fun endRedisSpan(span: RedisSpan?, error: Throwable) { - val s = (span as? RedisOtelSpan)?.delegate ?: return + override fun endSpan(span: SpanHandle?, error: Throwable) { + val s = (span as? OtelSpanHandle)?.delegate ?: return s.recordException(error) s.setStatus(StatusCode.ERROR) s.end() diff --git a/rbcs-server-otel/src/main/kotlin/net/woggioni/rbcs/server/otel/RedisOtelSpan.kt b/rbcs-server-otel/src/main/kotlin/net/woggioni/rbcs/server/otel/OtelSpanHandle.kt similarity index 61% rename from rbcs-server-otel/src/main/kotlin/net/woggioni/rbcs/server/otel/RedisOtelSpan.kt rename to rbcs-server-otel/src/main/kotlin/net/woggioni/rbcs/server/otel/OtelSpanHandle.kt index 9e22617..9432214 100644 --- a/rbcs-server-otel/src/main/kotlin/net/woggioni/rbcs/server/otel/RedisOtelSpan.kt +++ b/rbcs-server-otel/src/main/kotlin/net/woggioni/rbcs/server/otel/OtelSpanHandle.kt @@ -1,11 +1,11 @@ package net.woggioni.rbcs.server.otel import io.opentelemetry.api.trace.Span -import net.woggioni.rbcs.api.RedisSpan +import net.woggioni.rbcs.api.SpanHandle -internal class RedisOtelSpan( +internal class OtelSpanHandle( val delegate: Span, -) : RedisSpan { +) : SpanHandle { override fun setAttribute(key: String, value: String) { delegate.setAttribute(key, value) @@ -14,4 +14,8 @@ internal class RedisOtelSpan( override fun setAttribute(key: String, value: Long) { delegate.setAttribute(key, value) } + + override fun setAttribute(key: String, value: Boolean) { + delegate.setAttribute(key, value) + } } diff --git a/rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/RedisCacheHandler.kt b/rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/RedisCacheHandler.kt index 731b0ac..c4f9e0a 100644 --- a/rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/RedisCacheHandler.kt +++ b/rbcs-server-redis/src/main/kotlin/net/woggioni/rbcs/server/redis/RedisCacheHandler.kt @@ -36,7 +36,6 @@ import net.woggioni.rbcs.api.message.CacheMessage.CachePutResponse import net.woggioni.rbcs.api.message.CacheMessage.CacheValueFoundResponse import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent -import net.woggioni.rbcs.api.RedisSpan import net.woggioni.rbcs.api.TelemetryController import net.woggioni.rbcs.common.ByteBufInputStream import net.woggioni.rbcs.common.ByteBufOutputStream @@ -250,48 +249,57 @@ class RedisCacheHandler( } val keyBytes = processCacheKey(msg.key, keyPrefix, digestAlgorithm) val keyString = String(keyBytes, StandardCharsets.UTF_8) - val redisSpan = telemetryController?.startRedisSpan("GET", keyString) + val redisSpan = telemetryController?.startSpan("GET")?.apply { + setAttribute("db.system", "redis") + setAttribute("db.operation.name", "GET") + val remoteAddr = ctx.channel().remoteAddress() + if (remoteAddr is InetSocketAddress) { + remoteAddr.hostString?.let { + setAttribute("server.address", it) + } + setAttribute("server.port", remoteAddr.port.toLong()) + } + } val responseHandler = object : RedisResponseHandler { override fun responseReceived(response: RedisMessage) { - try { - when (response) { - is FullBulkStringRedisMessage -> { - if (response === FullBulkStringRedisMessage.NULL_INSTANCE || response.content().readableBytes() == 0) { - log.debug(ctx) { - "Cache miss for key ${msg.key} on Redis" - } - sendMessageAndFlush(ctx, CacheValueNotFoundResponse(msg.key)) - } else { - log.debug(ctx) { - "Cache hit for key ${msg.key} on Redis" - } - val getRequest = InProgressGetRequest(msg.key, ctx) - inProgressRequest = getRequest - getRequest.processResponse(response.content()) - inProgressRequest = null - } - } - - is ErrorRedisMessage -> { - val ex = RedisException("Redis error for GET ${msg.key}: ${response.content()}") - telemetryController?.endRedisSpan(redisSpan, ex) - this@RedisCacheHandler.exceptionCaught(ctx, ex) - } - - else -> { - log.warn(ctx) { - "Unexpected response type from Redis for key ${msg.key}: ${response.javaClass.name}" + when (response) { + is FullBulkStringRedisMessage -> { + if (response === FullBulkStringRedisMessage.NULL_INSTANCE || response.content().readableBytes() == 0) { + log.debug(ctx) { + "Cache miss for key ${msg.key} on Redis" } + telemetryController?.endSpan(redisSpan) sendMessageAndFlush(ctx, CacheValueNotFoundResponse(msg.key)) + } else { + log.debug(ctx) { + "Cache hit for key ${msg.key} on Redis" + } + telemetryController?.endSpan(redisSpan) + val getRequest = InProgressGetRequest(msg.key, ctx) + inProgressRequest = getRequest + getRequest.processResponse(response.content()) + inProgressRequest = null } } - } finally { - telemetryController?.endRedisSpan(redisSpan) + + is ErrorRedisMessage -> { + val ex = RedisException("Redis error for GET ${msg.key}: ${response.content()}") + telemetryController?.endSpan(redisSpan, ex) + this@RedisCacheHandler.exceptionCaught(ctx, ex) + } + + else -> { + log.warn(ctx) { + "Unexpected response type from Redis for key ${msg.key}: ${response.javaClass.name}" + } + telemetryController?.endSpan(redisSpan) + sendMessageAndFlush(ctx, CacheValueNotFoundResponse(msg.key)) + } } } override fun exceptionCaught(ex: Throwable) { - telemetryController?.endRedisSpan(redisSpan, ex) + telemetryController?.endSpan(redisSpan, ex) this@RedisCacheHandler.exceptionCaught(ctx, ex) } } @@ -361,38 +369,45 @@ class RedisCacheHandler( val expirySeconds = maxAge.toSeconds().toString() - val redisSpan = telemetryController?.startRedisSpan("SET", request.keyString) + val redisSpan = telemetryController?.startSpan("SET")?.apply { + setAttribute("db.system", "redis") + setAttribute("db.operation.name", "SET") + val remoteAddr = ctx.channel().remoteAddress() + if (remoteAddr is InetSocketAddress) { + remoteAddr.hostString?.let { + setAttribute("server.address", it) + } + setAttribute("server.port", remoteAddr.port.toLong()) + } + } val responseHandler = object : RedisResponseHandler { override fun responseReceived(response: RedisMessage) { - try { - when (response) { - is SimpleStringRedisMessage -> { - log.debug(ctx) { - "Inserted key ${request.keyString} into Redis" - } - sendMessageAndFlush(ctx, CachePutResponse(request.keyString)) - } - - is ErrorRedisMessage -> { - val ex = RedisException("Redis error for SET ${request.keyString}: ${response.content()}") - telemetryController?.endRedisSpan(redisSpan, ex) - this@RedisCacheHandler.exceptionCaught(ctx, ex) - } - - else -> { - val ex = RedisException("Unexpected response for SET ${request.keyString}: ${response.javaClass.name}") - telemetryController?.endRedisSpan(redisSpan, ex) - this@RedisCacheHandler.exceptionCaught(ctx, ex) + when (response) { + is SimpleStringRedisMessage -> { + log.debug(ctx) { + "Inserted key ${request.keyString} into Redis" } + telemetryController?.endSpan(redisSpan) + sendMessageAndFlush(ctx, CachePutResponse(request.keyString)) + } + + is ErrorRedisMessage -> { + val ex = RedisException("Redis error for SET ${request.keyString}: ${response.content()}") + telemetryController?.endSpan(redisSpan, ex) + this@RedisCacheHandler.exceptionCaught(ctx, ex) + } + + else -> { + val ex = RedisException("Unexpected response for SET ${request.keyString}: ${response.javaClass.name}") + telemetryController?.endSpan(redisSpan, ex) + this@RedisCacheHandler.exceptionCaught(ctx, ex) } - } finally { - telemetryController?.endRedisSpan(redisSpan) } } override fun exceptionCaught(ex: Throwable) { - telemetryController?.endRedisSpan(redisSpan, ex) + telemetryController?.endSpan(redisSpan, ex) this@RedisCacheHandler.exceptionCaught(ctx, ex) } }