forked from woggioni/rbcs
Generalize OTEL API and add memcache tracing support
- Rename RedisSpan -> SpanHandle for generic span handling - Generalize TelemetryController methods: startSpan/endSpan with dbSystem param - Rename RedisOtelSpan -> OtelSpanHandle in rbcs-server-otel - Update Redis cache handler to use new generic API - Add OpenTelemetry tracing for memcache GET and SET commands - Add channel property to MemcacheRequestController for server address attribution - Add uses TelemetryController directive in memcache module-info Memcache spans follow the same pattern as Redis: db.system=memcache, db.operation=GET|SET, server.address, server.port
This commit is contained in:
+1
-1
@@ -4,7 +4,7 @@ org.gradle.caching=true
|
|||||||
|
|
||||||
rbcs.version = 0.5.0
|
rbcs.version = 0.5.0
|
||||||
|
|
||||||
lys.version = 2026.05.16
|
lys.version = 2026.05.27
|
||||||
|
|
||||||
gitea.maven.url = https://gitea.woggioni.net/api/packages/woggioni/maven
|
gitea.maven.url = https://gitea.woggioni.net/api/packages/woggioni/maven
|
||||||
docker.registry.url=gitea.woggioni.net
|
docker.registry.url=gitea.woggioni.net
|
||||||
|
|||||||
+4
-1
@@ -2,9 +2,12 @@ package net.woggioni.rbcs.api;
|
|||||||
|
|
||||||
import org.jetbrains.annotations.NotNull;
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
|
||||||
public interface RedisSpan {
|
public interface SpanHandle {
|
||||||
|
|
||||||
void setAttribute(@NotNull String key, @NotNull String value);
|
void setAttribute(@NotNull String key, @NotNull String value);
|
||||||
|
|
||||||
void setAttribute(@NotNull String key, long value);
|
void setAttribute(@NotNull String key, long value);
|
||||||
|
|
||||||
|
void setAttribute(@NotNull String key, boolean value);
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -4,13 +4,15 @@ import io.netty.channel.ChannelHandler;
|
|||||||
import org.jetbrains.annotations.NotNull;
|
import org.jetbrains.annotations.NotNull;
|
||||||
import org.jetbrains.annotations.Nullable;
|
import org.jetbrains.annotations.Nullable;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
public interface TelemetryController {
|
public interface TelemetryController {
|
||||||
void initialize();
|
void initialize();
|
||||||
@NotNull ChannelHandler createHandler();
|
@NotNull ChannelHandler createHandler();
|
||||||
|
|
||||||
@Nullable RedisSpan startRedisSpan(@NotNull String command, @NotNull String key);
|
@Nullable SpanHandle startSpan(@NotNull String command);
|
||||||
|
|
||||||
void endRedisSpan(@Nullable RedisSpan span);
|
void endSpan(@Nullable SpanHandle span);
|
||||||
|
|
||||||
void endRedisSpan(@Nullable RedisSpan span, @NotNull Throwable error);
|
void endSpan(@Nullable SpanHandle span, @NotNull Throwable error);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import net.woggioni.rbcs.api.CacheProvider;
|
import net.woggioni.rbcs.api.CacheProvider;
|
||||||
|
import net.woggioni.rbcs.api.TelemetryController;
|
||||||
|
|
||||||
module net.woggioni.rbcs.server.memcache {
|
module net.woggioni.rbcs.server.memcache {
|
||||||
requires net.woggioni.rbcs.common;
|
requires net.woggioni.rbcs.common;
|
||||||
@@ -16,5 +17,7 @@ module net.woggioni.rbcs.server.memcache {
|
|||||||
|
|
||||||
provides CacheProvider with net.woggioni.rbcs.server.memcache.MemcacheCacheProvider;
|
provides CacheProvider with net.woggioni.rbcs.server.memcache.MemcacheCacheProvider;
|
||||||
|
|
||||||
|
uses TelemetryController;
|
||||||
|
|
||||||
opens net.woggioni.rbcs.server.memcache.schema;
|
opens net.woggioni.rbcs.server.memcache.schema;
|
||||||
}
|
}
|
||||||
+65
-8
@@ -9,9 +9,11 @@ import java.nio.channels.FileChannel
|
|||||||
import java.nio.channels.ReadableByteChannel
|
import java.nio.channels.ReadableByteChannel
|
||||||
import java.nio.file.Files
|
import java.nio.file.Files
|
||||||
import java.nio.file.StandardOpenOption
|
import java.nio.file.StandardOpenOption
|
||||||
|
import java.net.InetSocketAddress
|
||||||
import java.time.Duration
|
import java.time.Duration
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
import java.util.concurrent.CompletableFuture
|
import java.util.concurrent.CompletableFuture
|
||||||
|
import java.util.concurrent.atomic.AtomicReference
|
||||||
import java.util.zip.Deflater
|
import java.util.zip.Deflater
|
||||||
import java.util.zip.DeflaterOutputStream
|
import java.util.zip.DeflaterOutputStream
|
||||||
import java.util.zip.InflaterOutputStream
|
import java.util.zip.InflaterOutputStream
|
||||||
@@ -39,8 +41,11 @@ import net.woggioni.rbcs.api.message.CacheMessage.CachePutResponse
|
|||||||
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueFoundResponse
|
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueFoundResponse
|
||||||
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse
|
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse
|
||||||
import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent
|
import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent
|
||||||
|
import net.woggioni.rbcs.api.SpanHandle
|
||||||
|
import net.woggioni.rbcs.api.TelemetryController
|
||||||
import net.woggioni.rbcs.common.ByteBufInputStream
|
import net.woggioni.rbcs.common.ByteBufInputStream
|
||||||
import net.woggioni.rbcs.common.ByteBufOutputStream
|
import net.woggioni.rbcs.common.ByteBufOutputStream
|
||||||
|
import net.woggioni.rbcs.common.RBCS.loadService
|
||||||
import net.woggioni.rbcs.common.RBCS.processCacheKey
|
import net.woggioni.rbcs.common.RBCS.processCacheKey
|
||||||
import net.woggioni.rbcs.common.RBCS.toIntOrNull
|
import net.woggioni.rbcs.common.RBCS.toIntOrNull
|
||||||
import net.woggioni.rbcs.common.createLogger
|
import net.woggioni.rbcs.common.createLogger
|
||||||
@@ -70,6 +75,10 @@ class MemcacheCacheHandler(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private val telemetryController by lazy {
|
||||||
|
loadService(TelemetryController::class.java).firstOrNull()
|
||||||
|
}
|
||||||
|
|
||||||
private interface InProgressRequest {
|
private interface InProgressRequest {
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -153,7 +162,9 @@ class MemcacheCacheHandler(
|
|||||||
metadata: CacheValueMetadata,
|
metadata: CacheValueMetadata,
|
||||||
val digest: ByteBuf,
|
val digest: ByteBuf,
|
||||||
val requestController: CompletableFuture<MemcacheRequestController>,
|
val requestController: CompletableFuture<MemcacheRequestController>,
|
||||||
private val alloc: ByteBufAllocator
|
private val alloc: ByteBufAllocator,
|
||||||
|
val entryKey: String,
|
||||||
|
val memcacheSpanRef: AtomicReference<SpanHandle?>,
|
||||||
) : InProgressRequest {
|
) : InProgressRequest {
|
||||||
private var totalSize = 0
|
private var totalSize = 0
|
||||||
private var tmpFile: FileChannel? = null
|
private var tmpFile: FileChannel? = null
|
||||||
@@ -251,6 +262,17 @@ class MemcacheCacheHandler(
|
|||||||
val key = ctx.alloc().buffer().also {
|
val key = ctx.alloc().buffer().also {
|
||||||
it.writeBytes(processCacheKey(msg.key, keyPrefix, digestAlgorithm))
|
it.writeBytes(processCacheKey(msg.key, keyPrefix, digestAlgorithm))
|
||||||
}
|
}
|
||||||
|
val memcacheSpan = telemetryController?.startSpan("GET")?.apply {
|
||||||
|
setAttribute("db.system", "memcache")
|
||||||
|
setAttribute("db.operation.name", "GET")
|
||||||
|
val remoteAddr = ctx.channel().remoteAddress()
|
||||||
|
if (remoteAddr is InetSocketAddress) {
|
||||||
|
remoteAddr.hostString?.let {
|
||||||
|
setAttribute("server.address", it)
|
||||||
|
}
|
||||||
|
setAttribute("server.port", remoteAddr.port.toLong())
|
||||||
|
}
|
||||||
|
}
|
||||||
val responseHandler = object : MemcacheResponseHandler {
|
val responseHandler = object : MemcacheResponseHandler {
|
||||||
override fun responseReceived(response: BinaryMemcacheResponse) {
|
override fun responseReceived(response: BinaryMemcacheResponse) {
|
||||||
val status = response.status()
|
val status = response.status()
|
||||||
@@ -266,8 +288,15 @@ class MemcacheCacheHandler(
|
|||||||
log.debug(ctx) {
|
log.debug(ctx) {
|
||||||
"Cache miss for key ${msg.key} on memcache"
|
"Cache miss for key ${msg.key} on memcache"
|
||||||
}
|
}
|
||||||
|
telemetryController?.endSpan(memcacheSpan)
|
||||||
sendMessageAndFlush(ctx, CacheValueNotFoundResponse(msg.key))
|
sendMessageAndFlush(ctx, CacheValueNotFoundResponse(msg.key))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
else -> {
|
||||||
|
val ex = MemcacheException(status)
|
||||||
|
telemetryController?.endSpan(memcacheSpan, ex)
|
||||||
|
this@MemcacheCacheHandler.exceptionCaught(ctx, ex)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -282,11 +311,13 @@ class MemcacheCacheHandler(
|
|||||||
if (content is LastMemcacheContent) {
|
if (content is LastMemcacheContent) {
|
||||||
inProgressRequest = null
|
inProgressRequest = null
|
||||||
inProgressGetRequest.commit()
|
inProgressGetRequest.commit()
|
||||||
|
telemetryController?.endSpan(memcacheSpan)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun exceptionCaught(ex: Throwable) {
|
override fun exceptionCaught(ex: Throwable) {
|
||||||
|
telemetryController?.endSpan(memcacheSpan, ex)
|
||||||
(inProgressRequest as? InProgressGetRequest).let { inProgressGetRequest ->
|
(inProgressRequest as? InProgressGetRequest).let { inProgressGetRequest ->
|
||||||
inProgressGetRequest?.let {
|
inProgressGetRequest?.let {
|
||||||
inProgressRequest = null
|
inProgressRequest = null
|
||||||
@@ -312,6 +343,7 @@ class MemcacheCacheHandler(
|
|||||||
val key = ctx.alloc().buffer().also {
|
val key = ctx.alloc().buffer().also {
|
||||||
it.writeBytes(processCacheKey(msg.key, keyPrefix, digestAlgorithm))
|
it.writeBytes(processCacheKey(msg.key, keyPrefix, digestAlgorithm))
|
||||||
}
|
}
|
||||||
|
val memcacheSpanRef = AtomicReference<SpanHandle?>(null)
|
||||||
val responseHandler = object : MemcacheResponseHandler {
|
val responseHandler = object : MemcacheResponseHandler {
|
||||||
override fun responseReceived(response: BinaryMemcacheResponse) {
|
override fun responseReceived(response: BinaryMemcacheResponse) {
|
||||||
val status = response.status()
|
val status = response.status()
|
||||||
@@ -320,16 +352,22 @@ class MemcacheCacheHandler(
|
|||||||
log.debug(ctx) {
|
log.debug(ctx) {
|
||||||
"Inserted key ${msg.key} into memcache"
|
"Inserted key ${msg.key} into memcache"
|
||||||
}
|
}
|
||||||
|
telemetryController?.endSpan(memcacheSpanRef.get())
|
||||||
sendMessageAndFlush(ctx, CachePutResponse(msg.key))
|
sendMessageAndFlush(ctx, CachePutResponse(msg.key))
|
||||||
}
|
}
|
||||||
|
|
||||||
else -> this@MemcacheCacheHandler.exceptionCaught(ctx, MemcacheException(status))
|
else -> {
|
||||||
|
val ex = MemcacheException(status)
|
||||||
|
telemetryController?.endSpan(memcacheSpanRef.get(), ex)
|
||||||
|
this@MemcacheCacheHandler.exceptionCaught(ctx, ex)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun contentReceived(content: MemcacheContent) {}
|
override fun contentReceived(content: MemcacheContent) {}
|
||||||
|
|
||||||
override fun exceptionCaught(ex: Throwable) {
|
override fun exceptionCaught(ex: Throwable) {
|
||||||
|
telemetryController?.endSpan(memcacheSpanRef.get(), ex)
|
||||||
this@MemcacheCacheHandler.exceptionCaught(ctx, ex)
|
this@MemcacheCacheHandler.exceptionCaught(ctx, ex)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -339,7 +377,7 @@ class MemcacheCacheHandler(
|
|||||||
this@MemcacheCacheHandler.exceptionCaught(ctx, ex)
|
this@MemcacheCacheHandler.exceptionCaught(ctx, ex)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
inProgressRequest = InProgressPutRequest(ctx.channel(), msg.metadata, key, requestController, ctx.alloc())
|
inProgressRequest = InProgressPutRequest(ctx.channel(), msg.metadata, key, requestController, ctx.alloc(), msg.key, memcacheSpanRef)
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) {
|
private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) {
|
||||||
@@ -362,22 +400,41 @@ class MemcacheCacheHandler(
|
|||||||
val request = inProgressRequest
|
val request = inProgressRequest
|
||||||
when (request) {
|
when (request) {
|
||||||
is InProgressPutRequest -> {
|
is InProgressPutRequest -> {
|
||||||
|
val putRequest = request
|
||||||
inProgressRequest = null
|
inProgressRequest = null
|
||||||
log.trace(ctx) {
|
log.trace(ctx) {
|
||||||
"Received last chunk of ${msg.content().readableBytes()} bytes for memcache"
|
"Received last chunk of ${msg.content().readableBytes()} bytes for memcache"
|
||||||
}
|
}
|
||||||
request.write(msg.content())
|
putRequest.write(msg.content())
|
||||||
val key = request.digest.retainedDuplicate()
|
val memcacheSpan = telemetryController?.startSpan("SET",
|
||||||
val (payloadSize, payloadSource) = request.commit()
|
)?.apply {
|
||||||
|
setAttribute("db.system", "memcache")
|
||||||
|
setAttribute("db.operation.name", "SET")
|
||||||
|
val remoteAddr = ctx.channel().remoteAddress()
|
||||||
|
if (remoteAddr is InetSocketAddress) {
|
||||||
|
remoteAddr.hostString?.let {
|
||||||
|
setAttribute("server.address", it)
|
||||||
|
}
|
||||||
|
setAttribute("server.port", remoteAddr.port.toLong())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
putRequest.memcacheSpanRef.set(memcacheSpan)
|
||||||
|
val key = putRequest.digest.retainedDuplicate()
|
||||||
|
val (payloadSize, payloadSource) = putRequest.commit()
|
||||||
val extras = ctx.alloc().buffer(8, 8)
|
val extras = ctx.alloc().buffer(8, 8)
|
||||||
extras.writeInt(0)
|
extras.writeInt(0)
|
||||||
extras.writeInt(encodeExpiry(maxAge))
|
extras.writeInt(encodeExpiry(maxAge))
|
||||||
val totalBodyLength = request.digest.readableBytes() + extras.readableBytes() + payloadSize
|
val totalBodyLength = putRequest.digest.readableBytes() + extras.readableBytes() + payloadSize
|
||||||
log.trace(ctx) {
|
log.trace(ctx) {
|
||||||
"Trying to send SET request to memcache"
|
"Trying to send SET request to memcache"
|
||||||
}
|
}
|
||||||
request.requestController.whenComplete { requestController, ex ->
|
putRequest.requestController.whenComplete { requestController, ex ->
|
||||||
if (ex == null) {
|
if (ex == null) {
|
||||||
|
val remoteAddr = requestController.channel.remoteAddress()
|
||||||
|
if (remoteAddr is InetSocketAddress) {
|
||||||
|
remoteAddr.hostString?.let { memcacheSpan?.setAttribute("server.address", it) }
|
||||||
|
memcacheSpan?.setAttribute("server.port", remoteAddr.port.toLong())
|
||||||
|
}
|
||||||
log.trace(ctx) {
|
log.trace(ctx) {
|
||||||
"Sending SET request to memcache"
|
"Sending SET request to memcache"
|
||||||
}
|
}
|
||||||
|
|||||||
+2
@@ -147,6 +147,8 @@ class MemcacheClient(
|
|||||||
|
|
||||||
channel.pipeline().addLast(handler)
|
channel.pipeline().addLast(handler)
|
||||||
response.complete(object : MemcacheRequestController {
|
response.complete(object : MemcacheRequestController {
|
||||||
|
override val channel: Channel = channel
|
||||||
|
|
||||||
private var channelReleased = false
|
private var channelReleased = false
|
||||||
|
|
||||||
override fun sendRequest(request: BinaryMemcacheRequest) {
|
override fun sendRequest(request: BinaryMemcacheRequest) {
|
||||||
|
|||||||
+3
@@ -1,10 +1,13 @@
|
|||||||
package net.woggioni.rbcs.server.memcache.client
|
package net.woggioni.rbcs.server.memcache.client
|
||||||
|
|
||||||
|
import io.netty.channel.Channel
|
||||||
import io.netty.handler.codec.memcache.MemcacheContent
|
import io.netty.handler.codec.memcache.MemcacheContent
|
||||||
import io.netty.handler.codec.memcache.binary.BinaryMemcacheRequest
|
import io.netty.handler.codec.memcache.binary.BinaryMemcacheRequest
|
||||||
|
|
||||||
interface MemcacheRequestController {
|
interface MemcacheRequestController {
|
||||||
|
|
||||||
|
val channel: Channel
|
||||||
|
|
||||||
fun sendRequest(request : BinaryMemcacheRequest)
|
fun sendRequest(request : BinaryMemcacheRequest)
|
||||||
|
|
||||||
fun sendContent(content : MemcacheContent)
|
fun sendContent(content : MemcacheContent)
|
||||||
|
|||||||
@@ -2,13 +2,14 @@ package net.woggioni.rbcs.server.otel
|
|||||||
|
|
||||||
import io.netty.channel.ChannelHandler
|
import io.netty.channel.ChannelHandler
|
||||||
import io.opentelemetry.api.GlobalOpenTelemetry
|
import io.opentelemetry.api.GlobalOpenTelemetry
|
||||||
|
import io.opentelemetry.api.common.AttributeKey
|
||||||
import io.opentelemetry.api.trace.SpanKind
|
import io.opentelemetry.api.trace.SpanKind
|
||||||
import io.opentelemetry.api.trace.StatusCode
|
import io.opentelemetry.api.trace.StatusCode
|
||||||
import io.opentelemetry.instrumentation.logback.appender.v1_0.OpenTelemetryAppender
|
import io.opentelemetry.instrumentation.logback.appender.v1_0.OpenTelemetryAppender
|
||||||
import io.opentelemetry.instrumentation.netty.v4_1.NettyServerTelemetry
|
import io.opentelemetry.instrumentation.netty.v4_1.NettyServerTelemetry
|
||||||
import io.opentelemetry.instrumentation.runtimetelemetry.RuntimeTelemetry
|
import io.opentelemetry.instrumentation.runtimetelemetry.RuntimeTelemetry
|
||||||
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk
|
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk
|
||||||
import net.woggioni.rbcs.api.RedisSpan
|
import net.woggioni.rbcs.api.SpanHandle
|
||||||
import net.woggioni.rbcs.api.TelemetryController
|
import net.woggioni.rbcs.api.TelemetryController
|
||||||
import net.woggioni.rbcs.common.createLogger
|
import net.woggioni.rbcs.common.createLogger
|
||||||
import net.woggioni.rbcs.common.info
|
import net.woggioni.rbcs.common.info
|
||||||
@@ -44,21 +45,19 @@ class OtelController : TelemetryController {
|
|||||||
return NettyServerTelemetry.create(GlobalOpenTelemetry.get()).createCombinedHandler()
|
return NettyServerTelemetry.create(GlobalOpenTelemetry.get()).createCombinedHandler()
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun startRedisSpan(command: String, key: String): RedisSpan? {
|
override fun startSpan(name: String): SpanHandle {
|
||||||
val span = tracer.spanBuilder(command)
|
val span = tracer.spanBuilder(name)
|
||||||
.setSpanKind(SpanKind.CLIENT)
|
.setSpanKind(SpanKind.CLIENT)
|
||||||
.setAttribute("db.system", "redis")
|
|
||||||
.setAttribute("db.operation", command)
|
|
||||||
.startSpan()
|
.startSpan()
|
||||||
return RedisOtelSpan(span)
|
return OtelSpanHandle(span)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun endRedisSpan(span: RedisSpan?) {
|
override fun endSpan(span: SpanHandle?) {
|
||||||
(span as? RedisOtelSpan)?.delegate?.end()
|
(span as? OtelSpanHandle)?.delegate?.end()
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun endRedisSpan(span: RedisSpan?, error: Throwable) {
|
override fun endSpan(span: SpanHandle?, error: Throwable) {
|
||||||
val s = (span as? RedisOtelSpan)?.delegate ?: return
|
val s = (span as? OtelSpanHandle)?.delegate ?: return
|
||||||
s.recordException(error)
|
s.recordException(error)
|
||||||
s.setStatus(StatusCode.ERROR)
|
s.setStatus(StatusCode.ERROR)
|
||||||
s.end()
|
s.end()
|
||||||
|
|||||||
+7
-3
@@ -1,11 +1,11 @@
|
|||||||
package net.woggioni.rbcs.server.otel
|
package net.woggioni.rbcs.server.otel
|
||||||
|
|
||||||
import io.opentelemetry.api.trace.Span
|
import io.opentelemetry.api.trace.Span
|
||||||
import net.woggioni.rbcs.api.RedisSpan
|
import net.woggioni.rbcs.api.SpanHandle
|
||||||
|
|
||||||
internal class RedisOtelSpan(
|
internal class OtelSpanHandle(
|
||||||
val delegate: Span,
|
val delegate: Span,
|
||||||
) : RedisSpan {
|
) : SpanHandle {
|
||||||
|
|
||||||
override fun setAttribute(key: String, value: String) {
|
override fun setAttribute(key: String, value: String) {
|
||||||
delegate.setAttribute(key, value)
|
delegate.setAttribute(key, value)
|
||||||
@@ -14,4 +14,8 @@ internal class RedisOtelSpan(
|
|||||||
override fun setAttribute(key: String, value: Long) {
|
override fun setAttribute(key: String, value: Long) {
|
||||||
delegate.setAttribute(key, value)
|
delegate.setAttribute(key, value)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override fun setAttribute(key: String, value: Boolean) {
|
||||||
|
delegate.setAttribute(key, value)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
+31
-16
@@ -36,7 +36,6 @@ import net.woggioni.rbcs.api.message.CacheMessage.CachePutResponse
|
|||||||
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueFoundResponse
|
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueFoundResponse
|
||||||
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse
|
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse
|
||||||
import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent
|
import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent
|
||||||
import net.woggioni.rbcs.api.RedisSpan
|
|
||||||
import net.woggioni.rbcs.api.TelemetryController
|
import net.woggioni.rbcs.api.TelemetryController
|
||||||
import net.woggioni.rbcs.common.ByteBufInputStream
|
import net.woggioni.rbcs.common.ByteBufInputStream
|
||||||
import net.woggioni.rbcs.common.ByteBufOutputStream
|
import net.woggioni.rbcs.common.ByteBufOutputStream
|
||||||
@@ -250,21 +249,32 @@ class RedisCacheHandler(
|
|||||||
}
|
}
|
||||||
val keyBytes = processCacheKey(msg.key, keyPrefix, digestAlgorithm)
|
val keyBytes = processCacheKey(msg.key, keyPrefix, digestAlgorithm)
|
||||||
val keyString = String(keyBytes, StandardCharsets.UTF_8)
|
val keyString = String(keyBytes, StandardCharsets.UTF_8)
|
||||||
val redisSpan = telemetryController?.startRedisSpan("GET", keyString)
|
val redisSpan = telemetryController?.startSpan("GET")?.apply {
|
||||||
|
setAttribute("db.system", "redis")
|
||||||
|
setAttribute("db.operation.name", "GET")
|
||||||
|
val remoteAddr = ctx.channel().remoteAddress()
|
||||||
|
if (remoteAddr is InetSocketAddress) {
|
||||||
|
remoteAddr.hostString?.let {
|
||||||
|
setAttribute("server.address", it)
|
||||||
|
}
|
||||||
|
setAttribute("server.port", remoteAddr.port.toLong())
|
||||||
|
}
|
||||||
|
}
|
||||||
val responseHandler = object : RedisResponseHandler {
|
val responseHandler = object : RedisResponseHandler {
|
||||||
override fun responseReceived(response: RedisMessage) {
|
override fun responseReceived(response: RedisMessage) {
|
||||||
try {
|
|
||||||
when (response) {
|
when (response) {
|
||||||
is FullBulkStringRedisMessage -> {
|
is FullBulkStringRedisMessage -> {
|
||||||
if (response === FullBulkStringRedisMessage.NULL_INSTANCE || response.content().readableBytes() == 0) {
|
if (response === FullBulkStringRedisMessage.NULL_INSTANCE || response.content().readableBytes() == 0) {
|
||||||
log.debug(ctx) {
|
log.debug(ctx) {
|
||||||
"Cache miss for key ${msg.key} on Redis"
|
"Cache miss for key ${msg.key} on Redis"
|
||||||
}
|
}
|
||||||
|
telemetryController?.endSpan(redisSpan)
|
||||||
sendMessageAndFlush(ctx, CacheValueNotFoundResponse(msg.key))
|
sendMessageAndFlush(ctx, CacheValueNotFoundResponse(msg.key))
|
||||||
} else {
|
} else {
|
||||||
log.debug(ctx) {
|
log.debug(ctx) {
|
||||||
"Cache hit for key ${msg.key} on Redis"
|
"Cache hit for key ${msg.key} on Redis"
|
||||||
}
|
}
|
||||||
|
telemetryController?.endSpan(redisSpan)
|
||||||
val getRequest = InProgressGetRequest(msg.key, ctx)
|
val getRequest = InProgressGetRequest(msg.key, ctx)
|
||||||
inProgressRequest = getRequest
|
inProgressRequest = getRequest
|
||||||
getRequest.processResponse(response.content())
|
getRequest.processResponse(response.content())
|
||||||
@@ -274,7 +284,7 @@ class RedisCacheHandler(
|
|||||||
|
|
||||||
is ErrorRedisMessage -> {
|
is ErrorRedisMessage -> {
|
||||||
val ex = RedisException("Redis error for GET ${msg.key}: ${response.content()}")
|
val ex = RedisException("Redis error for GET ${msg.key}: ${response.content()}")
|
||||||
telemetryController?.endRedisSpan(redisSpan, ex)
|
telemetryController?.endSpan(redisSpan, ex)
|
||||||
this@RedisCacheHandler.exceptionCaught(ctx, ex)
|
this@RedisCacheHandler.exceptionCaught(ctx, ex)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -282,16 +292,14 @@ class RedisCacheHandler(
|
|||||||
log.warn(ctx) {
|
log.warn(ctx) {
|
||||||
"Unexpected response type from Redis for key ${msg.key}: ${response.javaClass.name}"
|
"Unexpected response type from Redis for key ${msg.key}: ${response.javaClass.name}"
|
||||||
}
|
}
|
||||||
|
telemetryController?.endSpan(redisSpan)
|
||||||
sendMessageAndFlush(ctx, CacheValueNotFoundResponse(msg.key))
|
sendMessageAndFlush(ctx, CacheValueNotFoundResponse(msg.key))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
|
||||||
telemetryController?.endRedisSpan(redisSpan)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun exceptionCaught(ex: Throwable) {
|
override fun exceptionCaught(ex: Throwable) {
|
||||||
telemetryController?.endRedisSpan(redisSpan, ex)
|
telemetryController?.endSpan(redisSpan, ex)
|
||||||
this@RedisCacheHandler.exceptionCaught(ctx, ex)
|
this@RedisCacheHandler.exceptionCaught(ctx, ex)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -361,38 +369,45 @@ class RedisCacheHandler(
|
|||||||
|
|
||||||
val expirySeconds = maxAge.toSeconds().toString()
|
val expirySeconds = maxAge.toSeconds().toString()
|
||||||
|
|
||||||
val redisSpan = telemetryController?.startRedisSpan("SET", request.keyString)
|
val redisSpan = telemetryController?.startSpan("SET")?.apply {
|
||||||
|
setAttribute("db.system", "redis")
|
||||||
|
setAttribute("db.operation.name", "SET")
|
||||||
|
val remoteAddr = ctx.channel().remoteAddress()
|
||||||
|
if (remoteAddr is InetSocketAddress) {
|
||||||
|
remoteAddr.hostString?.let {
|
||||||
|
setAttribute("server.address", it)
|
||||||
|
}
|
||||||
|
setAttribute("server.port", remoteAddr.port.toLong())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
val responseHandler = object : RedisResponseHandler {
|
val responseHandler = object : RedisResponseHandler {
|
||||||
override fun responseReceived(response: RedisMessage) {
|
override fun responseReceived(response: RedisMessage) {
|
||||||
try {
|
|
||||||
when (response) {
|
when (response) {
|
||||||
is SimpleStringRedisMessage -> {
|
is SimpleStringRedisMessage -> {
|
||||||
log.debug(ctx) {
|
log.debug(ctx) {
|
||||||
"Inserted key ${request.keyString} into Redis"
|
"Inserted key ${request.keyString} into Redis"
|
||||||
}
|
}
|
||||||
|
telemetryController?.endSpan(redisSpan)
|
||||||
sendMessageAndFlush(ctx, CachePutResponse(request.keyString))
|
sendMessageAndFlush(ctx, CachePutResponse(request.keyString))
|
||||||
}
|
}
|
||||||
|
|
||||||
is ErrorRedisMessage -> {
|
is ErrorRedisMessage -> {
|
||||||
val ex = RedisException("Redis error for SET ${request.keyString}: ${response.content()}")
|
val ex = RedisException("Redis error for SET ${request.keyString}: ${response.content()}")
|
||||||
telemetryController?.endRedisSpan(redisSpan, ex)
|
telemetryController?.endSpan(redisSpan, ex)
|
||||||
this@RedisCacheHandler.exceptionCaught(ctx, ex)
|
this@RedisCacheHandler.exceptionCaught(ctx, ex)
|
||||||
}
|
}
|
||||||
|
|
||||||
else -> {
|
else -> {
|
||||||
val ex = RedisException("Unexpected response for SET ${request.keyString}: ${response.javaClass.name}")
|
val ex = RedisException("Unexpected response for SET ${request.keyString}: ${response.javaClass.name}")
|
||||||
telemetryController?.endRedisSpan(redisSpan, ex)
|
telemetryController?.endSpan(redisSpan, ex)
|
||||||
this@RedisCacheHandler.exceptionCaught(ctx, ex)
|
this@RedisCacheHandler.exceptionCaught(ctx, ex)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
|
||||||
telemetryController?.endRedisSpan(redisSpan)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun exceptionCaught(ex: Throwable) {
|
override fun exceptionCaught(ex: Throwable) {
|
||||||
telemetryController?.endRedisSpan(redisSpan, ex)
|
telemetryController?.endSpan(redisSpan, ex)
|
||||||
this@RedisCacheHandler.exceptionCaught(ctx, ex)
|
this@RedisCacheHandler.exceptionCaught(ctx, ex)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user