Add OpenTelemetry tracing support for Redis commands
- Add RedisSpan interface in rbcs-api for opaque span handles - Extend TelemetryController with startRedisSpan/endRedisSpan methods - Implement Redis tracing in rbcs-server-otel via OtelController and RedisOtelSpan - Instrument RedisCacheHandler to create spans around GET and SET commands - Add uses directive in rbcs-server-redis module-info for ServiceLoader discovery Redis spans are created as CLIENT spans with attributes: db.system=redis, db.operation=GET|SET, server.address, server.port
This commit is contained in:
@@ -0,0 +1,10 @@
|
|||||||
|
package net.woggioni.rbcs.api;
|
||||||
|
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
|
||||||
|
public interface RedisSpan {
|
||||||
|
|
||||||
|
void setAttribute(@NotNull String key, @NotNull String value);
|
||||||
|
|
||||||
|
void setAttribute(@NotNull String key, long value);
|
||||||
|
}
|
||||||
@@ -2,8 +2,15 @@ package net.woggioni.rbcs.api;
|
|||||||
|
|
||||||
import io.netty.channel.ChannelHandler;
|
import io.netty.channel.ChannelHandler;
|
||||||
import org.jetbrains.annotations.NotNull;
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
import org.jetbrains.annotations.Nullable;
|
||||||
|
|
||||||
public interface TelemetryController {
|
public interface TelemetryController {
|
||||||
void initialize();
|
void initialize();
|
||||||
@NotNull ChannelHandler createHandler();
|
@NotNull ChannelHandler createHandler();
|
||||||
|
|
||||||
|
@Nullable RedisSpan startRedisSpan(@NotNull String command, @NotNull String key);
|
||||||
|
|
||||||
|
void endRedisSpan(@Nullable RedisSpan span);
|
||||||
|
|
||||||
|
void endRedisSpan(@Nullable RedisSpan span, @NotNull Throwable error);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,10 +2,13 @@ package net.woggioni.rbcs.server.otel
|
|||||||
|
|
||||||
import io.netty.channel.ChannelHandler
|
import io.netty.channel.ChannelHandler
|
||||||
import io.opentelemetry.api.GlobalOpenTelemetry
|
import io.opentelemetry.api.GlobalOpenTelemetry
|
||||||
|
import io.opentelemetry.api.trace.SpanKind
|
||||||
|
import io.opentelemetry.api.trace.StatusCode
|
||||||
import io.opentelemetry.instrumentation.logback.appender.v1_0.OpenTelemetryAppender
|
import io.opentelemetry.instrumentation.logback.appender.v1_0.OpenTelemetryAppender
|
||||||
import io.opentelemetry.instrumentation.netty.v4_1.NettyServerTelemetry
|
import io.opentelemetry.instrumentation.netty.v4_1.NettyServerTelemetry
|
||||||
import io.opentelemetry.instrumentation.runtimetelemetry.RuntimeTelemetry
|
import io.opentelemetry.instrumentation.runtimetelemetry.RuntimeTelemetry
|
||||||
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk
|
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk
|
||||||
|
import net.woggioni.rbcs.api.RedisSpan
|
||||||
import net.woggioni.rbcs.api.TelemetryController
|
import net.woggioni.rbcs.api.TelemetryController
|
||||||
import net.woggioni.rbcs.common.createLogger
|
import net.woggioni.rbcs.common.createLogger
|
||||||
import net.woggioni.rbcs.common.info
|
import net.woggioni.rbcs.common.info
|
||||||
@@ -14,6 +17,10 @@ class OtelController : TelemetryController {
|
|||||||
|
|
||||||
private val log = createLogger<OtelController>()
|
private val log = createLogger<OtelController>()
|
||||||
|
|
||||||
|
private val tracer by lazy {
|
||||||
|
GlobalOpenTelemetry.getTracer("net.woggioni.rbcs.server.redis", "0.5.0")
|
||||||
|
}
|
||||||
|
|
||||||
override fun initialize() {
|
override fun initialize() {
|
||||||
log.info { "Initializing OpenTelemetry SDK with auto-configuration" }
|
log.info { "Initializing OpenTelemetry SDK with auto-configuration" }
|
||||||
|
|
||||||
@@ -36,4 +43,24 @@ class OtelController : TelemetryController {
|
|||||||
override fun createHandler(): ChannelHandler {
|
override fun createHandler(): ChannelHandler {
|
||||||
return NettyServerTelemetry.create(GlobalOpenTelemetry.get()).createCombinedHandler()
|
return NettyServerTelemetry.create(GlobalOpenTelemetry.get()).createCombinedHandler()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override fun startRedisSpan(command: String, key: String): RedisSpan? {
|
||||||
|
val span = tracer.spanBuilder(command)
|
||||||
|
.setSpanKind(SpanKind.CLIENT)
|
||||||
|
.setAttribute("db.system", "redis")
|
||||||
|
.setAttribute("db.operation", command)
|
||||||
|
.startSpan()
|
||||||
|
return RedisOtelSpan(span)
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun endRedisSpan(span: RedisSpan?) {
|
||||||
|
(span as? RedisOtelSpan)?.delegate?.end()
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun endRedisSpan(span: RedisSpan?, error: Throwable) {
|
||||||
|
val s = (span as? RedisOtelSpan)?.delegate ?: return
|
||||||
|
s.recordException(error)
|
||||||
|
s.setStatus(StatusCode.ERROR)
|
||||||
|
s.end()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,17 @@
|
|||||||
|
package net.woggioni.rbcs.server.otel
|
||||||
|
|
||||||
|
import io.opentelemetry.api.trace.Span
|
||||||
|
import net.woggioni.rbcs.api.RedisSpan
|
||||||
|
|
||||||
|
internal class RedisOtelSpan(
|
||||||
|
val delegate: Span,
|
||||||
|
) : RedisSpan {
|
||||||
|
|
||||||
|
override fun setAttribute(key: String, value: String) {
|
||||||
|
delegate.setAttribute(key, value)
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun setAttribute(key: String, value: Long) {
|
||||||
|
delegate.setAttribute(key, value)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,4 +1,5 @@
|
|||||||
import net.woggioni.rbcs.api.CacheProvider;
|
import net.woggioni.rbcs.api.CacheProvider;
|
||||||
|
import net.woggioni.rbcs.api.TelemetryController;
|
||||||
|
|
||||||
module net.woggioni.rbcs.server.redis {
|
module net.woggioni.rbcs.server.redis {
|
||||||
requires net.woggioni.rbcs.common;
|
requires net.woggioni.rbcs.common;
|
||||||
@@ -16,5 +17,7 @@ module net.woggioni.rbcs.server.redis {
|
|||||||
|
|
||||||
provides CacheProvider with net.woggioni.rbcs.server.redis.RedisCacheProvider;
|
provides CacheProvider with net.woggioni.rbcs.server.redis.RedisCacheProvider;
|
||||||
|
|
||||||
|
uses TelemetryController;
|
||||||
|
|
||||||
opens net.woggioni.rbcs.server.redis.schema;
|
opens net.woggioni.rbcs.server.redis.schema;
|
||||||
}
|
}
|
||||||
|
|||||||
+40
-9
@@ -3,6 +3,7 @@ package net.woggioni.rbcs.server.redis
|
|||||||
import java.io.ByteArrayOutputStream
|
import java.io.ByteArrayOutputStream
|
||||||
import java.io.ObjectInputStream
|
import java.io.ObjectInputStream
|
||||||
import java.io.ObjectOutputStream
|
import java.io.ObjectOutputStream
|
||||||
|
import java.net.InetSocketAddress
|
||||||
import java.nio.ByteBuffer
|
import java.nio.ByteBuffer
|
||||||
import java.nio.channels.Channels
|
import java.nio.channels.Channels
|
||||||
import java.nio.channels.FileChannel
|
import java.nio.channels.FileChannel
|
||||||
@@ -35,8 +36,11 @@ import net.woggioni.rbcs.api.message.CacheMessage.CachePutResponse
|
|||||||
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueFoundResponse
|
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueFoundResponse
|
||||||
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse
|
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse
|
||||||
import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent
|
import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent
|
||||||
|
import net.woggioni.rbcs.api.RedisSpan
|
||||||
|
import net.woggioni.rbcs.api.TelemetryController
|
||||||
import net.woggioni.rbcs.common.ByteBufInputStream
|
import net.woggioni.rbcs.common.ByteBufInputStream
|
||||||
import net.woggioni.rbcs.common.ByteBufOutputStream
|
import net.woggioni.rbcs.common.ByteBufOutputStream
|
||||||
|
import net.woggioni.rbcs.common.RBCS.loadService
|
||||||
import net.woggioni.rbcs.common.RBCS.processCacheKey
|
import net.woggioni.rbcs.common.RBCS.processCacheKey
|
||||||
import net.woggioni.rbcs.common.RBCS.toIntOrNull
|
import net.woggioni.rbcs.common.RBCS.toIntOrNull
|
||||||
import net.woggioni.rbcs.common.createLogger
|
import net.woggioni.rbcs.common.createLogger
|
||||||
@@ -60,6 +64,10 @@ class RedisCacheHandler(
|
|||||||
private val log = createLogger<RedisCacheHandler>()
|
private val log = createLogger<RedisCacheHandler>()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private val telemetryController by lazy {
|
||||||
|
loadService(TelemetryController::class.java).firstOrNull()
|
||||||
|
}
|
||||||
|
|
||||||
private interface InProgressRequest
|
private interface InProgressRequest
|
||||||
|
|
||||||
private inner class InProgressGetRequest(
|
private inner class InProgressGetRequest(
|
||||||
@@ -242,8 +250,10 @@ class RedisCacheHandler(
|
|||||||
}
|
}
|
||||||
val keyBytes = processCacheKey(msg.key, keyPrefix, digestAlgorithm)
|
val keyBytes = processCacheKey(msg.key, keyPrefix, digestAlgorithm)
|
||||||
val keyString = String(keyBytes, StandardCharsets.UTF_8)
|
val keyString = String(keyBytes, StandardCharsets.UTF_8)
|
||||||
|
val redisSpan = telemetryController?.startRedisSpan("GET", keyString)
|
||||||
val responseHandler = object : RedisResponseHandler {
|
val responseHandler = object : RedisResponseHandler {
|
||||||
override fun responseReceived(response: RedisMessage) {
|
override fun responseReceived(response: RedisMessage) {
|
||||||
|
try {
|
||||||
when (response) {
|
when (response) {
|
||||||
is FullBulkStringRedisMessage -> {
|
is FullBulkStringRedisMessage -> {
|
||||||
if (response === FullBulkStringRedisMessage.NULL_INSTANCE || response.content().readableBytes() == 0) {
|
if (response === FullBulkStringRedisMessage.NULL_INSTANCE || response.content().readableBytes() == 0) {
|
||||||
@@ -263,9 +273,9 @@ class RedisCacheHandler(
|
|||||||
}
|
}
|
||||||
|
|
||||||
is ErrorRedisMessage -> {
|
is ErrorRedisMessage -> {
|
||||||
this@RedisCacheHandler.exceptionCaught(
|
val ex = RedisException("Redis error for GET ${msg.key}: ${response.content()}")
|
||||||
ctx, RedisException("Redis error for GET ${msg.key}: ${response.content()}")
|
telemetryController?.endRedisSpan(redisSpan, ex)
|
||||||
)
|
this@RedisCacheHandler.exceptionCaught(ctx, ex)
|
||||||
}
|
}
|
||||||
|
|
||||||
else -> {
|
else -> {
|
||||||
@@ -275,13 +285,22 @@ class RedisCacheHandler(
|
|||||||
sendMessageAndFlush(ctx, CacheValueNotFoundResponse(msg.key))
|
sendMessageAndFlush(ctx, CacheValueNotFoundResponse(msg.key))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
telemetryController?.endRedisSpan(redisSpan)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun exceptionCaught(ex: Throwable) {
|
override fun exceptionCaught(ex: Throwable) {
|
||||||
|
telemetryController?.endRedisSpan(redisSpan, ex)
|
||||||
this@RedisCacheHandler.exceptionCaught(ctx, ex)
|
this@RedisCacheHandler.exceptionCaught(ctx, ex)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
client.sendCommand(keyBytes, ctx.alloc(), responseHandler).thenAccept { channel ->
|
client.sendCommand(keyBytes, ctx.alloc(), responseHandler).thenAccept { channel ->
|
||||||
|
val remoteAddr = channel.remoteAddress()
|
||||||
|
if (remoteAddr is InetSocketAddress) {
|
||||||
|
remoteAddr.hostString?.let { redisSpan?.setAttribute("server.address", it) }
|
||||||
|
redisSpan?.setAttribute("server.port", remoteAddr.port.toLong())
|
||||||
|
}
|
||||||
log.trace(ctx) {
|
log.trace(ctx) {
|
||||||
"Sending GET request for key ${msg.key} to Redis"
|
"Sending GET request for key ${msg.key} to Redis"
|
||||||
}
|
}
|
||||||
@@ -342,8 +361,11 @@ class RedisCacheHandler(
|
|||||||
|
|
||||||
val expirySeconds = maxAge.toSeconds().toString()
|
val expirySeconds = maxAge.toSeconds().toString()
|
||||||
|
|
||||||
|
val redisSpan = telemetryController?.startRedisSpan("SET", request.keyString)
|
||||||
|
|
||||||
val responseHandler = object : RedisResponseHandler {
|
val responseHandler = object : RedisResponseHandler {
|
||||||
override fun responseReceived(response: RedisMessage) {
|
override fun responseReceived(response: RedisMessage) {
|
||||||
|
try {
|
||||||
when (response) {
|
when (response) {
|
||||||
is SimpleStringRedisMessage -> {
|
is SimpleStringRedisMessage -> {
|
||||||
log.debug(ctx) {
|
log.debug(ctx) {
|
||||||
@@ -353,26 +375,35 @@ class RedisCacheHandler(
|
|||||||
}
|
}
|
||||||
|
|
||||||
is ErrorRedisMessage -> {
|
is ErrorRedisMessage -> {
|
||||||
this@RedisCacheHandler.exceptionCaught(
|
val ex = RedisException("Redis error for SET ${request.keyString}: ${response.content()}")
|
||||||
ctx, RedisException("Redis error for SET ${request.keyString}: ${response.content()}")
|
telemetryController?.endRedisSpan(redisSpan, ex)
|
||||||
)
|
this@RedisCacheHandler.exceptionCaught(ctx, ex)
|
||||||
}
|
}
|
||||||
|
|
||||||
else -> {
|
else -> {
|
||||||
this@RedisCacheHandler.exceptionCaught(
|
val ex = RedisException("Unexpected response for SET ${request.keyString}: ${response.javaClass.name}")
|
||||||
ctx, RedisException("Unexpected response for SET ${request.keyString}: ${response.javaClass.name}")
|
telemetryController?.endRedisSpan(redisSpan, ex)
|
||||||
)
|
this@RedisCacheHandler.exceptionCaught(ctx, ex)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
telemetryController?.endRedisSpan(redisSpan)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun exceptionCaught(ex: Throwable) {
|
override fun exceptionCaught(ex: Throwable) {
|
||||||
|
telemetryController?.endRedisSpan(redisSpan, ex)
|
||||||
this@RedisCacheHandler.exceptionCaught(ctx, ex)
|
this@RedisCacheHandler.exceptionCaught(ctx, ex)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Use a ByteBuf key for server selection
|
// Use a ByteBuf key for server selection
|
||||||
client.sendCommand(keyBytes, ctx.alloc(), responseHandler).thenAccept { channel ->
|
client.sendCommand(keyBytes, ctx.alloc(), responseHandler).thenAccept { channel ->
|
||||||
|
val remoteAddr = channel.remoteAddress()
|
||||||
|
if (remoteAddr is InetSocketAddress) {
|
||||||
|
remoteAddr.hostString?.let { redisSpan?.setAttribute("server.address", it) }
|
||||||
|
redisSpan?.setAttribute("server.port", remoteAddr.port.toLong())
|
||||||
|
}
|
||||||
log.trace(ctx) {
|
log.trace(ctx) {
|
||||||
"Sending SET request to Redis"
|
"Sending SET request to Redis"
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user