Compare commits

...

1 Commits

Author SHA1 Message Date
e5fe8437a6 temporary commit 2025-02-04 13:59:44 +08:00
22 changed files with 771 additions and 145 deletions

View File

@@ -5,6 +5,7 @@ plugins {
}
dependencies {
api catalog.netty.buffer
}
publishing {

View File

@@ -1,6 +1,8 @@
module net.woggioni.gbcs.api {
requires static lombok;
requires java.xml;
requires io.netty.buffer;
exports net.woggioni.gbcs.api;
exports net.woggioni.gbcs.api.exception;
exports net.woggioni.gbcs.api.event;
}

View File

@@ -3,10 +3,10 @@ package net.woggioni.gbcs.api;
import net.woggioni.gbcs.api.exception.ContentTooLargeException;
import java.nio.channels.ReadableByteChannel;
import java.util.concurrent.CompletableFuture;
public interface Cache extends AutoCloseable {
ReadableByteChannel get(String key);
void put(String key, byte[] content) throws ContentTooLargeException;
CompletableFuture<CallHandle<Void>> get(String key, ResponseEventListener responseEventListener);
CompletableFuture<CallHandle<Void>> put(String key) throws ContentTooLargeException;
}

View File

@@ -0,0 +1,10 @@
package net.woggioni.gbcs.api;
import net.woggioni.gbcs.api.event.RequestEvent;
import java.util.concurrent.CompletableFuture;
public interface CallHandle<T> {
void postEvent(RequestEvent evt);
CompletableFuture<T> call();
}

View File

@@ -0,0 +1,7 @@
package net.woggioni.gbcs.api;
import net.woggioni.gbcs.api.event.ResponseEvent;
public interface ResponseEventListener {
void listen(ResponseEvent evt);
}

View File

@@ -0,0 +1,20 @@
package net.woggioni.gbcs.api.event;
import io.netty.buffer.ByteBuf;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import net.woggioni.gbcs.api.CallHandle;
sealed public abstract class RequestEvent {
@Getter
@RequiredArgsConstructor
public static final class ChunkSent extends RequestEvent {
private final ByteBuf chunk;
}
@Getter
@RequiredArgsConstructor
public static final class LastChunkSent extends RequestEvent {
private final ByteBuf chunk;
}
}

View File

@@ -0,0 +1,28 @@
package net.woggioni.gbcs.api.event;
import io.netty.buffer.ByteBuf;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
sealed public abstract class ResponseEvent {
@Getter
@RequiredArgsConstructor
public final static class ChunkReceived extends ResponseEvent {
private final ByteBuf chunk;
}
public final static class NoContent extends ResponseEvent {
}
@Getter
@RequiredArgsConstructor
public final static class LastChunkReceived extends ResponseEvent {
private final ByteBuf chunk;
}
@Getter
@RequiredArgsConstructor
public final static class ExceptionCaught extends ResponseEvent {
private final Throwable cause;
}
}

View File

@@ -26,13 +26,24 @@ configurations {
canBeResolved = true
visible = true
}
testImplementation {
extendsFrom compileOnly
}
}
dependencies {
compileOnly project(':gbcs-common')
compileOnly project(':gbcs-api')
compileOnly catalog.jwo
compileOnly catalog.slf4j.api
implementation catalog.xmemcached
implementation catalog.netty.codec.memcache
implementation catalog.netty.common
implementation group: 'io.netty', name: 'netty-handler', version: catalog.versions.netty.get()
testRuntimeOnly catalog.logback.classic
}
Provider<Tar> bundleTask = tasks.register("bundle", Tar) {
@@ -41,6 +52,11 @@ Provider<Tar> bundleTask = tasks.register("bundle", Tar) {
group = BasePlugin.BUILD_GROUP
}
tasks.named(JavaPlugin.TEST_TASK_NAME, Test) {
systemProperty("io.netty.leakDetectionLevel", "PARANOID")
}
tasks.named(BasePlugin.ASSEMBLE_TASK_NAME) {
dependsOn(bundleTask)
}

View File

@@ -7,6 +7,13 @@ module net.woggioni.gbcs.server.memcached {
requires net.woggioni.jwo;
requires java.xml;
requires kotlin.stdlib;
requires io.netty.common;
requires io.netty.handler;
requires io.netty.codec.memcache;
requires io.netty.transport;
requires org.slf4j;
requires io.netty.buffer;
requires io.netty.codec;
provides CacheProvider with net.woggioni.gbcs.server.memcached.MemcachedCacheProvider;

View File

@@ -0,0 +1,33 @@
package net.woggioni.gbcs.server.memcached
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.stream.ChunkedInput
import java.nio.channels.ReadableByteChannel
class CustomChunkedInput(private val readableByteChannel: ReadableByteChannel) : ChunkedInput<ByteBuf> {
override fun isEndOfInput(): Boolean {
TODO("Not yet implemented")
}
override fun close() {
TODO("Not yet implemented")
}
override fun readChunk(ctx: ChannelHandlerContext): ByteBuf {
TODO("Not yet implemented")
}
override fun readChunk(allocator: ByteBufAllocator): ByteBuf {
TODO("Not yet implemented")
}
override fun length(): Long {
TODO("Not yet implemented")
}
override fun progress(): Long {
TODO("Not yet implemented")
}
}

View File

@@ -0,0 +1,4 @@
package net.woggioni.gbcs.server.memcached
class MemcachedException(status : Short, msg : String? = null, cause : Throwable? = null)
: RuntimeException(msg ?: "Memcached status $status", cause)

View File

@@ -1,59 +1,85 @@
package net.woggioni.gbcs.server.memcached
import net.rubyeye.xmemcached.XMemcachedClientBuilder
import net.rubyeye.xmemcached.command.BinaryCommandFactory
import net.rubyeye.xmemcached.transcoders.CompressionMode
import net.rubyeye.xmemcached.transcoders.SerializingTranscoder
import io.netty.buffer.Unpooled
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus
import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.api.exception.ContentTooLargeException
import net.woggioni.gbcs.common.HostAndPort
import net.woggioni.jwo.JWO
import java.io.ByteArrayInputStream
import java.net.InetSocketAddress
import java.nio.channels.Channels
import java.nio.channels.ReadableByteChannel
import java.nio.charset.StandardCharsets
import java.security.MessageDigest
import java.time.Duration
import net.woggioni.gbcs.api.CallHandle
import net.woggioni.gbcs.api.ResponseEventListener
import net.woggioni.gbcs.api.event.RequestEvent
import net.woggioni.gbcs.api.event.ResponseEvent
import net.woggioni.gbcs.server.memcached.client.MemcachedClient
import net.woggioni.gbcs.server.memcached.client.ResponseEvent.ExceptionCaught
import net.woggioni.gbcs.server.memcached.client.ResponseEvent.LastResponseContentChunkReceived
import net.woggioni.gbcs.server.memcached.client.ResponseEvent.ResponseContentChunkReceived
import net.woggioni.gbcs.server.memcached.client.ResponseEvent.ResponseReceived
import net.woggioni.gbcs.server.memcached.client.ResponseListener
import java.util.concurrent.CompletableFuture
class MemcachedCache(
servers: List<HostAndPort>,
private val maxAge: Duration,
maxSize : Int,
digestAlgorithm: String?,
compressionMode: CompressionMode,
private val cfg : MemcachedCacheConfiguration
) : Cache {
private val memcachedClient = XMemcachedClientBuilder(
servers.stream().map { addr: HostAndPort -> InetSocketAddress(addr.host, addr.port) }.toList()
).apply {
commandFactory = BinaryCommandFactory()
digestAlgorithm?.let { dAlg ->
setKeyProvider { key ->
val md = MessageDigest.getInstance(dAlg)
md.update(key.toByteArray(StandardCharsets.UTF_8))
JWO.bytesToHex(md.digest())
}
}
transcoder = SerializingTranscoder(maxSize).apply {
setCompressionMode(compressionMode)
}
}.build()
override fun get(key: String): ReadableByteChannel? {
return memcachedClient.get<ByteArray>(key)
?.let(::ByteArrayInputStream)
?.let(Channels::newChannel)
}
override fun put(key: String, content: ByteArray) {
try {
memcachedClient[key, maxAge.toSeconds().toInt()] = content
} catch (e: IllegalArgumentException) {
throw ContentTooLargeException(e.message, e)
}
}
private val client = MemcachedClient(cfg)
override fun close() {
memcachedClient.shutdown()
client.close()
}
override fun get(key: String, responseEventListener: ResponseEventListener): CompletableFuture<CallHandle<Void>> {
val listener = ResponseListener { evt ->
when(evt) {
is ResponseContentChunkReceived -> {
responseEventListener.listen(ResponseEvent.ChunkReceived(Unpooled.wrappedBuffer(evt.chunk)))
}
is LastResponseContentChunkReceived -> {
responseEventListener.listen(ResponseEvent.LastChunkReceived(Unpooled.wrappedBuffer(evt.chunk)))
}
is ExceptionCaught -> {
responseEventListener.listen(ResponseEvent.ExceptionCaught(evt.cause))
}
is ResponseReceived -> {
when(val status = evt.response.status) {
BinaryMemcacheResponseStatus.SUCCESS -> {
}
BinaryMemcacheResponseStatus.KEY_ENOENT -> {
responseEventListener.listen(ResponseEvent.NoContent())
}
else -> {
responseEventListener.listen(ResponseEvent.ExceptionCaught(MemcachedException(status)))
}
}
}
}
}
return client.get(key, listener).thenApply { clientCallHandle ->
object : CallHandle<Void> {
override fun postEvent(evt: RequestEvent) {
when(evt) {
is RequestEvent.ChunkSent -> clientCallHandle.sendChunk(evt.chunk.nioBuffer())
is RequestEvent.LastChunkSent -> clientCallHandle.sendChunk(evt.chunk.nioBuffer())
}
}
override fun call(): CompletableFuture<Void> {
return clientCallHandle.waitForResponse().thenApply { null }
}
}
}
}
override fun put(key: String): CompletableFuture<CallHandle<Void>> {
return client.put(key, cfg.maxAge).thenApply { clientCallHandle ->
object : CallHandle<Void> {
override fun postEvent(evt: RequestEvent) {
when(evt) {
is RequestEvent.ChunkSent -> clientCallHandle.sendChunk(evt.chunk.nioBuffer())
is RequestEvent.LastChunkSent -> clientCallHandle.sendChunk(evt.chunk.nioBuffer())
}
}
override fun call(): CompletableFuture<Void> {
return clientCallHandle.waitForResponse().thenApply { null }
}
}
}
}
}

View File

@@ -1,25 +1,45 @@
package net.woggioni.gbcs.server.memcached
import net.rubyeye.xmemcached.transcoders.CompressionMode
import net.woggioni.gbcs.api.Configuration
import net.woggioni.gbcs.common.HostAndPort
import java.time.Duration
data class MemcachedCacheConfiguration(
var servers: List<HostAndPort>,
var maxAge: Duration = Duration.ofDays(1),
var maxSize: Int = 0x100000,
var digestAlgorithm: String? = null,
var compressionMode: CompressionMode = CompressionMode.ZIP,
val servers: List<Server>,
val maxAge: Duration = Duration.ofDays(1),
val maxSize: Int = 0x100000,
val digestAlgorithm: String? = null,
val compressionMode: CompressionMode? = CompressionMode.DEFLATE,
) : Configuration.Cache {
override fun materialize() = MemcachedCache(
servers,
maxAge,
maxSize,
digestAlgorithm,
compressionMode
enum class CompressionMode {
/**
* Gzip mode
*/
GZIP,
/**
* Deflate mode
*/
DEFLATE
}
class RetryPolicy(
val maxAttempts: Int,
val initialDelayMillis: Long,
val exp: Double
)
data class Server(
val endpoint : HostAndPort,
val connectionTimeoutMillis : Int?,
val retryPolicy : RetryPolicy?,
val maxConnections : Int
)
override fun materialize() = MemcachedCache(this)
override fun getNamespaceURI() = "urn:net.woggioni.gbcs.server.memcached"
override fun getTypeName() = "memcachedCacheType"

View File

@@ -51,7 +51,7 @@ class MemcachedCacheProvider : CacheProvider<MemcachedCacheConfiguration> {
}
return MemcachedCacheConfiguration(
servers,
servers.map { MemcachedCacheConfiguration.Server(it, null, null, 1) },
maxAge,
maxSize,
digestAlgorithm,
@@ -67,8 +67,8 @@ class MemcachedCacheProvider : CacheProvider<MemcachedCacheConfiguration> {
attr("xs:type", "${xmlNamespacePrefix}:$xmlType", GBCS.XML_SCHEMA_NAMESPACE_URI)
for (server in servers) {
node("server") {
attr("host", server.host)
attr("port", server.port.toString())
attr("host", server.endpoint.host)
attr("port", server.endpoint.port.toString())
}
}
attr("max-age", maxAge.toString())

View File

@@ -0,0 +1,9 @@
package net.woggioni.gbcs.server.memcached.client
import java.nio.ByteBuffer
import java.util.concurrent.CompletableFuture
interface CallHandle {
fun sendChunk(requestBodyChunk : ByteBuffer)
fun waitForResponse() : CompletableFuture<Short>
}

View File

@@ -0,0 +1,24 @@
package net.woggioni.gbcs.server.memcached.client
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
import java.nio.ByteBuffer
data class MemcacheResponse(
val status: Short,
val opcode: Byte,
val cas: Long?,
val opaque: Int?,
val key: ByteBuffer?,
val extra: ByteBuffer?
) {
companion object {
fun of(response : BinaryMemcacheResponse) = MemcacheResponse(
response.status(),
response.opcode(),
response.cas(),
response.opaque(),
response.key()?.nioBuffer(),
response.extras()?.nioBuffer()
)
}
}

View File

@@ -0,0 +1,241 @@
package net.woggioni.gbcs.server.memcached.client
import io.netty.bootstrap.Bootstrap
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelOption
import io.netty.channel.ChannelPipeline
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.pool.AbstractChannelPoolHandler
import io.netty.channel.pool.ChannelPool
import io.netty.channel.pool.FixedChannelPool
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.DecoderException
import io.netty.handler.codec.memcache.DefaultLastMemcacheContent
import io.netty.handler.codec.memcache.DefaultMemcacheContent
import io.netty.handler.codec.memcache.LastMemcacheContent
import io.netty.handler.codec.memcache.MemcacheContent
import io.netty.handler.codec.memcache.MemcacheObject
import io.netty.handler.codec.memcache.binary.BinaryMemcacheClientCodec
import io.netty.handler.codec.memcache.binary.BinaryMemcacheOpcodes
import io.netty.handler.codec.memcache.binary.BinaryMemcacheRequest
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus
import io.netty.handler.codec.memcache.binary.DefaultBinaryMemcacheRequest
import io.netty.util.concurrent.GenericFutureListener
import net.woggioni.gbcs.common.GBCS.digest
import net.woggioni.gbcs.common.HostAndPort
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.server.memcached.MemcachedCacheConfiguration
import net.woggioni.gbcs.server.memcached.MemcachedException
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.security.MessageDigest
import java.time.Duration
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import io.netty.util.concurrent.Future as NettyFuture
class MemcachedClient(private val cfg: MemcachedCacheConfiguration) : AutoCloseable {
private val log = contextLogger()
private val group: NioEventLoopGroup
private val connectionPool: MutableMap<HostAndPort, ChannelPool> = ConcurrentHashMap()
init {
group = NioEventLoopGroup()
}
private fun newConnectionPool(server : MemcachedCacheConfiguration.Server) : FixedChannelPool {
val bootstrap = Bootstrap().apply {
group(group)
channel(NioSocketChannel::class.java)
option(ChannelOption.SO_KEEPALIVE, true)
remoteAddress(InetSocketAddress(server.endpoint.host, server.endpoint.port))
server.connectionTimeoutMillis?.let {
option(ChannelOption.CONNECT_TIMEOUT_MILLIS, it)
}
}
val channelPoolHandler = object : AbstractChannelPoolHandler() {
override fun channelCreated(ch: Channel) {
val pipeline: ChannelPipeline = ch.pipeline()
pipeline.addLast(BinaryMemcacheClientCodec())
}
}
return FixedChannelPool(bootstrap, channelPoolHandler, server.maxConnections)
}
private fun sendRequest(request: BinaryMemcacheRequest,
responseListener: ResponseListener?
): CompletableFuture<CallHandle> {
val server = cfg.servers.let { servers ->
if(servers.size > 1) {
val key = request.key().duplicate()
var checksum = 0
while(key.readableBytes() > 4) {
val byte = key.readInt()
checksum = checksum xor byte
}
while(key.readableBytes() > 0) {
val byte = key.readByte()
checksum = checksum xor byte.toInt()
}
servers[checksum % servers.size]
} else {
servers.first()
}
}
val callHandleFuture = CompletableFuture<CallHandle>()
val result = CompletableFuture<Short>()
// Custom handler for processing responses
val pool = connectionPool.computeIfAbsent(server.endpoint) {
newConnectionPool(server)
}
pool.acquire().addListener(object : GenericFutureListener<NettyFuture<Channel>> {
override fun operationComplete(channelFuture: NettyFuture<Channel>) {
if (channelFuture.isSuccess) {
val channel = channelFuture.now
val pipeline = channel.pipeline()
channel.pipeline().addLast("handler", object : SimpleChannelInboundHandler<MemcacheObject>() {
val response : MemcacheResponse? = null
override fun channelRead0(
ctx: ChannelHandlerContext,
msg: MemcacheObject
) {
if(msg is BinaryMemcacheResponse) {
val resp = MemcacheResponse.of(msg)
responseListener?.listen(ResponseEvent.ResponseReceived(resp))
if(msg.totalBodyLength() == msg.keyLength() + msg.extrasLength()) {
result.complete(resp.status)
}
}
if(responseListener != null) {
when (msg) {
is LastMemcacheContent -> {
responseListener.listen(ResponseEvent.LastResponseContentChunkReceived(msg.content().nioBuffer()))
result.complete(response?.status)
pipeline.removeLast()
pool.release(channel)
}
is MemcacheContent -> {
responseListener.listen(ResponseEvent.ResponseContentChunkReceived(msg.content().nioBuffer()))
}
}
}
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
val ex = when (cause) {
is DecoderException -> cause.cause!!
else -> cause
}
responseListener?.listen(ResponseEvent.ExceptionCaught(ex))
result.completeExceptionally(ex)
ctx.close()
pipeline.removeLast()
pool.release(channel)
}
})
val chunks = mutableListOf <ByteBuffer>()
fun sendRequest() {
val valueLen = chunks.fold(0) { acc : Int, c2 : ByteBuffer ->
acc + c2.remaining()
}
request.setTotalBodyLength(request.keyLength() + request.extrasLength() + valueLen)
channel.write(request)
for((i, chunk) in chunks.withIndex()) {
if(i + 1 < chunks.size) {
channel.write(DefaultMemcacheContent(Unpooled.wrappedBuffer(chunk)))
} else {
channel.write(DefaultLastMemcacheContent(Unpooled.wrappedBuffer(chunk)))
}
}
channel.flush()
}
callHandleFuture.complete(object : CallHandle {
override fun sendChunk(requestBodyChunk: ByteBuffer) {
chunks.addLast(requestBodyChunk)
}
override fun waitForResponse(): CompletableFuture<Short> {
sendRequest()
return result
}
})
} else {
callHandleFuture.completeExceptionally(channelFuture.cause())
}
}
})
return callHandleFuture
}
private fun encodeExpiry(expiry: Duration) : Int {
val expirySeconds = expiry.toSeconds()
return expirySeconds.toInt().takeIf { it.toLong() == expirySeconds }
?: Instant.ofEpochSecond(expirySeconds).epochSecond.toInt()
}
fun get(key: String, responseListener: ResponseListener) : CompletableFuture<CallHandle> {
val request = (cfg.digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digest(key.toByteArray(), md)
} ?: key.toByteArray(Charsets.UTF_8)).let { digest ->
DefaultBinaryMemcacheRequest().apply {
setKey(Unpooled.wrappedBuffer(digest))
setOpcode(BinaryMemcacheOpcodes.GET)
}
}
return sendRequest(request, responseListener)
}
fun put(key: String, expiry : Duration, cas : Long? = null): CompletableFuture<CallHandle> {
val request = (cfg.digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digest(key.toByteArray(), md)
} ?: key.toByteArray(Charsets.UTF_8)).let { digest ->
val extras = Unpooled.buffer(8, 8)
extras.writeInt(0)
extras.writeInt(encodeExpiry(expiry))
DefaultBinaryMemcacheRequest().apply {
setExtras(extras)
setKey(Unpooled.wrappedBuffer(digest))
setOpcode(BinaryMemcacheOpcodes.SET)
cas?.let(this::setCas)
}
}
return sendRequest(request) { evt ->
when (evt) {
is ResponseEvent.ResponseReceived -> {
if (evt.response.status != BinaryMemcacheResponseStatus.SUCCESS) {
throw MemcachedException(evt.response.status)
}
}
else -> {}
}
}
}
fun shutDown(): NettyFuture<*> {
return group.shutdownGracefully()
}
override fun close() {
shutDown().sync()
}
}

View File

@@ -0,0 +1,10 @@
package net.woggioni.gbcs.server.memcached.client
import java.nio.ByteBuffer
sealed interface ResponseEvent {
class ResponseReceived(val response : MemcacheResponse) : ResponseEvent
class ResponseContentChunkReceived(val chunk: ByteBuffer) : ResponseEvent
class LastResponseContentChunkReceived(val chunk: ByteBuffer) : ResponseEvent
class ExceptionCaught(val cause : Throwable) : ResponseEvent
}

View File

@@ -0,0 +1,5 @@
package net.woggioni.gbcs.server.memcached.client
fun interface ResponseListener {
fun listen(evt : ResponseEvent)
}

View File

@@ -0,0 +1,80 @@
package net.woggioni.gbcs.server.memcached.test
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus
import net.woggioni.gbcs.api.event.ChunkReceived
import net.woggioni.gbcs.common.HostAndPort
import net.woggioni.gbcs.server.memcached.MemcachedCacheConfiguration
import net.woggioni.gbcs.server.memcached.client.MemcacheResponse
import net.woggioni.gbcs.server.memcached.client.MemcachedClient
import net.woggioni.gbcs.server.memcached.client.ResponseEvent
import net.woggioni.gbcs.server.memcached.client.ResponseListener
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import java.nio.ByteBuffer
import java.security.SecureRandom
import java.time.Duration
import java.util.Objects
import java.util.concurrent.TimeUnit
import kotlin.random.Random
class MemcachedClientTest {
@Test
fun test() {
val client = MemcachedClient(MemcachedCacheConfiguration(
servers = listOf(
MemcachedCacheConfiguration.Server(
endpoint = HostAndPort("127.0.0.1", 11211),
connectionTimeoutMillis = null,
retryPolicy = null,
maxConnections = 1
)
)
))
val random = Random(SecureRandom.getInstance("NativePRNGNonBlocking").nextLong())
val key = "101325"
val value = random.nextBytes(0x1000)
val requestListener = client.put(key, Duration.ofDays(2), null)
val response = Unpooled.buffer(value.size)
requestListener.thenCompose { listener ->
listener.sendChunk(ByteBuffer.wrap(value))
listener.waitForResponse()
}.get(10, TimeUnit.SECONDS)
client.get(key, object: ResponseListener {
override fun listen(evt: ResponseEvent) {
when(evt) {
is ResponseEvent.ResponseReceived -> {
if(evt.response.status != BinaryMemcacheResponseStatus.SUCCESS) {
Assertions.fail<String> {
"Memcache status ${evt.response.status}"
}
}
}
is ResponseEvent.ResponseContentChunkReceived -> response.writeBytes(evt.chunk)
else -> {}
}
}
}).thenCompose { it.waitForResponse() }.get(1, TimeUnit.SECONDS)
val retrievedResponse = response.array()
Assertions.assertArrayEquals(value, retrievedResponse)
}
@Test
fun test2() {
val a1 = ByteArray(10) {
it.toByte()
}
val a2 = ByteArray(10) {
it.toByte()
}
Assertions.assertTrue(Objects.equals(a1, a1))
}
}

View File

@@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration>
<configuration>
<import class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"/>
<import class="ch.qos.logback.core.ConsoleAppender"/>
<appender name="console" class="ConsoleAppender">
<target>System.err</target>
<encoder class="PatternLayoutEncoder">
<pattern>%d [%highlight(%-5level)] \(%thread\) %logger{36} -%kvp- %msg %n</pattern>
</encoder>
</appender>
<root level="info">
<appender-ref ref="console"/>
</root>
<logger name="io.netty" level="debug"/>
<logger name="com.google.code.yanf4j" level="warn"/>
<logger name="net.rubyeye.xmemcached" level="warn"/>
</configuration>

View File

@@ -1,36 +1,83 @@
package net.woggioni.gbcs.server.handler
import io.netty.buffer.Unpooled
import io.netty.channel.ChannelFutureListener
import io.netty.channel.ChannelHandler
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.DefaultFileRegion
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.http.DefaultFullHttpResponse
import io.netty.handler.codec.http.DefaultHttpContent
import io.netty.handler.codec.http.DefaultHttpResponse
import io.netty.handler.codec.http.FullHttpRequest
import io.netty.handler.codec.http.DefaultLastHttpContent
import io.netty.handler.codec.http.HttpContent
import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpHeaderValues
import io.netty.handler.codec.http.HttpMessage
import io.netty.handler.codec.http.HttpMethod
import io.netty.handler.codec.http.HttpRequest
import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpUtil
import io.netty.handler.codec.http.LastHttpContent
import io.netty.handler.stream.ChunkedNioStream
import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.api.exception.CacheException
import net.woggioni.gbcs.api.CallHandle
import net.woggioni.gbcs.api.ResponseEventListener
import net.woggioni.gbcs.api.event.RequestEvent
import net.woggioni.gbcs.api.event.ResponseEvent
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.server.debug
import net.woggioni.gbcs.server.warn
import java.nio.channels.FileChannel
import java.nio.file.Path
import java.util.concurrent.CompletableFuture
@ChannelHandler.Sharable
class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
SimpleChannelInboundHandler<FullHttpRequest>() {
SimpleChannelInboundHandler<HttpMessage>() {
companion object {
@JvmStatic
private val log = contextLogger()
}
override fun channelRead0(ctx: ChannelHandlerContext, msg: FullHttpRequest) {
private data class TransientContext(
var key: String?,
var callHandle: CompletableFuture<CallHandle<Void>>
)
private var transientContext: TransientContext? = null
override fun channelRead0(ctx: ChannelHandlerContext, msg: HttpMessage) {
when (msg) {
is HttpRequest -> {
handleRequest(ctx, msg)
}
is LastHttpContent -> {
transientContext?.run {
callHandle.thenCompose { callHandle ->
callHandle.postEvent(RequestEvent.LastChunkSent(msg.content()))
callHandle.call()
}.thenApply {
val response = DefaultFullHttpResponse(
msg.protocolVersion(), HttpResponseStatus.CREATED,
key?.let(String::toByteArray)
?.let(Unpooled::copiedBuffer)
)
// response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes()
ctx.writeAndFlush(response)
}
}
}
is HttpContent -> {
transientContext?.run {
callHandle = callHandle.thenApply { it ->
it.postEvent(RequestEvent.ChunkSent(msg.content()))
it
}
}
}
}
}
private fun handleRequest(ctx: ChannelHandlerContext, msg: HttpRequest) {
val keepAlive: Boolean = HttpUtil.isKeepAlive(msg)
val method = msg.method()
if (method === HttpMethod.GET) {
@@ -43,49 +90,61 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
return
}
if (serverPrefix == prefix) {
try {
cache.get(key)
} catch(ex : Throwable) {
throw CacheException("Error accessing the cache backend", ex)
}?.let { channel ->
cache.get(key, object : ResponseEventListener {
var first = false
override fun listen(evt: ResponseEvent) {
when (evt) {
is ResponseEvent.NoContent -> {
log.debug(ctx) {
"Cache miss for key '$key'"
}
val response =
DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response)
}
is ResponseEvent.ChunkReceived, is ResponseEvent.LastChunkReceived -> {
if (first) {
first = false
log.debug(ctx) {
"Cache hit for key '$key'"
}
val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK)
response.headers()[HttpHeaderNames.CONTENT_TYPE] = HttpHeaderValues.APPLICATION_OCTET_STREAM
response.headers()[HttpHeaderNames.CONTENT_TYPE] =
HttpHeaderValues.APPLICATION_OCTET_STREAM
if (!keepAlive) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.IDENTITY)
response.headers()
.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.IDENTITY)
} else {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
response.headers()
.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
response.headers()
.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
}
ctx.write(response)
when (channel) {
is FileChannel -> {
if (keepAlive) {
ctx.write(DefaultFileRegion(channel, 0, channel.size()))
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate())
} else {
ctx.writeAndFlush(DefaultFileRegion(channel, 0, channel.size()))
.addListener(ChannelFutureListener.CLOSE)
}
if (evt is ResponseEvent.LastChunkReceived)
ctx.write(DefaultLastHttpContent(evt.chunk))
else if (evt is ResponseEvent.ChunkReceived)
ctx.write(DefaultHttpContent(evt.chunk))
ctx.flush()
}
is ResponseEvent.ExceptionCaught -> {
log.error(evt.cause.message, evt.cause)
val errorResponse = DefaultFullHttpResponse(
msg.protocolVersion(), HttpResponseStatus.INTERNAL_SERVER_ERROR,
evt.cause.message
?.let(String::toByteArray)
?.let(Unpooled::copiedBuffer)
)
ctx.write(errorResponse)
}
}
else -> {
ctx.write(ChunkedNioStream(channel)).addListener { evt ->
channel.close()
}
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate())
}
}
} ?: let {
log.debug(ctx) {
"Cache miss for key '$key'"
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response)
}
}).thenCompose(CallHandle<Void>::call)
} else {
log.warn(ctx) {
"Got request for unhandled path '${msg.uri()}'"
@@ -93,35 +152,22 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.BAD_REQUEST)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response)
ctx.channel().read()
}
} else if (method === HttpMethod.PUT) {
val path = Path.of(msg.uri())
val prefix = path.parent
val key = path.fileName.toString()
if (serverPrefix == prefix) {
log.debug(ctx) {
"Added value for key '$key' to build cache"
}
val bodyBytes = msg.content().run {
if (isDirect) {
ByteArray(readableBytes()).also {
readBytes(it)
}
} else {
array()
}
}
try {
cache.put(key, bodyBytes)
} catch(ex : Throwable) {
throw CacheException("Error accessing the cache backend", ex)
}
transientContext = TransientContext(key, cache.put(key))
val response = DefaultFullHttpResponse(
msg.protocolVersion(), HttpResponseStatus.CREATED,
Unpooled.copiedBuffer(key.toByteArray())
)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes()
// response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes()
ctx.writeAndFlush(response)
} else {
log.warn(ctx) {
@@ -133,7 +179,10 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
}
} else if (method == HttpMethod.TRACE) {
val replayedRequestHead = ctx.alloc().buffer()
replayedRequestHead.writeCharSequence("TRACE ${Path.of(msg.uri())} ${msg.protocolVersion().text()}\r\n", Charsets.US_ASCII)
replayedRequestHead.writeCharSequence(
"TRACE ${Path.of(msg.uri())} ${msg.protocolVersion().text()}\r\n",
Charsets.US_ASCII
)
msg.headers().forEach { (key, value) ->
replayedRequestHead.apply {
writeCharSequence(key, Charsets.US_ASCII)
@@ -143,18 +192,30 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
}
}
replayedRequestHead.writeCharSequence("\r\n", Charsets.US_ASCII)
val requestBody = msg.content()
requestBody.retain()
val responseBody = ctx.alloc().compositeBuffer(2).apply {
addComponents(true, replayedRequestHead)
addComponents(true, requestBody)
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK, responseBody)
val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK)
response.headers().apply {
set(HttpHeaderNames.CONTENT_TYPE, "message/http")
set(HttpHeaderNames.CONTENT_LENGTH, responseBody.readableBytes())
}
ctx.writeAndFlush(response)
ctx.write(response)
ctx.writeAndFlush(DefaultHttpContent(replayedRequestHead))
val callHandle = object : CallHandle<Void> {
override fun postEvent(evt: RequestEvent) {
when (evt) {
is RequestEvent.ChunkSent -> {
ctx.writeAndFlush(DefaultHttpContent(evt.chunk))
}
is RequestEvent.LastChunkSent -> {
ctx.writeAndFlush(DefaultLastHttpContent(evt.chunk))
}
}
}
override fun call(): CompletableFuture<Void> {
return CompletableFuture.completedFuture(null)
}
}
transientContext = TransientContext(null, CompletableFuture.completedFuture(callHandle))
} else {
log.warn(ctx) {
"Got request with unhandled method '${msg.method().name()}'"
@@ -163,5 +224,6 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
ctx.writeAndFlush(response)
}
}
}