Compare commits
1 Commits
streaming
...
e5fe8437a6
Author | SHA1 | Date | |
---|---|---|---|
e5fe8437a6
|
@@ -5,6 +5,7 @@ plugins {
|
||||
}
|
||||
|
||||
dependencies {
|
||||
api catalog.netty.buffer
|
||||
}
|
||||
|
||||
publishing {
|
||||
|
@@ -1,6 +1,8 @@
|
||||
module net.woggioni.gbcs.api {
|
||||
requires static lombok;
|
||||
requires java.xml;
|
||||
requires io.netty.buffer;
|
||||
exports net.woggioni.gbcs.api;
|
||||
exports net.woggioni.gbcs.api.exception;
|
||||
exports net.woggioni.gbcs.api.event;
|
||||
}
|
@@ -3,10 +3,10 @@ package net.woggioni.gbcs.api;
|
||||
import net.woggioni.gbcs.api.exception.ContentTooLargeException;
|
||||
|
||||
import java.nio.channels.ReadableByteChannel;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
|
||||
public interface Cache extends AutoCloseable {
|
||||
ReadableByteChannel get(String key);
|
||||
|
||||
void put(String key, byte[] content) throws ContentTooLargeException;
|
||||
CompletableFuture<CallHandle<Void>> get(String key, ResponseEventListener responseEventListener);
|
||||
CompletableFuture<CallHandle<Void>> put(String key) throws ContentTooLargeException;
|
||||
}
|
||||
|
10
gbcs-api/src/main/java/net/woggioni/gbcs/api/CallHandle.java
Normal file
10
gbcs-api/src/main/java/net/woggioni/gbcs/api/CallHandle.java
Normal file
@@ -0,0 +1,10 @@
|
||||
package net.woggioni.gbcs.api;
|
||||
|
||||
import net.woggioni.gbcs.api.event.RequestEvent;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
public interface CallHandle<T> {
|
||||
void postEvent(RequestEvent evt);
|
||||
CompletableFuture<T> call();
|
||||
}
|
@@ -0,0 +1,7 @@
|
||||
package net.woggioni.gbcs.api;
|
||||
|
||||
import net.woggioni.gbcs.api.event.ResponseEvent;
|
||||
|
||||
public interface ResponseEventListener {
|
||||
void listen(ResponseEvent evt);
|
||||
}
|
@@ -0,0 +1,20 @@
|
||||
package net.woggioni.gbcs.api.event;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import lombok.Getter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import net.woggioni.gbcs.api.CallHandle;
|
||||
|
||||
sealed public abstract class RequestEvent {
|
||||
@Getter
|
||||
@RequiredArgsConstructor
|
||||
public static final class ChunkSent extends RequestEvent {
|
||||
private final ByteBuf chunk;
|
||||
}
|
||||
|
||||
@Getter
|
||||
@RequiredArgsConstructor
|
||||
public static final class LastChunkSent extends RequestEvent {
|
||||
private final ByteBuf chunk;
|
||||
}
|
||||
}
|
@@ -0,0 +1,28 @@
|
||||
package net.woggioni.gbcs.api.event;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import lombok.Getter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
|
||||
sealed public abstract class ResponseEvent {
|
||||
@Getter
|
||||
@RequiredArgsConstructor
|
||||
public final static class ChunkReceived extends ResponseEvent {
|
||||
private final ByteBuf chunk;
|
||||
}
|
||||
|
||||
public final static class NoContent extends ResponseEvent {
|
||||
}
|
||||
|
||||
@Getter
|
||||
@RequiredArgsConstructor
|
||||
public final static class LastChunkReceived extends ResponseEvent {
|
||||
private final ByteBuf chunk;
|
||||
}
|
||||
|
||||
@Getter
|
||||
@RequiredArgsConstructor
|
||||
public final static class ExceptionCaught extends ResponseEvent {
|
||||
private final Throwable cause;
|
||||
}
|
||||
}
|
@@ -26,13 +26,24 @@ configurations {
|
||||
canBeResolved = true
|
||||
visible = true
|
||||
}
|
||||
|
||||
testImplementation {
|
||||
extendsFrom compileOnly
|
||||
}
|
||||
}
|
||||
|
||||
dependencies {
|
||||
compileOnly project(':gbcs-common')
|
||||
compileOnly project(':gbcs-api')
|
||||
compileOnly catalog.jwo
|
||||
compileOnly catalog.slf4j.api
|
||||
implementation catalog.xmemcached
|
||||
implementation catalog.netty.codec.memcache
|
||||
implementation catalog.netty.common
|
||||
implementation group: 'io.netty', name: 'netty-handler', version: catalog.versions.netty.get()
|
||||
|
||||
testRuntimeOnly catalog.logback.classic
|
||||
|
||||
}
|
||||
|
||||
Provider<Tar> bundleTask = tasks.register("bundle", Tar) {
|
||||
@@ -41,6 +52,11 @@ Provider<Tar> bundleTask = tasks.register("bundle", Tar) {
|
||||
group = BasePlugin.BUILD_GROUP
|
||||
}
|
||||
|
||||
tasks.named(JavaPlugin.TEST_TASK_NAME, Test) {
|
||||
systemProperty("io.netty.leakDetectionLevel", "PARANOID")
|
||||
}
|
||||
|
||||
|
||||
tasks.named(BasePlugin.ASSEMBLE_TASK_NAME) {
|
||||
dependsOn(bundleTask)
|
||||
}
|
||||
|
@@ -7,6 +7,13 @@ module net.woggioni.gbcs.server.memcached {
|
||||
requires net.woggioni.jwo;
|
||||
requires java.xml;
|
||||
requires kotlin.stdlib;
|
||||
requires io.netty.common;
|
||||
requires io.netty.handler;
|
||||
requires io.netty.codec.memcache;
|
||||
requires io.netty.transport;
|
||||
requires org.slf4j;
|
||||
requires io.netty.buffer;
|
||||
requires io.netty.codec;
|
||||
|
||||
provides CacheProvider with net.woggioni.gbcs.server.memcached.MemcachedCacheProvider;
|
||||
|
||||
|
@@ -0,0 +1,33 @@
|
||||
package net.woggioni.gbcs.server.memcached
|
||||
|
||||
import io.netty.buffer.ByteBuf
|
||||
import io.netty.buffer.ByteBufAllocator
|
||||
import io.netty.channel.ChannelHandlerContext
|
||||
import io.netty.handler.stream.ChunkedInput
|
||||
import java.nio.channels.ReadableByteChannel
|
||||
|
||||
class CustomChunkedInput(private val readableByteChannel: ReadableByteChannel) : ChunkedInput<ByteBuf> {
|
||||
override fun isEndOfInput(): Boolean {
|
||||
TODO("Not yet implemented")
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
TODO("Not yet implemented")
|
||||
}
|
||||
|
||||
override fun readChunk(ctx: ChannelHandlerContext): ByteBuf {
|
||||
TODO("Not yet implemented")
|
||||
}
|
||||
|
||||
override fun readChunk(allocator: ByteBufAllocator): ByteBuf {
|
||||
TODO("Not yet implemented")
|
||||
}
|
||||
|
||||
override fun length(): Long {
|
||||
TODO("Not yet implemented")
|
||||
}
|
||||
|
||||
override fun progress(): Long {
|
||||
TODO("Not yet implemented")
|
||||
}
|
||||
}
|
@@ -0,0 +1,4 @@
|
||||
package net.woggioni.gbcs.server.memcached
|
||||
|
||||
class MemcachedException(status : Short, msg : String? = null, cause : Throwable? = null)
|
||||
: RuntimeException(msg ?: "Memcached status $status", cause)
|
@@ -1,59 +1,85 @@
|
||||
package net.woggioni.gbcs.server.memcached
|
||||
|
||||
import net.rubyeye.xmemcached.XMemcachedClientBuilder
|
||||
import net.rubyeye.xmemcached.command.BinaryCommandFactory
|
||||
import net.rubyeye.xmemcached.transcoders.CompressionMode
|
||||
import net.rubyeye.xmemcached.transcoders.SerializingTranscoder
|
||||
import io.netty.buffer.Unpooled
|
||||
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus
|
||||
import net.woggioni.gbcs.api.Cache
|
||||
import net.woggioni.gbcs.api.exception.ContentTooLargeException
|
||||
import net.woggioni.gbcs.common.HostAndPort
|
||||
import net.woggioni.jwo.JWO
|
||||
import java.io.ByteArrayInputStream
|
||||
import java.net.InetSocketAddress
|
||||
import java.nio.channels.Channels
|
||||
import java.nio.channels.ReadableByteChannel
|
||||
import java.nio.charset.StandardCharsets
|
||||
import java.security.MessageDigest
|
||||
import java.time.Duration
|
||||
import net.woggioni.gbcs.api.CallHandle
|
||||
import net.woggioni.gbcs.api.ResponseEventListener
|
||||
import net.woggioni.gbcs.api.event.RequestEvent
|
||||
import net.woggioni.gbcs.api.event.ResponseEvent
|
||||
import net.woggioni.gbcs.server.memcached.client.MemcachedClient
|
||||
import net.woggioni.gbcs.server.memcached.client.ResponseEvent.ExceptionCaught
|
||||
import net.woggioni.gbcs.server.memcached.client.ResponseEvent.LastResponseContentChunkReceived
|
||||
import net.woggioni.gbcs.server.memcached.client.ResponseEvent.ResponseContentChunkReceived
|
||||
import net.woggioni.gbcs.server.memcached.client.ResponseEvent.ResponseReceived
|
||||
import net.woggioni.gbcs.server.memcached.client.ResponseListener
|
||||
import java.util.concurrent.CompletableFuture
|
||||
|
||||
class MemcachedCache(
|
||||
servers: List<HostAndPort>,
|
||||
private val maxAge: Duration,
|
||||
maxSize : Int,
|
||||
digestAlgorithm: String?,
|
||||
compressionMode: CompressionMode,
|
||||
private val cfg : MemcachedCacheConfiguration
|
||||
) : Cache {
|
||||
private val memcachedClient = XMemcachedClientBuilder(
|
||||
servers.stream().map { addr: HostAndPort -> InetSocketAddress(addr.host, addr.port) }.toList()
|
||||
).apply {
|
||||
commandFactory = BinaryCommandFactory()
|
||||
digestAlgorithm?.let { dAlg ->
|
||||
setKeyProvider { key ->
|
||||
val md = MessageDigest.getInstance(dAlg)
|
||||
md.update(key.toByteArray(StandardCharsets.UTF_8))
|
||||
JWO.bytesToHex(md.digest())
|
||||
}
|
||||
}
|
||||
transcoder = SerializingTranscoder(maxSize).apply {
|
||||
setCompressionMode(compressionMode)
|
||||
}
|
||||
}.build()
|
||||
|
||||
override fun get(key: String): ReadableByteChannel? {
|
||||
return memcachedClient.get<ByteArray>(key)
|
||||
?.let(::ByteArrayInputStream)
|
||||
?.let(Channels::newChannel)
|
||||
}
|
||||
|
||||
override fun put(key: String, content: ByteArray) {
|
||||
try {
|
||||
memcachedClient[key, maxAge.toSeconds().toInt()] = content
|
||||
} catch (e: IllegalArgumentException) {
|
||||
throw ContentTooLargeException(e.message, e)
|
||||
}
|
||||
}
|
||||
private val client = MemcachedClient(cfg)
|
||||
|
||||
override fun close() {
|
||||
memcachedClient.shutdown()
|
||||
client.close()
|
||||
}
|
||||
|
||||
override fun get(key: String, responseEventListener: ResponseEventListener): CompletableFuture<CallHandle<Void>> {
|
||||
val listener = ResponseListener { evt ->
|
||||
when(evt) {
|
||||
is ResponseContentChunkReceived -> {
|
||||
responseEventListener.listen(ResponseEvent.ChunkReceived(Unpooled.wrappedBuffer(evt.chunk)))
|
||||
}
|
||||
is LastResponseContentChunkReceived -> {
|
||||
responseEventListener.listen(ResponseEvent.LastChunkReceived(Unpooled.wrappedBuffer(evt.chunk)))
|
||||
}
|
||||
is ExceptionCaught -> {
|
||||
responseEventListener.listen(ResponseEvent.ExceptionCaught(evt.cause))
|
||||
}
|
||||
is ResponseReceived -> {
|
||||
when(val status = evt.response.status) {
|
||||
BinaryMemcacheResponseStatus.SUCCESS -> {
|
||||
}
|
||||
BinaryMemcacheResponseStatus.KEY_ENOENT -> {
|
||||
responseEventListener.listen(ResponseEvent.NoContent())
|
||||
}
|
||||
else -> {
|
||||
responseEventListener.listen(ResponseEvent.ExceptionCaught(MemcachedException(status)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return client.get(key, listener).thenApply { clientCallHandle ->
|
||||
object : CallHandle<Void> {
|
||||
override fun postEvent(evt: RequestEvent) {
|
||||
when(evt) {
|
||||
is RequestEvent.ChunkSent -> clientCallHandle.sendChunk(evt.chunk.nioBuffer())
|
||||
is RequestEvent.LastChunkSent -> clientCallHandle.sendChunk(evt.chunk.nioBuffer())
|
||||
}
|
||||
}
|
||||
|
||||
override fun call(): CompletableFuture<Void> {
|
||||
return clientCallHandle.waitForResponse().thenApply { null }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun put(key: String): CompletableFuture<CallHandle<Void>> {
|
||||
return client.put(key, cfg.maxAge).thenApply { clientCallHandle ->
|
||||
object : CallHandle<Void> {
|
||||
override fun postEvent(evt: RequestEvent) {
|
||||
when(evt) {
|
||||
is RequestEvent.ChunkSent -> clientCallHandle.sendChunk(evt.chunk.nioBuffer())
|
||||
is RequestEvent.LastChunkSent -> clientCallHandle.sendChunk(evt.chunk.nioBuffer())
|
||||
}
|
||||
}
|
||||
|
||||
override fun call(): CompletableFuture<Void> {
|
||||
return clientCallHandle.waitForResponse().thenApply { null }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -1,25 +1,45 @@
|
||||
package net.woggioni.gbcs.server.memcached
|
||||
|
||||
import net.rubyeye.xmemcached.transcoders.CompressionMode
|
||||
import net.woggioni.gbcs.api.Configuration
|
||||
import net.woggioni.gbcs.common.HostAndPort
|
||||
import java.time.Duration
|
||||
|
||||
data class MemcachedCacheConfiguration(
|
||||
var servers: List<HostAndPort>,
|
||||
var maxAge: Duration = Duration.ofDays(1),
|
||||
var maxSize: Int = 0x100000,
|
||||
var digestAlgorithm: String? = null,
|
||||
var compressionMode: CompressionMode = CompressionMode.ZIP,
|
||||
val servers: List<Server>,
|
||||
val maxAge: Duration = Duration.ofDays(1),
|
||||
val maxSize: Int = 0x100000,
|
||||
val digestAlgorithm: String? = null,
|
||||
val compressionMode: CompressionMode? = CompressionMode.DEFLATE,
|
||||
) : Configuration.Cache {
|
||||
override fun materialize() = MemcachedCache(
|
||||
servers,
|
||||
maxAge,
|
||||
maxSize,
|
||||
digestAlgorithm,
|
||||
compressionMode
|
||||
|
||||
enum class CompressionMode {
|
||||
/**
|
||||
* Gzip mode
|
||||
*/
|
||||
GZIP,
|
||||
|
||||
/**
|
||||
* Deflate mode
|
||||
*/
|
||||
DEFLATE
|
||||
}
|
||||
|
||||
class RetryPolicy(
|
||||
val maxAttempts: Int,
|
||||
val initialDelayMillis: Long,
|
||||
val exp: Double
|
||||
)
|
||||
|
||||
data class Server(
|
||||
val endpoint : HostAndPort,
|
||||
val connectionTimeoutMillis : Int?,
|
||||
val retryPolicy : RetryPolicy?,
|
||||
val maxConnections : Int
|
||||
)
|
||||
|
||||
|
||||
override fun materialize() = MemcachedCache(this)
|
||||
|
||||
override fun getNamespaceURI() = "urn:net.woggioni.gbcs.server.memcached"
|
||||
|
||||
override fun getTypeName() = "memcachedCacheType"
|
||||
|
@@ -51,7 +51,7 @@ class MemcachedCacheProvider : CacheProvider<MemcachedCacheConfiguration> {
|
||||
}
|
||||
|
||||
return MemcachedCacheConfiguration(
|
||||
servers,
|
||||
servers.map { MemcachedCacheConfiguration.Server(it, null, null, 1) },
|
||||
maxAge,
|
||||
maxSize,
|
||||
digestAlgorithm,
|
||||
@@ -67,8 +67,8 @@ class MemcachedCacheProvider : CacheProvider<MemcachedCacheConfiguration> {
|
||||
attr("xs:type", "${xmlNamespacePrefix}:$xmlType", GBCS.XML_SCHEMA_NAMESPACE_URI)
|
||||
for (server in servers) {
|
||||
node("server") {
|
||||
attr("host", server.host)
|
||||
attr("port", server.port.toString())
|
||||
attr("host", server.endpoint.host)
|
||||
attr("port", server.endpoint.port.toString())
|
||||
}
|
||||
}
|
||||
attr("max-age", maxAge.toString())
|
||||
|
@@ -0,0 +1,9 @@
|
||||
package net.woggioni.gbcs.server.memcached.client
|
||||
|
||||
import java.nio.ByteBuffer
|
||||
import java.util.concurrent.CompletableFuture
|
||||
|
||||
interface CallHandle {
|
||||
fun sendChunk(requestBodyChunk : ByteBuffer)
|
||||
fun waitForResponse() : CompletableFuture<Short>
|
||||
}
|
@@ -0,0 +1,24 @@
|
||||
package net.woggioni.gbcs.server.memcached.client
|
||||
|
||||
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
|
||||
import java.nio.ByteBuffer
|
||||
|
||||
data class MemcacheResponse(
|
||||
val status: Short,
|
||||
val opcode: Byte,
|
||||
val cas: Long?,
|
||||
val opaque: Int?,
|
||||
val key: ByteBuffer?,
|
||||
val extra: ByteBuffer?
|
||||
) {
|
||||
companion object {
|
||||
fun of(response : BinaryMemcacheResponse) = MemcacheResponse(
|
||||
response.status(),
|
||||
response.opcode(),
|
||||
response.cas(),
|
||||
response.opaque(),
|
||||
response.key()?.nioBuffer(),
|
||||
response.extras()?.nioBuffer()
|
||||
)
|
||||
}
|
||||
}
|
@@ -0,0 +1,241 @@
|
||||
package net.woggioni.gbcs.server.memcached.client
|
||||
|
||||
import io.netty.bootstrap.Bootstrap
|
||||
import io.netty.buffer.ByteBuf
|
||||
import io.netty.buffer.Unpooled
|
||||
import io.netty.channel.Channel
|
||||
import io.netty.channel.ChannelHandlerContext
|
||||
import io.netty.channel.ChannelOption
|
||||
import io.netty.channel.ChannelPipeline
|
||||
import io.netty.channel.SimpleChannelInboundHandler
|
||||
import io.netty.channel.nio.NioEventLoopGroup
|
||||
import io.netty.channel.pool.AbstractChannelPoolHandler
|
||||
import io.netty.channel.pool.ChannelPool
|
||||
import io.netty.channel.pool.FixedChannelPool
|
||||
import io.netty.channel.socket.nio.NioSocketChannel
|
||||
import io.netty.handler.codec.DecoderException
|
||||
import io.netty.handler.codec.memcache.DefaultLastMemcacheContent
|
||||
import io.netty.handler.codec.memcache.DefaultMemcacheContent
|
||||
import io.netty.handler.codec.memcache.LastMemcacheContent
|
||||
import io.netty.handler.codec.memcache.MemcacheContent
|
||||
import io.netty.handler.codec.memcache.MemcacheObject
|
||||
import io.netty.handler.codec.memcache.binary.BinaryMemcacheClientCodec
|
||||
import io.netty.handler.codec.memcache.binary.BinaryMemcacheOpcodes
|
||||
import io.netty.handler.codec.memcache.binary.BinaryMemcacheRequest
|
||||
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
|
||||
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus
|
||||
import io.netty.handler.codec.memcache.binary.DefaultBinaryMemcacheRequest
|
||||
import io.netty.util.concurrent.GenericFutureListener
|
||||
import net.woggioni.gbcs.common.GBCS.digest
|
||||
import net.woggioni.gbcs.common.HostAndPort
|
||||
import net.woggioni.gbcs.common.contextLogger
|
||||
import net.woggioni.gbcs.server.memcached.MemcachedCacheConfiguration
|
||||
import net.woggioni.gbcs.server.memcached.MemcachedException
|
||||
import java.net.InetSocketAddress
|
||||
import java.nio.ByteBuffer
|
||||
import java.security.MessageDigest
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import io.netty.util.concurrent.Future as NettyFuture
|
||||
|
||||
|
||||
class MemcachedClient(private val cfg: MemcachedCacheConfiguration) : AutoCloseable {
|
||||
|
||||
private val log = contextLogger()
|
||||
private val group: NioEventLoopGroup
|
||||
private val connectionPool: MutableMap<HostAndPort, ChannelPool> = ConcurrentHashMap()
|
||||
|
||||
init {
|
||||
group = NioEventLoopGroup()
|
||||
}
|
||||
|
||||
private fun newConnectionPool(server : MemcachedCacheConfiguration.Server) : FixedChannelPool {
|
||||
val bootstrap = Bootstrap().apply {
|
||||
group(group)
|
||||
channel(NioSocketChannel::class.java)
|
||||
option(ChannelOption.SO_KEEPALIVE, true)
|
||||
remoteAddress(InetSocketAddress(server.endpoint.host, server.endpoint.port))
|
||||
server.connectionTimeoutMillis?.let {
|
||||
option(ChannelOption.CONNECT_TIMEOUT_MILLIS, it)
|
||||
}
|
||||
}
|
||||
val channelPoolHandler = object : AbstractChannelPoolHandler() {
|
||||
|
||||
override fun channelCreated(ch: Channel) {
|
||||
val pipeline: ChannelPipeline = ch.pipeline()
|
||||
pipeline.addLast(BinaryMemcacheClientCodec())
|
||||
}
|
||||
}
|
||||
return FixedChannelPool(bootstrap, channelPoolHandler, server.maxConnections)
|
||||
}
|
||||
|
||||
|
||||
private fun sendRequest(request: BinaryMemcacheRequest,
|
||||
responseListener: ResponseListener?
|
||||
): CompletableFuture<CallHandle> {
|
||||
|
||||
val server = cfg.servers.let { servers ->
|
||||
if(servers.size > 1) {
|
||||
val key = request.key().duplicate()
|
||||
var checksum = 0
|
||||
while(key.readableBytes() > 4) {
|
||||
val byte = key.readInt()
|
||||
checksum = checksum xor byte
|
||||
}
|
||||
while(key.readableBytes() > 0) {
|
||||
val byte = key.readByte()
|
||||
checksum = checksum xor byte.toInt()
|
||||
}
|
||||
servers[checksum % servers.size]
|
||||
} else {
|
||||
servers.first()
|
||||
}
|
||||
}
|
||||
|
||||
val callHandleFuture = CompletableFuture<CallHandle>()
|
||||
val result = CompletableFuture<Short>()
|
||||
// Custom handler for processing responses
|
||||
val pool = connectionPool.computeIfAbsent(server.endpoint) {
|
||||
newConnectionPool(server)
|
||||
}
|
||||
pool.acquire().addListener(object : GenericFutureListener<NettyFuture<Channel>> {
|
||||
override fun operationComplete(channelFuture: NettyFuture<Channel>) {
|
||||
if (channelFuture.isSuccess) {
|
||||
val channel = channelFuture.now
|
||||
val pipeline = channel.pipeline()
|
||||
channel.pipeline().addLast("handler", object : SimpleChannelInboundHandler<MemcacheObject>() {
|
||||
val response : MemcacheResponse? = null
|
||||
override fun channelRead0(
|
||||
ctx: ChannelHandlerContext,
|
||||
msg: MemcacheObject
|
||||
) {
|
||||
if(msg is BinaryMemcacheResponse) {
|
||||
val resp = MemcacheResponse.of(msg)
|
||||
responseListener?.listen(ResponseEvent.ResponseReceived(resp))
|
||||
if(msg.totalBodyLength() == msg.keyLength() + msg.extrasLength()) {
|
||||
result.complete(resp.status)
|
||||
}
|
||||
}
|
||||
if(responseListener != null) {
|
||||
when (msg) {
|
||||
is LastMemcacheContent -> {
|
||||
responseListener.listen(ResponseEvent.LastResponseContentChunkReceived(msg.content().nioBuffer()))
|
||||
result.complete(response?.status)
|
||||
pipeline.removeLast()
|
||||
pool.release(channel)
|
||||
}
|
||||
is MemcacheContent -> {
|
||||
responseListener.listen(ResponseEvent.ResponseContentChunkReceived(msg.content().nioBuffer()))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
|
||||
val ex = when (cause) {
|
||||
is DecoderException -> cause.cause!!
|
||||
else -> cause
|
||||
}
|
||||
responseListener?.listen(ResponseEvent.ExceptionCaught(ex))
|
||||
result.completeExceptionally(ex)
|
||||
ctx.close()
|
||||
pipeline.removeLast()
|
||||
pool.release(channel)
|
||||
}
|
||||
})
|
||||
|
||||
val chunks = mutableListOf <ByteBuffer>()
|
||||
fun sendRequest() {
|
||||
val valueLen = chunks.fold(0) { acc : Int, c2 : ByteBuffer ->
|
||||
acc + c2.remaining()
|
||||
}
|
||||
request.setTotalBodyLength(request.keyLength() + request.extrasLength() + valueLen)
|
||||
channel.write(request)
|
||||
for((i, chunk) in chunks.withIndex()) {
|
||||
if(i + 1 < chunks.size) {
|
||||
channel.write(DefaultMemcacheContent(Unpooled.wrappedBuffer(chunk)))
|
||||
} else {
|
||||
channel.write(DefaultLastMemcacheContent(Unpooled.wrappedBuffer(chunk)))
|
||||
}
|
||||
}
|
||||
channel.flush()
|
||||
}
|
||||
|
||||
callHandleFuture.complete(object : CallHandle {
|
||||
override fun sendChunk(requestBodyChunk: ByteBuffer) {
|
||||
chunks.addLast(requestBodyChunk)
|
||||
}
|
||||
|
||||
override fun waitForResponse(): CompletableFuture<Short> {
|
||||
sendRequest()
|
||||
return result
|
||||
}
|
||||
})
|
||||
} else {
|
||||
callHandleFuture.completeExceptionally(channelFuture.cause())
|
||||
}
|
||||
}
|
||||
})
|
||||
return callHandleFuture
|
||||
}
|
||||
|
||||
private fun encodeExpiry(expiry: Duration) : Int {
|
||||
val expirySeconds = expiry.toSeconds()
|
||||
return expirySeconds.toInt().takeIf { it.toLong() == expirySeconds }
|
||||
?: Instant.ofEpochSecond(expirySeconds).epochSecond.toInt()
|
||||
}
|
||||
|
||||
fun get(key: String, responseListener: ResponseListener) : CompletableFuture<CallHandle> {
|
||||
val request = (cfg.digestAlgorithm
|
||||
?.let(MessageDigest::getInstance)
|
||||
?.let { md ->
|
||||
digest(key.toByteArray(), md)
|
||||
} ?: key.toByteArray(Charsets.UTF_8)).let { digest ->
|
||||
DefaultBinaryMemcacheRequest().apply {
|
||||
setKey(Unpooled.wrappedBuffer(digest))
|
||||
setOpcode(BinaryMemcacheOpcodes.GET)
|
||||
}
|
||||
}
|
||||
return sendRequest(request, responseListener)
|
||||
}
|
||||
|
||||
fun put(key: String, expiry : Duration, cas : Long? = null): CompletableFuture<CallHandle> {
|
||||
val request = (cfg.digestAlgorithm
|
||||
?.let(MessageDigest::getInstance)
|
||||
?.let { md ->
|
||||
digest(key.toByteArray(), md)
|
||||
} ?: key.toByteArray(Charsets.UTF_8)).let { digest ->
|
||||
val extras = Unpooled.buffer(8, 8)
|
||||
extras.writeInt(0)
|
||||
extras.writeInt(encodeExpiry(expiry))
|
||||
DefaultBinaryMemcacheRequest().apply {
|
||||
setExtras(extras)
|
||||
setKey(Unpooled.wrappedBuffer(digest))
|
||||
setOpcode(BinaryMemcacheOpcodes.SET)
|
||||
cas?.let(this::setCas)
|
||||
}
|
||||
}
|
||||
return sendRequest(request) { evt ->
|
||||
when (evt) {
|
||||
is ResponseEvent.ResponseReceived -> {
|
||||
if (evt.response.status != BinaryMemcacheResponseStatus.SUCCESS) {
|
||||
throw MemcachedException(evt.response.status)
|
||||
}
|
||||
}
|
||||
else -> {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
fun shutDown(): NettyFuture<*> {
|
||||
return group.shutdownGracefully()
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
shutDown().sync()
|
||||
}
|
||||
}
|
@@ -0,0 +1,10 @@
|
||||
package net.woggioni.gbcs.server.memcached.client
|
||||
|
||||
import java.nio.ByteBuffer
|
||||
|
||||
sealed interface ResponseEvent {
|
||||
class ResponseReceived(val response : MemcacheResponse) : ResponseEvent
|
||||
class ResponseContentChunkReceived(val chunk: ByteBuffer) : ResponseEvent
|
||||
class LastResponseContentChunkReceived(val chunk: ByteBuffer) : ResponseEvent
|
||||
class ExceptionCaught(val cause : Throwable) : ResponseEvent
|
||||
}
|
@@ -0,0 +1,5 @@
|
||||
package net.woggioni.gbcs.server.memcached.client
|
||||
|
||||
fun interface ResponseListener {
|
||||
fun listen(evt : ResponseEvent)
|
||||
}
|
@@ -0,0 +1,80 @@
|
||||
package net.woggioni.gbcs.server.memcached.test
|
||||
|
||||
import io.netty.buffer.ByteBuf
|
||||
import io.netty.buffer.Unpooled
|
||||
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
|
||||
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus
|
||||
import net.woggioni.gbcs.api.event.ChunkReceived
|
||||
import net.woggioni.gbcs.common.HostAndPort
|
||||
|
||||
import net.woggioni.gbcs.server.memcached.MemcachedCacheConfiguration
|
||||
import net.woggioni.gbcs.server.memcached.client.MemcacheResponse
|
||||
import net.woggioni.gbcs.server.memcached.client.MemcachedClient
|
||||
import net.woggioni.gbcs.server.memcached.client.ResponseEvent
|
||||
import net.woggioni.gbcs.server.memcached.client.ResponseListener
|
||||
import org.junit.jupiter.api.Assertions
|
||||
import org.junit.jupiter.api.Test
|
||||
import java.nio.ByteBuffer
|
||||
import java.security.SecureRandom
|
||||
import java.time.Duration
|
||||
import java.util.Objects
|
||||
import java.util.concurrent.TimeUnit
|
||||
import kotlin.random.Random
|
||||
|
||||
class MemcachedClientTest {
|
||||
|
||||
@Test
|
||||
fun test() {
|
||||
val client = MemcachedClient(MemcachedCacheConfiguration(
|
||||
servers = listOf(
|
||||
MemcachedCacheConfiguration.Server(
|
||||
endpoint = HostAndPort("127.0.0.1", 11211),
|
||||
connectionTimeoutMillis = null,
|
||||
retryPolicy = null,
|
||||
maxConnections = 1
|
||||
)
|
||||
)
|
||||
))
|
||||
|
||||
val random = Random(SecureRandom.getInstance("NativePRNGNonBlocking").nextLong())
|
||||
val key = "101325"
|
||||
val value = random.nextBytes(0x1000)
|
||||
val requestListener = client.put(key, Duration.ofDays(2), null)
|
||||
|
||||
val response = Unpooled.buffer(value.size)
|
||||
requestListener.thenCompose { listener ->
|
||||
listener.sendChunk(ByteBuffer.wrap(value))
|
||||
listener.waitForResponse()
|
||||
}.get(10, TimeUnit.SECONDS)
|
||||
|
||||
client.get(key, object: ResponseListener {
|
||||
override fun listen(evt: ResponseEvent) {
|
||||
when(evt) {
|
||||
is ResponseEvent.ResponseReceived -> {
|
||||
if(evt.response.status != BinaryMemcacheResponseStatus.SUCCESS) {
|
||||
Assertions.fail<String> {
|
||||
"Memcache status ${evt.response.status}"
|
||||
}
|
||||
}
|
||||
}
|
||||
is ResponseEvent.ResponseContentChunkReceived -> response.writeBytes(evt.chunk)
|
||||
else -> {}
|
||||
}
|
||||
}
|
||||
}).thenCompose { it.waitForResponse() }.get(1, TimeUnit.SECONDS)
|
||||
val retrievedResponse = response.array()
|
||||
Assertions.assertArrayEquals(value, retrievedResponse)
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
fun test2() {
|
||||
val a1 = ByteArray(10) {
|
||||
it.toByte()
|
||||
}
|
||||
val a2 = ByteArray(10) {
|
||||
it.toByte()
|
||||
}
|
||||
Assertions.assertTrue(Objects.equals(a1, a1))
|
||||
}
|
||||
}
|
21
gbcs-server-memcached/src/test/resources/logback.xml
Normal file
21
gbcs-server-memcached/src/test/resources/logback.xml
Normal file
@@ -0,0 +1,21 @@
|
||||
<?xml version="1.0" encoding="UTF-8" ?>
|
||||
<!DOCTYPE configuration>
|
||||
|
||||
<configuration>
|
||||
<import class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"/>
|
||||
<import class="ch.qos.logback.core.ConsoleAppender"/>
|
||||
|
||||
<appender name="console" class="ConsoleAppender">
|
||||
<target>System.err</target>
|
||||
<encoder class="PatternLayoutEncoder">
|
||||
<pattern>%d [%highlight(%-5level)] \(%thread\) %logger{36} -%kvp- %msg %n</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<root level="info">
|
||||
<appender-ref ref="console"/>
|
||||
</root>
|
||||
<logger name="io.netty" level="debug"/>
|
||||
<logger name="com.google.code.yanf4j" level="warn"/>
|
||||
<logger name="net.rubyeye.xmemcached" level="warn"/>
|
||||
</configuration>
|
@@ -1,36 +1,83 @@
|
||||
package net.woggioni.gbcs.server.handler
|
||||
|
||||
import io.netty.buffer.Unpooled
|
||||
import io.netty.channel.ChannelFutureListener
|
||||
import io.netty.channel.ChannelHandler
|
||||
import io.netty.channel.ChannelHandlerContext
|
||||
import io.netty.channel.DefaultFileRegion
|
||||
import io.netty.channel.SimpleChannelInboundHandler
|
||||
import io.netty.handler.codec.http.DefaultFullHttpResponse
|
||||
import io.netty.handler.codec.http.DefaultHttpContent
|
||||
import io.netty.handler.codec.http.DefaultHttpResponse
|
||||
import io.netty.handler.codec.http.FullHttpRequest
|
||||
import io.netty.handler.codec.http.DefaultLastHttpContent
|
||||
import io.netty.handler.codec.http.HttpContent
|
||||
import io.netty.handler.codec.http.HttpHeaderNames
|
||||
import io.netty.handler.codec.http.HttpHeaderValues
|
||||
import io.netty.handler.codec.http.HttpMessage
|
||||
import io.netty.handler.codec.http.HttpMethod
|
||||
import io.netty.handler.codec.http.HttpRequest
|
||||
import io.netty.handler.codec.http.HttpResponseStatus
|
||||
import io.netty.handler.codec.http.HttpUtil
|
||||
import io.netty.handler.codec.http.LastHttpContent
|
||||
import io.netty.handler.stream.ChunkedNioStream
|
||||
import net.woggioni.gbcs.api.Cache
|
||||
import net.woggioni.gbcs.api.exception.CacheException
|
||||
import net.woggioni.gbcs.api.CallHandle
|
||||
import net.woggioni.gbcs.api.ResponseEventListener
|
||||
import net.woggioni.gbcs.api.event.RequestEvent
|
||||
import net.woggioni.gbcs.api.event.ResponseEvent
|
||||
import net.woggioni.gbcs.common.contextLogger
|
||||
import net.woggioni.gbcs.server.debug
|
||||
import net.woggioni.gbcs.server.warn
|
||||
import java.nio.channels.FileChannel
|
||||
import java.nio.file.Path
|
||||
import java.util.concurrent.CompletableFuture
|
||||
|
||||
@ChannelHandler.Sharable
|
||||
class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
|
||||
SimpleChannelInboundHandler<FullHttpRequest>() {
|
||||
SimpleChannelInboundHandler<HttpMessage>() {
|
||||
|
||||
private val log = contextLogger()
|
||||
companion object {
|
||||
@JvmStatic
|
||||
private val log = contextLogger()
|
||||
}
|
||||
|
||||
override fun channelRead0(ctx: ChannelHandlerContext, msg: FullHttpRequest) {
|
||||
private data class TransientContext(
|
||||
var key: String?,
|
||||
var callHandle: CompletableFuture<CallHandle<Void>>
|
||||
)
|
||||
|
||||
private var transientContext: TransientContext? = null
|
||||
|
||||
override fun channelRead0(ctx: ChannelHandlerContext, msg: HttpMessage) {
|
||||
when (msg) {
|
||||
is HttpRequest -> {
|
||||
handleRequest(ctx, msg)
|
||||
}
|
||||
|
||||
is LastHttpContent -> {
|
||||
transientContext?.run {
|
||||
callHandle.thenCompose { callHandle ->
|
||||
callHandle.postEvent(RequestEvent.LastChunkSent(msg.content()))
|
||||
callHandle.call()
|
||||
}.thenApply {
|
||||
val response = DefaultFullHttpResponse(
|
||||
msg.protocolVersion(), HttpResponseStatus.CREATED,
|
||||
key?.let(String::toByteArray)
|
||||
?.let(Unpooled::copiedBuffer)
|
||||
)
|
||||
// response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes()
|
||||
ctx.writeAndFlush(response)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
is HttpContent -> {
|
||||
transientContext?.run {
|
||||
callHandle = callHandle.thenApply { it ->
|
||||
it.postEvent(RequestEvent.ChunkSent(msg.content()))
|
||||
it
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private fun handleRequest(ctx: ChannelHandlerContext, msg: HttpRequest) {
|
||||
val keepAlive: Boolean = HttpUtil.isKeepAlive(msg)
|
||||
val method = msg.method()
|
||||
if (method === HttpMethod.GET) {
|
||||
@@ -43,49 +90,61 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
|
||||
return
|
||||
}
|
||||
if (serverPrefix == prefix) {
|
||||
try {
|
||||
cache.get(key)
|
||||
} catch(ex : Throwable) {
|
||||
throw CacheException("Error accessing the cache backend", ex)
|
||||
}?.let { channel ->
|
||||
log.debug(ctx) {
|
||||
"Cache hit for key '$key'"
|
||||
}
|
||||
val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK)
|
||||
response.headers()[HttpHeaderNames.CONTENT_TYPE] = HttpHeaderValues.APPLICATION_OCTET_STREAM
|
||||
if (!keepAlive) {
|
||||
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
|
||||
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.IDENTITY)
|
||||
} else {
|
||||
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
|
||||
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
|
||||
}
|
||||
ctx.write(response)
|
||||
when (channel) {
|
||||
is FileChannel -> {
|
||||
if (keepAlive) {
|
||||
ctx.write(DefaultFileRegion(channel, 0, channel.size()))
|
||||
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate())
|
||||
} else {
|
||||
ctx.writeAndFlush(DefaultFileRegion(channel, 0, channel.size()))
|
||||
.addListener(ChannelFutureListener.CLOSE)
|
||||
cache.get(key, object : ResponseEventListener {
|
||||
var first = false
|
||||
override fun listen(evt: ResponseEvent) {
|
||||
when (evt) {
|
||||
is ResponseEvent.NoContent -> {
|
||||
log.debug(ctx) {
|
||||
"Cache miss for key '$key'"
|
||||
}
|
||||
val response =
|
||||
DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND)
|
||||
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
|
||||
ctx.writeAndFlush(response)
|
||||
}
|
||||
|
||||
is ResponseEvent.ChunkReceived, is ResponseEvent.LastChunkReceived -> {
|
||||
if (first) {
|
||||
first = false
|
||||
log.debug(ctx) {
|
||||
"Cache hit for key '$key'"
|
||||
}
|
||||
val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK)
|
||||
response.headers()[HttpHeaderNames.CONTENT_TYPE] =
|
||||
HttpHeaderValues.APPLICATION_OCTET_STREAM
|
||||
if (!keepAlive) {
|
||||
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
|
||||
response.headers()
|
||||
.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.IDENTITY)
|
||||
} else {
|
||||
response.headers()
|
||||
.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
|
||||
response.headers()
|
||||
.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
|
||||
}
|
||||
ctx.write(response)
|
||||
}
|
||||
if (evt is ResponseEvent.LastChunkReceived)
|
||||
ctx.write(DefaultLastHttpContent(evt.chunk))
|
||||
else if (evt is ResponseEvent.ChunkReceived)
|
||||
ctx.write(DefaultHttpContent(evt.chunk))
|
||||
ctx.flush()
|
||||
}
|
||||
|
||||
is ResponseEvent.ExceptionCaught -> {
|
||||
log.error(evt.cause.message, evt.cause)
|
||||
val errorResponse = DefaultFullHttpResponse(
|
||||
msg.protocolVersion(), HttpResponseStatus.INTERNAL_SERVER_ERROR,
|
||||
evt.cause.message
|
||||
?.let(String::toByteArray)
|
||||
?.let(Unpooled::copiedBuffer)
|
||||
)
|
||||
ctx.write(errorResponse)
|
||||
}
|
||||
}
|
||||
else -> {
|
||||
ctx.write(ChunkedNioStream(channel)).addListener { evt ->
|
||||
channel.close()
|
||||
}
|
||||
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate())
|
||||
}
|
||||
}
|
||||
} ?: let {
|
||||
log.debug(ctx) {
|
||||
"Cache miss for key '$key'"
|
||||
}
|
||||
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND)
|
||||
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
|
||||
ctx.writeAndFlush(response)
|
||||
}
|
||||
}).thenCompose(CallHandle<Void>::call)
|
||||
} else {
|
||||
log.warn(ctx) {
|
||||
"Got request for unhandled path '${msg.uri()}'"
|
||||
@@ -93,35 +152,22 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
|
||||
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.BAD_REQUEST)
|
||||
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
|
||||
ctx.writeAndFlush(response)
|
||||
ctx.channel().read()
|
||||
}
|
||||
} else if (method === HttpMethod.PUT) {
|
||||
val path = Path.of(msg.uri())
|
||||
val prefix = path.parent
|
||||
val key = path.fileName.toString()
|
||||
|
||||
if (serverPrefix == prefix) {
|
||||
log.debug(ctx) {
|
||||
"Added value for key '$key' to build cache"
|
||||
}
|
||||
val bodyBytes = msg.content().run {
|
||||
if (isDirect) {
|
||||
ByteArray(readableBytes()).also {
|
||||
readBytes(it)
|
||||
}
|
||||
} else {
|
||||
array()
|
||||
}
|
||||
}
|
||||
try {
|
||||
cache.put(key, bodyBytes)
|
||||
} catch(ex : Throwable) {
|
||||
throw CacheException("Error accessing the cache backend", ex)
|
||||
}
|
||||
transientContext = TransientContext(key, cache.put(key))
|
||||
val response = DefaultFullHttpResponse(
|
||||
msg.protocolVersion(), HttpResponseStatus.CREATED,
|
||||
Unpooled.copiedBuffer(key.toByteArray())
|
||||
)
|
||||
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes()
|
||||
// response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes()
|
||||
ctx.writeAndFlush(response)
|
||||
} else {
|
||||
log.warn(ctx) {
|
||||
@@ -131,9 +177,12 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
|
||||
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
|
||||
ctx.writeAndFlush(response)
|
||||
}
|
||||
} else if(method == HttpMethod.TRACE) {
|
||||
} else if (method == HttpMethod.TRACE) {
|
||||
val replayedRequestHead = ctx.alloc().buffer()
|
||||
replayedRequestHead.writeCharSequence("TRACE ${Path.of(msg.uri())} ${msg.protocolVersion().text()}\r\n", Charsets.US_ASCII)
|
||||
replayedRequestHead.writeCharSequence(
|
||||
"TRACE ${Path.of(msg.uri())} ${msg.protocolVersion().text()}\r\n",
|
||||
Charsets.US_ASCII
|
||||
)
|
||||
msg.headers().forEach { (key, value) ->
|
||||
replayedRequestHead.apply {
|
||||
writeCharSequence(key, Charsets.US_ASCII)
|
||||
@@ -143,18 +192,30 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
|
||||
}
|
||||
}
|
||||
replayedRequestHead.writeCharSequence("\r\n", Charsets.US_ASCII)
|
||||
val requestBody = msg.content()
|
||||
requestBody.retain()
|
||||
val responseBody = ctx.alloc().compositeBuffer(2).apply {
|
||||
addComponents(true, replayedRequestHead)
|
||||
addComponents(true, requestBody)
|
||||
}
|
||||
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK, responseBody)
|
||||
val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK)
|
||||
response.headers().apply {
|
||||
set(HttpHeaderNames.CONTENT_TYPE, "message/http")
|
||||
set(HttpHeaderNames.CONTENT_LENGTH, responseBody.readableBytes())
|
||||
}
|
||||
ctx.writeAndFlush(response)
|
||||
ctx.write(response)
|
||||
ctx.writeAndFlush(DefaultHttpContent(replayedRequestHead))
|
||||
val callHandle = object : CallHandle<Void> {
|
||||
override fun postEvent(evt: RequestEvent) {
|
||||
when (evt) {
|
||||
is RequestEvent.ChunkSent -> {
|
||||
ctx.writeAndFlush(DefaultHttpContent(evt.chunk))
|
||||
}
|
||||
|
||||
is RequestEvent.LastChunkSent -> {
|
||||
ctx.writeAndFlush(DefaultLastHttpContent(evt.chunk))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun call(): CompletableFuture<Void> {
|
||||
return CompletableFuture.completedFuture(null)
|
||||
}
|
||||
}
|
||||
transientContext = TransientContext(null, CompletableFuture.completedFuture(callHandle))
|
||||
} else {
|
||||
log.warn(ctx) {
|
||||
"Got request with unhandled method '${msg.method().name()}'"
|
||||
@@ -163,5 +224,6 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
|
||||
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
|
||||
ctx.writeAndFlush(response)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user