From e5fe8437a6444841f57321f92e6eb57dd7fc89d7 Mon Sep 17 00:00:00 2001 From: Walter Oggioni Date: Tue, 4 Feb 2025 13:59:44 +0800 Subject: [PATCH] temporary commit --- gbcs-api/build.gradle | 1 + gbcs-api/src/main/java/module-info.java | 2 + .../java/net/woggioni/gbcs/api/Cache.java | 6 +- .../net/woggioni/gbcs/api/CallHandle.java | 10 + .../gbcs/api/ResponseEventListener.java | 7 + .../woggioni/gbcs/api/event/RequestEvent.java | 20 ++ .../gbcs/api/event/ResponseEvent.java | 28 ++ gbcs-server-memcached/build.gradle | 16 ++ .../src/main/java/module-info.java | 7 + .../server/memcached/CustomChunkedInput.kt | 33 +++ .../gbcs/server/memcached/Exception.kt | 4 + .../gbcs/server/memcached/MemcachedCache.kt | 124 +++++---- .../memcached/MemcachedCacheConfiguration.kt | 44 +++- .../memcached/MemcachedCacheProvider.kt | 6 +- .../server/memcached/client/CallHandle.kt | 9 + .../memcached/client/MemcacheResponse.kt | 24 ++ .../memcached/client/MemcachedClient.kt | 241 ++++++++++++++++++ .../server/memcached/client/ResponseEvent.kt | 10 + .../memcached/client/ResponseListener.kt | 5 + .../memcached/test/MemcachedClientTest.kt | 80 ++++++ .../src/test/resources/logback.xml | 21 ++ .../gbcs/server/handler/ServerHandler.kt | 218 ++++++++++------ 22 files changed, 771 insertions(+), 145 deletions(-) create mode 100644 gbcs-api/src/main/java/net/woggioni/gbcs/api/CallHandle.java create mode 100644 gbcs-api/src/main/java/net/woggioni/gbcs/api/ResponseEventListener.java create mode 100644 gbcs-api/src/main/java/net/woggioni/gbcs/api/event/RequestEvent.java create mode 100644 gbcs-api/src/main/java/net/woggioni/gbcs/api/event/ResponseEvent.java create mode 100644 gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/CustomChunkedInput.kt create mode 100644 gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/Exception.kt create mode 100644 gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/client/CallHandle.kt create mode 100644 gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/client/MemcacheResponse.kt create mode 100644 gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/client/MemcachedClient.kt create mode 100644 gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/client/ResponseEvent.kt create mode 100644 gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/client/ResponseListener.kt create mode 100644 gbcs-server-memcached/src/test/kotlin/net/woggioni/gbcs/server/memcached/test/MemcachedClientTest.kt create mode 100644 gbcs-server-memcached/src/test/resources/logback.xml diff --git a/gbcs-api/build.gradle b/gbcs-api/build.gradle index 516de05..ac6a484 100644 --- a/gbcs-api/build.gradle +++ b/gbcs-api/build.gradle @@ -5,6 +5,7 @@ plugins { } dependencies { + api catalog.netty.buffer } publishing { diff --git a/gbcs-api/src/main/java/module-info.java b/gbcs-api/src/main/java/module-info.java index e825602..49fc206 100644 --- a/gbcs-api/src/main/java/module-info.java +++ b/gbcs-api/src/main/java/module-info.java @@ -1,6 +1,8 @@ module net.woggioni.gbcs.api { requires static lombok; requires java.xml; + requires io.netty.buffer; exports net.woggioni.gbcs.api; exports net.woggioni.gbcs.api.exception; + exports net.woggioni.gbcs.api.event; } \ No newline at end of file diff --git a/gbcs-api/src/main/java/net/woggioni/gbcs/api/Cache.java b/gbcs-api/src/main/java/net/woggioni/gbcs/api/Cache.java index c6018b0..aa27cd1 100644 --- a/gbcs-api/src/main/java/net/woggioni/gbcs/api/Cache.java +++ b/gbcs-api/src/main/java/net/woggioni/gbcs/api/Cache.java @@ -3,10 +3,10 @@ package net.woggioni.gbcs.api; import net.woggioni.gbcs.api.exception.ContentTooLargeException; import java.nio.channels.ReadableByteChannel; +import java.util.concurrent.CompletableFuture; public interface Cache extends AutoCloseable { - ReadableByteChannel get(String key); - - void put(String key, byte[] content) throws ContentTooLargeException; + CompletableFuture> get(String key, ResponseEventListener responseEventListener); + CompletableFuture> put(String key) throws ContentTooLargeException; } diff --git a/gbcs-api/src/main/java/net/woggioni/gbcs/api/CallHandle.java b/gbcs-api/src/main/java/net/woggioni/gbcs/api/CallHandle.java new file mode 100644 index 0000000..74c0492 --- /dev/null +++ b/gbcs-api/src/main/java/net/woggioni/gbcs/api/CallHandle.java @@ -0,0 +1,10 @@ +package net.woggioni.gbcs.api; + +import net.woggioni.gbcs.api.event.RequestEvent; + +import java.util.concurrent.CompletableFuture; + +public interface CallHandle { + void postEvent(RequestEvent evt); + CompletableFuture call(); +} diff --git a/gbcs-api/src/main/java/net/woggioni/gbcs/api/ResponseEventListener.java b/gbcs-api/src/main/java/net/woggioni/gbcs/api/ResponseEventListener.java new file mode 100644 index 0000000..286703b --- /dev/null +++ b/gbcs-api/src/main/java/net/woggioni/gbcs/api/ResponseEventListener.java @@ -0,0 +1,7 @@ +package net.woggioni.gbcs.api; + +import net.woggioni.gbcs.api.event.ResponseEvent; + +public interface ResponseEventListener { + void listen(ResponseEvent evt); +} diff --git a/gbcs-api/src/main/java/net/woggioni/gbcs/api/event/RequestEvent.java b/gbcs-api/src/main/java/net/woggioni/gbcs/api/event/RequestEvent.java new file mode 100644 index 0000000..45d74ab --- /dev/null +++ b/gbcs-api/src/main/java/net/woggioni/gbcs/api/event/RequestEvent.java @@ -0,0 +1,20 @@ +package net.woggioni.gbcs.api.event; + +import io.netty.buffer.ByteBuf; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import net.woggioni.gbcs.api.CallHandle; + +sealed public abstract class RequestEvent { + @Getter + @RequiredArgsConstructor + public static final class ChunkSent extends RequestEvent { + private final ByteBuf chunk; + } + + @Getter + @RequiredArgsConstructor + public static final class LastChunkSent extends RequestEvent { + private final ByteBuf chunk; + } +} diff --git a/gbcs-api/src/main/java/net/woggioni/gbcs/api/event/ResponseEvent.java b/gbcs-api/src/main/java/net/woggioni/gbcs/api/event/ResponseEvent.java new file mode 100644 index 0000000..bb729b2 --- /dev/null +++ b/gbcs-api/src/main/java/net/woggioni/gbcs/api/event/ResponseEvent.java @@ -0,0 +1,28 @@ +package net.woggioni.gbcs.api.event; + +import io.netty.buffer.ByteBuf; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +sealed public abstract class ResponseEvent { + @Getter + @RequiredArgsConstructor + public final static class ChunkReceived extends ResponseEvent { + private final ByteBuf chunk; + } + + public final static class NoContent extends ResponseEvent { + } + + @Getter + @RequiredArgsConstructor + public final static class LastChunkReceived extends ResponseEvent { + private final ByteBuf chunk; + } + + @Getter + @RequiredArgsConstructor + public final static class ExceptionCaught extends ResponseEvent { + private final Throwable cause; + } +} diff --git a/gbcs-server-memcached/build.gradle b/gbcs-server-memcached/build.gradle index 4f60e00..4b21596 100644 --- a/gbcs-server-memcached/build.gradle +++ b/gbcs-server-memcached/build.gradle @@ -26,13 +26,24 @@ configurations { canBeResolved = true visible = true } + + testImplementation { + extendsFrom compileOnly + } } dependencies { compileOnly project(':gbcs-common') compileOnly project(':gbcs-api') compileOnly catalog.jwo + compileOnly catalog.slf4j.api implementation catalog.xmemcached + implementation catalog.netty.codec.memcache + implementation catalog.netty.common + implementation group: 'io.netty', name: 'netty-handler', version: catalog.versions.netty.get() + + testRuntimeOnly catalog.logback.classic + } Provider bundleTask = tasks.register("bundle", Tar) { @@ -41,6 +52,11 @@ Provider bundleTask = tasks.register("bundle", Tar) { group = BasePlugin.BUILD_GROUP } +tasks.named(JavaPlugin.TEST_TASK_NAME, Test) { + systemProperty("io.netty.leakDetectionLevel", "PARANOID") +} + + tasks.named(BasePlugin.ASSEMBLE_TASK_NAME) { dependsOn(bundleTask) } diff --git a/gbcs-server-memcached/src/main/java/module-info.java b/gbcs-server-memcached/src/main/java/module-info.java index 12ed994..383b1ba 100644 --- a/gbcs-server-memcached/src/main/java/module-info.java +++ b/gbcs-server-memcached/src/main/java/module-info.java @@ -7,6 +7,13 @@ module net.woggioni.gbcs.server.memcached { requires net.woggioni.jwo; requires java.xml; requires kotlin.stdlib; + requires io.netty.common; + requires io.netty.handler; + requires io.netty.codec.memcache; + requires io.netty.transport; + requires org.slf4j; + requires io.netty.buffer; + requires io.netty.codec; provides CacheProvider with net.woggioni.gbcs.server.memcached.MemcachedCacheProvider; diff --git a/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/CustomChunkedInput.kt b/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/CustomChunkedInput.kt new file mode 100644 index 0000000..8696806 --- /dev/null +++ b/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/CustomChunkedInput.kt @@ -0,0 +1,33 @@ +package net.woggioni.gbcs.server.memcached + +import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufAllocator +import io.netty.channel.ChannelHandlerContext +import io.netty.handler.stream.ChunkedInput +import java.nio.channels.ReadableByteChannel + +class CustomChunkedInput(private val readableByteChannel: ReadableByteChannel) : ChunkedInput { + override fun isEndOfInput(): Boolean { + TODO("Not yet implemented") + } + + override fun close() { + TODO("Not yet implemented") + } + + override fun readChunk(ctx: ChannelHandlerContext): ByteBuf { + TODO("Not yet implemented") + } + + override fun readChunk(allocator: ByteBufAllocator): ByteBuf { + TODO("Not yet implemented") + } + + override fun length(): Long { + TODO("Not yet implemented") + } + + override fun progress(): Long { + TODO("Not yet implemented") + } +} \ No newline at end of file diff --git a/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/Exception.kt b/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/Exception.kt new file mode 100644 index 0000000..070f88e --- /dev/null +++ b/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/Exception.kt @@ -0,0 +1,4 @@ +package net.woggioni.gbcs.server.memcached + +class MemcachedException(status : Short, msg : String? = null, cause : Throwable? = null) + : RuntimeException(msg ?: "Memcached status $status", cause) \ No newline at end of file diff --git a/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/MemcachedCache.kt b/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/MemcachedCache.kt index cec7a1a..0b4af4b 100644 --- a/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/MemcachedCache.kt +++ b/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/MemcachedCache.kt @@ -1,59 +1,85 @@ package net.woggioni.gbcs.server.memcached -import net.rubyeye.xmemcached.XMemcachedClientBuilder -import net.rubyeye.xmemcached.command.BinaryCommandFactory -import net.rubyeye.xmemcached.transcoders.CompressionMode -import net.rubyeye.xmemcached.transcoders.SerializingTranscoder +import io.netty.buffer.Unpooled +import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus import net.woggioni.gbcs.api.Cache -import net.woggioni.gbcs.api.exception.ContentTooLargeException -import net.woggioni.gbcs.common.HostAndPort -import net.woggioni.jwo.JWO -import java.io.ByteArrayInputStream -import java.net.InetSocketAddress -import java.nio.channels.Channels -import java.nio.channels.ReadableByteChannel -import java.nio.charset.StandardCharsets -import java.security.MessageDigest -import java.time.Duration +import net.woggioni.gbcs.api.CallHandle +import net.woggioni.gbcs.api.ResponseEventListener +import net.woggioni.gbcs.api.event.RequestEvent +import net.woggioni.gbcs.api.event.ResponseEvent +import net.woggioni.gbcs.server.memcached.client.MemcachedClient +import net.woggioni.gbcs.server.memcached.client.ResponseEvent.ExceptionCaught +import net.woggioni.gbcs.server.memcached.client.ResponseEvent.LastResponseContentChunkReceived +import net.woggioni.gbcs.server.memcached.client.ResponseEvent.ResponseContentChunkReceived +import net.woggioni.gbcs.server.memcached.client.ResponseEvent.ResponseReceived +import net.woggioni.gbcs.server.memcached.client.ResponseListener +import java.util.concurrent.CompletableFuture class MemcachedCache( - servers: List, - private val maxAge: Duration, - maxSize : Int, - digestAlgorithm: String?, - compressionMode: CompressionMode, + private val cfg : MemcachedCacheConfiguration ) : Cache { - private val memcachedClient = XMemcachedClientBuilder( - servers.stream().map { addr: HostAndPort -> InetSocketAddress(addr.host, addr.port) }.toList() - ).apply { - commandFactory = BinaryCommandFactory() - digestAlgorithm?.let { dAlg -> - setKeyProvider { key -> - val md = MessageDigest.getInstance(dAlg) - md.update(key.toByteArray(StandardCharsets.UTF_8)) - JWO.bytesToHex(md.digest()) - } - } - transcoder = SerializingTranscoder(maxSize).apply { - setCompressionMode(compressionMode) - } - }.build() - - override fun get(key: String): ReadableByteChannel? { - return memcachedClient.get(key) - ?.let(::ByteArrayInputStream) - ?.let(Channels::newChannel) - } - - override fun put(key: String, content: ByteArray) { - try { - memcachedClient[key, maxAge.toSeconds().toInt()] = content - } catch (e: IllegalArgumentException) { - throw ContentTooLargeException(e.message, e) - } - } + private val client = MemcachedClient(cfg) override fun close() { - memcachedClient.shutdown() + client.close() + } + + override fun get(key: String, responseEventListener: ResponseEventListener): CompletableFuture> { + val listener = ResponseListener { evt -> + when(evt) { + is ResponseContentChunkReceived -> { + responseEventListener.listen(ResponseEvent.ChunkReceived(Unpooled.wrappedBuffer(evt.chunk))) + } + is LastResponseContentChunkReceived -> { + responseEventListener.listen(ResponseEvent.LastChunkReceived(Unpooled.wrappedBuffer(evt.chunk))) + } + is ExceptionCaught -> { + responseEventListener.listen(ResponseEvent.ExceptionCaught(evt.cause)) + } + is ResponseReceived -> { + when(val status = evt.response.status) { + BinaryMemcacheResponseStatus.SUCCESS -> { + } + BinaryMemcacheResponseStatus.KEY_ENOENT -> { + responseEventListener.listen(ResponseEvent.NoContent()) + } + else -> { + responseEventListener.listen(ResponseEvent.ExceptionCaught(MemcachedException(status))) + } + } + } + } + } + return client.get(key, listener).thenApply { clientCallHandle -> + object : CallHandle { + override fun postEvent(evt: RequestEvent) { + when(evt) { + is RequestEvent.ChunkSent -> clientCallHandle.sendChunk(evt.chunk.nioBuffer()) + is RequestEvent.LastChunkSent -> clientCallHandle.sendChunk(evt.chunk.nioBuffer()) + } + } + + override fun call(): CompletableFuture { + return clientCallHandle.waitForResponse().thenApply { null } + } + } + } + } + + override fun put(key: String): CompletableFuture> { + return client.put(key, cfg.maxAge).thenApply { clientCallHandle -> + object : CallHandle { + override fun postEvent(evt: RequestEvent) { + when(evt) { + is RequestEvent.ChunkSent -> clientCallHandle.sendChunk(evt.chunk.nioBuffer()) + is RequestEvent.LastChunkSent -> clientCallHandle.sendChunk(evt.chunk.nioBuffer()) + } + } + + override fun call(): CompletableFuture { + return clientCallHandle.waitForResponse().thenApply { null } + } + } + } } } diff --git a/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/MemcachedCacheConfiguration.kt b/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/MemcachedCacheConfiguration.kt index b600dff..7ff84da 100644 --- a/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/MemcachedCacheConfiguration.kt +++ b/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/MemcachedCacheConfiguration.kt @@ -1,25 +1,45 @@ package net.woggioni.gbcs.server.memcached -import net.rubyeye.xmemcached.transcoders.CompressionMode import net.woggioni.gbcs.api.Configuration import net.woggioni.gbcs.common.HostAndPort import java.time.Duration data class MemcachedCacheConfiguration( - var servers: List, - var maxAge: Duration = Duration.ofDays(1), - var maxSize: Int = 0x100000, - var digestAlgorithm: String? = null, - var compressionMode: CompressionMode = CompressionMode.ZIP, + val servers: List, + val maxAge: Duration = Duration.ofDays(1), + val maxSize: Int = 0x100000, + val digestAlgorithm: String? = null, + val compressionMode: CompressionMode? = CompressionMode.DEFLATE, ) : Configuration.Cache { - override fun materialize() = MemcachedCache( - servers, - maxAge, - maxSize, - digestAlgorithm, - compressionMode + + enum class CompressionMode { + /** + * Gzip mode + */ + GZIP, + + /** + * Deflate mode + */ + DEFLATE + } + + class RetryPolicy( + val maxAttempts: Int, + val initialDelayMillis: Long, + val exp: Double ) + data class Server( + val endpoint : HostAndPort, + val connectionTimeoutMillis : Int?, + val retryPolicy : RetryPolicy?, + val maxConnections : Int + ) + + + override fun materialize() = MemcachedCache(this) + override fun getNamespaceURI() = "urn:net.woggioni.gbcs.server.memcached" override fun getTypeName() = "memcachedCacheType" diff --git a/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/MemcachedCacheProvider.kt b/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/MemcachedCacheProvider.kt index 46acdb4..48acf27 100644 --- a/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/MemcachedCacheProvider.kt +++ b/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/MemcachedCacheProvider.kt @@ -51,7 +51,7 @@ class MemcachedCacheProvider : CacheProvider { } return MemcachedCacheConfiguration( - servers, + servers.map { MemcachedCacheConfiguration.Server(it, null, null, 1) }, maxAge, maxSize, digestAlgorithm, @@ -67,8 +67,8 @@ class MemcachedCacheProvider : CacheProvider { attr("xs:type", "${xmlNamespacePrefix}:$xmlType", GBCS.XML_SCHEMA_NAMESPACE_URI) for (server in servers) { node("server") { - attr("host", server.host) - attr("port", server.port.toString()) + attr("host", server.endpoint.host) + attr("port", server.endpoint.port.toString()) } } attr("max-age", maxAge.toString()) diff --git a/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/client/CallHandle.kt b/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/client/CallHandle.kt new file mode 100644 index 0000000..b5cf71d --- /dev/null +++ b/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/client/CallHandle.kt @@ -0,0 +1,9 @@ +package net.woggioni.gbcs.server.memcached.client + +import java.nio.ByteBuffer +import java.util.concurrent.CompletableFuture + +interface CallHandle { + fun sendChunk(requestBodyChunk : ByteBuffer) + fun waitForResponse() : CompletableFuture +} \ No newline at end of file diff --git a/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/client/MemcacheResponse.kt b/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/client/MemcacheResponse.kt new file mode 100644 index 0000000..3bb28d1 --- /dev/null +++ b/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/client/MemcacheResponse.kt @@ -0,0 +1,24 @@ +package net.woggioni.gbcs.server.memcached.client + +import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse +import java.nio.ByteBuffer + +data class MemcacheResponse( + val status: Short, + val opcode: Byte, + val cas: Long?, + val opaque: Int?, + val key: ByteBuffer?, + val extra: ByteBuffer? +) { + companion object { + fun of(response : BinaryMemcacheResponse) = MemcacheResponse( + response.status(), + response.opcode(), + response.cas(), + response.opaque(), + response.key()?.nioBuffer(), + response.extras()?.nioBuffer() + ) + } +} \ No newline at end of file diff --git a/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/client/MemcachedClient.kt b/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/client/MemcachedClient.kt new file mode 100644 index 0000000..d5bd6a1 --- /dev/null +++ b/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/client/MemcachedClient.kt @@ -0,0 +1,241 @@ +package net.woggioni.gbcs.server.memcached.client + +import io.netty.bootstrap.Bootstrap +import io.netty.buffer.ByteBuf +import io.netty.buffer.Unpooled +import io.netty.channel.Channel +import io.netty.channel.ChannelHandlerContext +import io.netty.channel.ChannelOption +import io.netty.channel.ChannelPipeline +import io.netty.channel.SimpleChannelInboundHandler +import io.netty.channel.nio.NioEventLoopGroup +import io.netty.channel.pool.AbstractChannelPoolHandler +import io.netty.channel.pool.ChannelPool +import io.netty.channel.pool.FixedChannelPool +import io.netty.channel.socket.nio.NioSocketChannel +import io.netty.handler.codec.DecoderException +import io.netty.handler.codec.memcache.DefaultLastMemcacheContent +import io.netty.handler.codec.memcache.DefaultMemcacheContent +import io.netty.handler.codec.memcache.LastMemcacheContent +import io.netty.handler.codec.memcache.MemcacheContent +import io.netty.handler.codec.memcache.MemcacheObject +import io.netty.handler.codec.memcache.binary.BinaryMemcacheClientCodec +import io.netty.handler.codec.memcache.binary.BinaryMemcacheOpcodes +import io.netty.handler.codec.memcache.binary.BinaryMemcacheRequest +import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse +import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus +import io.netty.handler.codec.memcache.binary.DefaultBinaryMemcacheRequest +import io.netty.util.concurrent.GenericFutureListener +import net.woggioni.gbcs.common.GBCS.digest +import net.woggioni.gbcs.common.HostAndPort +import net.woggioni.gbcs.common.contextLogger +import net.woggioni.gbcs.server.memcached.MemcachedCacheConfiguration +import net.woggioni.gbcs.server.memcached.MemcachedException +import java.net.InetSocketAddress +import java.nio.ByteBuffer +import java.security.MessageDigest +import java.time.Duration +import java.time.Instant +import java.util.concurrent.CompletableFuture +import java.util.concurrent.ConcurrentHashMap +import io.netty.util.concurrent.Future as NettyFuture + + +class MemcachedClient(private val cfg: MemcachedCacheConfiguration) : AutoCloseable { + + private val log = contextLogger() + private val group: NioEventLoopGroup + private val connectionPool: MutableMap = ConcurrentHashMap() + + init { + group = NioEventLoopGroup() + } + + private fun newConnectionPool(server : MemcachedCacheConfiguration.Server) : FixedChannelPool { + val bootstrap = Bootstrap().apply { + group(group) + channel(NioSocketChannel::class.java) + option(ChannelOption.SO_KEEPALIVE, true) + remoteAddress(InetSocketAddress(server.endpoint.host, server.endpoint.port)) + server.connectionTimeoutMillis?.let { + option(ChannelOption.CONNECT_TIMEOUT_MILLIS, it) + } + } + val channelPoolHandler = object : AbstractChannelPoolHandler() { + + override fun channelCreated(ch: Channel) { + val pipeline: ChannelPipeline = ch.pipeline() + pipeline.addLast(BinaryMemcacheClientCodec()) + } + } + return FixedChannelPool(bootstrap, channelPoolHandler, server.maxConnections) + } + + + private fun sendRequest(request: BinaryMemcacheRequest, + responseListener: ResponseListener? + ): CompletableFuture { + + val server = cfg.servers.let { servers -> + if(servers.size > 1) { + val key = request.key().duplicate() + var checksum = 0 + while(key.readableBytes() > 4) { + val byte = key.readInt() + checksum = checksum xor byte + } + while(key.readableBytes() > 0) { + val byte = key.readByte() + checksum = checksum xor byte.toInt() + } + servers[checksum % servers.size] + } else { + servers.first() + } + } + + val callHandleFuture = CompletableFuture() + val result = CompletableFuture() + // Custom handler for processing responses + val pool = connectionPool.computeIfAbsent(server.endpoint) { + newConnectionPool(server) + } + pool.acquire().addListener(object : GenericFutureListener> { + override fun operationComplete(channelFuture: NettyFuture) { + if (channelFuture.isSuccess) { + val channel = channelFuture.now + val pipeline = channel.pipeline() + channel.pipeline().addLast("handler", object : SimpleChannelInboundHandler() { + val response : MemcacheResponse? = null + override fun channelRead0( + ctx: ChannelHandlerContext, + msg: MemcacheObject + ) { + if(msg is BinaryMemcacheResponse) { + val resp = MemcacheResponse.of(msg) + responseListener?.listen(ResponseEvent.ResponseReceived(resp)) + if(msg.totalBodyLength() == msg.keyLength() + msg.extrasLength()) { + result.complete(resp.status) + } + } + if(responseListener != null) { + when (msg) { + is LastMemcacheContent -> { + responseListener.listen(ResponseEvent.LastResponseContentChunkReceived(msg.content().nioBuffer())) + result.complete(response?.status) + pipeline.removeLast() + pool.release(channel) + } + is MemcacheContent -> { + responseListener.listen(ResponseEvent.ResponseContentChunkReceived(msg.content().nioBuffer())) + } + } + } + } + + override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { + val ex = when (cause) { + is DecoderException -> cause.cause!! + else -> cause + } + responseListener?.listen(ResponseEvent.ExceptionCaught(ex)) + result.completeExceptionally(ex) + ctx.close() + pipeline.removeLast() + pool.release(channel) + } + }) + + val chunks = mutableListOf () + fun sendRequest() { + val valueLen = chunks.fold(0) { acc : Int, c2 : ByteBuffer -> + acc + c2.remaining() + } + request.setTotalBodyLength(request.keyLength() + request.extrasLength() + valueLen) + channel.write(request) + for((i, chunk) in chunks.withIndex()) { + if(i + 1 < chunks.size) { + channel.write(DefaultMemcacheContent(Unpooled.wrappedBuffer(chunk))) + } else { + channel.write(DefaultLastMemcacheContent(Unpooled.wrappedBuffer(chunk))) + } + } + channel.flush() + } + + callHandleFuture.complete(object : CallHandle { + override fun sendChunk(requestBodyChunk: ByteBuffer) { + chunks.addLast(requestBodyChunk) + } + + override fun waitForResponse(): CompletableFuture { + sendRequest() + return result + } + }) + } else { + callHandleFuture.completeExceptionally(channelFuture.cause()) + } + } + }) + return callHandleFuture + } + + private fun encodeExpiry(expiry: Duration) : Int { + val expirySeconds = expiry.toSeconds() + return expirySeconds.toInt().takeIf { it.toLong() == expirySeconds } + ?: Instant.ofEpochSecond(expirySeconds).epochSecond.toInt() + } + + fun get(key: String, responseListener: ResponseListener) : CompletableFuture { + val request = (cfg.digestAlgorithm + ?.let(MessageDigest::getInstance) + ?.let { md -> + digest(key.toByteArray(), md) + } ?: key.toByteArray(Charsets.UTF_8)).let { digest -> + DefaultBinaryMemcacheRequest().apply { + setKey(Unpooled.wrappedBuffer(digest)) + setOpcode(BinaryMemcacheOpcodes.GET) + } + } + return sendRequest(request, responseListener) + } + + fun put(key: String, expiry : Duration, cas : Long? = null): CompletableFuture { + val request = (cfg.digestAlgorithm + ?.let(MessageDigest::getInstance) + ?.let { md -> + digest(key.toByteArray(), md) + } ?: key.toByteArray(Charsets.UTF_8)).let { digest -> + val extras = Unpooled.buffer(8, 8) + extras.writeInt(0) + extras.writeInt(encodeExpiry(expiry)) + DefaultBinaryMemcacheRequest().apply { + setExtras(extras) + setKey(Unpooled.wrappedBuffer(digest)) + setOpcode(BinaryMemcacheOpcodes.SET) + cas?.let(this::setCas) + } + } + return sendRequest(request) { evt -> + when (evt) { + is ResponseEvent.ResponseReceived -> { + if (evt.response.status != BinaryMemcacheResponseStatus.SUCCESS) { + throw MemcachedException(evt.response.status) + } + } + else -> {} + } + } + } + + + + fun shutDown(): NettyFuture<*> { + return group.shutdownGracefully() + } + + override fun close() { + shutDown().sync() + } +} \ No newline at end of file diff --git a/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/client/ResponseEvent.kt b/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/client/ResponseEvent.kt new file mode 100644 index 0000000..65b9d09 --- /dev/null +++ b/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/client/ResponseEvent.kt @@ -0,0 +1,10 @@ +package net.woggioni.gbcs.server.memcached.client + +import java.nio.ByteBuffer + +sealed interface ResponseEvent { + class ResponseReceived(val response : MemcacheResponse) : ResponseEvent + class ResponseContentChunkReceived(val chunk: ByteBuffer) : ResponseEvent + class LastResponseContentChunkReceived(val chunk: ByteBuffer) : ResponseEvent + class ExceptionCaught(val cause : Throwable) : ResponseEvent +} \ No newline at end of file diff --git a/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/client/ResponseListener.kt b/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/client/ResponseListener.kt new file mode 100644 index 0000000..6c0568d --- /dev/null +++ b/gbcs-server-memcached/src/main/kotlin/net/woggioni/gbcs/server/memcached/client/ResponseListener.kt @@ -0,0 +1,5 @@ +package net.woggioni.gbcs.server.memcached.client + +fun interface ResponseListener { + fun listen(evt : ResponseEvent) +} \ No newline at end of file diff --git a/gbcs-server-memcached/src/test/kotlin/net/woggioni/gbcs/server/memcached/test/MemcachedClientTest.kt b/gbcs-server-memcached/src/test/kotlin/net/woggioni/gbcs/server/memcached/test/MemcachedClientTest.kt new file mode 100644 index 0000000..9c767a4 --- /dev/null +++ b/gbcs-server-memcached/src/test/kotlin/net/woggioni/gbcs/server/memcached/test/MemcachedClientTest.kt @@ -0,0 +1,80 @@ +package net.woggioni.gbcs.server.memcached.test + +import io.netty.buffer.ByteBuf +import io.netty.buffer.Unpooled +import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse +import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus +import net.woggioni.gbcs.api.event.ChunkReceived +import net.woggioni.gbcs.common.HostAndPort + +import net.woggioni.gbcs.server.memcached.MemcachedCacheConfiguration +import net.woggioni.gbcs.server.memcached.client.MemcacheResponse +import net.woggioni.gbcs.server.memcached.client.MemcachedClient +import net.woggioni.gbcs.server.memcached.client.ResponseEvent +import net.woggioni.gbcs.server.memcached.client.ResponseListener +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test +import java.nio.ByteBuffer +import java.security.SecureRandom +import java.time.Duration +import java.util.Objects +import java.util.concurrent.TimeUnit +import kotlin.random.Random + +class MemcachedClientTest { + + @Test + fun test() { + val client = MemcachedClient(MemcachedCacheConfiguration( + servers = listOf( + MemcachedCacheConfiguration.Server( + endpoint = HostAndPort("127.0.0.1", 11211), + connectionTimeoutMillis = null, + retryPolicy = null, + maxConnections = 1 + ) + ) + )) + + val random = Random(SecureRandom.getInstance("NativePRNGNonBlocking").nextLong()) + val key = "101325" + val value = random.nextBytes(0x1000) + val requestListener = client.put(key, Duration.ofDays(2), null) + + val response = Unpooled.buffer(value.size) + requestListener.thenCompose { listener -> + listener.sendChunk(ByteBuffer.wrap(value)) + listener.waitForResponse() + }.get(10, TimeUnit.SECONDS) + + client.get(key, object: ResponseListener { + override fun listen(evt: ResponseEvent) { + when(evt) { + is ResponseEvent.ResponseReceived -> { + if(evt.response.status != BinaryMemcacheResponseStatus.SUCCESS) { + Assertions.fail { + "Memcache status ${evt.response.status}" + } + } + } + is ResponseEvent.ResponseContentChunkReceived -> response.writeBytes(evt.chunk) + else -> {} + } + } + }).thenCompose { it.waitForResponse() }.get(1, TimeUnit.SECONDS) + val retrievedResponse = response.array() + Assertions.assertArrayEquals(value, retrievedResponse) + + } + + @Test + fun test2() { + val a1 = ByteArray(10) { + it.toByte() + } + val a2 = ByteArray(10) { + it.toByte() + } + Assertions.assertTrue(Objects.equals(a1, a1)) + } +} \ No newline at end of file diff --git a/gbcs-server-memcached/src/test/resources/logback.xml b/gbcs-server-memcached/src/test/resources/logback.xml new file mode 100644 index 0000000..c6f9111 --- /dev/null +++ b/gbcs-server-memcached/src/test/resources/logback.xml @@ -0,0 +1,21 @@ + + + + + + + + + System.err + + %d [%highlight(%-5level)] \(%thread\) %logger{36} -%kvp- %msg %n + + + + + + + + + + \ No newline at end of file diff --git a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/handler/ServerHandler.kt b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/handler/ServerHandler.kt index a127163..9da05eb 100644 --- a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/handler/ServerHandler.kt +++ b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/handler/ServerHandler.kt @@ -1,36 +1,83 @@ package net.woggioni.gbcs.server.handler import io.netty.buffer.Unpooled -import io.netty.channel.ChannelFutureListener -import io.netty.channel.ChannelHandler import io.netty.channel.ChannelHandlerContext -import io.netty.channel.DefaultFileRegion import io.netty.channel.SimpleChannelInboundHandler import io.netty.handler.codec.http.DefaultFullHttpResponse +import io.netty.handler.codec.http.DefaultHttpContent import io.netty.handler.codec.http.DefaultHttpResponse -import io.netty.handler.codec.http.FullHttpRequest +import io.netty.handler.codec.http.DefaultLastHttpContent +import io.netty.handler.codec.http.HttpContent import io.netty.handler.codec.http.HttpHeaderNames import io.netty.handler.codec.http.HttpHeaderValues +import io.netty.handler.codec.http.HttpMessage import io.netty.handler.codec.http.HttpMethod +import io.netty.handler.codec.http.HttpRequest import io.netty.handler.codec.http.HttpResponseStatus import io.netty.handler.codec.http.HttpUtil import io.netty.handler.codec.http.LastHttpContent -import io.netty.handler.stream.ChunkedNioStream import net.woggioni.gbcs.api.Cache -import net.woggioni.gbcs.api.exception.CacheException +import net.woggioni.gbcs.api.CallHandle +import net.woggioni.gbcs.api.ResponseEventListener +import net.woggioni.gbcs.api.event.RequestEvent +import net.woggioni.gbcs.api.event.ResponseEvent import net.woggioni.gbcs.common.contextLogger import net.woggioni.gbcs.server.debug import net.woggioni.gbcs.server.warn -import java.nio.channels.FileChannel import java.nio.file.Path +import java.util.concurrent.CompletableFuture -@ChannelHandler.Sharable class ServerHandler(private val cache: Cache, private val serverPrefix: Path) : - SimpleChannelInboundHandler() { + SimpleChannelInboundHandler() { - private val log = contextLogger() + companion object { + @JvmStatic + private val log = contextLogger() + } - override fun channelRead0(ctx: ChannelHandlerContext, msg: FullHttpRequest) { + private data class TransientContext( + var key: String?, + var callHandle: CompletableFuture> + ) + + private var transientContext: TransientContext? = null + + override fun channelRead0(ctx: ChannelHandlerContext, msg: HttpMessage) { + when (msg) { + is HttpRequest -> { + handleRequest(ctx, msg) + } + + is LastHttpContent -> { + transientContext?.run { + callHandle.thenCompose { callHandle -> + callHandle.postEvent(RequestEvent.LastChunkSent(msg.content())) + callHandle.call() + }.thenApply { + val response = DefaultFullHttpResponse( + msg.protocolVersion(), HttpResponseStatus.CREATED, + key?.let(String::toByteArray) + ?.let(Unpooled::copiedBuffer) + ) +// response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes() + ctx.writeAndFlush(response) + } + } + } + + is HttpContent -> { + transientContext?.run { + callHandle = callHandle.thenApply { it -> + it.postEvent(RequestEvent.ChunkSent(msg.content())) + it + } + } + } + } + + } + + private fun handleRequest(ctx: ChannelHandlerContext, msg: HttpRequest) { val keepAlive: Boolean = HttpUtil.isKeepAlive(msg) val method = msg.method() if (method === HttpMethod.GET) { @@ -43,49 +90,61 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) : return } if (serverPrefix == prefix) { - try { - cache.get(key) - } catch(ex : Throwable) { - throw CacheException("Error accessing the cache backend", ex) - }?.let { channel -> - log.debug(ctx) { - "Cache hit for key '$key'" - } - val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK) - response.headers()[HttpHeaderNames.CONTENT_TYPE] = HttpHeaderValues.APPLICATION_OCTET_STREAM - if (!keepAlive) { - response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE) - response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.IDENTITY) - } else { - response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE) - response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED) - } - ctx.write(response) - when (channel) { - is FileChannel -> { - if (keepAlive) { - ctx.write(DefaultFileRegion(channel, 0, channel.size())) - ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate()) - } else { - ctx.writeAndFlush(DefaultFileRegion(channel, 0, channel.size())) - .addListener(ChannelFutureListener.CLOSE) + cache.get(key, object : ResponseEventListener { + var first = false + override fun listen(evt: ResponseEvent) { + when (evt) { + is ResponseEvent.NoContent -> { + log.debug(ctx) { + "Cache miss for key '$key'" + } + val response = + DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND) + response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0 + ctx.writeAndFlush(response) + } + + is ResponseEvent.ChunkReceived, is ResponseEvent.LastChunkReceived -> { + if (first) { + first = false + log.debug(ctx) { + "Cache hit for key '$key'" + } + val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK) + response.headers()[HttpHeaderNames.CONTENT_TYPE] = + HttpHeaderValues.APPLICATION_OCTET_STREAM + if (!keepAlive) { + response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE) + response.headers() + .set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.IDENTITY) + } else { + response.headers() + .set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE) + response.headers() + .set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED) + } + ctx.write(response) + } + if (evt is ResponseEvent.LastChunkReceived) + ctx.write(DefaultLastHttpContent(evt.chunk)) + else if (evt is ResponseEvent.ChunkReceived) + ctx.write(DefaultHttpContent(evt.chunk)) + ctx.flush() + } + + is ResponseEvent.ExceptionCaught -> { + log.error(evt.cause.message, evt.cause) + val errorResponse = DefaultFullHttpResponse( + msg.protocolVersion(), HttpResponseStatus.INTERNAL_SERVER_ERROR, + evt.cause.message + ?.let(String::toByteArray) + ?.let(Unpooled::copiedBuffer) + ) + ctx.write(errorResponse) } } - else -> { - ctx.write(ChunkedNioStream(channel)).addListener { evt -> - channel.close() - } - ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate()) - } } - } ?: let { - log.debug(ctx) { - "Cache miss for key '$key'" - } - val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND) - response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0 - ctx.writeAndFlush(response) - } + }).thenCompose(CallHandle::call) } else { log.warn(ctx) { "Got request for unhandled path '${msg.uri()}'" @@ -93,35 +152,22 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) : val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.BAD_REQUEST) response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0 ctx.writeAndFlush(response) + ctx.channel().read() } } else if (method === HttpMethod.PUT) { val path = Path.of(msg.uri()) val prefix = path.parent val key = path.fileName.toString() - if (serverPrefix == prefix) { log.debug(ctx) { "Added value for key '$key' to build cache" } - val bodyBytes = msg.content().run { - if (isDirect) { - ByteArray(readableBytes()).also { - readBytes(it) - } - } else { - array() - } - } - try { - cache.put(key, bodyBytes) - } catch(ex : Throwable) { - throw CacheException("Error accessing the cache backend", ex) - } + transientContext = TransientContext(key, cache.put(key)) val response = DefaultFullHttpResponse( msg.protocolVersion(), HttpResponseStatus.CREATED, Unpooled.copiedBuffer(key.toByteArray()) ) - response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes() +// response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes() ctx.writeAndFlush(response) } else { log.warn(ctx) { @@ -131,9 +177,12 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) : response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0" ctx.writeAndFlush(response) } - } else if(method == HttpMethod.TRACE) { + } else if (method == HttpMethod.TRACE) { val replayedRequestHead = ctx.alloc().buffer() - replayedRequestHead.writeCharSequence("TRACE ${Path.of(msg.uri())} ${msg.protocolVersion().text()}\r\n", Charsets.US_ASCII) + replayedRequestHead.writeCharSequence( + "TRACE ${Path.of(msg.uri())} ${msg.protocolVersion().text()}\r\n", + Charsets.US_ASCII + ) msg.headers().forEach { (key, value) -> replayedRequestHead.apply { writeCharSequence(key, Charsets.US_ASCII) @@ -143,18 +192,30 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) : } } replayedRequestHead.writeCharSequence("\r\n", Charsets.US_ASCII) - val requestBody = msg.content() - requestBody.retain() - val responseBody = ctx.alloc().compositeBuffer(2).apply { - addComponents(true, replayedRequestHead) - addComponents(true, requestBody) - } - val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK, responseBody) + val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK) response.headers().apply { set(HttpHeaderNames.CONTENT_TYPE, "message/http") - set(HttpHeaderNames.CONTENT_LENGTH, responseBody.readableBytes()) } - ctx.writeAndFlush(response) + ctx.write(response) + ctx.writeAndFlush(DefaultHttpContent(replayedRequestHead)) + val callHandle = object : CallHandle { + override fun postEvent(evt: RequestEvent) { + when (evt) { + is RequestEvent.ChunkSent -> { + ctx.writeAndFlush(DefaultHttpContent(evt.chunk)) + } + + is RequestEvent.LastChunkSent -> { + ctx.writeAndFlush(DefaultLastHttpContent(evt.chunk)) + } + } + } + + override fun call(): CompletableFuture { + return CompletableFuture.completedFuture(null) + } + } + transientContext = TransientContext(null, CompletableFuture.completedFuture(callHandle)) } else { log.warn(ctx) { "Got request with unhandled method '${msg.method().name()}'" @@ -163,5 +224,6 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) : response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0" ctx.writeAndFlush(response) } + } } \ No newline at end of file