temporary commit

This commit is contained in:
2025-02-04 13:59:44 +08:00
parent 89153b60f8
commit e5fe8437a6
22 changed files with 771 additions and 145 deletions

View File

@@ -5,6 +5,7 @@ plugins {
} }
dependencies { dependencies {
api catalog.netty.buffer
} }
publishing { publishing {

View File

@@ -1,6 +1,8 @@
module net.woggioni.gbcs.api { module net.woggioni.gbcs.api {
requires static lombok; requires static lombok;
requires java.xml; requires java.xml;
requires io.netty.buffer;
exports net.woggioni.gbcs.api; exports net.woggioni.gbcs.api;
exports net.woggioni.gbcs.api.exception; exports net.woggioni.gbcs.api.exception;
exports net.woggioni.gbcs.api.event;
} }

View File

@@ -3,10 +3,10 @@ package net.woggioni.gbcs.api;
import net.woggioni.gbcs.api.exception.ContentTooLargeException; import net.woggioni.gbcs.api.exception.ContentTooLargeException;
import java.nio.channels.ReadableByteChannel; import java.nio.channels.ReadableByteChannel;
import java.util.concurrent.CompletableFuture;
public interface Cache extends AutoCloseable { public interface Cache extends AutoCloseable {
ReadableByteChannel get(String key); CompletableFuture<CallHandle<Void>> get(String key, ResponseEventListener responseEventListener);
CompletableFuture<CallHandle<Void>> put(String key) throws ContentTooLargeException;
void put(String key, byte[] content) throws ContentTooLargeException;
} }

View File

@@ -0,0 +1,10 @@
package net.woggioni.gbcs.api;
import net.woggioni.gbcs.api.event.RequestEvent;
import java.util.concurrent.CompletableFuture;
public interface CallHandle<T> {
void postEvent(RequestEvent evt);
CompletableFuture<T> call();
}

View File

@@ -0,0 +1,7 @@
package net.woggioni.gbcs.api;
import net.woggioni.gbcs.api.event.ResponseEvent;
public interface ResponseEventListener {
void listen(ResponseEvent evt);
}

View File

@@ -0,0 +1,20 @@
package net.woggioni.gbcs.api.event;
import io.netty.buffer.ByteBuf;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import net.woggioni.gbcs.api.CallHandle;
sealed public abstract class RequestEvent {
@Getter
@RequiredArgsConstructor
public static final class ChunkSent extends RequestEvent {
private final ByteBuf chunk;
}
@Getter
@RequiredArgsConstructor
public static final class LastChunkSent extends RequestEvent {
private final ByteBuf chunk;
}
}

View File

@@ -0,0 +1,28 @@
package net.woggioni.gbcs.api.event;
import io.netty.buffer.ByteBuf;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
sealed public abstract class ResponseEvent {
@Getter
@RequiredArgsConstructor
public final static class ChunkReceived extends ResponseEvent {
private final ByteBuf chunk;
}
public final static class NoContent extends ResponseEvent {
}
@Getter
@RequiredArgsConstructor
public final static class LastChunkReceived extends ResponseEvent {
private final ByteBuf chunk;
}
@Getter
@RequiredArgsConstructor
public final static class ExceptionCaught extends ResponseEvent {
private final Throwable cause;
}
}

View File

@@ -26,13 +26,24 @@ configurations {
canBeResolved = true canBeResolved = true
visible = true visible = true
} }
testImplementation {
extendsFrom compileOnly
}
} }
dependencies { dependencies {
compileOnly project(':gbcs-common') compileOnly project(':gbcs-common')
compileOnly project(':gbcs-api') compileOnly project(':gbcs-api')
compileOnly catalog.jwo compileOnly catalog.jwo
compileOnly catalog.slf4j.api
implementation catalog.xmemcached implementation catalog.xmemcached
implementation catalog.netty.codec.memcache
implementation catalog.netty.common
implementation group: 'io.netty', name: 'netty-handler', version: catalog.versions.netty.get()
testRuntimeOnly catalog.logback.classic
} }
Provider<Tar> bundleTask = tasks.register("bundle", Tar) { Provider<Tar> bundleTask = tasks.register("bundle", Tar) {
@@ -41,6 +52,11 @@ Provider<Tar> bundleTask = tasks.register("bundle", Tar) {
group = BasePlugin.BUILD_GROUP group = BasePlugin.BUILD_GROUP
} }
tasks.named(JavaPlugin.TEST_TASK_NAME, Test) {
systemProperty("io.netty.leakDetectionLevel", "PARANOID")
}
tasks.named(BasePlugin.ASSEMBLE_TASK_NAME) { tasks.named(BasePlugin.ASSEMBLE_TASK_NAME) {
dependsOn(bundleTask) dependsOn(bundleTask)
} }

View File

@@ -7,6 +7,13 @@ module net.woggioni.gbcs.server.memcached {
requires net.woggioni.jwo; requires net.woggioni.jwo;
requires java.xml; requires java.xml;
requires kotlin.stdlib; requires kotlin.stdlib;
requires io.netty.common;
requires io.netty.handler;
requires io.netty.codec.memcache;
requires io.netty.transport;
requires org.slf4j;
requires io.netty.buffer;
requires io.netty.codec;
provides CacheProvider with net.woggioni.gbcs.server.memcached.MemcachedCacheProvider; provides CacheProvider with net.woggioni.gbcs.server.memcached.MemcachedCacheProvider;

View File

@@ -0,0 +1,33 @@
package net.woggioni.gbcs.server.memcached
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.stream.ChunkedInput
import java.nio.channels.ReadableByteChannel
class CustomChunkedInput(private val readableByteChannel: ReadableByteChannel) : ChunkedInput<ByteBuf> {
override fun isEndOfInput(): Boolean {
TODO("Not yet implemented")
}
override fun close() {
TODO("Not yet implemented")
}
override fun readChunk(ctx: ChannelHandlerContext): ByteBuf {
TODO("Not yet implemented")
}
override fun readChunk(allocator: ByteBufAllocator): ByteBuf {
TODO("Not yet implemented")
}
override fun length(): Long {
TODO("Not yet implemented")
}
override fun progress(): Long {
TODO("Not yet implemented")
}
}

View File

@@ -0,0 +1,4 @@
package net.woggioni.gbcs.server.memcached
class MemcachedException(status : Short, msg : String? = null, cause : Throwable? = null)
: RuntimeException(msg ?: "Memcached status $status", cause)

View File

@@ -1,59 +1,85 @@
package net.woggioni.gbcs.server.memcached package net.woggioni.gbcs.server.memcached
import net.rubyeye.xmemcached.XMemcachedClientBuilder import io.netty.buffer.Unpooled
import net.rubyeye.xmemcached.command.BinaryCommandFactory import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus
import net.rubyeye.xmemcached.transcoders.CompressionMode
import net.rubyeye.xmemcached.transcoders.SerializingTranscoder
import net.woggioni.gbcs.api.Cache import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.api.exception.ContentTooLargeException import net.woggioni.gbcs.api.CallHandle
import net.woggioni.gbcs.common.HostAndPort import net.woggioni.gbcs.api.ResponseEventListener
import net.woggioni.jwo.JWO import net.woggioni.gbcs.api.event.RequestEvent
import java.io.ByteArrayInputStream import net.woggioni.gbcs.api.event.ResponseEvent
import java.net.InetSocketAddress import net.woggioni.gbcs.server.memcached.client.MemcachedClient
import java.nio.channels.Channels import net.woggioni.gbcs.server.memcached.client.ResponseEvent.ExceptionCaught
import java.nio.channels.ReadableByteChannel import net.woggioni.gbcs.server.memcached.client.ResponseEvent.LastResponseContentChunkReceived
import java.nio.charset.StandardCharsets import net.woggioni.gbcs.server.memcached.client.ResponseEvent.ResponseContentChunkReceived
import java.security.MessageDigest import net.woggioni.gbcs.server.memcached.client.ResponseEvent.ResponseReceived
import java.time.Duration import net.woggioni.gbcs.server.memcached.client.ResponseListener
import java.util.concurrent.CompletableFuture
class MemcachedCache( class MemcachedCache(
servers: List<HostAndPort>, private val cfg : MemcachedCacheConfiguration
private val maxAge: Duration,
maxSize : Int,
digestAlgorithm: String?,
compressionMode: CompressionMode,
) : Cache { ) : Cache {
private val memcachedClient = XMemcachedClientBuilder( private val client = MemcachedClient(cfg)
servers.stream().map { addr: HostAndPort -> InetSocketAddress(addr.host, addr.port) }.toList()
).apply {
commandFactory = BinaryCommandFactory()
digestAlgorithm?.let { dAlg ->
setKeyProvider { key ->
val md = MessageDigest.getInstance(dAlg)
md.update(key.toByteArray(StandardCharsets.UTF_8))
JWO.bytesToHex(md.digest())
}
}
transcoder = SerializingTranscoder(maxSize).apply {
setCompressionMode(compressionMode)
}
}.build()
override fun get(key: String): ReadableByteChannel? {
return memcachedClient.get<ByteArray>(key)
?.let(::ByteArrayInputStream)
?.let(Channels::newChannel)
}
override fun put(key: String, content: ByteArray) {
try {
memcachedClient[key, maxAge.toSeconds().toInt()] = content
} catch (e: IllegalArgumentException) {
throw ContentTooLargeException(e.message, e)
}
}
override fun close() { override fun close() {
memcachedClient.shutdown() client.close()
}
override fun get(key: String, responseEventListener: ResponseEventListener): CompletableFuture<CallHandle<Void>> {
val listener = ResponseListener { evt ->
when(evt) {
is ResponseContentChunkReceived -> {
responseEventListener.listen(ResponseEvent.ChunkReceived(Unpooled.wrappedBuffer(evt.chunk)))
}
is LastResponseContentChunkReceived -> {
responseEventListener.listen(ResponseEvent.LastChunkReceived(Unpooled.wrappedBuffer(evt.chunk)))
}
is ExceptionCaught -> {
responseEventListener.listen(ResponseEvent.ExceptionCaught(evt.cause))
}
is ResponseReceived -> {
when(val status = evt.response.status) {
BinaryMemcacheResponseStatus.SUCCESS -> {
}
BinaryMemcacheResponseStatus.KEY_ENOENT -> {
responseEventListener.listen(ResponseEvent.NoContent())
}
else -> {
responseEventListener.listen(ResponseEvent.ExceptionCaught(MemcachedException(status)))
}
}
}
}
}
return client.get(key, listener).thenApply { clientCallHandle ->
object : CallHandle<Void> {
override fun postEvent(evt: RequestEvent) {
when(evt) {
is RequestEvent.ChunkSent -> clientCallHandle.sendChunk(evt.chunk.nioBuffer())
is RequestEvent.LastChunkSent -> clientCallHandle.sendChunk(evt.chunk.nioBuffer())
}
}
override fun call(): CompletableFuture<Void> {
return clientCallHandle.waitForResponse().thenApply { null }
}
}
}
}
override fun put(key: String): CompletableFuture<CallHandle<Void>> {
return client.put(key, cfg.maxAge).thenApply { clientCallHandle ->
object : CallHandle<Void> {
override fun postEvent(evt: RequestEvent) {
when(evt) {
is RequestEvent.ChunkSent -> clientCallHandle.sendChunk(evt.chunk.nioBuffer())
is RequestEvent.LastChunkSent -> clientCallHandle.sendChunk(evt.chunk.nioBuffer())
}
}
override fun call(): CompletableFuture<Void> {
return clientCallHandle.waitForResponse().thenApply { null }
}
}
}
} }
} }

View File

@@ -1,25 +1,45 @@
package net.woggioni.gbcs.server.memcached package net.woggioni.gbcs.server.memcached
import net.rubyeye.xmemcached.transcoders.CompressionMode
import net.woggioni.gbcs.api.Configuration import net.woggioni.gbcs.api.Configuration
import net.woggioni.gbcs.common.HostAndPort import net.woggioni.gbcs.common.HostAndPort
import java.time.Duration import java.time.Duration
data class MemcachedCacheConfiguration( data class MemcachedCacheConfiguration(
var servers: List<HostAndPort>, val servers: List<Server>,
var maxAge: Duration = Duration.ofDays(1), val maxAge: Duration = Duration.ofDays(1),
var maxSize: Int = 0x100000, val maxSize: Int = 0x100000,
var digestAlgorithm: String? = null, val digestAlgorithm: String? = null,
var compressionMode: CompressionMode = CompressionMode.ZIP, val compressionMode: CompressionMode? = CompressionMode.DEFLATE,
) : Configuration.Cache { ) : Configuration.Cache {
override fun materialize() = MemcachedCache(
servers, enum class CompressionMode {
maxAge, /**
maxSize, * Gzip mode
digestAlgorithm, */
compressionMode GZIP,
/**
* Deflate mode
*/
DEFLATE
}
class RetryPolicy(
val maxAttempts: Int,
val initialDelayMillis: Long,
val exp: Double
) )
data class Server(
val endpoint : HostAndPort,
val connectionTimeoutMillis : Int?,
val retryPolicy : RetryPolicy?,
val maxConnections : Int
)
override fun materialize() = MemcachedCache(this)
override fun getNamespaceURI() = "urn:net.woggioni.gbcs.server.memcached" override fun getNamespaceURI() = "urn:net.woggioni.gbcs.server.memcached"
override fun getTypeName() = "memcachedCacheType" override fun getTypeName() = "memcachedCacheType"

View File

@@ -51,7 +51,7 @@ class MemcachedCacheProvider : CacheProvider<MemcachedCacheConfiguration> {
} }
return MemcachedCacheConfiguration( return MemcachedCacheConfiguration(
servers, servers.map { MemcachedCacheConfiguration.Server(it, null, null, 1) },
maxAge, maxAge,
maxSize, maxSize,
digestAlgorithm, digestAlgorithm,
@@ -67,8 +67,8 @@ class MemcachedCacheProvider : CacheProvider<MemcachedCacheConfiguration> {
attr("xs:type", "${xmlNamespacePrefix}:$xmlType", GBCS.XML_SCHEMA_NAMESPACE_URI) attr("xs:type", "${xmlNamespacePrefix}:$xmlType", GBCS.XML_SCHEMA_NAMESPACE_URI)
for (server in servers) { for (server in servers) {
node("server") { node("server") {
attr("host", server.host) attr("host", server.endpoint.host)
attr("port", server.port.toString()) attr("port", server.endpoint.port.toString())
} }
} }
attr("max-age", maxAge.toString()) attr("max-age", maxAge.toString())

View File

@@ -0,0 +1,9 @@
package net.woggioni.gbcs.server.memcached.client
import java.nio.ByteBuffer
import java.util.concurrent.CompletableFuture
interface CallHandle {
fun sendChunk(requestBodyChunk : ByteBuffer)
fun waitForResponse() : CompletableFuture<Short>
}

View File

@@ -0,0 +1,24 @@
package net.woggioni.gbcs.server.memcached.client
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
import java.nio.ByteBuffer
data class MemcacheResponse(
val status: Short,
val opcode: Byte,
val cas: Long?,
val opaque: Int?,
val key: ByteBuffer?,
val extra: ByteBuffer?
) {
companion object {
fun of(response : BinaryMemcacheResponse) = MemcacheResponse(
response.status(),
response.opcode(),
response.cas(),
response.opaque(),
response.key()?.nioBuffer(),
response.extras()?.nioBuffer()
)
}
}

View File

@@ -0,0 +1,241 @@
package net.woggioni.gbcs.server.memcached.client
import io.netty.bootstrap.Bootstrap
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelOption
import io.netty.channel.ChannelPipeline
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.pool.AbstractChannelPoolHandler
import io.netty.channel.pool.ChannelPool
import io.netty.channel.pool.FixedChannelPool
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.DecoderException
import io.netty.handler.codec.memcache.DefaultLastMemcacheContent
import io.netty.handler.codec.memcache.DefaultMemcacheContent
import io.netty.handler.codec.memcache.LastMemcacheContent
import io.netty.handler.codec.memcache.MemcacheContent
import io.netty.handler.codec.memcache.MemcacheObject
import io.netty.handler.codec.memcache.binary.BinaryMemcacheClientCodec
import io.netty.handler.codec.memcache.binary.BinaryMemcacheOpcodes
import io.netty.handler.codec.memcache.binary.BinaryMemcacheRequest
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus
import io.netty.handler.codec.memcache.binary.DefaultBinaryMemcacheRequest
import io.netty.util.concurrent.GenericFutureListener
import net.woggioni.gbcs.common.GBCS.digest
import net.woggioni.gbcs.common.HostAndPort
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.server.memcached.MemcachedCacheConfiguration
import net.woggioni.gbcs.server.memcached.MemcachedException
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.security.MessageDigest
import java.time.Duration
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import io.netty.util.concurrent.Future as NettyFuture
class MemcachedClient(private val cfg: MemcachedCacheConfiguration) : AutoCloseable {
private val log = contextLogger()
private val group: NioEventLoopGroup
private val connectionPool: MutableMap<HostAndPort, ChannelPool> = ConcurrentHashMap()
init {
group = NioEventLoopGroup()
}
private fun newConnectionPool(server : MemcachedCacheConfiguration.Server) : FixedChannelPool {
val bootstrap = Bootstrap().apply {
group(group)
channel(NioSocketChannel::class.java)
option(ChannelOption.SO_KEEPALIVE, true)
remoteAddress(InetSocketAddress(server.endpoint.host, server.endpoint.port))
server.connectionTimeoutMillis?.let {
option(ChannelOption.CONNECT_TIMEOUT_MILLIS, it)
}
}
val channelPoolHandler = object : AbstractChannelPoolHandler() {
override fun channelCreated(ch: Channel) {
val pipeline: ChannelPipeline = ch.pipeline()
pipeline.addLast(BinaryMemcacheClientCodec())
}
}
return FixedChannelPool(bootstrap, channelPoolHandler, server.maxConnections)
}
private fun sendRequest(request: BinaryMemcacheRequest,
responseListener: ResponseListener?
): CompletableFuture<CallHandle> {
val server = cfg.servers.let { servers ->
if(servers.size > 1) {
val key = request.key().duplicate()
var checksum = 0
while(key.readableBytes() > 4) {
val byte = key.readInt()
checksum = checksum xor byte
}
while(key.readableBytes() > 0) {
val byte = key.readByte()
checksum = checksum xor byte.toInt()
}
servers[checksum % servers.size]
} else {
servers.first()
}
}
val callHandleFuture = CompletableFuture<CallHandle>()
val result = CompletableFuture<Short>()
// Custom handler for processing responses
val pool = connectionPool.computeIfAbsent(server.endpoint) {
newConnectionPool(server)
}
pool.acquire().addListener(object : GenericFutureListener<NettyFuture<Channel>> {
override fun operationComplete(channelFuture: NettyFuture<Channel>) {
if (channelFuture.isSuccess) {
val channel = channelFuture.now
val pipeline = channel.pipeline()
channel.pipeline().addLast("handler", object : SimpleChannelInboundHandler<MemcacheObject>() {
val response : MemcacheResponse? = null
override fun channelRead0(
ctx: ChannelHandlerContext,
msg: MemcacheObject
) {
if(msg is BinaryMemcacheResponse) {
val resp = MemcacheResponse.of(msg)
responseListener?.listen(ResponseEvent.ResponseReceived(resp))
if(msg.totalBodyLength() == msg.keyLength() + msg.extrasLength()) {
result.complete(resp.status)
}
}
if(responseListener != null) {
when (msg) {
is LastMemcacheContent -> {
responseListener.listen(ResponseEvent.LastResponseContentChunkReceived(msg.content().nioBuffer()))
result.complete(response?.status)
pipeline.removeLast()
pool.release(channel)
}
is MemcacheContent -> {
responseListener.listen(ResponseEvent.ResponseContentChunkReceived(msg.content().nioBuffer()))
}
}
}
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
val ex = when (cause) {
is DecoderException -> cause.cause!!
else -> cause
}
responseListener?.listen(ResponseEvent.ExceptionCaught(ex))
result.completeExceptionally(ex)
ctx.close()
pipeline.removeLast()
pool.release(channel)
}
})
val chunks = mutableListOf <ByteBuffer>()
fun sendRequest() {
val valueLen = chunks.fold(0) { acc : Int, c2 : ByteBuffer ->
acc + c2.remaining()
}
request.setTotalBodyLength(request.keyLength() + request.extrasLength() + valueLen)
channel.write(request)
for((i, chunk) in chunks.withIndex()) {
if(i + 1 < chunks.size) {
channel.write(DefaultMemcacheContent(Unpooled.wrappedBuffer(chunk)))
} else {
channel.write(DefaultLastMemcacheContent(Unpooled.wrappedBuffer(chunk)))
}
}
channel.flush()
}
callHandleFuture.complete(object : CallHandle {
override fun sendChunk(requestBodyChunk: ByteBuffer) {
chunks.addLast(requestBodyChunk)
}
override fun waitForResponse(): CompletableFuture<Short> {
sendRequest()
return result
}
})
} else {
callHandleFuture.completeExceptionally(channelFuture.cause())
}
}
})
return callHandleFuture
}
private fun encodeExpiry(expiry: Duration) : Int {
val expirySeconds = expiry.toSeconds()
return expirySeconds.toInt().takeIf { it.toLong() == expirySeconds }
?: Instant.ofEpochSecond(expirySeconds).epochSecond.toInt()
}
fun get(key: String, responseListener: ResponseListener) : CompletableFuture<CallHandle> {
val request = (cfg.digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digest(key.toByteArray(), md)
} ?: key.toByteArray(Charsets.UTF_8)).let { digest ->
DefaultBinaryMemcacheRequest().apply {
setKey(Unpooled.wrappedBuffer(digest))
setOpcode(BinaryMemcacheOpcodes.GET)
}
}
return sendRequest(request, responseListener)
}
fun put(key: String, expiry : Duration, cas : Long? = null): CompletableFuture<CallHandle> {
val request = (cfg.digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digest(key.toByteArray(), md)
} ?: key.toByteArray(Charsets.UTF_8)).let { digest ->
val extras = Unpooled.buffer(8, 8)
extras.writeInt(0)
extras.writeInt(encodeExpiry(expiry))
DefaultBinaryMemcacheRequest().apply {
setExtras(extras)
setKey(Unpooled.wrappedBuffer(digest))
setOpcode(BinaryMemcacheOpcodes.SET)
cas?.let(this::setCas)
}
}
return sendRequest(request) { evt ->
when (evt) {
is ResponseEvent.ResponseReceived -> {
if (evt.response.status != BinaryMemcacheResponseStatus.SUCCESS) {
throw MemcachedException(evt.response.status)
}
}
else -> {}
}
}
}
fun shutDown(): NettyFuture<*> {
return group.shutdownGracefully()
}
override fun close() {
shutDown().sync()
}
}

View File

@@ -0,0 +1,10 @@
package net.woggioni.gbcs.server.memcached.client
import java.nio.ByteBuffer
sealed interface ResponseEvent {
class ResponseReceived(val response : MemcacheResponse) : ResponseEvent
class ResponseContentChunkReceived(val chunk: ByteBuffer) : ResponseEvent
class LastResponseContentChunkReceived(val chunk: ByteBuffer) : ResponseEvent
class ExceptionCaught(val cause : Throwable) : ResponseEvent
}

View File

@@ -0,0 +1,5 @@
package net.woggioni.gbcs.server.memcached.client
fun interface ResponseListener {
fun listen(evt : ResponseEvent)
}

View File

@@ -0,0 +1,80 @@
package net.woggioni.gbcs.server.memcached.test
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus
import net.woggioni.gbcs.api.event.ChunkReceived
import net.woggioni.gbcs.common.HostAndPort
import net.woggioni.gbcs.server.memcached.MemcachedCacheConfiguration
import net.woggioni.gbcs.server.memcached.client.MemcacheResponse
import net.woggioni.gbcs.server.memcached.client.MemcachedClient
import net.woggioni.gbcs.server.memcached.client.ResponseEvent
import net.woggioni.gbcs.server.memcached.client.ResponseListener
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import java.nio.ByteBuffer
import java.security.SecureRandom
import java.time.Duration
import java.util.Objects
import java.util.concurrent.TimeUnit
import kotlin.random.Random
class MemcachedClientTest {
@Test
fun test() {
val client = MemcachedClient(MemcachedCacheConfiguration(
servers = listOf(
MemcachedCacheConfiguration.Server(
endpoint = HostAndPort("127.0.0.1", 11211),
connectionTimeoutMillis = null,
retryPolicy = null,
maxConnections = 1
)
)
))
val random = Random(SecureRandom.getInstance("NativePRNGNonBlocking").nextLong())
val key = "101325"
val value = random.nextBytes(0x1000)
val requestListener = client.put(key, Duration.ofDays(2), null)
val response = Unpooled.buffer(value.size)
requestListener.thenCompose { listener ->
listener.sendChunk(ByteBuffer.wrap(value))
listener.waitForResponse()
}.get(10, TimeUnit.SECONDS)
client.get(key, object: ResponseListener {
override fun listen(evt: ResponseEvent) {
when(evt) {
is ResponseEvent.ResponseReceived -> {
if(evt.response.status != BinaryMemcacheResponseStatus.SUCCESS) {
Assertions.fail<String> {
"Memcache status ${evt.response.status}"
}
}
}
is ResponseEvent.ResponseContentChunkReceived -> response.writeBytes(evt.chunk)
else -> {}
}
}
}).thenCompose { it.waitForResponse() }.get(1, TimeUnit.SECONDS)
val retrievedResponse = response.array()
Assertions.assertArrayEquals(value, retrievedResponse)
}
@Test
fun test2() {
val a1 = ByteArray(10) {
it.toByte()
}
val a2 = ByteArray(10) {
it.toByte()
}
Assertions.assertTrue(Objects.equals(a1, a1))
}
}

View File

@@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration>
<configuration>
<import class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"/>
<import class="ch.qos.logback.core.ConsoleAppender"/>
<appender name="console" class="ConsoleAppender">
<target>System.err</target>
<encoder class="PatternLayoutEncoder">
<pattern>%d [%highlight(%-5level)] \(%thread\) %logger{36} -%kvp- %msg %n</pattern>
</encoder>
</appender>
<root level="info">
<appender-ref ref="console"/>
</root>
<logger name="io.netty" level="debug"/>
<logger name="com.google.code.yanf4j" level="warn"/>
<logger name="net.rubyeye.xmemcached" level="warn"/>
</configuration>

View File

@@ -1,36 +1,83 @@
package net.woggioni.gbcs.server.handler package net.woggioni.gbcs.server.handler
import io.netty.buffer.Unpooled import io.netty.buffer.Unpooled
import io.netty.channel.ChannelFutureListener
import io.netty.channel.ChannelHandler
import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelHandlerContext
import io.netty.channel.DefaultFileRegion
import io.netty.channel.SimpleChannelInboundHandler import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.http.DefaultFullHttpResponse import io.netty.handler.codec.http.DefaultFullHttpResponse
import io.netty.handler.codec.http.DefaultHttpContent
import io.netty.handler.codec.http.DefaultHttpResponse import io.netty.handler.codec.http.DefaultHttpResponse
import io.netty.handler.codec.http.FullHttpRequest import io.netty.handler.codec.http.DefaultLastHttpContent
import io.netty.handler.codec.http.HttpContent
import io.netty.handler.codec.http.HttpHeaderNames import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpHeaderValues import io.netty.handler.codec.http.HttpHeaderValues
import io.netty.handler.codec.http.HttpMessage
import io.netty.handler.codec.http.HttpMethod import io.netty.handler.codec.http.HttpMethod
import io.netty.handler.codec.http.HttpRequest
import io.netty.handler.codec.http.HttpResponseStatus import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpUtil import io.netty.handler.codec.http.HttpUtil
import io.netty.handler.codec.http.LastHttpContent import io.netty.handler.codec.http.LastHttpContent
import io.netty.handler.stream.ChunkedNioStream
import net.woggioni.gbcs.api.Cache import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.api.exception.CacheException import net.woggioni.gbcs.api.CallHandle
import net.woggioni.gbcs.api.ResponseEventListener
import net.woggioni.gbcs.api.event.RequestEvent
import net.woggioni.gbcs.api.event.ResponseEvent
import net.woggioni.gbcs.common.contextLogger import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.server.debug import net.woggioni.gbcs.server.debug
import net.woggioni.gbcs.server.warn import net.woggioni.gbcs.server.warn
import java.nio.channels.FileChannel
import java.nio.file.Path import java.nio.file.Path
import java.util.concurrent.CompletableFuture
@ChannelHandler.Sharable
class ServerHandler(private val cache: Cache, private val serverPrefix: Path) : class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
SimpleChannelInboundHandler<FullHttpRequest>() { SimpleChannelInboundHandler<HttpMessage>() {
private val log = contextLogger() companion object {
@JvmStatic
private val log = contextLogger()
}
override fun channelRead0(ctx: ChannelHandlerContext, msg: FullHttpRequest) { private data class TransientContext(
var key: String?,
var callHandle: CompletableFuture<CallHandle<Void>>
)
private var transientContext: TransientContext? = null
override fun channelRead0(ctx: ChannelHandlerContext, msg: HttpMessage) {
when (msg) {
is HttpRequest -> {
handleRequest(ctx, msg)
}
is LastHttpContent -> {
transientContext?.run {
callHandle.thenCompose { callHandle ->
callHandle.postEvent(RequestEvent.LastChunkSent(msg.content()))
callHandle.call()
}.thenApply {
val response = DefaultFullHttpResponse(
msg.protocolVersion(), HttpResponseStatus.CREATED,
key?.let(String::toByteArray)
?.let(Unpooled::copiedBuffer)
)
// response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes()
ctx.writeAndFlush(response)
}
}
}
is HttpContent -> {
transientContext?.run {
callHandle = callHandle.thenApply { it ->
it.postEvent(RequestEvent.ChunkSent(msg.content()))
it
}
}
}
}
}
private fun handleRequest(ctx: ChannelHandlerContext, msg: HttpRequest) {
val keepAlive: Boolean = HttpUtil.isKeepAlive(msg) val keepAlive: Boolean = HttpUtil.isKeepAlive(msg)
val method = msg.method() val method = msg.method()
if (method === HttpMethod.GET) { if (method === HttpMethod.GET) {
@@ -43,49 +90,61 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
return return
} }
if (serverPrefix == prefix) { if (serverPrefix == prefix) {
try { cache.get(key, object : ResponseEventListener {
cache.get(key) var first = false
} catch(ex : Throwable) { override fun listen(evt: ResponseEvent) {
throw CacheException("Error accessing the cache backend", ex) when (evt) {
}?.let { channel -> is ResponseEvent.NoContent -> {
log.debug(ctx) { log.debug(ctx) {
"Cache hit for key '$key'" "Cache miss for key '$key'"
} }
val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK) val response =
response.headers()[HttpHeaderNames.CONTENT_TYPE] = HttpHeaderValues.APPLICATION_OCTET_STREAM DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND)
if (!keepAlive) { response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE) ctx.writeAndFlush(response)
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.IDENTITY) }
} else {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE) is ResponseEvent.ChunkReceived, is ResponseEvent.LastChunkReceived -> {
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED) if (first) {
} first = false
ctx.write(response) log.debug(ctx) {
when (channel) { "Cache hit for key '$key'"
is FileChannel -> { }
if (keepAlive) { val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK)
ctx.write(DefaultFileRegion(channel, 0, channel.size())) response.headers()[HttpHeaderNames.CONTENT_TYPE] =
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate()) HttpHeaderValues.APPLICATION_OCTET_STREAM
} else { if (!keepAlive) {
ctx.writeAndFlush(DefaultFileRegion(channel, 0, channel.size())) response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
.addListener(ChannelFutureListener.CLOSE) response.headers()
.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.IDENTITY)
} else {
response.headers()
.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
response.headers()
.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
}
ctx.write(response)
}
if (evt is ResponseEvent.LastChunkReceived)
ctx.write(DefaultLastHttpContent(evt.chunk))
else if (evt is ResponseEvent.ChunkReceived)
ctx.write(DefaultHttpContent(evt.chunk))
ctx.flush()
}
is ResponseEvent.ExceptionCaught -> {
log.error(evt.cause.message, evt.cause)
val errorResponse = DefaultFullHttpResponse(
msg.protocolVersion(), HttpResponseStatus.INTERNAL_SERVER_ERROR,
evt.cause.message
?.let(String::toByteArray)
?.let(Unpooled::copiedBuffer)
)
ctx.write(errorResponse)
} }
} }
else -> {
ctx.write(ChunkedNioStream(channel)).addListener { evt ->
channel.close()
}
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate())
}
} }
} ?: let { }).thenCompose(CallHandle<Void>::call)
log.debug(ctx) {
"Cache miss for key '$key'"
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response)
}
} else { } else {
log.warn(ctx) { log.warn(ctx) {
"Got request for unhandled path '${msg.uri()}'" "Got request for unhandled path '${msg.uri()}'"
@@ -93,35 +152,22 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.BAD_REQUEST) val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.BAD_REQUEST)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0 response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response) ctx.writeAndFlush(response)
ctx.channel().read()
} }
} else if (method === HttpMethod.PUT) { } else if (method === HttpMethod.PUT) {
val path = Path.of(msg.uri()) val path = Path.of(msg.uri())
val prefix = path.parent val prefix = path.parent
val key = path.fileName.toString() val key = path.fileName.toString()
if (serverPrefix == prefix) { if (serverPrefix == prefix) {
log.debug(ctx) { log.debug(ctx) {
"Added value for key '$key' to build cache" "Added value for key '$key' to build cache"
} }
val bodyBytes = msg.content().run { transientContext = TransientContext(key, cache.put(key))
if (isDirect) {
ByteArray(readableBytes()).also {
readBytes(it)
}
} else {
array()
}
}
try {
cache.put(key, bodyBytes)
} catch(ex : Throwable) {
throw CacheException("Error accessing the cache backend", ex)
}
val response = DefaultFullHttpResponse( val response = DefaultFullHttpResponse(
msg.protocolVersion(), HttpResponseStatus.CREATED, msg.protocolVersion(), HttpResponseStatus.CREATED,
Unpooled.copiedBuffer(key.toByteArray()) Unpooled.copiedBuffer(key.toByteArray())
) )
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes() // response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes()
ctx.writeAndFlush(response) ctx.writeAndFlush(response)
} else { } else {
log.warn(ctx) { log.warn(ctx) {
@@ -131,9 +177,12 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0" response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
ctx.writeAndFlush(response) ctx.writeAndFlush(response)
} }
} else if(method == HttpMethod.TRACE) { } else if (method == HttpMethod.TRACE) {
val replayedRequestHead = ctx.alloc().buffer() val replayedRequestHead = ctx.alloc().buffer()
replayedRequestHead.writeCharSequence("TRACE ${Path.of(msg.uri())} ${msg.protocolVersion().text()}\r\n", Charsets.US_ASCII) replayedRequestHead.writeCharSequence(
"TRACE ${Path.of(msg.uri())} ${msg.protocolVersion().text()}\r\n",
Charsets.US_ASCII
)
msg.headers().forEach { (key, value) -> msg.headers().forEach { (key, value) ->
replayedRequestHead.apply { replayedRequestHead.apply {
writeCharSequence(key, Charsets.US_ASCII) writeCharSequence(key, Charsets.US_ASCII)
@@ -143,18 +192,30 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
} }
} }
replayedRequestHead.writeCharSequence("\r\n", Charsets.US_ASCII) replayedRequestHead.writeCharSequence("\r\n", Charsets.US_ASCII)
val requestBody = msg.content() val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK)
requestBody.retain()
val responseBody = ctx.alloc().compositeBuffer(2).apply {
addComponents(true, replayedRequestHead)
addComponents(true, requestBody)
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK, responseBody)
response.headers().apply { response.headers().apply {
set(HttpHeaderNames.CONTENT_TYPE, "message/http") set(HttpHeaderNames.CONTENT_TYPE, "message/http")
set(HttpHeaderNames.CONTENT_LENGTH, responseBody.readableBytes())
} }
ctx.writeAndFlush(response) ctx.write(response)
ctx.writeAndFlush(DefaultHttpContent(replayedRequestHead))
val callHandle = object : CallHandle<Void> {
override fun postEvent(evt: RequestEvent) {
when (evt) {
is RequestEvent.ChunkSent -> {
ctx.writeAndFlush(DefaultHttpContent(evt.chunk))
}
is RequestEvent.LastChunkSent -> {
ctx.writeAndFlush(DefaultLastHttpContent(evt.chunk))
}
}
}
override fun call(): CompletableFuture<Void> {
return CompletableFuture.completedFuture(null)
}
}
transientContext = TransientContext(null, CompletableFuture.completedFuture(callHandle))
} else { } else {
log.warn(ctx) { log.warn(ctx) {
"Got request with unhandled method '${msg.method().name()}'" "Got request with unhandled method '${msg.method().name()}'"
@@ -163,5 +224,6 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0" response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
ctx.writeAndFlush(response) ctx.writeAndFlush(response)
} }
} }
} }