Compare commits

..

4 Commits

Author SHA1 Message Date
ac156c68eb fixed memory leak in MemcachedCacheHandler
All checks were successful
CI / build (push) Successful in 2m53s
2025-03-09 22:18:20 +08:00
9600dd7e4f solved issue with ignored HttpContent and HttpCacheContent messages in the Netty pipeline
All checks were successful
CI / build (push) Successful in 12m48s
2025-03-09 13:57:52 +08:00
729276a2b1 fixed native image configuration
All checks were successful
CI / build (push) Successful in 14m12s
2025-03-08 14:35:50 +08:00
7ba7070693 fixed server support for request pipelining
All checks were successful
CI / build (push) Successful in 15m33s
2025-03-08 11:07:21 +08:00
34 changed files with 460 additions and 419 deletions

View File

@@ -4,7 +4,7 @@ org.gradle.caching=true
rbcs.version = 0.2.0 rbcs.version = 0.2.0
lys.version = 2025.03.03 lys.version = 2025.03.08
gitea.maven.url = https://gitea.woggioni.net/api/packages/woggioni/maven gitea.maven.url = https://gitea.woggioni.net/api/packages/woggioni/maven
docker.registry.url=gitea.woggioni.net docker.registry.url=gitea.woggioni.net

View File

@@ -5,9 +5,12 @@ plugins {
} }
dependencies { dependencies {
implementation catalog.slf4j.api
implementation project(':rbcs-common')
api catalog.netty.common api catalog.netty.common
api catalog.netty.buffer api catalog.netty.buffer
api catalog.netty.handler api catalog.netty.handler
api catalog.netty.codec.http
} }
publishing { publishing {

View File

@@ -1,10 +1,15 @@
module net.woggioni.rbcs.api { module net.woggioni.rbcs.api {
requires static lombok; requires static lombok;
requires java.xml;
requires io.netty.buffer;
requires io.netty.handler; requires io.netty.handler;
requires io.netty.transport;
requires io.netty.common; requires io.netty.common;
requires net.woggioni.rbcs.common;
requires io.netty.transport;
requires io.netty.codec.http;
requires io.netty.buffer;
requires org.slf4j;
requires java.xml;
exports net.woggioni.rbcs.api; exports net.woggioni.rbcs.api;
exports net.woggioni.rbcs.api.exception; exports net.woggioni.rbcs.api.exception;
exports net.woggioni.rbcs.api.message; exports net.woggioni.rbcs.api.message;

View File

@@ -0,0 +1,57 @@
package net.woggioni.rbcs.api;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.ReferenceCounted;
import lombok.extern.slf4j.Slf4j;
import net.woggioni.rbcs.api.message.CacheMessage;
@Slf4j
public abstract class CacheHandler extends ChannelInboundHandlerAdapter {
private boolean requestFinished = false;
abstract protected void channelRead0(ChannelHandlerContext ctx, CacheMessage msg);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if(!requestFinished && msg instanceof CacheMessage) {
if(msg instanceof CacheMessage.LastCacheContent) requestFinished = true;
try {
channelRead0(ctx, (CacheMessage) msg);
} finally {
if(msg instanceof ReferenceCounted rc) rc.release();
}
} else {
ctx.fireChannelRead(msg);
}
}
protected void sendMessageAndFlush(ChannelHandlerContext ctx, Object msg) {
sendMessage(ctx, msg, true);
}
protected void sendMessage(ChannelHandlerContext ctx, Object msg) {
sendMessage(ctx, msg, false);
}
private void sendMessage(ChannelHandlerContext ctx, Object msg, boolean flush) {
ctx.write(msg);
if(
msg instanceof CacheMessage.LastCacheContent ||
msg instanceof CacheMessage.CachePutResponse ||
msg instanceof CacheMessage.CacheValueNotFoundResponse ||
msg instanceof LastHttpContent
) {
ctx.flush();
ctx.pipeline().remove(this);
} else if(flush) {
ctx.flush();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}

View File

@@ -1,13 +1,12 @@
package net.woggioni.rbcs.api; package net.woggioni.rbcs.api;
import io.netty.channel.ChannelFactory; import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
public interface CacheHandlerFactory extends AsyncCloseable { public interface CacheHandlerFactory extends AsyncCloseable {
ChannelHandler newHandler( CacheHandler newHandler(
Configuration configuration, Configuration configuration,
EventLoopGroup eventLoopGroup, EventLoopGroup eventLoopGroup,
ChannelFactory<SocketChannel> socketChannelFactory, ChannelFactory<SocketChannel> socketChannelFactory,

View File

@@ -1,10 +1,9 @@
package net.woggioni.rbcs.api; package net.woggioni.rbcs.api;
import java.io.Serializable;
import lombok.Getter; import lombok.Getter;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import java.io.Serializable;
@Getter @Getter
@RequiredArgsConstructor @RequiredArgsConstructor
public class CacheValueMetadata implements Serializable { public class CacheValueMetadata implements Serializable {

View File

@@ -1,16 +1,15 @@
package net.woggioni.rbcs.api; package net.woggioni.rbcs.api;
import lombok.EqualsAndHashCode;
import lombok.NonNull;
import lombok.Value;
import java.nio.file.Path; import java.nio.file.Path;
import java.security.cert.X509Certificate; import java.security.cert.X509Certificate;
import java.time.Duration; import java.time.Duration;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import lombok.EqualsAndHashCode;
import lombok.NonNull;
import lombok.Value;
@Value @Value
public class Configuration { public class Configuration {

View File

@@ -105,6 +105,7 @@ tasks.named(NativeImagePlugin.CONFIGURE_NATIVE_IMAGE_TASK_NAME, NativeImageConfi
systemProperty('io.netty.leakDetectionLevel', 'DISABLED') systemProperty('io.netty.leakDetectionLevel', 'DISABLED')
modularity.inferModulePath = false modularity.inferModulePath = false
enabled = true enabled = true
systemProperty('gradle.tmp.dir', temporaryDir.toString())
} }
nativeImage { nativeImage {

View File

@@ -487,6 +487,10 @@
"name":"jdk.internal.misc.Unsafe", "name":"jdk.internal.misc.Unsafe",
"methods":[{"name":"getUnsafe","parameterTypes":[] }] "methods":[{"name":"getUnsafe","parameterTypes":[] }]
}, },
{
"name":"net.woggioni.rbcs.api.CacheHandler",
"methods":[{"name":"channelRead","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] }]
},
{ {
"name":"net.woggioni.rbcs.cli.RemoteBuildCacheServerCli", "name":"net.woggioni.rbcs.cli.RemoteBuildCacheServerCli",
"allDeclaredFields":true, "allDeclaredFields":true,
@@ -552,11 +556,7 @@
}, },
{ {
"name":"net.woggioni.rbcs.client.RemoteBuildCacheClient$sendRequest$1$operationComplete$responseHandler$1", "name":"net.woggioni.rbcs.client.RemoteBuildCacheClient$sendRequest$1$operationComplete$responseHandler$1",
"methods":[{"name":"channelInactive","parameterTypes":["io.netty.channel.ChannelHandlerContext"] }, {"name":"exceptionCaught","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Throwable"] }] "methods":[{"name":"channelInactive","parameterTypes":["io.netty.channel.ChannelHandlerContext"] }, {"name":"exceptionCaught","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Throwable"] }, {"name":"userEventTriggered","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] }]
},
{
"name":"net.woggioni.rbcs.client.RemoteBuildCacheClient$sendRequest$1$operationComplete$timeoutHandler$1",
"methods":[{"name":"userEventTriggered","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] }]
}, },
{ {
"name":"net.woggioni.rbcs.server.RemoteBuildCacheServer$HttpChunkContentCompressor", "name":"net.woggioni.rbcs.server.RemoteBuildCacheServer$HttpChunkContentCompressor",
@@ -588,17 +588,13 @@
"name":"net.woggioni.rbcs.server.exception.ExceptionHandler", "name":"net.woggioni.rbcs.server.exception.ExceptionHandler",
"methods":[{"name":"exceptionCaught","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Throwable"] }] "methods":[{"name":"exceptionCaught","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Throwable"] }]
}, },
{
"name":"net.woggioni.rbcs.server.handler.CacheContentHandler",
"methods":[{"name":"exceptionCaught","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Throwable"] }]
},
{ {
"name":"net.woggioni.rbcs.server.handler.MaxRequestSizeHandler", "name":"net.woggioni.rbcs.server.handler.MaxRequestSizeHandler",
"methods":[{"name":"channelRead","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] }] "methods":[{"name":"channelRead","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] }]
}, },
{ {
"name":"net.woggioni.rbcs.server.handler.ServerHandler", "name":"net.woggioni.rbcs.server.handler.ServerHandler",
"methods":[{"name":"channelRead","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] }, {"name":"exceptionCaught","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Throwable"] }, {"name":"write","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object","io.netty.channel.ChannelPromise"] }] "methods":[{"name":"channelRead","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] }, {"name":"channelReadComplete","parameterTypes":["io.netty.channel.ChannelHandlerContext"] }, {"name":"exceptionCaught","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Throwable"] }, {"name":"write","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object","io.netty.channel.ChannelPromise"] }]
}, },
{ {
"name":"net.woggioni.rbcs.server.handler.TraceHandler", "name":"net.woggioni.rbcs.server.handler.TraceHandler",

View File

@@ -32,7 +32,7 @@ object GraalNativeImageConfiguration {
@JvmStatic @JvmStatic
fun main(vararg args : String) { fun main(vararg args : String) {
val serverURL = URI.create("file:conf/rbcs-client.xml").toURL() val serverURL = URI.create("file:conf/rbcs-server.xml").toURL()
val serverDoc = serverURL.openStream().use { val serverDoc = serverURL.openStream().use {
Xml.parseXml(serverURL, it) Xml.parseXml(serverURL, it)
} }
@@ -71,7 +71,6 @@ object GraalNativeImageConfiguration {
compressionLevel = Deflater.DEFAULT_COMPRESSION, compressionLevel = Deflater.DEFAULT_COMPRESSION,
compressionEnabled = false, compressionEnabled = false,
maxSize = 0x1000000, maxSize = 0x1000000,
chunkSize = 0x1000
), ),
FileSystemCacheConfiguration( FileSystemCacheConfiguration(
Path.of(System.getProperty("java.io.tmpdir")).resolve("rbcs"), Path.of(System.getProperty("java.io.tmpdir")).resolve("rbcs"),
@@ -79,7 +78,6 @@ object GraalNativeImageConfiguration {
digestAlgorithm = "MD5", digestAlgorithm = "MD5",
compressionLevel = Deflater.DEFAULT_COMPRESSION, compressionLevel = Deflater.DEFAULT_COMPRESSION,
compressionEnabled = false, compressionEnabled = false,
chunkSize = 0x1000
), ),
MemcacheCacheConfiguration( MemcacheCacheConfiguration(
listOf(MemcacheCacheConfiguration.Server( listOf(MemcacheCacheConfiguration.Server(
@@ -91,7 +89,6 @@ object GraalNativeImageConfiguration {
"MD5", "MD5",
null, null,
1, 1,
0x1000
) )
) )
@@ -107,6 +104,7 @@ object GraalNativeImageConfiguration {
Duration.ofSeconds(15), Duration.ofSeconds(15),
Duration.ofSeconds(15), Duration.ofSeconds(15),
0x10000, 0x10000,
0x1000
), ),
users.asSequence().map { it.name to it }.toMap(), users.asSequence().map { it.name to it }.toMap(),
sequenceOf(writersGroup, readersGroup).map { it.name to it }.toMap(), sequenceOf(writersGroup, readersGroup).map { it.name to it }.toMap(),
@@ -127,7 +125,6 @@ object GraalNativeImageConfiguration {
"MD5", "MD5",
null, null,
1, 1,
0x1000
) )
val serverHandle = RemoteBuildCacheServer(serverConfiguration).run() val serverHandle = RemoteBuildCacheServer(serverConfiguration).run()
@@ -135,7 +132,12 @@ object GraalNativeImageConfiguration {
val clientProfile = ClientConfiguration.Profile( val clientProfile = ClientConfiguration.Profile(
URI.create("http://127.0.0.1:$serverPort/"), URI.create("http://127.0.0.1:$serverPort/"),
null, ClientConfiguration.Connection(
Duration.ofSeconds(5),
Duration.ofSeconds(5),
Duration.ofSeconds(7),
true,
),
ClientConfiguration.Authentication.BasicAuthenticationCredentials("user3", PASSWORD), ClientConfiguration.Authentication.BasicAuthenticationCredentials("user3", PASSWORD),
Duration.ofSeconds(3), Duration.ofSeconds(3),
10, 10,
@@ -177,6 +179,8 @@ object GraalNativeImageConfiguration {
} catch (ee : ExecutionException) { } catch (ee : ExecutionException) {
} }
} }
RemoteBuildCacheServerCli.main("--help") System.setProperty("net.woggioni.rbcs.conf.dir", System.getProperty("gradle.tmp.dir"))
RemoteBuildCacheServerCli.createCommandLine().execute("--version")
RemoteBuildCacheServerCli.createCommandLine().execute("server", "-t", "PT10S")
} }
} }

View File

@@ -26,8 +26,8 @@ class RemoteBuildCacheServerCli : RbcsCommand() {
private fun setPropertyIfNotPresent(key: String, value: String) { private fun setPropertyIfNotPresent(key: String, value: String) {
System.getProperty(key) ?: System.setProperty(key, value) System.getProperty(key) ?: System.setProperty(key, value)
} }
@JvmStatic
fun main(vararg args: String) { fun createCommandLine() : CommandLine {
setPropertyIfNotPresent("logback.configurationFile", "net/woggioni/rbcs/cli/logback.xml") setPropertyIfNotPresent("logback.configurationFile", "net/woggioni/rbcs/cli/logback.xml")
setPropertyIfNotPresent("io.netty.leakDetectionLevel", "DISABLED") setPropertyIfNotPresent("io.netty.leakDetectionLevel", "DISABLED")
val currentClassLoader = RemoteBuildCacheServerCli::class.java.classLoader val currentClassLoader = RemoteBuildCacheServerCli::class.java.classLoader
@@ -56,7 +56,12 @@ class RemoteBuildCacheServerCli : RbcsCommand() {
addSubcommand(GetCommand()) addSubcommand(GetCommand())
addSubcommand(HealthCheckCommand()) addSubcommand(HealthCheckCommand())
}) })
System.exit(commandLine.execute(*args)) return commandLine
}
@JvmStatic
fun main(vararg args: String) {
System.exit(createCommandLine().execute(*args))
} }
} }

View File

@@ -6,7 +6,6 @@ import net.woggioni.rbcs.client.Configuration
import net.woggioni.rbcs.common.createLogger import net.woggioni.rbcs.common.createLogger
import net.woggioni.rbcs.common.debug import net.woggioni.rbcs.common.debug
import picocli.CommandLine import picocli.CommandLine
import java.lang.IllegalArgumentException
import java.nio.file.Path import java.nio.file.Path
@CommandLine.Command( @CommandLine.Command(

View File

@@ -38,11 +38,12 @@ data class Configuration(
val readIdleTimeout: Duration, val readIdleTimeout: Duration,
val writeIdleTimeout: Duration, val writeIdleTimeout: Duration,
val idleTimeout: Duration, val idleTimeout: Duration,
val requestPipelining : Boolean,
) )
data class Profile( data class Profile(
val serverURI: URI, val serverURI: URI,
val connection: Connection?, val connection: Connection,
val authentication: Authentication?, val authentication: Authentication?,
val connectionTimeout: Duration?, val connectionTimeout: Duration?,
val maxConnections: Int, val maxConnections: Int,

View File

@@ -318,6 +318,9 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
) { ) {
pipeline.remove(this) pipeline.remove(this)
responseFuture.complete(response) responseFuture.complete(response)
if(!profile.connection.requestPipelining) {
pool.release(channel)
}
} }
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
@@ -332,6 +335,9 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
override fun channelInactive(ctx: ChannelHandlerContext) { override fun channelInactive(ctx: ChannelHandlerContext) {
responseFuture.completeExceptionally(IOException("The remote server closed the connection")) responseFuture.completeExceptionally(IOException("The remote server closed the connection"))
if(!profile.connection.requestPipelining) {
pool.release(channel)
}
super.channelInactive(ctx) super.channelInactive(ctx)
} }
@@ -352,6 +358,9 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
if (this === pipeline.last()) { if (this === pipeline.last()) {
ctx.close() ctx.close()
} }
if(!profile.connection.requestPipelining) {
pool.release(channel)
}
} else { } else {
super.userEventTriggered(ctx, evt) super.userEventTriggered(ctx, evt)
} }
@@ -401,8 +410,9 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
val ex = it.cause() val ex = it.cause()
log.warn(ex.message, ex) log.warn(ex.message, ex)
} }
if(profile.connection.requestPipelining) {
pool.release(channel) pool.release(channel)
}
} }
} else { } else {
responseFuture.completeExceptionally(channelFuture.cause()) responseFuture.completeExceptionally(channelFuture.cause())

View File

@@ -30,7 +30,12 @@ object Parser {
?: throw ConfigurationException("base-url attribute is required") ?: throw ConfigurationException("base-url attribute is required")
var authentication: Configuration.Authentication? = null var authentication: Configuration.Authentication? = null
var retryPolicy: Configuration.RetryPolicy? = null var retryPolicy: Configuration.RetryPolicy? = null
var connection : Configuration.Connection? = null var connection : Configuration.Connection = Configuration.Connection(
Duration.ofSeconds(60),
Duration.ofSeconds(60),
Duration.ofSeconds(30),
false
)
var trustStore : Configuration.TrustStore? = null var trustStore : Configuration.TrustStore? = null
for (gchild in child.asIterable()) { for (gchild in child.asIterable()) {
when (gchild.localName) { when (gchild.localName) {
@@ -97,10 +102,13 @@ object Parser {
?.let(Duration::parse) ?: Duration.of(60, ChronoUnit.SECONDS) ?.let(Duration::parse) ?: Duration.of(60, ChronoUnit.SECONDS)
val writeIdleTimeout = gchild.renderAttribute("write-idle-timeout") val writeIdleTimeout = gchild.renderAttribute("write-idle-timeout")
?.let(Duration::parse) ?: Duration.of(60, ChronoUnit.SECONDS) ?.let(Duration::parse) ?: Duration.of(60, ChronoUnit.SECONDS)
val requestPipelining = gchild.renderAttribute("request-pipelining")
?.let(String::toBoolean) ?: false
connection = Configuration.Connection( connection = Configuration.Connection(
readIdleTimeout, readIdleTimeout,
writeIdleTimeout, writeIdleTimeout,
idleTimeout, idleTimeout,
requestPipelining
) )
} }

View File

@@ -123,6 +123,13 @@
</xs:documentation> </xs:documentation>
</xs:annotation> </xs:annotation>
</xs:attribute> </xs:attribute>
<xs:attribute name="request-pipelining" type="xs:boolean" use="optional" default="false">
<xs:annotation>
<xs:documentation>
Enables HTTP/1.1 request pipelining
</xs:documentation>
</xs:annotation>
</xs:attribute>
</xs:complexType> </xs:complexType>
<xs:complexType name="noAuthType"> <xs:complexType name="noAuthType">

View File

@@ -129,7 +129,7 @@ class RetryTest {
previousAttempt.first + testArgs.initialDelay * Math.pow(testArgs.exp, index.toDouble()) * 1e6 previousAttempt.first + testArgs.initialDelay * Math.pow(testArgs.exp, index.toDouble()) * 1e6
val actualTimestamp = timestamp val actualTimestamp = timestamp
val err = Math.abs(expectedTimestamp - actualTimestamp) / expectedTimestamp val err = Math.abs(expectedTimestamp - actualTimestamp) / expectedTimestamp
Assertions.assertTrue(err < 0.1) Assertions.assertTrue(err < 0.5)
} }
if (index == attempts.size - 1 && index < testArgs.maxAttempt - 1) { if (index == attempts.size - 1 && index < testArgs.maxAttempt - 1) {
/* /*

View File

@@ -6,7 +6,7 @@ plugins {
} }
dependencies { dependencies {
implementation project(':rbcs-api') implementation catalog.netty.transport
implementation catalog.slf4j.api implementation catalog.slf4j.api
implementation catalog.jwo implementation catalog.jwo
implementation catalog.netty.buffer implementation catalog.netty.buffer

View File

@@ -1,11 +1,11 @@
package net.woggioni.rbcs.server.memcache package net.woggioni.rbcs.server.memcache
import io.netty.channel.ChannelFactory import io.netty.channel.ChannelFactory
import io.netty.channel.ChannelHandler
import io.netty.channel.EventLoopGroup import io.netty.channel.EventLoopGroup
import io.netty.channel.pool.FixedChannelPool import io.netty.channel.pool.FixedChannelPool
import io.netty.channel.socket.DatagramChannel import io.netty.channel.socket.DatagramChannel
import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.SocketChannel
import net.woggioni.rbcs.api.CacheHandler
import net.woggioni.rbcs.api.CacheHandlerFactory import net.woggioni.rbcs.api.CacheHandlerFactory
import net.woggioni.rbcs.api.Configuration import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.common.HostAndPort import net.woggioni.rbcs.common.HostAndPort
@@ -51,7 +51,7 @@ data class MemcacheCacheConfiguration(
eventLoop: EventLoopGroup, eventLoop: EventLoopGroup,
socketChannelFactory: ChannelFactory<SocketChannel>, socketChannelFactory: ChannelFactory<SocketChannel>,
datagramChannelFactory: ChannelFactory<DatagramChannel>, datagramChannelFactory: ChannelFactory<DatagramChannel>,
): ChannelHandler { ): CacheHandler {
return MemcacheCacheHandler( return MemcacheCacheHandler(
MemcacheClient( MemcacheClient(
this@MemcacheCacheConfiguration.servers, this@MemcacheCacheConfiguration.servers,

View File

@@ -4,7 +4,6 @@ import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.CompositeByteBuf import io.netty.buffer.CompositeByteBuf
import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelHandlerContext
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.memcache.DefaultLastMemcacheContent import io.netty.handler.codec.memcache.DefaultLastMemcacheContent
import io.netty.handler.codec.memcache.DefaultMemcacheContent import io.netty.handler.codec.memcache.DefaultMemcacheContent
import io.netty.handler.codec.memcache.LastMemcacheContent import io.netty.handler.codec.memcache.LastMemcacheContent
@@ -13,6 +12,7 @@ import io.netty.handler.codec.memcache.binary.BinaryMemcacheOpcodes
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus
import io.netty.handler.codec.memcache.binary.DefaultBinaryMemcacheRequest import io.netty.handler.codec.memcache.binary.DefaultBinaryMemcacheRequest
import net.woggioni.rbcs.api.CacheHandler
import net.woggioni.rbcs.api.CacheValueMetadata import net.woggioni.rbcs.api.CacheValueMetadata
import net.woggioni.rbcs.api.exception.ContentTooLargeException import net.woggioni.rbcs.api.exception.ContentTooLargeException
import net.woggioni.rbcs.api.message.CacheMessage import net.woggioni.rbcs.api.message.CacheMessage
@@ -58,7 +58,7 @@ class MemcacheCacheHandler(
private val compressionLevel: Int, private val compressionLevel: Int,
private val chunkSize: Int, private val chunkSize: Int,
private val maxAge: Duration private val maxAge: Duration
) : SimpleChannelInboundHandler<CacheMessage>() { ) : CacheHandler() {
companion object { companion object {
private val log = createLogger<MemcacheCacheHandler>() private val log = createLogger<MemcacheCacheHandler>()
@@ -69,10 +69,14 @@ class MemcacheCacheHandler(
} }
} }
private interface InProgressRequest {
}
private inner class InProgressGetRequest( private inner class InProgressGetRequest(
private val key: String, val key: String,
private val ctx: ChannelHandlerContext private val ctx: ChannelHandlerContext
) { ) : InProgressRequest {
private val acc = ctx.alloc().compositeBuffer() private val acc = ctx.alloc().compositeBuffer()
private val chunk = ctx.alloc().compositeBuffer() private val chunk = ctx.alloc().compositeBuffer()
private val outputStream = ByteBufOutputStream(chunk).let { private val outputStream = ByteBufOutputStream(chunk).let {
@@ -98,32 +102,35 @@ class MemcacheCacheHandler(
acc.retain() acc.retain()
it.readObject() as CacheValueMetadata it.readObject() as CacheValueMetadata
} }
ctx.writeAndFlush(CacheValueFoundResponse(key, metadata)) log.trace(ctx) {
"Sending response from cache"
}
sendMessageAndFlush(ctx, CacheValueFoundResponse(key, metadata))
responseSent = true responseSent = true
acc.readerIndex(Int.SIZE_BYTES + mSize) acc.readerIndex(Int.SIZE_BYTES + mSize)
} }
if (responseSent) { if (responseSent) {
acc.readBytes(outputStream, acc.readableBytes()) acc.readBytes(outputStream, acc.readableBytes())
if(acc.readableBytes() >= chunkSize) { if (acc.readableBytes() >= chunkSize) {
flush(false) flush(false)
} }
} }
} }
private fun flush(last : Boolean) { private fun flush(last: Boolean) {
val toSend = extractChunk(chunk, ctx.alloc()) val toSend = extractChunk(chunk, ctx.alloc())
val msg = if(last) { val msg = if (last) {
log.trace(ctx) { log.trace(ctx) {
"Sending last chunk to client on channel ${ctx.channel().id().asShortText()}" "Sending last chunk to client"
} }
LastCacheContent(toSend) LastCacheContent(toSend)
} else { } else {
log.trace(ctx) { log.trace(ctx) {
"Sending chunk to client on channel ${ctx.channel().id().asShortText()}" "Sending chunk to client"
} }
CacheContent(toSend) CacheContent(toSend)
} }
ctx.writeAndFlush(msg) sendMessageAndFlush(ctx, msg)
} }
fun commit() { fun commit() {
@@ -141,14 +148,14 @@ class MemcacheCacheHandler(
} }
private inner class InProgressPutRequest( private inner class InProgressPutRequest(
private val ch : NettyChannel, private val ch: NettyChannel,
metadata : CacheValueMetadata, metadata: CacheValueMetadata,
val digest : ByteBuf, val digest: ByteBuf,
val requestController: CompletableFuture<MemcacheRequestController>, val requestController: CompletableFuture<MemcacheRequestController>,
private val alloc: ByteBufAllocator private val alloc: ByteBufAllocator
) { ) : InProgressRequest {
private var totalSize = 0 private var totalSize = 0
private var tmpFile : FileChannel? = null private var tmpFile: FileChannel? = null
private val accumulator = alloc.compositeBuffer() private val accumulator = alloc.compositeBuffer()
private val stream = ByteBufOutputStream(accumulator).let { private val stream = ByteBufOutputStream(accumulator).let {
if (compressionEnabled) { if (compressionEnabled) {
@@ -175,7 +182,7 @@ class MemcacheCacheHandler(
tmpFile?.let { tmpFile?.let {
flushToDisk(it, accumulator) flushToDisk(it, accumulator)
} }
if(accumulator.readableBytes() > 0x100000) { if (accumulator.readableBytes() > 0x100000) {
log.debug(ch) { log.debug(ch) {
"Entry is too big, buffering it into a file" "Entry is too big, buffering it into a file"
} }
@@ -192,18 +199,18 @@ class MemcacheCacheHandler(
} }
} }
private fun flushToDisk(fc : FileChannel, buf : CompositeByteBuf) { private fun flushToDisk(fc: FileChannel, buf: CompositeByteBuf) {
val chunk = extractChunk(buf, alloc) val chunk = extractChunk(buf, alloc)
fc.write(chunk.nioBuffer()) fc.write(chunk.nioBuffer())
chunk.release() chunk.release()
} }
fun commit() : Pair<Int, ReadableByteChannel> { fun commit(): Pair<Int, ReadableByteChannel> {
digest.release() digest.release()
accumulator.retain() accumulator.retain()
stream.close() stream.close()
val fileChannel = tmpFile val fileChannel = tmpFile
return if(fileChannel != null) { return if (fileChannel != null) {
flushToDisk(fileChannel, accumulator) flushToDisk(fileChannel, accumulator)
accumulator.release() accumulator.release()
fileChannel.position(0) fileChannel.position(0)
@@ -224,8 +231,7 @@ class MemcacheCacheHandler(
} }
} }
private var inProgressPutRequest: InProgressPutRequest? = null private var inProgressRequest: InProgressRequest? = null
private var inProgressGetRequest: InProgressGetRequest? = null
override fun channelRead0(ctx: ChannelHandlerContext, msg: CacheMessage) { override fun channelRead0(ctx: ChannelHandlerContext, msg: CacheMessage) {
when (msg) { when (msg) {
@@ -252,32 +258,39 @@ class MemcacheCacheHandler(
log.debug(ctx) { log.debug(ctx) {
"Cache hit for key ${msg.key} on memcache" "Cache hit for key ${msg.key} on memcache"
} }
inProgressGetRequest = InProgressGetRequest(msg.key, ctx) inProgressRequest = InProgressGetRequest(msg.key, ctx)
} }
BinaryMemcacheResponseStatus.KEY_ENOENT -> { BinaryMemcacheResponseStatus.KEY_ENOENT -> {
log.debug(ctx) { log.debug(ctx) {
"Cache miss for key ${msg.key} on memcache" "Cache miss for key ${msg.key} on memcache"
} }
ctx.writeAndFlush(CacheValueNotFoundResponse()) sendMessageAndFlush(ctx, CacheValueNotFoundResponse())
} }
} }
} }
override fun contentReceived(content: MemcacheContent) { override fun contentReceived(content: MemcacheContent) {
log.trace(ctx) { log.trace(ctx) {
"${if(content is LastMemcacheContent) "Last chunk" else "Chunk"} of ${content.content().readableBytes()} bytes received from memcache for key ${msg.key}" "${if (content is LastMemcacheContent) "Last chunk" else "Chunk"} of ${
content.content().readableBytes()
} bytes received from memcache for key ${msg.key}"
} }
inProgressGetRequest?.write(content.content()) (inProgressRequest as? InProgressGetRequest)?.let { inProgressGetRequest ->
if (content is LastMemcacheContent) { inProgressGetRequest.write(content.content())
inProgressGetRequest?.commit() if (content is LastMemcacheContent) {
inProgressRequest = null
inProgressGetRequest.commit()
}
} }
} }
override fun exceptionCaught(ex: Throwable) { override fun exceptionCaught(ex: Throwable) {
inProgressGetRequest?.let { (inProgressRequest as? InProgressGetRequest).let { inProgressGetRequest ->
inProgressGetRequest = null inProgressGetRequest?.let {
it.rollback() inProgressRequest = null
it.rollback()
}
} }
this@MemcacheCacheHandler.exceptionCaught(ctx, ex) this@MemcacheCacheHandler.exceptionCaught(ctx, ex)
} }
@@ -290,6 +303,7 @@ class MemcacheCacheHandler(
setOpcode(BinaryMemcacheOpcodes.GET) setOpcode(BinaryMemcacheOpcodes.GET)
} }
requestHandle.sendRequest(request) requestHandle.sendRequest(request)
requestHandle.sendContent(LastMemcacheContent.EMPTY_LAST_CONTENT)
} }
} }
@@ -305,8 +319,9 @@ class MemcacheCacheHandler(
log.debug(ctx) { log.debug(ctx) {
"Inserted key ${msg.key} into memcache" "Inserted key ${msg.key} into memcache"
} }
ctx.writeAndFlush(CachePutResponse(msg.key)) sendMessageAndFlush(ctx, CachePutResponse(msg.key))
} }
else -> this@MemcacheCacheHandler.exceptionCaught(ctx, MemcacheException(status)) else -> this@MemcacheCacheHandler.exceptionCaught(ctx, MemcacheException(status))
} }
} }
@@ -323,86 +338,103 @@ class MemcacheCacheHandler(
this@MemcacheCacheHandler.exceptionCaught(ctx, ex) this@MemcacheCacheHandler.exceptionCaught(ctx, ex)
} }
} }
inProgressPutRequest = InProgressPutRequest(ctx.channel(), msg.metadata, key, requestController, ctx.alloc()) inProgressRequest = InProgressPutRequest(ctx.channel(), msg.metadata, key, requestController, ctx.alloc())
} }
private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) { private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) {
inProgressPutRequest?.let { request -> val request = inProgressRequest
log.trace(ctx) { when (request) {
"Received chunk of ${msg.content().readableBytes()} bytes for memcache" is InProgressPutRequest -> {
log.trace(ctx) {
"Received chunk of ${msg.content().readableBytes()} bytes for memcache"
}
request.write(msg.content())
}
is InProgressGetRequest -> {
msg.release()
} }
request.write(msg.content())
} }
} }
private fun handleLastCacheContent(ctx: ChannelHandlerContext, msg: LastCacheContent) { private fun handleLastCacheContent(ctx: ChannelHandlerContext, msg: LastCacheContent) {
inProgressPutRequest?.let { request -> val request = inProgressRequest
inProgressPutRequest = null when (request) {
log.trace(ctx) { is InProgressPutRequest -> {
"Received last chunk of ${msg.content().readableBytes()} bytes for memcache" inProgressRequest = null
} log.trace(ctx) {
request.write(msg.content()) "Received last chunk of ${msg.content().readableBytes()} bytes for memcache"
val key = request.digest.retainedDuplicate() }
val (payloadSize, payloadSource) = request.commit() request.write(msg.content())
val extras = ctx.alloc().buffer(8, 8) val key = request.digest.retainedDuplicate()
extras.writeInt(0) val (payloadSize, payloadSource) = request.commit()
extras.writeInt(encodeExpiry(maxAge)) val extras = ctx.alloc().buffer(8, 8)
val totalBodyLength = request.digest.readableBytes() + extras.readableBytes() + payloadSize extras.writeInt(0)
request.requestController.whenComplete { requestController, ex -> extras.writeInt(encodeExpiry(maxAge))
if(ex == null) { val totalBodyLength = request.digest.readableBytes() + extras.readableBytes() + payloadSize
log.trace(ctx) { log.trace(ctx) {
"Sending SET request to memcache" "Trying to send SET request to memcache"
} }
requestController.sendRequest(DefaultBinaryMemcacheRequest().apply { request.requestController.whenComplete { requestController, ex ->
setOpcode(BinaryMemcacheOpcodes.SET) if (ex == null) {
setKey(key) log.trace(ctx) {
setExtras(extras) "Sending SET request to memcache"
setTotalBodyLength(totalBodyLength) }
}) requestController.sendRequest(DefaultBinaryMemcacheRequest().apply {
log.trace(ctx) { setOpcode(BinaryMemcacheOpcodes.SET)
"Sending request payload to memcache" setKey(key)
} setExtras(extras)
payloadSource.use { source -> setTotalBodyLength(totalBodyLength)
val bb = ByteBuffer.allocate(chunkSize) })
while (true) { log.trace(ctx) {
val read = source.read(bb) "Sending request payload to memcache"
bb.limit() }
if(read >= 0 && bb.position() < chunkSize && bb.hasRemaining()) { payloadSource.use { source ->
continue val bb = ByteBuffer.allocate(chunkSize)
} while (true) {
val chunk = ctx.alloc().buffer(chunkSize) val read = source.read(bb)
bb.flip() bb.limit()
chunk.writeBytes(bb) if (read >= 0 && bb.position() < chunkSize && bb.hasRemaining()) {
bb.clear() continue
log.trace(ctx) { }
"Sending ${chunk.readableBytes()} bytes chunk to memcache" val chunk = ctx.alloc().buffer(chunkSize)
} bb.flip()
if(read < 0) { chunk.writeBytes(bb)
requestController.sendContent(DefaultLastMemcacheContent(chunk)) bb.clear()
break log.trace(ctx) {
} else { "Sending ${chunk.readableBytes()} bytes chunk to memcache"
requestController.sendContent(DefaultMemcacheContent(chunk)) }
if (read < 0) {
requestController.sendContent(DefaultLastMemcacheContent(chunk))
break
} else {
requestController.sendContent(DefaultMemcacheContent(chunk))
}
} }
} }
} else {
payloadSource.close()
} }
} else {
payloadSource.close()
} }
} }
} }
} }
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
inProgressGetRequest?.let { val request = inProgressRequest
inProgressGetRequest = null when (request) {
it.rollback() is InProgressPutRequest -> {
} inProgressRequest = null
inProgressPutRequest?.let { request.requestController.thenAccept { controller ->
inProgressPutRequest = null controller.exceptionCaught(cause)
it.requestController.thenAccept { controller -> }
controller.exceptionCaught(cause) request.rollback()
}
is InProgressGetRequest -> {
inProgressRequest = null
request.rollback()
} }
it.rollback()
} }
super.exceptionCaught(ctx, cause) super.exceptionCaught(ctx, cause)
} }

View File

@@ -12,7 +12,6 @@ import io.netty.channel.ChannelPipeline
import io.netty.channel.EventLoopGroup import io.netty.channel.EventLoopGroup
import io.netty.channel.SimpleChannelInboundHandler import io.netty.channel.SimpleChannelInboundHandler
import io.netty.channel.pool.AbstractChannelPoolHandler import io.netty.channel.pool.AbstractChannelPoolHandler
import io.netty.channel.pool.ChannelPool
import io.netty.channel.pool.FixedChannelPool import io.netty.channel.pool.FixedChannelPool
import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.SocketChannel
import io.netty.handler.codec.memcache.LastMemcacheContent import io.netty.handler.codec.memcache.LastMemcacheContent
@@ -24,7 +23,7 @@ import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
import io.netty.util.concurrent.GenericFutureListener import io.netty.util.concurrent.GenericFutureListener
import net.woggioni.rbcs.common.HostAndPort import net.woggioni.rbcs.common.HostAndPort
import net.woggioni.rbcs.common.createLogger import net.woggioni.rbcs.common.createLogger
import net.woggioni.rbcs.common.warn import net.woggioni.rbcs.common.trace
import net.woggioni.rbcs.server.memcache.MemcacheCacheConfiguration import net.woggioni.rbcs.server.memcache.MemcacheCacheConfiguration
import net.woggioni.rbcs.server.memcache.MemcacheCacheHandler import net.woggioni.rbcs.server.memcache.MemcacheCacheHandler
import java.io.IOException import java.io.IOException
@@ -94,18 +93,6 @@ class MemcacheClient(
pool.acquire().addListener(object : GenericFutureListener<NettyFuture<Channel>> { pool.acquire().addListener(object : GenericFutureListener<NettyFuture<Channel>> {
override fun operationComplete(channelFuture: NettyFuture<Channel>) { override fun operationComplete(channelFuture: NettyFuture<Channel>) {
if (channelFuture.isSuccess) { if (channelFuture.isSuccess) {
var requestSent = false
var requestBodySent = false
var requestFinished = false
var responseReceived = false
var responseBodyReceived = false
var responseFinished = false
var requestBodySize = 0
var requestBodyBytesSent = 0
val channel = channelFuture.now val channel = channelFuture.now
var connectionClosedByTheRemoteServer = true var connectionClosedByTheRemoteServer = true
val closeCallback = { val closeCallback = {
@@ -113,14 +100,7 @@ class MemcacheClient(
val ex = IOException("The memcache server closed the connection") val ex = IOException("The memcache server closed the connection")
val completed = response.completeExceptionally(ex) val completed = response.completeExceptionally(ex)
if(!completed) responseHandler.exceptionCaught(ex) if(!completed) responseHandler.exceptionCaught(ex)
log.warn {
"RequestSent: $requestSent, RequestBodySent: $requestBodySent, " +
"RequestFinished: $requestFinished, ResponseReceived: $responseReceived, " +
"ResponseBodyReceived: $responseBodyReceived, ResponseFinished: $responseFinished, " +
"RequestBodySize: $requestBodySize, RequestBodyBytesSent: $requestBodyBytesSent"
}
} }
pool.release(channel)
} }
val closeListener = ChannelFutureListener { val closeListener = ChannelFutureListener {
closeCallback() closeCallback()
@@ -140,18 +120,14 @@ class MemcacheClient(
when (msg) { when (msg) {
is BinaryMemcacheResponse -> { is BinaryMemcacheResponse -> {
responseHandler.responseReceived(msg) responseHandler.responseReceived(msg)
responseReceived = true
} }
is LastMemcacheContent -> { is LastMemcacheContent -> {
responseFinished = true
responseHandler.contentReceived(msg) responseHandler.contentReceived(msg)
pipeline.remove(this) pipeline.remove(this)
pool.release(channel)
} }
is MemcacheContent -> { is MemcacheContent -> {
responseBodyReceived = true
responseHandler.contentReceived(msg) responseHandler.contentReceived(msg)
} }
} }
@@ -165,35 +141,43 @@ class MemcacheClient(
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
connectionClosedByTheRemoteServer = false connectionClosedByTheRemoteServer = false
ctx.close() ctx.close()
pool.release(channel)
responseHandler.exceptionCaught(cause) responseHandler.exceptionCaught(cause)
} }
} }
channel.pipeline() channel.pipeline().addLast(handler)
.addLast("client-handler", handler)
response.complete(object : MemcacheRequestController { response.complete(object : MemcacheRequestController {
private var channelReleased = false
override fun sendRequest(request: BinaryMemcacheRequest) { override fun sendRequest(request: BinaryMemcacheRequest) {
requestBodySize = request.totalBodyLength() - request.keyLength() - request.extrasLength()
channel.writeAndFlush(request) channel.writeAndFlush(request)
requestSent = true
} }
override fun sendContent(content: MemcacheContent) { override fun sendContent(content: MemcacheContent) {
val size = content.content().readableBytes()
channel.writeAndFlush(content).addListener { channel.writeAndFlush(content).addListener {
requestBodyBytesSent += size
requestBodySent = true
if(content is LastMemcacheContent) { if(content is LastMemcacheContent) {
requestFinished = true if(!channelReleased) {
pool.release(channel)
channelReleased = true
log.trace(channel) {
"Channel released"
}
}
} }
} }
} }
override fun exceptionCaught(ex: Throwable) { override fun exceptionCaught(ex: Throwable) {
log.warn(ex.message, ex)
connectionClosedByTheRemoteServer = false connectionClosedByTheRemoteServer = false
channel.close() channel.close()
if(!channelReleased) {
pool.release(channel)
channelReleased = true
log.trace(channel) {
"Channel released"
}
}
} }
}) })
} else { } else {

View File

@@ -54,6 +54,7 @@ import net.woggioni.rbcs.server.auth.RoleAuthorizer
import net.woggioni.rbcs.server.configuration.Parser import net.woggioni.rbcs.server.configuration.Parser
import net.woggioni.rbcs.server.configuration.Serializer import net.woggioni.rbcs.server.configuration.Serializer
import net.woggioni.rbcs.server.exception.ExceptionHandler import net.woggioni.rbcs.server.exception.ExceptionHandler
import net.woggioni.rbcs.server.handler.BlackHoleRequestHandler
import net.woggioni.rbcs.server.handler.MaxRequestSizeHandler import net.woggioni.rbcs.server.handler.MaxRequestSizeHandler
import net.woggioni.rbcs.server.handler.ServerHandler import net.woggioni.rbcs.server.handler.ServerHandler
import net.woggioni.rbcs.server.throttling.BucketManager import net.woggioni.rbcs.server.throttling.BucketManager
@@ -298,6 +299,7 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
"Closed connection ${ch.id().asShortText()} with ${ch.remoteAddress()}" "Closed connection ${ch.id().asShortText()} with ${ch.remoteAddress()}"
} }
} }
ch.config().setAutoRead(false)
val pipeline = ch.pipeline() val pipeline = ch.pipeline()
cfg.connection.also { conn -> cfg.connection.also { conn ->
val readIdleTimeout = conn.readIdleTimeout.toMillis() val readIdleTimeout = conn.readIdleTimeout.toMillis()
@@ -360,6 +362,7 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
} }
pipeline.addLast(eventExecutorGroup, ServerHandler.NAME, serverHandler) pipeline.addLast(eventExecutorGroup, ServerHandler.NAME, serverHandler)
pipeline.addLast(ExceptionHandler.NAME, ExceptionHandler) pipeline.addLast(ExceptionHandler.NAME, ExceptionHandler)
pipeline.addLast(BlackHoleRequestHandler.NAME, BlackHoleRequestHandler())
} }
override fun asyncClose() = cacheHandlerFactory.asyncClose() override fun asyncClose() = cacheHandlerFactory.asyncClose()

View File

@@ -2,9 +2,9 @@ package net.woggioni.rbcs.server.cache
import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelHandlerContext
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.http.LastHttpContent import io.netty.handler.codec.http.LastHttpContent
import io.netty.handler.stream.ChunkedNioFile import io.netty.handler.stream.ChunkedNioFile
import net.woggioni.rbcs.api.CacheHandler
import net.woggioni.rbcs.api.message.CacheMessage import net.woggioni.rbcs.api.message.CacheMessage
import net.woggioni.rbcs.api.message.CacheMessage.CacheContent import net.woggioni.rbcs.api.message.CacheMessage.CacheContent
import net.woggioni.rbcs.api.message.CacheMessage.CacheGetRequest import net.woggioni.rbcs.api.message.CacheMessage.CacheGetRequest
@@ -26,12 +26,18 @@ class FileSystemCacheHandler(
private val compressionEnabled: Boolean, private val compressionEnabled: Boolean,
private val compressionLevel: Int, private val compressionLevel: Int,
private val chunkSize: Int private val chunkSize: Int
) : SimpleChannelInboundHandler<CacheMessage>() { ) : CacheHandler() {
private interface InProgressRequest{
}
private class InProgressGetRequest(val request : CacheGetRequest) : InProgressRequest
private inner class InProgressPutRequest( private inner class InProgressPutRequest(
val key : String, val key : String,
private val fileSink : FileSystemCache.FileSink private val fileSink : FileSystemCache.FileSink
) { ) : InProgressRequest {
private val stream = Channels.newOutputStream(fileSink.channel).let { private val stream = Channels.newOutputStream(fileSink.channel).let {
if (compressionEnabled) { if (compressionEnabled) {
@@ -55,7 +61,7 @@ class FileSystemCacheHandler(
} }
} }
private var inProgressPutRequest: InProgressPutRequest? = null private var inProgressRequest: InProgressRequest? = null
override fun channelRead0(ctx: ChannelHandlerContext, msg: CacheMessage) { override fun channelRead0(ctx: ChannelHandlerContext, msg: CacheMessage) {
when (msg) { when (msg) {
@@ -68,55 +74,64 @@ class FileSystemCacheHandler(
} }
private fun handleGetRequest(ctx: ChannelHandlerContext, msg: CacheGetRequest) { private fun handleGetRequest(ctx: ChannelHandlerContext, msg: CacheGetRequest) {
val key = String(Base64.getUrlEncoder().encode(processCacheKey(msg.key, digestAlgorithm))) inProgressRequest = InProgressGetRequest(msg)
cache.get(key)?.also { entryValue ->
ctx.writeAndFlush(CacheValueFoundResponse(msg.key, entryValue.metadata))
entryValue.channel.let { channel ->
if(compressionEnabled) {
InflaterInputStream(Channels.newInputStream(channel)).use { stream ->
outerLoop@
while (true) {
val buf = ctx.alloc().heapBuffer(chunkSize)
while(buf.readableBytes() < chunkSize) {
val read = buf.writeBytes(stream, chunkSize)
if(read < 0) {
ctx.writeAndFlush(LastCacheContent(buf))
break@outerLoop
}
}
ctx.writeAndFlush(CacheContent(buf))
}
}
} else {
ctx.writeAndFlush(ChunkedNioFile(channel, entryValue.offset, entryValue.size - entryValue.offset, chunkSize))
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
}
}
} ?: ctx.writeAndFlush(CacheValueNotFoundResponse())
} }
private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) { private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) {
val key = String(Base64.getUrlEncoder().encode(processCacheKey(msg.key, digestAlgorithm))) val key = String(Base64.getUrlEncoder().encode(processCacheKey(msg.key, digestAlgorithm)))
val sink = cache.put(key, msg.metadata) val sink = cache.put(key, msg.metadata)
inProgressPutRequest = InProgressPutRequest(msg.key, sink) inProgressRequest = InProgressPutRequest(msg.key, sink)
} }
private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) { private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) {
inProgressPutRequest!!.write(msg.content()) val request = inProgressRequest
if(request is InProgressPutRequest) {
request.write(msg.content())
}
} }
private fun handleLastCacheContent(ctx: ChannelHandlerContext, msg: LastCacheContent) { private fun handleLastCacheContent(ctx: ChannelHandlerContext, msg: LastCacheContent) {
inProgressPutRequest?.let { request -> when(val request = inProgressRequest) {
inProgressPutRequest = null is InProgressPutRequest -> {
request.write(msg.content()) inProgressRequest = null
request.commit() request.write(msg.content())
ctx.writeAndFlush(CachePutResponse(request.key)) request.commit()
sendMessageAndFlush(ctx, CachePutResponse(request.key))
}
is InProgressGetRequest -> {
val key = String(Base64.getUrlEncoder().encode(processCacheKey(request.request.key, digestAlgorithm)))
cache.get(key)?.also { entryValue ->
sendMessageAndFlush(ctx, CacheValueFoundResponse(request.request.key, entryValue.metadata))
entryValue.channel.let { channel ->
if(compressionEnabled) {
InflaterInputStream(Channels.newInputStream(channel)).use { stream ->
outerLoop@
while (true) {
val buf = ctx.alloc().heapBuffer(chunkSize)
while(buf.readableBytes() < chunkSize) {
val read = buf.writeBytes(stream, chunkSize)
if(read < 0) {
sendMessageAndFlush(ctx, LastCacheContent(buf))
break@outerLoop
}
}
sendMessageAndFlush(ctx, CacheContent(buf))
}
}
} else {
sendMessage(ctx, ChunkedNioFile(channel, entryValue.offset, entryValue.size - entryValue.offset, chunkSize))
sendMessageAndFlush(ctx, LastHttpContent.EMPTY_LAST_CONTENT)
}
}
} ?: sendMessageAndFlush(ctx, CacheValueNotFoundResponse())
}
} }
} }
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
inProgressPutRequest?.rollback() (inProgressRequest as? InProgressPutRequest)?.rollback()
super.exceptionCaught(ctx, cause) super.exceptionCaught(ctx, cause)
} }
} }

View File

@@ -75,6 +75,10 @@ class InMemoryCache(
cond.await(interval.toMillis(), TimeUnit.MILLISECONDS) cond.await(interval.toMillis(), TimeUnit.MILLISECONDS)
} }
} }
map.forEach {
it.value.content.release()
}
map.clear()
} }
complete(null) complete(null)
} catch (ex: Throwable) { } catch (ex: Throwable) {

View File

@@ -4,7 +4,6 @@ import io.netty.channel.ChannelFactory
import io.netty.channel.EventLoopGroup import io.netty.channel.EventLoopGroup
import io.netty.channel.socket.DatagramChannel import io.netty.channel.socket.DatagramChannel
import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.SocketChannel
import io.netty.util.concurrent.Future
import net.woggioni.rbcs.api.CacheHandlerFactory import net.woggioni.rbcs.api.CacheHandlerFactory
import net.woggioni.rbcs.api.Configuration import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.common.RBCS import net.woggioni.rbcs.common.RBCS

View File

@@ -2,7 +2,7 @@ package net.woggioni.rbcs.server.cache
import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelHandlerContext
import io.netty.channel.SimpleChannelInboundHandler import net.woggioni.rbcs.api.CacheHandler
import net.woggioni.rbcs.api.message.CacheMessage import net.woggioni.rbcs.api.message.CacheMessage
import net.woggioni.rbcs.api.message.CacheMessage.CacheContent import net.woggioni.rbcs.api.message.CacheMessage.CacheContent
import net.woggioni.rbcs.api.message.CacheMessage.CacheGetRequest import net.woggioni.rbcs.api.message.CacheMessage.CacheGetRequest
@@ -22,9 +22,17 @@ class InMemoryCacheHandler(
private val digestAlgorithm: String?, private val digestAlgorithm: String?,
private val compressionEnabled: Boolean, private val compressionEnabled: Boolean,
private val compressionLevel: Int private val compressionLevel: Int
) : SimpleChannelInboundHandler<CacheMessage>() { ) : CacheHandler() {
private interface InProgressPutRequest : AutoCloseable { private interface InProgressRequest : AutoCloseable {
}
private class InProgressGetRequest(val request : CacheGetRequest) : InProgressRequest {
override fun close() {
}
}
private interface InProgressPutRequest : InProgressRequest {
val request: CachePutRequest val request: CachePutRequest
val buf: ByteBuf val buf: ByteBuf
@@ -72,7 +80,7 @@ class InMemoryCacheHandler(
} }
} }
private var inProgressPutRequest: InProgressPutRequest? = null private var inProgressRequest: InProgressRequest? = null
override fun channelRead0(ctx: ChannelHandlerContext, msg: CacheMessage) { override fun channelRead0(ctx: ChannelHandlerContext, msg: CacheMessage) {
when (msg) { when (msg) {
@@ -85,24 +93,11 @@ class InMemoryCacheHandler(
} }
private fun handleGetRequest(ctx: ChannelHandlerContext, msg: CacheGetRequest) { private fun handleGetRequest(ctx: ChannelHandlerContext, msg: CacheGetRequest) {
cache.get(processCacheKey(msg.key, digestAlgorithm))?.let { value -> inProgressRequest = InProgressGetRequest(msg)
ctx.writeAndFlush(CacheValueFoundResponse(msg.key, value.metadata))
if (compressionEnabled) {
val buf = ctx.alloc().heapBuffer()
InflaterOutputStream(ByteBufOutputStream(buf)).use {
value.content.readBytes(it, value.content.readableBytes())
value.content.release()
buf.retain()
}
ctx.writeAndFlush(LastCacheContent(buf))
} else {
ctx.writeAndFlush(LastCacheContent(value.content))
}
} ?: ctx.writeAndFlush(CacheValueNotFoundResponse())
} }
private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) { private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) {
inProgressPutRequest = if(compressionEnabled) { inProgressRequest = if(compressionEnabled) {
InProgressCompressedPutRequest(ctx, msg) InProgressCompressedPutRequest(ctx, msg)
} else { } else {
InProgressPlainPutRequest(ctx, msg) InProgressPlainPutRequest(ctx, msg)
@@ -110,27 +105,46 @@ class InMemoryCacheHandler(
} }
private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) { private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) {
inProgressPutRequest?.append(msg.content()) val req = inProgressRequest
if(req is InProgressPutRequest) {
req.append(msg.content())
}
} }
private fun handleLastCacheContent(ctx: ChannelHandlerContext, msg: LastCacheContent) { private fun handleLastCacheContent(ctx: ChannelHandlerContext, msg: LastCacheContent) {
handleCacheContent(ctx, msg) handleCacheContent(ctx, msg)
inProgressPutRequest?.let { inProgressRequest -> when(val req = inProgressRequest) {
inProgressPutRequest = null is InProgressGetRequest -> {
val buf = inProgressRequest.buf cache.get(processCacheKey(req.request.key, digestAlgorithm))?.let { value ->
buf.retain() sendMessageAndFlush(ctx, CacheValueFoundResponse(req.request.key, value.metadata))
inProgressRequest.close() if (compressionEnabled) {
val cacheKey = processCacheKey(inProgressRequest.request.key, digestAlgorithm) val buf = ctx.alloc().heapBuffer()
cache.put(cacheKey, CacheEntry(inProgressRequest.request.metadata, buf)) InflaterOutputStream(ByteBufOutputStream(buf)).use {
ctx.writeAndFlush(CachePutResponse(inProgressRequest.request.key)) value.content.readBytes(it, value.content.readableBytes())
value.content.release()
buf.retain()
}
sendMessage(ctx, LastCacheContent(buf))
} else {
sendMessage(ctx, LastCacheContent(value.content))
}
} ?: sendMessage(ctx, CacheValueNotFoundResponse())
}
is InProgressPutRequest -> {
this.inProgressRequest = null
val buf = req.buf
buf.retain()
req.close()
val cacheKey = processCacheKey(req.request.key, digestAlgorithm)
cache.put(cacheKey, CacheEntry(req.request.metadata, buf))
sendMessageAndFlush(ctx, CachePutResponse(req.request.key))
}
} }
} }
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
inProgressPutRequest?.let { req -> inProgressRequest?.close()
req.buf.release() inProgressRequest = null
inProgressPutRequest = null
}
super.exceptionCaught(ctx, cause) super.exceptionCaught(ctx, cause)
} }
} }

View File

@@ -1,4 +0,0 @@
package net.woggioni.rbcs.server.event
class RequestCompletedEvent {
}

View File

@@ -0,0 +1,13 @@
package net.woggioni.rbcs.server.handler
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.http.HttpContent
class BlackHoleRequestHandler : SimpleChannelInboundHandler<HttpContent>() {
companion object {
val NAME = BlackHoleRequestHandler::class.java.name
}
override fun channelRead0(ctx: ChannelHandlerContext, msg: HttpContent) {
}
}

View File

@@ -1,79 +0,0 @@
package net.woggioni.rbcs.server.handler
import io.netty.channel.ChannelHandler
import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelOutboundHandler
import io.netty.channel.ChannelPromise
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.http.HttpContent
import io.netty.handler.codec.http.LastHttpContent
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse
import net.woggioni.rbcs.api.message.CacheMessage.CacheContent
import net.woggioni.rbcs.api.message.CacheMessage.CachePutResponse
import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent
import java.net.SocketAddress
class CacheContentHandler(private val pairedHandler : ChannelHandler) : SimpleChannelInboundHandler<HttpContent>(), ChannelOutboundHandler {
private var requestFinished = false
override fun channelRead0(ctx: ChannelHandlerContext, msg: HttpContent) {
if(requestFinished) {
ctx.fireChannelRead(msg.retain())
} else {
when (msg) {
is LastHttpContent -> {
ctx.fireChannelRead(LastCacheContent(msg.content().retain()))
requestFinished = true
}
else -> ctx.fireChannelRead(CacheContent(msg.content().retain()))
}
}
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
super.exceptionCaught(ctx, cause)
}
override fun bind(ctx: ChannelHandlerContext, localAddress: SocketAddress, promise: ChannelPromise) {
ctx.bind(localAddress, promise)
}
override fun connect(
ctx: ChannelHandlerContext,
remoteAddress: SocketAddress,
localAddress: SocketAddress,
promise: ChannelPromise
) {
ctx.connect(remoteAddress, localAddress, promise)
}
override fun disconnect(ctx: ChannelHandlerContext, promise: ChannelPromise) {
ctx.disconnect(promise)
}
override fun close(ctx: ChannelHandlerContext, promise: ChannelPromise) {
ctx.close(promise)
}
override fun deregister(ctx: ChannelHandlerContext, promise: ChannelPromise) {
ctx.deregister(promise)
}
override fun read(ctx: ChannelHandlerContext) {
ctx.read()
}
override fun flush(ctx: ChannelHandlerContext) {
ctx.flush()
}
override fun write(ctx: ChannelHandlerContext, msg: Any, promise: ChannelPromise) {
ctx.write(msg, promise)
if(msg is LastCacheContent || msg is CachePutResponse || msg is CacheValueNotFoundResponse || msg is LastHttpContent) {
ctx.pipeline().remove(pairedHandler)
ctx.pipeline().remove(this)
}
}
}

View File

@@ -1,56 +0,0 @@
package net.woggioni.rbcs.server.handler
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.channel.ChannelOutboundHandler
import io.netty.channel.ChannelPromise
import net.woggioni.rbcs.server.event.RequestCompletedEvent
import java.net.SocketAddress
class ResponseCapHandler : ChannelInboundHandlerAdapter(), ChannelOutboundHandler {
val bufferedMessages = mutableListOf<Any>()
override fun bind(ctx: ChannelHandlerContext, localAddress: SocketAddress, promise: ChannelPromise) {
ctx.bind(localAddress, promise)
}
override fun connect(
ctx: ChannelHandlerContext,
remoteAddress: SocketAddress,
localAddress: SocketAddress,
promise: ChannelPromise
) {
ctx.connect(remoteAddress, localAddress, promise)
}
override fun disconnect(ctx: ChannelHandlerContext, promise: ChannelPromise) {
ctx.disconnect(promise)
}
override fun close(ctx: ChannelHandlerContext, promise: ChannelPromise) {
ctx.close(promise)
}
override fun deregister(ctx: ChannelHandlerContext, promise: ChannelPromise) {
ctx.deregister(promise)
}
override fun read(ctx: ChannelHandlerContext) {
ctx.read()
}
override fun write(ctx: ChannelHandlerContext, msg: Any, promise: ChannelPromise) {
bufferedMessages.add(msg)
}
override fun flush(ctx: ChannelHandlerContext) {
}
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
if(evt is RequestCompletedEvent) {
for(msg in bufferedMessages) ctx.write(msg)
ctx.flush()
ctx.pipeline().remove(this)
}
}
}

View File

@@ -8,6 +8,7 @@ import io.netty.handler.codec.http.DefaultFullHttpResponse
import io.netty.handler.codec.http.DefaultHttpContent import io.netty.handler.codec.http.DefaultHttpContent
import io.netty.handler.codec.http.DefaultHttpResponse import io.netty.handler.codec.http.DefaultHttpResponse
import io.netty.handler.codec.http.DefaultLastHttpContent import io.netty.handler.codec.http.DefaultLastHttpContent
import io.netty.handler.codec.http.HttpContent
import io.netty.handler.codec.http.HttpHeaderNames import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpHeaderValues import io.netty.handler.codec.http.HttpHeaderValues
import io.netty.handler.codec.http.HttpHeaders import io.netty.handler.codec.http.HttpHeaders
@@ -17,7 +18,6 @@ import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpUtil import io.netty.handler.codec.http.HttpUtil
import io.netty.handler.codec.http.HttpVersion import io.netty.handler.codec.http.HttpVersion
import io.netty.handler.codec.http.LastHttpContent import io.netty.handler.codec.http.LastHttpContent
import net.woggioni.rbcs.api.CacheHandlerFactory
import net.woggioni.rbcs.api.CacheValueMetadata import net.woggioni.rbcs.api.CacheValueMetadata
import net.woggioni.rbcs.api.message.CacheMessage import net.woggioni.rbcs.api.message.CacheMessage
import net.woggioni.rbcs.api.message.CacheMessage.CacheContent import net.woggioni.rbcs.api.message.CacheMessage.CacheContent
@@ -30,10 +30,8 @@ import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent
import net.woggioni.rbcs.common.createLogger import net.woggioni.rbcs.common.createLogger
import net.woggioni.rbcs.common.debug import net.woggioni.rbcs.common.debug
import net.woggioni.rbcs.common.warn import net.woggioni.rbcs.common.warn
import net.woggioni.rbcs.server.event.RequestCompletedEvent
import net.woggioni.rbcs.server.exception.ExceptionHandler import net.woggioni.rbcs.server.exception.ExceptionHandler
import java.nio.file.Path import java.nio.file.Path
import java.util.Locale
class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupplier : () -> ChannelHandler) : class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupplier : () -> ChannelHandler) :
ChannelDuplexHandler() { ChannelDuplexHandler() {
@@ -47,6 +45,15 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp
private var keepAlive = true private var keepAlive = true
private var pipelinedRequests = 0 private var pipelinedRequests = 0
private fun newRequest() {
pipelinedRequests += 1
}
private fun requestCompleted(ctx : ChannelHandlerContext) {
pipelinedRequests -= 1
if(pipelinedRequests == 0) ctx.read()
}
private fun resetRequestMetadata() { private fun resetRequestMetadata() {
httpVersion = HttpVersion.HTTP_1_1 httpVersion = HttpVersion.HTTP_1_1
keepAlive = true keepAlive = true
@@ -65,22 +72,44 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp
} }
} }
private var cacheRequestInProgress : Boolean = false
override fun handlerAdded(ctx: ChannelHandlerContext) {
ctx.read()
}
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) { override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
when (msg) { when (msg) {
is HttpRequest -> handleRequest(ctx, msg) is HttpRequest -> handleRequest(ctx, msg)
is HttpContent -> {
if(cacheRequestInProgress) {
if(msg is LastHttpContent) {
super.channelRead(ctx, LastCacheContent(msg.content().retain()))
cacheRequestInProgress = false
} else {
super.channelRead(ctx, CacheContent(msg.content().retain()))
}
msg.release()
} else {
super.channelRead(ctx, msg)
}
}
else -> super.channelRead(ctx, msg) else -> super.channelRead(ctx, msg)
} }
} }
override fun channelReadComplete(ctx: ChannelHandlerContext) {
super.channelReadComplete(ctx)
if(cacheRequestInProgress) {
ctx.read()
}
}
override fun write(ctx: ChannelHandlerContext, msg: Any, promise: ChannelPromise?) { override fun write(ctx: ChannelHandlerContext, msg: Any, promise: ChannelPromise?) {
if (msg is CacheMessage) { if (msg is CacheMessage) {
try { try {
when (msg) { when (msg) {
is CachePutResponse -> { is CachePutResponse -> {
pipelinedRequests -= 1
ctx.fireUserEventTriggered(RequestCompletedEvent())
val response = DefaultFullHttpResponse(httpVersion, HttpResponseStatus.CREATED) val response = DefaultFullHttpResponse(httpVersion, HttpResponseStatus.CREATED)
val keyBytes = msg.key.toByteArray(Charsets.UTF_8) val keyBytes = msg.key.toByteArray(Charsets.UTF_8)
response.headers().apply { response.headers().apply {
@@ -92,16 +121,18 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp
val buf = ctx.alloc().buffer(keyBytes.size).apply { val buf = ctx.alloc().buffer(keyBytes.size).apply {
writeBytes(keyBytes) writeBytes(keyBytes)
} }
ctx.writeAndFlush(DefaultLastHttpContent(buf)) ctx.writeAndFlush(DefaultLastHttpContent(buf)).also {
requestCompleted(ctx)
}
} }
is CacheValueNotFoundResponse -> { is CacheValueNotFoundResponse -> {
pipelinedRequests -= 1
ctx.fireUserEventTriggered(RequestCompletedEvent())
val response = DefaultFullHttpResponse(httpVersion, HttpResponseStatus.NOT_FOUND) val response = DefaultFullHttpResponse(httpVersion, HttpResponseStatus.NOT_FOUND)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0 response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
setKeepAliveHeader(response.headers()) setKeepAliveHeader(response.headers())
ctx.writeAndFlush(response) ctx.writeAndFlush(response).also {
requestCompleted(ctx)
}
} }
is CacheValueFoundResponse -> { is CacheValueFoundResponse -> {
@@ -118,9 +149,9 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp
} }
is LastCacheContent -> { is LastCacheContent -> {
pipelinedRequests -= 1 ctx.writeAndFlush(DefaultLastHttpContent(msg.content())).also {
ctx.fireUserEventTriggered(RequestCompletedEvent()) requestCompleted(ctx)
ctx.writeAndFlush(DefaultLastHttpContent(msg.content())) }
} }
is CacheContent -> { is CacheContent -> {
@@ -140,9 +171,8 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp
resetRequestMetadata() resetRequestMetadata()
} }
} else if(msg is LastHttpContent) { } else if(msg is LastHttpContent) {
pipelinedRequests -= 1
ctx.fireUserEventTriggered(RequestCompletedEvent())
ctx.write(msg, promise) ctx.write(msg, promise)
requestCompleted(ctx)
} else super.write(ctx, msg, promise) } else super.write(ctx, msg, promise)
} }
@@ -153,15 +183,12 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp
if (method === HttpMethod.GET) { if (method === HttpMethod.GET) {
val path = Path.of(msg.uri()).normalize() val path = Path.of(msg.uri()).normalize()
if (path.startsWith(serverPrefix)) { if (path.startsWith(serverPrefix)) {
cacheRequestInProgress = true
val relativePath = serverPrefix.relativize(path) val relativePath = serverPrefix.relativize(path)
val key = relativePath.toString() val key : String = relativePath.toString()
if(pipelinedRequests > 0) { newRequest()
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, ResponseCapHandler())
}
val cacheHandler = cacheHandlerSupplier() val cacheHandler = cacheHandlerSupplier()
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, CacheContentHandler(cacheHandler))
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, cacheHandler) ctx.pipeline().addBefore(ExceptionHandler.NAME, null, cacheHandler)
pipelinedRequests += 1
key.let(::CacheGetRequest) key.let(::CacheGetRequest)
.let(ctx::fireChannelRead) .let(ctx::fireChannelRead)
?: ctx.channel().write(CacheValueNotFoundResponse()) ?: ctx.channel().write(CacheValueNotFoundResponse())
@@ -176,18 +203,15 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp
} else if (method === HttpMethod.PUT) { } else if (method === HttpMethod.PUT) {
val path = Path.of(msg.uri()).normalize() val path = Path.of(msg.uri()).normalize()
if (path.startsWith(serverPrefix)) { if (path.startsWith(serverPrefix)) {
cacheRequestInProgress = true
val relativePath = serverPrefix.relativize(path) val relativePath = serverPrefix.relativize(path)
val key = relativePath.toString() val key = relativePath.toString()
log.debug(ctx) { log.debug(ctx) {
"Added value for key '$key' to build cache" "Added value for key '$key' to build cache"
} }
if(pipelinedRequests > 0) { newRequest()
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, ResponseCapHandler())
}
val cacheHandler = cacheHandlerSupplier() val cacheHandler = cacheHandlerSupplier()
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, CacheContentHandler(cacheHandler))
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, cacheHandler) ctx.pipeline().addBefore(ExceptionHandler.NAME, null, cacheHandler)
pipelinedRequests += 1
path.fileName?.toString() path.fileName?.toString()
?.let { ?.let {
@@ -205,11 +229,8 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp
ctx.writeAndFlush(response) ctx.writeAndFlush(response)
} }
} else if (method == HttpMethod.TRACE) { } else if (method == HttpMethod.TRACE) {
if(pipelinedRequests > 0) { newRequest()
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, ResponseCapHandler())
}
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, TraceHandler) ctx.pipeline().addBefore(ExceptionHandler.NAME, null, TraceHandler)
pipelinedRequests += 1
super.channelRead(ctx, msg) super.channelRead(ctx, msg)
} else { } else {
log.warn(ctx) { log.warn(ctx) {

View File

@@ -94,6 +94,9 @@ class ThrottlingHandler(private val bucketManager : BucketManager,
handleBuckets(buckets, ctx, msg, false) handleBuckets(buckets, ctx, msg, false)
}, waitDuration.toMillis(), TimeUnit.MILLISECONDS) }, waitDuration.toMillis(), TimeUnit.MILLISECONDS)
} else { } else {
queuedContent?.let { qc ->
qc.forEach { it.release() }
}
this.queuedContent = null this.queuedContent = null
sendThrottledResponse(ctx, waitDuration) sendThrottledResponse(ctx, waitDuration)
} }

View File

@@ -1,5 +1,14 @@
package net.woggioni.rbcs.server.test.utils; package net.woggioni.rbcs.server.test.utils;
import java.math.BigInteger;
import java.security.KeyPair;
import java.security.KeyPairGenerator;
import java.security.PrivateKey;
import java.security.SecureRandom;
import java.security.cert.X509Certificate;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Date;
import org.bouncycastle.asn1.DERSequence; import org.bouncycastle.asn1.DERSequence;
import org.bouncycastle.asn1.x500.X500Name; import org.bouncycastle.asn1.x500.X500Name;
import org.bouncycastle.asn1.x509.BasicConstraints; import org.bouncycastle.asn1.x509.BasicConstraints;
@@ -15,16 +24,6 @@ import org.bouncycastle.cert.jcajce.JcaX509v3CertificateBuilder;
import org.bouncycastle.operator.ContentSigner; import org.bouncycastle.operator.ContentSigner;
import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder; import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder;
import java.math.BigInteger;
import java.security.KeyPair;
import java.security.KeyPairGenerator;
import java.security.PrivateKey;
import java.security.SecureRandom;
import java.security.cert.X509Certificate;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Date;
public class CertificateUtils { public class CertificateUtils {
public record X509Credentials( public record X509Credentials(

View File

@@ -154,7 +154,7 @@ class BasicAuthServerTest : AbstractBasicAuthServerTest() {
} }
@Test @Test
@Order(6) @Order(8)
fun getAsAThrottledUser() { fun getAsAThrottledUser() {
val client: HttpClient = HttpClient.newHttpClient() val client: HttpClient = HttpClient.newHttpClient()
@@ -172,7 +172,7 @@ class BasicAuthServerTest : AbstractBasicAuthServerTest() {
} }
@Test @Test
@Order(7) @Order(9)
fun getAsAThrottledUser2() { fun getAsAThrottledUser2() {
val client: HttpClient = HttpClient.newHttpClient() val client: HttpClient = HttpClient.newHttpClient()