forked from woggioni/rbcs
Add OpenTelemetry tracing support for Redis commands
- Add RedisSpan interface in rbcs-api for opaque span handles - Extend TelemetryController with startRedisSpan/endRedisSpan methods - Implement Redis tracing in rbcs-server-otel via OtelController and RedisOtelSpan - Instrument RedisCacheHandler to create spans around GET and SET commands - Add uses directive in rbcs-server-redis module-info for ServiceLoader discovery Redis spans are created as CLIENT spans with attributes: db.system=redis, db.operation=GET|SET, server.address, server.port
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
import net.woggioni.rbcs.api.CacheProvider;
|
||||
import net.woggioni.rbcs.api.TelemetryController;
|
||||
|
||||
module net.woggioni.rbcs.server.redis {
|
||||
requires net.woggioni.rbcs.common;
|
||||
@@ -16,5 +17,7 @@ module net.woggioni.rbcs.server.redis {
|
||||
|
||||
provides CacheProvider with net.woggioni.rbcs.server.redis.RedisCacheProvider;
|
||||
|
||||
uses TelemetryController;
|
||||
|
||||
opens net.woggioni.rbcs.server.redis.schema;
|
||||
}
|
||||
|
||||
+72
-41
@@ -14,6 +14,7 @@ import io.netty.handler.codec.redis.SimpleStringRedisMessage
|
||||
import java.io.ByteArrayOutputStream
|
||||
import java.io.ObjectInputStream
|
||||
import java.io.ObjectOutputStream
|
||||
import java.net.InetSocketAddress
|
||||
import java.nio.ByteBuffer
|
||||
import java.nio.channels.Channels
|
||||
import java.nio.channels.FileChannel
|
||||
@@ -37,8 +38,11 @@ import net.woggioni.rbcs.api.message.CacheMessage.CachePutResponse
|
||||
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueFoundResponse
|
||||
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse
|
||||
import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent
|
||||
import net.woggioni.rbcs.api.RedisSpan
|
||||
import net.woggioni.rbcs.api.TelemetryController
|
||||
import net.woggioni.rbcs.common.ByteBufInputStream
|
||||
import net.woggioni.rbcs.common.ByteBufOutputStream
|
||||
import net.woggioni.rbcs.common.RBCS.loadService
|
||||
import net.woggioni.rbcs.common.RBCS.processCacheKey
|
||||
import net.woggioni.rbcs.common.RBCS.toIntOrNull
|
||||
import net.woggioni.rbcs.common.createLogger
|
||||
@@ -62,6 +66,10 @@ class RedisCacheHandler(
|
||||
private val log = createLogger<RedisCacheHandler>()
|
||||
}
|
||||
|
||||
private val telemetryController by lazy {
|
||||
loadService(TelemetryController::class.java).firstOrNull()
|
||||
}
|
||||
|
||||
private interface InProgressRequest
|
||||
|
||||
private inner class InProgressGetRequest(
|
||||
@@ -244,46 +252,57 @@ class RedisCacheHandler(
|
||||
}
|
||||
val keyBytes = processCacheKey(msg.key, keyPrefix, digestAlgorithm)
|
||||
val keyString = String(keyBytes, StandardCharsets.UTF_8)
|
||||
val redisSpan = telemetryController?.startRedisSpan("GET", keyString)
|
||||
val responseHandler = object : RedisResponseHandler {
|
||||
override fun responseReceived(response: RedisMessage) {
|
||||
when (response) {
|
||||
is FullBulkStringRedisMessage -> {
|
||||
if (response === FullBulkStringRedisMessage.NULL_INSTANCE || response.content().readableBytes() == 0) {
|
||||
log.debug(ctx) {
|
||||
"Cache miss for key ${msg.key} on Redis"
|
||||
try {
|
||||
when (response) {
|
||||
is FullBulkStringRedisMessage -> {
|
||||
if (response === FullBulkStringRedisMessage.NULL_INSTANCE || response.content().readableBytes() == 0) {
|
||||
log.debug(ctx) {
|
||||
"Cache miss for key ${msg.key} on Redis"
|
||||
}
|
||||
sendMessageAndFlush(ctx, CacheValueNotFoundResponse(msg.key))
|
||||
} else {
|
||||
log.debug(ctx) {
|
||||
"Cache hit for key ${msg.key} on Redis"
|
||||
}
|
||||
val getRequest = InProgressGetRequest(msg.key, ctx)
|
||||
inProgressRequest = getRequest
|
||||
getRequest.processResponse(response.content())
|
||||
inProgressRequest = null
|
||||
}
|
||||
}
|
||||
|
||||
is ErrorRedisMessage -> {
|
||||
val ex = RedisException("Redis error for GET ${msg.key}: ${response.content()}")
|
||||
telemetryController?.endRedisSpan(redisSpan, ex)
|
||||
this@RedisCacheHandler.exceptionCaught(ctx, ex)
|
||||
}
|
||||
|
||||
else -> {
|
||||
log.warn(ctx) {
|
||||
"Unexpected response type from Redis for key ${msg.key}: ${response.javaClass.name}"
|
||||
}
|
||||
sendMessageAndFlush(ctx, CacheValueNotFoundResponse(msg.key))
|
||||
} else {
|
||||
log.debug(ctx) {
|
||||
"Cache hit for key ${msg.key} on Redis"
|
||||
}
|
||||
val getRequest = InProgressGetRequest(msg.key, ctx)
|
||||
inProgressRequest = getRequest
|
||||
getRequest.processResponse(response.content())
|
||||
inProgressRequest = null
|
||||
}
|
||||
}
|
||||
|
||||
is ErrorRedisMessage -> {
|
||||
this@RedisCacheHandler.exceptionCaught(
|
||||
ctx, RedisException("Redis error for GET ${msg.key}: ${response.content()}")
|
||||
)
|
||||
}
|
||||
|
||||
else -> {
|
||||
log.warn(ctx) {
|
||||
"Unexpected response type from Redis for key ${msg.key}: ${response.javaClass.name}"
|
||||
}
|
||||
sendMessageAndFlush(ctx, CacheValueNotFoundResponse(msg.key))
|
||||
}
|
||||
} finally {
|
||||
telemetryController?.endRedisSpan(redisSpan)
|
||||
}
|
||||
}
|
||||
|
||||
override fun exceptionCaught(ex: Throwable) {
|
||||
telemetryController?.endRedisSpan(redisSpan, ex)
|
||||
this@RedisCacheHandler.exceptionCaught(ctx, ex)
|
||||
}
|
||||
}
|
||||
client.sendCommand(keyBytes, ctx.alloc(), responseHandler).thenAccept { channel ->
|
||||
val remoteAddr = channel.remoteAddress()
|
||||
if (remoteAddr is InetSocketAddress) {
|
||||
remoteAddr.hostString?.let { redisSpan?.setAttribute("server.address", it) }
|
||||
redisSpan?.setAttribute("server.port", remoteAddr.port.toLong())
|
||||
}
|
||||
log.trace(ctx) {
|
||||
"Sending GET request for key ${msg.key} to Redis"
|
||||
}
|
||||
@@ -344,37 +363,49 @@ class RedisCacheHandler(
|
||||
|
||||
val expirySeconds = maxAge.toSeconds().toString()
|
||||
|
||||
val redisSpan = telemetryController?.startRedisSpan("SET", request.keyString)
|
||||
|
||||
val responseHandler = object : RedisResponseHandler {
|
||||
override fun responseReceived(response: RedisMessage) {
|
||||
when (response) {
|
||||
is SimpleStringRedisMessage -> {
|
||||
log.debug(ctx) {
|
||||
"Inserted key ${request.keyString} into Redis"
|
||||
try {
|
||||
when (response) {
|
||||
is SimpleStringRedisMessage -> {
|
||||
log.debug(ctx) {
|
||||
"Inserted key ${request.keyString} into Redis"
|
||||
}
|
||||
sendMessageAndFlush(ctx, CachePutResponse(request.keyString))
|
||||
}
|
||||
sendMessageAndFlush(ctx, CachePutResponse(request.keyString))
|
||||
}
|
||||
|
||||
is ErrorRedisMessage -> {
|
||||
this@RedisCacheHandler.exceptionCaught(
|
||||
ctx, RedisException("Redis error for SET ${request.keyString}: ${response.content()}")
|
||||
)
|
||||
}
|
||||
is ErrorRedisMessage -> {
|
||||
val ex = RedisException("Redis error for SET ${request.keyString}: ${response.content()}")
|
||||
telemetryController?.endRedisSpan(redisSpan, ex)
|
||||
this@RedisCacheHandler.exceptionCaught(ctx, ex)
|
||||
}
|
||||
|
||||
else -> {
|
||||
this@RedisCacheHandler.exceptionCaught(
|
||||
ctx, RedisException("Unexpected response for SET ${request.keyString}: ${response.javaClass.name}")
|
||||
)
|
||||
else -> {
|
||||
val ex = RedisException("Unexpected response for SET ${request.keyString}: ${response.javaClass.name}")
|
||||
telemetryController?.endRedisSpan(redisSpan, ex)
|
||||
this@RedisCacheHandler.exceptionCaught(ctx, ex)
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
telemetryController?.endRedisSpan(redisSpan)
|
||||
}
|
||||
}
|
||||
|
||||
override fun exceptionCaught(ex: Throwable) {
|
||||
telemetryController?.endRedisSpan(redisSpan, ex)
|
||||
this@RedisCacheHandler.exceptionCaught(ctx, ex)
|
||||
}
|
||||
}
|
||||
|
||||
// Use a ByteBuf key for server selection
|
||||
client.sendCommand(keyBytes, ctx.alloc(), responseHandler).thenAccept { channel ->
|
||||
val remoteAddr = channel.remoteAddress()
|
||||
if (remoteAddr is InetSocketAddress) {
|
||||
remoteAddr.hostString?.let { redisSpan?.setAttribute("server.address", it) }
|
||||
redisSpan?.setAttribute("server.port", remoteAddr.port.toLong())
|
||||
}
|
||||
log.trace(ctx) {
|
||||
"Sending SET request to Redis"
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user