fixed server support for request pipelining
All checks were successful
CI / build (push) Successful in 15m33s

This commit is contained in:
2025-03-08 11:07:21 +08:00
parent 59a12d6218
commit 7ba7070693
19 changed files with 203 additions and 235 deletions

View File

@@ -5,9 +5,12 @@ plugins {
} }
dependencies { dependencies {
implementation catalog.slf4j.api
implementation project(':rbcs-common')
api catalog.netty.common api catalog.netty.common
api catalog.netty.buffer api catalog.netty.buffer
api catalog.netty.handler api catalog.netty.handler
api catalog.netty.codec.http
} }
publishing { publishing {

View File

@@ -1,10 +1,15 @@
module net.woggioni.rbcs.api { module net.woggioni.rbcs.api {
requires static lombok; requires static lombok;
requires java.xml;
requires io.netty.buffer;
requires io.netty.handler; requires io.netty.handler;
requires io.netty.transport;
requires io.netty.common; requires io.netty.common;
requires net.woggioni.rbcs.common;
requires io.netty.transport;
requires io.netty.codec.http;
requires io.netty.buffer;
requires org.slf4j;
requires java.xml;
exports net.woggioni.rbcs.api; exports net.woggioni.rbcs.api;
exports net.woggioni.rbcs.api.exception; exports net.woggioni.rbcs.api.exception;
exports net.woggioni.rbcs.api.message; exports net.woggioni.rbcs.api.message;

View File

@@ -0,0 +1,57 @@
package net.woggioni.rbcs.api;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.ReferenceCounted;
import lombok.extern.slf4j.Slf4j;
import net.woggioni.rbcs.api.message.CacheMessage;
@Slf4j
public abstract class CacheHandler extends ChannelInboundHandlerAdapter {
private boolean requestFinished = false;
abstract protected void channelRead0(ChannelHandlerContext ctx, CacheMessage msg);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if(!requestFinished && msg instanceof CacheMessage) {
if(msg instanceof CacheMessage.LastCacheContent || msg instanceof CacheMessage.CacheGetRequest) requestFinished = true;
try {
channelRead0(ctx, (CacheMessage) msg);
} finally {
if(msg instanceof ReferenceCounted rc) rc.release();
}
} else {
ctx.fireChannelRead(msg);
}
}
protected void sendMessageAndFlush(ChannelHandlerContext ctx, Object msg) {
sendMessage(ctx, msg, true);
}
protected void sendMessage(ChannelHandlerContext ctx, Object msg) {
sendMessage(ctx, msg, false);
}
private void sendMessage(ChannelHandlerContext ctx, Object msg, boolean flush) {
ctx.write(msg);
if(
msg instanceof CacheMessage.LastCacheContent ||
msg instanceof CacheMessage.CachePutResponse ||
msg instanceof CacheMessage.CacheValueNotFoundResponse ||
msg instanceof LastHttpContent
) {
ctx.flush();
ctx.pipeline().remove(this);
} else if(flush) {
ctx.flush();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}

View File

@@ -1,13 +1,12 @@
package net.woggioni.rbcs.api; package net.woggioni.rbcs.api;
import io.netty.channel.ChannelFactory; import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
public interface CacheHandlerFactory extends AsyncCloseable { public interface CacheHandlerFactory extends AsyncCloseable {
ChannelHandler newHandler( CacheHandler newHandler(
Configuration configuration, Configuration configuration,
EventLoopGroup eventLoopGroup, EventLoopGroup eventLoopGroup,
ChannelFactory<SocketChannel> socketChannelFactory, ChannelFactory<SocketChannel> socketChannelFactory,

View File

@@ -38,11 +38,12 @@ data class Configuration(
val readIdleTimeout: Duration, val readIdleTimeout: Duration,
val writeIdleTimeout: Duration, val writeIdleTimeout: Duration,
val idleTimeout: Duration, val idleTimeout: Duration,
val requestPipelining : Boolean,
) )
data class Profile( data class Profile(
val serverURI: URI, val serverURI: URI,
val connection: Connection?, val connection: Connection,
val authentication: Authentication?, val authentication: Authentication?,
val connectionTimeout: Duration?, val connectionTimeout: Duration?,
val maxConnections: Int, val maxConnections: Int,

View File

@@ -318,6 +318,9 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
) { ) {
pipeline.remove(this) pipeline.remove(this)
responseFuture.complete(response) responseFuture.complete(response)
if(!profile.connection.requestPipelining) {
pool.release(channel)
}
} }
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
@@ -332,6 +335,9 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
override fun channelInactive(ctx: ChannelHandlerContext) { override fun channelInactive(ctx: ChannelHandlerContext) {
responseFuture.completeExceptionally(IOException("The remote server closed the connection")) responseFuture.completeExceptionally(IOException("The remote server closed the connection"))
if(!profile.connection.requestPipelining) {
pool.release(channel)
}
super.channelInactive(ctx) super.channelInactive(ctx)
} }
@@ -352,6 +358,9 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
if (this === pipeline.last()) { if (this === pipeline.last()) {
ctx.close() ctx.close()
} }
if(!profile.connection.requestPipelining) {
pool.release(channel)
}
} else { } else {
super.userEventTriggered(ctx, evt) super.userEventTriggered(ctx, evt)
} }
@@ -401,8 +410,9 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
val ex = it.cause() val ex = it.cause()
log.warn(ex.message, ex) log.warn(ex.message, ex)
} }
if(profile.connection.requestPipelining) {
pool.release(channel) pool.release(channel)
}
} }
} else { } else {
responseFuture.completeExceptionally(channelFuture.cause()) responseFuture.completeExceptionally(channelFuture.cause())

View File

@@ -30,7 +30,12 @@ object Parser {
?: throw ConfigurationException("base-url attribute is required") ?: throw ConfigurationException("base-url attribute is required")
var authentication: Configuration.Authentication? = null var authentication: Configuration.Authentication? = null
var retryPolicy: Configuration.RetryPolicy? = null var retryPolicy: Configuration.RetryPolicy? = null
var connection : Configuration.Connection? = null var connection : Configuration.Connection = Configuration.Connection(
Duration.ofSeconds(60),
Duration.ofSeconds(60),
Duration.ofSeconds(30),
false
)
var trustStore : Configuration.TrustStore? = null var trustStore : Configuration.TrustStore? = null
for (gchild in child.asIterable()) { for (gchild in child.asIterable()) {
when (gchild.localName) { when (gchild.localName) {
@@ -97,10 +102,13 @@ object Parser {
?.let(Duration::parse) ?: Duration.of(60, ChronoUnit.SECONDS) ?.let(Duration::parse) ?: Duration.of(60, ChronoUnit.SECONDS)
val writeIdleTimeout = gchild.renderAttribute("write-idle-timeout") val writeIdleTimeout = gchild.renderAttribute("write-idle-timeout")
?.let(Duration::parse) ?: Duration.of(60, ChronoUnit.SECONDS) ?.let(Duration::parse) ?: Duration.of(60, ChronoUnit.SECONDS)
val requestPipelining = gchild.renderAttribute("request-pipelining")
?.let(String::toBoolean) ?: false
connection = Configuration.Connection( connection = Configuration.Connection(
readIdleTimeout, readIdleTimeout,
writeIdleTimeout, writeIdleTimeout,
idleTimeout, idleTimeout,
requestPipelining
) )
} }

View File

@@ -123,6 +123,13 @@
</xs:documentation> </xs:documentation>
</xs:annotation> </xs:annotation>
</xs:attribute> </xs:attribute>
<xs:attribute name="request-pipelining" type="xs:boolean" use="optional" default="false">
<xs:annotation>
<xs:documentation>
Enables HTTP/1.1 request pipelining
</xs:documentation>
</xs:annotation>
</xs:attribute>
</xs:complexType> </xs:complexType>
<xs:complexType name="noAuthType"> <xs:complexType name="noAuthType">

View File

@@ -6,7 +6,7 @@ plugins {
} }
dependencies { dependencies {
implementation project(':rbcs-api') implementation catalog.netty.transport
implementation catalog.slf4j.api implementation catalog.slf4j.api
implementation catalog.jwo implementation catalog.jwo
implementation catalog.netty.buffer implementation catalog.netty.buffer

View File

@@ -6,6 +6,7 @@ import io.netty.channel.EventLoopGroup
import io.netty.channel.pool.FixedChannelPool import io.netty.channel.pool.FixedChannelPool
import io.netty.channel.socket.DatagramChannel import io.netty.channel.socket.DatagramChannel
import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.SocketChannel
import net.woggioni.rbcs.api.CacheHandler
import net.woggioni.rbcs.api.CacheHandlerFactory import net.woggioni.rbcs.api.CacheHandlerFactory
import net.woggioni.rbcs.api.Configuration import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.common.HostAndPort import net.woggioni.rbcs.common.HostAndPort
@@ -51,7 +52,7 @@ data class MemcacheCacheConfiguration(
eventLoop: EventLoopGroup, eventLoop: EventLoopGroup,
socketChannelFactory: ChannelFactory<SocketChannel>, socketChannelFactory: ChannelFactory<SocketChannel>,
datagramChannelFactory: ChannelFactory<DatagramChannel>, datagramChannelFactory: ChannelFactory<DatagramChannel>,
): ChannelHandler { ): CacheHandler {
return MemcacheCacheHandler( return MemcacheCacheHandler(
MemcacheClient( MemcacheClient(
this@MemcacheCacheConfiguration.servers, this@MemcacheCacheConfiguration.servers,

View File

@@ -4,7 +4,6 @@ import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.CompositeByteBuf import io.netty.buffer.CompositeByteBuf
import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelHandlerContext
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.memcache.DefaultLastMemcacheContent import io.netty.handler.codec.memcache.DefaultLastMemcacheContent
import io.netty.handler.codec.memcache.DefaultMemcacheContent import io.netty.handler.codec.memcache.DefaultMemcacheContent
import io.netty.handler.codec.memcache.LastMemcacheContent import io.netty.handler.codec.memcache.LastMemcacheContent
@@ -13,6 +12,7 @@ import io.netty.handler.codec.memcache.binary.BinaryMemcacheOpcodes
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus
import io.netty.handler.codec.memcache.binary.DefaultBinaryMemcacheRequest import io.netty.handler.codec.memcache.binary.DefaultBinaryMemcacheRequest
import net.woggioni.rbcs.api.CacheHandler
import net.woggioni.rbcs.api.CacheValueMetadata import net.woggioni.rbcs.api.CacheValueMetadata
import net.woggioni.rbcs.api.exception.ContentTooLargeException import net.woggioni.rbcs.api.exception.ContentTooLargeException
import net.woggioni.rbcs.api.message.CacheMessage import net.woggioni.rbcs.api.message.CacheMessage
@@ -58,7 +58,7 @@ class MemcacheCacheHandler(
private val compressionLevel: Int, private val compressionLevel: Int,
private val chunkSize: Int, private val chunkSize: Int,
private val maxAge: Duration private val maxAge: Duration
) : SimpleChannelInboundHandler<CacheMessage>() { ) : CacheHandler() {
companion object { companion object {
private val log = createLogger<MemcacheCacheHandler>() private val log = createLogger<MemcacheCacheHandler>()
@@ -98,7 +98,10 @@ class MemcacheCacheHandler(
acc.retain() acc.retain()
it.readObject() as CacheValueMetadata it.readObject() as CacheValueMetadata
} }
ctx.writeAndFlush(CacheValueFoundResponse(key, metadata)) log.trace(ctx) {
"Sending response from cache"
}
sendMessageAndFlush(ctx, CacheValueFoundResponse(key, metadata))
responseSent = true responseSent = true
acc.readerIndex(Int.SIZE_BYTES + mSize) acc.readerIndex(Int.SIZE_BYTES + mSize)
} }
@@ -114,16 +117,16 @@ class MemcacheCacheHandler(
val toSend = extractChunk(chunk, ctx.alloc()) val toSend = extractChunk(chunk, ctx.alloc())
val msg = if(last) { val msg = if(last) {
log.trace(ctx) { log.trace(ctx) {
"Sending last chunk to client on channel ${ctx.channel().id().asShortText()}" "Sending last chunk to client on channel"
} }
LastCacheContent(toSend) LastCacheContent(toSend)
} else { } else {
log.trace(ctx) { log.trace(ctx) {
"Sending chunk to client on channel ${ctx.channel().id().asShortText()}" "Sending chunk to client"
} }
CacheContent(toSend) CacheContent(toSend)
} }
ctx.writeAndFlush(msg) sendMessageAndFlush(ctx, msg)
} }
fun commit() { fun commit() {
@@ -259,7 +262,7 @@ class MemcacheCacheHandler(
log.debug(ctx) { log.debug(ctx) {
"Cache miss for key ${msg.key} on memcache" "Cache miss for key ${msg.key} on memcache"
} }
ctx.writeAndFlush(CacheValueNotFoundResponse()) sendMessageAndFlush(ctx, CacheValueNotFoundResponse())
} }
} }
} }
@@ -290,6 +293,7 @@ class MemcacheCacheHandler(
setOpcode(BinaryMemcacheOpcodes.GET) setOpcode(BinaryMemcacheOpcodes.GET)
} }
requestHandle.sendRequest(request) requestHandle.sendRequest(request)
requestHandle.sendContent(LastMemcacheContent.EMPTY_LAST_CONTENT)
} }
} }
@@ -305,7 +309,7 @@ class MemcacheCacheHandler(
log.debug(ctx) { log.debug(ctx) {
"Inserted key ${msg.key} into memcache" "Inserted key ${msg.key} into memcache"
} }
ctx.writeAndFlush(CachePutResponse(msg.key)) sendMessageAndFlush(ctx, CachePutResponse(msg.key))
} }
else -> this@MemcacheCacheHandler.exceptionCaught(ctx, MemcacheException(status)) else -> this@MemcacheCacheHandler.exceptionCaught(ctx, MemcacheException(status))
} }
@@ -348,6 +352,9 @@ class MemcacheCacheHandler(
extras.writeInt(0) extras.writeInt(0)
extras.writeInt(encodeExpiry(maxAge)) extras.writeInt(encodeExpiry(maxAge))
val totalBodyLength = request.digest.readableBytes() + extras.readableBytes() + payloadSize val totalBodyLength = request.digest.readableBytes() + extras.readableBytes() + payloadSize
log.trace(ctx) {
"Trying to send SET request to memcache"
}
request.requestController.whenComplete { requestController, ex -> request.requestController.whenComplete { requestController, ex ->
if(ex == null) { if(ex == null) {
log.trace(ctx) { log.trace(ctx) {

View File

@@ -12,7 +12,6 @@ import io.netty.channel.ChannelPipeline
import io.netty.channel.EventLoopGroup import io.netty.channel.EventLoopGroup
import io.netty.channel.SimpleChannelInboundHandler import io.netty.channel.SimpleChannelInboundHandler
import io.netty.channel.pool.AbstractChannelPoolHandler import io.netty.channel.pool.AbstractChannelPoolHandler
import io.netty.channel.pool.ChannelPool
import io.netty.channel.pool.FixedChannelPool import io.netty.channel.pool.FixedChannelPool
import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.SocketChannel
import io.netty.handler.codec.memcache.LastMemcacheContent import io.netty.handler.codec.memcache.LastMemcacheContent
@@ -24,7 +23,7 @@ import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
import io.netty.util.concurrent.GenericFutureListener import io.netty.util.concurrent.GenericFutureListener
import net.woggioni.rbcs.common.HostAndPort import net.woggioni.rbcs.common.HostAndPort
import net.woggioni.rbcs.common.createLogger import net.woggioni.rbcs.common.createLogger
import net.woggioni.rbcs.common.warn import net.woggioni.rbcs.common.trace
import net.woggioni.rbcs.server.memcache.MemcacheCacheConfiguration import net.woggioni.rbcs.server.memcache.MemcacheCacheConfiguration
import net.woggioni.rbcs.server.memcache.MemcacheCacheHandler import net.woggioni.rbcs.server.memcache.MemcacheCacheHandler
import java.io.IOException import java.io.IOException
@@ -94,18 +93,6 @@ class MemcacheClient(
pool.acquire().addListener(object : GenericFutureListener<NettyFuture<Channel>> { pool.acquire().addListener(object : GenericFutureListener<NettyFuture<Channel>> {
override fun operationComplete(channelFuture: NettyFuture<Channel>) { override fun operationComplete(channelFuture: NettyFuture<Channel>) {
if (channelFuture.isSuccess) { if (channelFuture.isSuccess) {
var requestSent = false
var requestBodySent = false
var requestFinished = false
var responseReceived = false
var responseBodyReceived = false
var responseFinished = false
var requestBodySize = 0
var requestBodyBytesSent = 0
val channel = channelFuture.now val channel = channelFuture.now
var connectionClosedByTheRemoteServer = true var connectionClosedByTheRemoteServer = true
val closeCallback = { val closeCallback = {
@@ -113,14 +100,7 @@ class MemcacheClient(
val ex = IOException("The memcache server closed the connection") val ex = IOException("The memcache server closed the connection")
val completed = response.completeExceptionally(ex) val completed = response.completeExceptionally(ex)
if(!completed) responseHandler.exceptionCaught(ex) if(!completed) responseHandler.exceptionCaught(ex)
log.warn {
"RequestSent: $requestSent, RequestBodySent: $requestBodySent, " +
"RequestFinished: $requestFinished, ResponseReceived: $responseReceived, " +
"ResponseBodyReceived: $responseBodyReceived, ResponseFinished: $responseFinished, " +
"RequestBodySize: $requestBodySize, RequestBodyBytesSent: $requestBodyBytesSent"
}
} }
pool.release(channel)
} }
val closeListener = ChannelFutureListener { val closeListener = ChannelFutureListener {
closeCallback() closeCallback()
@@ -140,18 +120,14 @@ class MemcacheClient(
when (msg) { when (msg) {
is BinaryMemcacheResponse -> { is BinaryMemcacheResponse -> {
responseHandler.responseReceived(msg) responseHandler.responseReceived(msg)
responseReceived = true
} }
is LastMemcacheContent -> { is LastMemcacheContent -> {
responseFinished = true
responseHandler.contentReceived(msg) responseHandler.contentReceived(msg)
pipeline.remove(this) pipeline.remove(this)
pool.release(channel)
} }
is MemcacheContent -> { is MemcacheContent -> {
responseBodyReceived = true
responseHandler.contentReceived(msg) responseHandler.contentReceived(msg)
} }
} }
@@ -165,35 +141,43 @@ class MemcacheClient(
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
connectionClosedByTheRemoteServer = false connectionClosedByTheRemoteServer = false
ctx.close() ctx.close()
pool.release(channel)
responseHandler.exceptionCaught(cause) responseHandler.exceptionCaught(cause)
} }
} }
channel.pipeline() channel.pipeline().addLast(handler)
.addLast("client-handler", handler)
response.complete(object : MemcacheRequestController { response.complete(object : MemcacheRequestController {
private var channelReleased = false
override fun sendRequest(request: BinaryMemcacheRequest) { override fun sendRequest(request: BinaryMemcacheRequest) {
requestBodySize = request.totalBodyLength() - request.keyLength() - request.extrasLength()
channel.writeAndFlush(request) channel.writeAndFlush(request)
requestSent = true
} }
override fun sendContent(content: MemcacheContent) { override fun sendContent(content: MemcacheContent) {
val size = content.content().readableBytes()
channel.writeAndFlush(content).addListener { channel.writeAndFlush(content).addListener {
requestBodyBytesSent += size
requestBodySent = true
if(content is LastMemcacheContent) { if(content is LastMemcacheContent) {
requestFinished = true if(!channelReleased) {
pool.release(channel)
channelReleased = true
log.trace(channel) {
"Channel released"
}
}
} }
} }
} }
override fun exceptionCaught(ex: Throwable) { override fun exceptionCaught(ex: Throwable) {
log.warn(ex.message, ex)
connectionClosedByTheRemoteServer = false connectionClosedByTheRemoteServer = false
channel.close() channel.close()
if(!channelReleased) {
pool.release(channel)
channelReleased = true
log.trace(channel) {
"Channel released"
}
}
} }
}) })
} else { } else {

View File

@@ -298,6 +298,7 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
"Closed connection ${ch.id().asShortText()} with ${ch.remoteAddress()}" "Closed connection ${ch.id().asShortText()} with ${ch.remoteAddress()}"
} }
} }
ch.config().setAutoRead(false)
val pipeline = ch.pipeline() val pipeline = ch.pipeline()
cfg.connection.also { conn -> cfg.connection.also { conn ->
val readIdleTimeout = conn.readIdleTimeout.toMillis() val readIdleTimeout = conn.readIdleTimeout.toMillis()

View File

@@ -5,6 +5,7 @@ import io.netty.channel.ChannelHandlerContext
import io.netty.channel.SimpleChannelInboundHandler import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.http.LastHttpContent import io.netty.handler.codec.http.LastHttpContent
import io.netty.handler.stream.ChunkedNioFile import io.netty.handler.stream.ChunkedNioFile
import net.woggioni.rbcs.api.CacheHandler
import net.woggioni.rbcs.api.message.CacheMessage import net.woggioni.rbcs.api.message.CacheMessage
import net.woggioni.rbcs.api.message.CacheMessage.CacheContent import net.woggioni.rbcs.api.message.CacheMessage.CacheContent
import net.woggioni.rbcs.api.message.CacheMessage.CacheGetRequest import net.woggioni.rbcs.api.message.CacheMessage.CacheGetRequest
@@ -26,7 +27,7 @@ class FileSystemCacheHandler(
private val compressionEnabled: Boolean, private val compressionEnabled: Boolean,
private val compressionLevel: Int, private val compressionLevel: Int,
private val chunkSize: Int private val chunkSize: Int
) : SimpleChannelInboundHandler<CacheMessage>() { ) : CacheHandler() {
private inner class InProgressPutRequest( private inner class InProgressPutRequest(
val key : String, val key : String,
@@ -70,7 +71,7 @@ class FileSystemCacheHandler(
private fun handleGetRequest(ctx: ChannelHandlerContext, msg: CacheGetRequest) { private fun handleGetRequest(ctx: ChannelHandlerContext, msg: CacheGetRequest) {
val key = String(Base64.getUrlEncoder().encode(processCacheKey(msg.key, digestAlgorithm))) val key = String(Base64.getUrlEncoder().encode(processCacheKey(msg.key, digestAlgorithm)))
cache.get(key)?.also { entryValue -> cache.get(key)?.also { entryValue ->
ctx.writeAndFlush(CacheValueFoundResponse(msg.key, entryValue.metadata)) sendMessageAndFlush(ctx, CacheValueFoundResponse(msg.key, entryValue.metadata))
entryValue.channel.let { channel -> entryValue.channel.let { channel ->
if(compressionEnabled) { if(compressionEnabled) {
InflaterInputStream(Channels.newInputStream(channel)).use { stream -> InflaterInputStream(Channels.newInputStream(channel)).use { stream ->
@@ -81,19 +82,19 @@ class FileSystemCacheHandler(
while(buf.readableBytes() < chunkSize) { while(buf.readableBytes() < chunkSize) {
val read = buf.writeBytes(stream, chunkSize) val read = buf.writeBytes(stream, chunkSize)
if(read < 0) { if(read < 0) {
ctx.writeAndFlush(LastCacheContent(buf)) sendMessageAndFlush(ctx, LastCacheContent(buf))
break@outerLoop break@outerLoop
} }
} }
ctx.writeAndFlush(CacheContent(buf)) sendMessageAndFlush(ctx, CacheContent(buf))
} }
} }
} else { } else {
ctx.writeAndFlush(ChunkedNioFile(channel, entryValue.offset, entryValue.size - entryValue.offset, chunkSize)) sendMessage(ctx, ChunkedNioFile(channel, entryValue.offset, entryValue.size - entryValue.offset, chunkSize))
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT) sendMessageAndFlush(ctx, LastHttpContent.EMPTY_LAST_CONTENT)
} }
} }
} ?: ctx.writeAndFlush(CacheValueNotFoundResponse()) } ?: sendMessageAndFlush(ctx, CacheValueNotFoundResponse())
} }
private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) { private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) {
@@ -111,7 +112,7 @@ class FileSystemCacheHandler(
inProgressPutRequest = null inProgressPutRequest = null
request.write(msg.content()) request.write(msg.content())
request.commit() request.commit()
ctx.writeAndFlush(CachePutResponse(request.key)) sendMessageAndFlush(ctx, CachePutResponse(request.key))
} }
} }

View File

@@ -3,6 +3,7 @@ package net.woggioni.rbcs.server.cache
import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelHandlerContext
import io.netty.channel.SimpleChannelInboundHandler import io.netty.channel.SimpleChannelInboundHandler
import net.woggioni.rbcs.api.CacheHandler
import net.woggioni.rbcs.api.message.CacheMessage import net.woggioni.rbcs.api.message.CacheMessage
import net.woggioni.rbcs.api.message.CacheMessage.CacheContent import net.woggioni.rbcs.api.message.CacheMessage.CacheContent
import net.woggioni.rbcs.api.message.CacheMessage.CacheGetRequest import net.woggioni.rbcs.api.message.CacheMessage.CacheGetRequest
@@ -13,6 +14,7 @@ import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse
import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent
import net.woggioni.rbcs.common.ByteBufOutputStream import net.woggioni.rbcs.common.ByteBufOutputStream
import net.woggioni.rbcs.common.RBCS.processCacheKey import net.woggioni.rbcs.common.RBCS.processCacheKey
import net.woggioni.rbcs.common.trace
import java.util.zip.Deflater import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream import java.util.zip.DeflaterOutputStream
import java.util.zip.InflaterOutputStream import java.util.zip.InflaterOutputStream
@@ -22,7 +24,7 @@ class InMemoryCacheHandler(
private val digestAlgorithm: String?, private val digestAlgorithm: String?,
private val compressionEnabled: Boolean, private val compressionEnabled: Boolean,
private val compressionLevel: Int private val compressionLevel: Int
) : SimpleChannelInboundHandler<CacheMessage>() { ) : CacheHandler() {
private interface InProgressPutRequest : AutoCloseable { private interface InProgressPutRequest : AutoCloseable {
val request: CachePutRequest val request: CachePutRequest
@@ -86,7 +88,7 @@ class InMemoryCacheHandler(
private fun handleGetRequest(ctx: ChannelHandlerContext, msg: CacheGetRequest) { private fun handleGetRequest(ctx: ChannelHandlerContext, msg: CacheGetRequest) {
cache.get(processCacheKey(msg.key, digestAlgorithm))?.let { value -> cache.get(processCacheKey(msg.key, digestAlgorithm))?.let { value ->
ctx.writeAndFlush(CacheValueFoundResponse(msg.key, value.metadata)) sendMessageAndFlush(ctx, CacheValueFoundResponse(msg.key, value.metadata))
if (compressionEnabled) { if (compressionEnabled) {
val buf = ctx.alloc().heapBuffer() val buf = ctx.alloc().heapBuffer()
InflaterOutputStream(ByteBufOutputStream(buf)).use { InflaterOutputStream(ByteBufOutputStream(buf)).use {
@@ -94,11 +96,11 @@ class InMemoryCacheHandler(
value.content.release() value.content.release()
buf.retain() buf.retain()
} }
ctx.writeAndFlush(LastCacheContent(buf)) sendMessage(ctx, LastCacheContent(buf))
} else { } else {
ctx.writeAndFlush(LastCacheContent(value.content)) sendMessage(ctx, LastCacheContent(value.content))
} }
} ?: ctx.writeAndFlush(CacheValueNotFoundResponse()) } ?: sendMessage(ctx, CacheValueNotFoundResponse())
} }
private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) { private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) {
@@ -122,7 +124,7 @@ class InMemoryCacheHandler(
inProgressRequest.close() inProgressRequest.close()
val cacheKey = processCacheKey(inProgressRequest.request.key, digestAlgorithm) val cacheKey = processCacheKey(inProgressRequest.request.key, digestAlgorithm)
cache.put(cacheKey, CacheEntry(inProgressRequest.request.metadata, buf)) cache.put(cacheKey, CacheEntry(inProgressRequest.request.metadata, buf))
ctx.writeAndFlush(CachePutResponse(inProgressRequest.request.key)) sendMessageAndFlush(ctx, CachePutResponse(inProgressRequest.request.key))
} }
} }

View File

@@ -1,4 +0,0 @@
package net.woggioni.rbcs.server.event
class RequestCompletedEvent {
}

View File

@@ -1,79 +0,0 @@
package net.woggioni.rbcs.server.handler
import io.netty.channel.ChannelHandler
import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelOutboundHandler
import io.netty.channel.ChannelPromise
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.http.HttpContent
import io.netty.handler.codec.http.LastHttpContent
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse
import net.woggioni.rbcs.api.message.CacheMessage.CacheContent
import net.woggioni.rbcs.api.message.CacheMessage.CachePutResponse
import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent
import java.net.SocketAddress
class CacheContentHandler(private val pairedHandler : ChannelHandler) : SimpleChannelInboundHandler<HttpContent>(), ChannelOutboundHandler {
private var requestFinished = false
override fun channelRead0(ctx: ChannelHandlerContext, msg: HttpContent) {
if(requestFinished) {
ctx.fireChannelRead(msg.retain())
} else {
when (msg) {
is LastHttpContent -> {
ctx.fireChannelRead(LastCacheContent(msg.content().retain()))
requestFinished = true
}
else -> ctx.fireChannelRead(CacheContent(msg.content().retain()))
}
}
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
super.exceptionCaught(ctx, cause)
}
override fun bind(ctx: ChannelHandlerContext, localAddress: SocketAddress, promise: ChannelPromise) {
ctx.bind(localAddress, promise)
}
override fun connect(
ctx: ChannelHandlerContext,
remoteAddress: SocketAddress,
localAddress: SocketAddress,
promise: ChannelPromise
) {
ctx.connect(remoteAddress, localAddress, promise)
}
override fun disconnect(ctx: ChannelHandlerContext, promise: ChannelPromise) {
ctx.disconnect(promise)
}
override fun close(ctx: ChannelHandlerContext, promise: ChannelPromise) {
ctx.close(promise)
}
override fun deregister(ctx: ChannelHandlerContext, promise: ChannelPromise) {
ctx.deregister(promise)
}
override fun read(ctx: ChannelHandlerContext) {
ctx.read()
}
override fun flush(ctx: ChannelHandlerContext) {
ctx.flush()
}
override fun write(ctx: ChannelHandlerContext, msg: Any, promise: ChannelPromise) {
ctx.write(msg, promise)
if(msg is LastCacheContent || msg is CachePutResponse || msg is CacheValueNotFoundResponse || msg is LastHttpContent) {
ctx.pipeline().remove(pairedHandler)
ctx.pipeline().remove(this)
}
}
}

View File

@@ -1,56 +0,0 @@
package net.woggioni.rbcs.server.handler
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.channel.ChannelOutboundHandler
import io.netty.channel.ChannelPromise
import net.woggioni.rbcs.server.event.RequestCompletedEvent
import java.net.SocketAddress
class ResponseCapHandler : ChannelInboundHandlerAdapter(), ChannelOutboundHandler {
val bufferedMessages = mutableListOf<Any>()
override fun bind(ctx: ChannelHandlerContext, localAddress: SocketAddress, promise: ChannelPromise) {
ctx.bind(localAddress, promise)
}
override fun connect(
ctx: ChannelHandlerContext,
remoteAddress: SocketAddress,
localAddress: SocketAddress,
promise: ChannelPromise
) {
ctx.connect(remoteAddress, localAddress, promise)
}
override fun disconnect(ctx: ChannelHandlerContext, promise: ChannelPromise) {
ctx.disconnect(promise)
}
override fun close(ctx: ChannelHandlerContext, promise: ChannelPromise) {
ctx.close(promise)
}
override fun deregister(ctx: ChannelHandlerContext, promise: ChannelPromise) {
ctx.deregister(promise)
}
override fun read(ctx: ChannelHandlerContext) {
ctx.read()
}
override fun write(ctx: ChannelHandlerContext, msg: Any, promise: ChannelPromise) {
bufferedMessages.add(msg)
}
override fun flush(ctx: ChannelHandlerContext) {
}
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
if(evt is RequestCompletedEvent) {
for(msg in bufferedMessages) ctx.write(msg)
ctx.flush()
ctx.pipeline().remove(this)
}
}
}

View File

@@ -8,6 +8,7 @@ import io.netty.handler.codec.http.DefaultFullHttpResponse
import io.netty.handler.codec.http.DefaultHttpContent import io.netty.handler.codec.http.DefaultHttpContent
import io.netty.handler.codec.http.DefaultHttpResponse import io.netty.handler.codec.http.DefaultHttpResponse
import io.netty.handler.codec.http.DefaultLastHttpContent import io.netty.handler.codec.http.DefaultLastHttpContent
import io.netty.handler.codec.http.HttpContent
import io.netty.handler.codec.http.HttpHeaderNames import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpHeaderValues import io.netty.handler.codec.http.HttpHeaderValues
import io.netty.handler.codec.http.HttpHeaders import io.netty.handler.codec.http.HttpHeaders
@@ -17,7 +18,6 @@ import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpUtil import io.netty.handler.codec.http.HttpUtil
import io.netty.handler.codec.http.HttpVersion import io.netty.handler.codec.http.HttpVersion
import io.netty.handler.codec.http.LastHttpContent import io.netty.handler.codec.http.LastHttpContent
import net.woggioni.rbcs.api.CacheHandlerFactory
import net.woggioni.rbcs.api.CacheValueMetadata import net.woggioni.rbcs.api.CacheValueMetadata
import net.woggioni.rbcs.api.message.CacheMessage import net.woggioni.rbcs.api.message.CacheMessage
import net.woggioni.rbcs.api.message.CacheMessage.CacheContent import net.woggioni.rbcs.api.message.CacheMessage.CacheContent
@@ -30,10 +30,8 @@ import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent
import net.woggioni.rbcs.common.createLogger import net.woggioni.rbcs.common.createLogger
import net.woggioni.rbcs.common.debug import net.woggioni.rbcs.common.debug
import net.woggioni.rbcs.common.warn import net.woggioni.rbcs.common.warn
import net.woggioni.rbcs.server.event.RequestCompletedEvent
import net.woggioni.rbcs.server.exception.ExceptionHandler import net.woggioni.rbcs.server.exception.ExceptionHandler
import java.nio.file.Path import java.nio.file.Path
import java.util.Locale
class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupplier : () -> ChannelHandler) : class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupplier : () -> ChannelHandler) :
ChannelDuplexHandler() { ChannelDuplexHandler() {
@@ -47,6 +45,15 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp
private var keepAlive = true private var keepAlive = true
private var pipelinedRequests = 0 private var pipelinedRequests = 0
private fun newRequest() {
pipelinedRequests += 1
}
private fun requestCompleted(ctx : ChannelHandlerContext) {
pipelinedRequests -= 1
if(pipelinedRequests == 0) ctx.read()
}
private fun resetRequestMetadata() { private fun resetRequestMetadata() {
httpVersion = HttpVersion.HTTP_1_1 httpVersion = HttpVersion.HTTP_1_1
keepAlive = true keepAlive = true
@@ -65,22 +72,44 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp
} }
} }
private var cacheRequestInProgress : Boolean = false
override fun handlerAdded(ctx: ChannelHandlerContext) {
ctx.read()
}
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) { override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
when (msg) { when (msg) {
is HttpRequest -> handleRequest(ctx, msg) is HttpRequest -> handleRequest(ctx, msg)
is HttpContent -> {
if(cacheRequestInProgress) {
if(msg is LastHttpContent) {
super.channelRead(ctx, LastCacheContent(msg.content().retain()))
cacheRequestInProgress = false
} else {
super.channelRead(ctx, CacheContent(msg.content().retain()))
}
msg.release()
} else {
super.channelRead(ctx, msg)
}
}
else -> super.channelRead(ctx, msg) else -> super.channelRead(ctx, msg)
} }
} }
override fun channelReadComplete(ctx: ChannelHandlerContext) {
super.channelReadComplete(ctx)
if(cacheRequestInProgress) {
ctx.read()
}
}
override fun write(ctx: ChannelHandlerContext, msg: Any, promise: ChannelPromise?) { override fun write(ctx: ChannelHandlerContext, msg: Any, promise: ChannelPromise?) {
if (msg is CacheMessage) { if (msg is CacheMessage) {
try { try {
when (msg) { when (msg) {
is CachePutResponse -> { is CachePutResponse -> {
pipelinedRequests -= 1
ctx.fireUserEventTriggered(RequestCompletedEvent())
val response = DefaultFullHttpResponse(httpVersion, HttpResponseStatus.CREATED) val response = DefaultFullHttpResponse(httpVersion, HttpResponseStatus.CREATED)
val keyBytes = msg.key.toByteArray(Charsets.UTF_8) val keyBytes = msg.key.toByteArray(Charsets.UTF_8)
response.headers().apply { response.headers().apply {
@@ -92,16 +121,18 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp
val buf = ctx.alloc().buffer(keyBytes.size).apply { val buf = ctx.alloc().buffer(keyBytes.size).apply {
writeBytes(keyBytes) writeBytes(keyBytes)
} }
ctx.writeAndFlush(DefaultLastHttpContent(buf)) ctx.writeAndFlush(DefaultLastHttpContent(buf)).also {
requestCompleted(ctx)
}
} }
is CacheValueNotFoundResponse -> { is CacheValueNotFoundResponse -> {
pipelinedRequests -= 1
ctx.fireUserEventTriggered(RequestCompletedEvent())
val response = DefaultFullHttpResponse(httpVersion, HttpResponseStatus.NOT_FOUND) val response = DefaultFullHttpResponse(httpVersion, HttpResponseStatus.NOT_FOUND)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0 response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
setKeepAliveHeader(response.headers()) setKeepAliveHeader(response.headers())
ctx.writeAndFlush(response) ctx.writeAndFlush(response).also {
requestCompleted(ctx)
}
} }
is CacheValueFoundResponse -> { is CacheValueFoundResponse -> {
@@ -118,9 +149,9 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp
} }
is LastCacheContent -> { is LastCacheContent -> {
pipelinedRequests -= 1 ctx.writeAndFlush(DefaultLastHttpContent(msg.content())).also {
ctx.fireUserEventTriggered(RequestCompletedEvent()) requestCompleted(ctx)
ctx.writeAndFlush(DefaultLastHttpContent(msg.content())) }
} }
is CacheContent -> { is CacheContent -> {
@@ -140,9 +171,8 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp
resetRequestMetadata() resetRequestMetadata()
} }
} else if(msg is LastHttpContent) { } else if(msg is LastHttpContent) {
pipelinedRequests -= 1
ctx.fireUserEventTriggered(RequestCompletedEvent())
ctx.write(msg, promise) ctx.write(msg, promise)
requestCompleted(ctx)
} else super.write(ctx, msg, promise) } else super.write(ctx, msg, promise)
} }
@@ -153,15 +183,12 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp
if (method === HttpMethod.GET) { if (method === HttpMethod.GET) {
val path = Path.of(msg.uri()).normalize() val path = Path.of(msg.uri()).normalize()
if (path.startsWith(serverPrefix)) { if (path.startsWith(serverPrefix)) {
cacheRequestInProgress = true
val relativePath = serverPrefix.relativize(path) val relativePath = serverPrefix.relativize(path)
val key = relativePath.toString() val key : String = relativePath.toString()
if(pipelinedRequests > 0) { newRequest()
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, ResponseCapHandler())
}
val cacheHandler = cacheHandlerSupplier() val cacheHandler = cacheHandlerSupplier()
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, CacheContentHandler(cacheHandler))
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, cacheHandler) ctx.pipeline().addBefore(ExceptionHandler.NAME, null, cacheHandler)
pipelinedRequests += 1
key.let(::CacheGetRequest) key.let(::CacheGetRequest)
.let(ctx::fireChannelRead) .let(ctx::fireChannelRead)
?: ctx.channel().write(CacheValueNotFoundResponse()) ?: ctx.channel().write(CacheValueNotFoundResponse())
@@ -176,18 +203,15 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp
} else if (method === HttpMethod.PUT) { } else if (method === HttpMethod.PUT) {
val path = Path.of(msg.uri()).normalize() val path = Path.of(msg.uri()).normalize()
if (path.startsWith(serverPrefix)) { if (path.startsWith(serverPrefix)) {
cacheRequestInProgress = true
val relativePath = serverPrefix.relativize(path) val relativePath = serverPrefix.relativize(path)
val key = relativePath.toString() val key = relativePath.toString()
log.debug(ctx) { log.debug(ctx) {
"Added value for key '$key' to build cache" "Added value for key '$key' to build cache"
} }
if(pipelinedRequests > 0) { newRequest()
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, ResponseCapHandler())
}
val cacheHandler = cacheHandlerSupplier() val cacheHandler = cacheHandlerSupplier()
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, CacheContentHandler(cacheHandler))
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, cacheHandler) ctx.pipeline().addBefore(ExceptionHandler.NAME, null, cacheHandler)
pipelinedRequests += 1
path.fileName?.toString() path.fileName?.toString()
?.let { ?.let {
@@ -205,11 +229,8 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp
ctx.writeAndFlush(response) ctx.writeAndFlush(response)
} }
} else if (method == HttpMethod.TRACE) { } else if (method == HttpMethod.TRACE) {
if(pipelinedRequests > 0) { newRequest()
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, ResponseCapHandler())
}
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, TraceHandler) ctx.pipeline().addBefore(ExceptionHandler.NAME, null, TraceHandler)
pipelinedRequests += 1
super.channelRead(ctx, msg) super.channelRead(ctx, msg)
} else { } else {
log.warn(ctx) { log.warn(ctx) {