diff --git a/rbcs-api/src/main/java/module-info.java b/rbcs-api/src/main/java/module-info.java index 65708d8..42abdf2 100644 --- a/rbcs-api/src/main/java/module-info.java +++ b/rbcs-api/src/main/java/module-info.java @@ -4,4 +4,5 @@ module net.woggioni.rbcs.api { requires io.netty.buffer; exports net.woggioni.rbcs.api; exports net.woggioni.rbcs.api.exception; + exports net.woggioni.rbcs.api.event; } \ No newline at end of file diff --git a/rbcs-api/src/main/java/net/woggioni/rbcs/api/Cache.java b/rbcs-api/src/main/java/net/woggioni/rbcs/api/Cache.java index 8c217a4..06af410 100644 --- a/rbcs-api/src/main/java/net/woggioni/rbcs/api/Cache.java +++ b/rbcs-api/src/main/java/net/woggioni/rbcs/api/Cache.java @@ -1,14 +1,17 @@ package net.woggioni.rbcs.api; -import io.netty.buffer.ByteBuf; -import net.woggioni.rbcs.api.exception.ContentTooLargeException; +import io.netty.buffer.ByteBufAllocator; -import java.nio.channels.ReadableByteChannel; import java.util.concurrent.CompletableFuture; public interface Cache extends AutoCloseable { - CompletableFuture get(String key); - CompletableFuture put(String key, ByteBuf content) throws ContentTooLargeException; + default void get(String key, ResponseHandle responseHandle, ByteBufAllocator alloc) { + throw new UnsupportedOperationException(); + } + + default CompletableFuture put(String key, ResponseHandle responseHandle, ByteBufAllocator alloc) { + throw new UnsupportedOperationException(); + } } diff --git a/rbcs-api/src/main/java/net/woggioni/rbcs/api/RequestHandle.java b/rbcs-api/src/main/java/net/woggioni/rbcs/api/RequestHandle.java new file mode 100644 index 0000000..7d3c43d --- /dev/null +++ b/rbcs-api/src/main/java/net/woggioni/rbcs/api/RequestHandle.java @@ -0,0 +1,8 @@ +package net.woggioni.rbcs.api; + +import net.woggioni.rbcs.api.event.RequestStreamingEvent; + +@FunctionalInterface +public interface RequestHandle { + void handleEvent(RequestStreamingEvent evt); +} diff --git a/rbcs-api/src/main/java/net/woggioni/rbcs/api/ResponseHandle.java b/rbcs-api/src/main/java/net/woggioni/rbcs/api/ResponseHandle.java new file mode 100644 index 0000000..fa6d285 --- /dev/null +++ b/rbcs-api/src/main/java/net/woggioni/rbcs/api/ResponseHandle.java @@ -0,0 +1,8 @@ +package net.woggioni.rbcs.api; + +import net.woggioni.rbcs.api.event.ResponseStreamingEvent; + +@FunctionalInterface +public interface ResponseHandle { + void handleEvent(ResponseStreamingEvent evt); +} diff --git a/rbcs-api/src/main/java/net/woggioni/rbcs/api/event/RequestStreamingEvent.java b/rbcs-api/src/main/java/net/woggioni/rbcs/api/event/RequestStreamingEvent.java new file mode 100644 index 0000000..46fc534 --- /dev/null +++ b/rbcs-api/src/main/java/net/woggioni/rbcs/api/event/RequestStreamingEvent.java @@ -0,0 +1,26 @@ +package net.woggioni.rbcs.api.event; + +import io.netty.buffer.ByteBuf; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +public sealed interface RequestStreamingEvent { + + @Getter + @RequiredArgsConstructor + non-sealed class ChunkReceived implements RequestStreamingEvent { + private final ByteBuf chunk; + } + + final class LastChunkReceived extends ChunkReceived { + public LastChunkReceived(ByteBuf chunk) { + super(chunk); + } + } + + @Getter + @RequiredArgsConstructor + final class ExceptionCaught implements RequestStreamingEvent { + private final Throwable exception; + } +} diff --git a/rbcs-api/src/main/java/net/woggioni/rbcs/api/event/ResponseStreamingEvent.java b/rbcs-api/src/main/java/net/woggioni/rbcs/api/event/ResponseStreamingEvent.java new file mode 100644 index 0000000..cced1e3 --- /dev/null +++ b/rbcs-api/src/main/java/net/woggioni/rbcs/api/event/ResponseStreamingEvent.java @@ -0,0 +1,42 @@ +package net.woggioni.rbcs.api.event; + +import io.netty.buffer.ByteBuf; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +import java.nio.channels.FileChannel; + +public sealed interface ResponseStreamingEvent { + + final class ResponseReceived implements ResponseStreamingEvent { + } + + @Getter + @RequiredArgsConstructor + non-sealed class ChunkReceived implements ResponseStreamingEvent { + private final ByteBuf chunk; + } + + @Getter + @RequiredArgsConstructor + non-sealed class FileReceived implements ResponseStreamingEvent { + private final FileChannel file; + } + + final class LastChunkReceived extends ChunkReceived { + public LastChunkReceived(ByteBuf chunk) { + super(chunk); + } + } + + @Getter + @RequiredArgsConstructor + final class ExceptionCaught implements ResponseStreamingEvent { + private final Throwable exception; + } + + final class NotFound implements ResponseStreamingEvent { } + + NotFound NOT_FOUND = new NotFound(); + ResponseReceived RESPONSE_RECEIVED = new ResponseReceived(); +} diff --git a/rbcs-cli/src/main/kotlin/net/woggioni/rbcs/cli/impl/commands/BenchmarkCommand.kt b/rbcs-cli/src/main/kotlin/net/woggioni/rbcs/cli/impl/commands/BenchmarkCommand.kt index fb0fa2f..47e237b 100644 --- a/rbcs-cli/src/main/kotlin/net/woggioni/rbcs/cli/impl/commands/BenchmarkCommand.kt +++ b/rbcs-cli/src/main/kotlin/net/woggioni/rbcs/cli/impl/commands/BenchmarkCommand.kt @@ -6,6 +6,8 @@ import net.woggioni.rbcs.common.contextLogger import net.woggioni.rbcs.common.error import net.woggioni.rbcs.common.info import net.woggioni.jwo.JWO +import net.woggioni.jwo.LongMath +import net.woggioni.rbcs.common.debug import picocli.CommandLine import java.security.SecureRandom import java.time.Duration @@ -46,6 +48,7 @@ class BenchmarkCommand : RbcsCommand() { clientCommand.configuration.profiles[profileName] ?: throw IllegalArgumentException("Profile $profileName does not exist in configuration") } + val progressThreshold = LongMath.ceilDiv(numberOfEntries.toLong(), 20) RemoteBuildCacheClient(profile).use { client -> val entryGenerator = sequence { @@ -79,7 +82,12 @@ class BenchmarkCommand : RbcsCommand() { completionQueue.put(result) } semaphore.release() - completionCounter.incrementAndGet() + val completed = completionCounter.incrementAndGet() + if(completed.mod(progressThreshold) == 0L) { + log.debug { + "Inserted $completed / $numberOfEntries" + } + } } } else { Thread.sleep(0) @@ -121,7 +129,12 @@ class BenchmarkCommand : RbcsCommand() { } } future.whenComplete { _, _ -> - completionCounter.incrementAndGet() + val completed = completionCounter.incrementAndGet() + if(completed.mod(progressThreshold) == 0L) { + log.debug { + "Retrieved $completed / ${entries.size}" + } + } semaphore.release() } } else { diff --git a/rbcs-common/src/main/kotlin/net/woggioni/rbcs/common/BB.kt b/rbcs-common/src/main/kotlin/net/woggioni/rbcs/common/BB.kt new file mode 100644 index 0000000..65796c3 --- /dev/null +++ b/rbcs-common/src/main/kotlin/net/woggioni/rbcs/common/BB.kt @@ -0,0 +1,15 @@ +package net.woggioni.rbcs.common + +import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufAllocator +import io.netty.buffer.CompositeByteBuf + +fun extractChunk(buf: CompositeByteBuf, alloc: ByteBufAllocator): ByteBuf { + val chunk = alloc.compositeBuffer() + for (component in buf.decompose(0, buf.readableBytes())) { + chunk.addComponent(true, component.retain()) + } + buf.removeComponents(0, buf.numComponents()) + buf.clear() + return chunk +} \ No newline at end of file diff --git a/rbcs-server-memcache/build.gradle b/rbcs-server-memcache/build.gradle index 6aaa6ed..4795375 100644 --- a/rbcs-server-memcache/build.gradle +++ b/rbcs-server-memcache/build.gradle @@ -34,6 +34,7 @@ dependencies { implementation catalog.jwo implementation catalog.slf4j.api implementation catalog.netty.common + implementation catalog.netty.handler implementation catalog.netty.codec.memcache bundle catalog.netty.codec.memcache diff --git a/rbcs-server-memcache/src/main/java/module-info.java b/rbcs-server-memcache/src/main/java/module-info.java index 920665a..65abedc 100644 --- a/rbcs-server-memcache/src/main/java/module-info.java +++ b/rbcs-server-memcache/src/main/java/module-info.java @@ -11,6 +11,7 @@ module net.woggioni.rbcs.server.memcache { requires io.netty.codec.memcache; requires io.netty.common; requires io.netty.buffer; + requires io.netty.handler; requires org.slf4j; provides CacheProvider with net.woggioni.rbcs.server.memcache.MemcacheCacheProvider; diff --git a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCache.kt b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCache.kt index 8a4d1bc..c5d5a84 100644 --- a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCache.kt +++ b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCache.kt @@ -1,20 +1,232 @@ package net.woggioni.rbcs.server.memcache -import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufAllocator +import io.netty.buffer.Unpooled +import io.netty.handler.codec.memcache.binary.BinaryMemcacheOpcodes +import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus +import io.netty.handler.codec.memcache.binary.DefaultBinaryMemcacheRequest import net.woggioni.rbcs.api.Cache +import net.woggioni.rbcs.api.RequestHandle +import net.woggioni.rbcs.api.ResponseHandle +import net.woggioni.rbcs.api.event.RequestStreamingEvent +import net.woggioni.rbcs.api.event.ResponseStreamingEvent +import net.woggioni.rbcs.api.exception.ContentTooLargeException +import net.woggioni.rbcs.common.ByteBufOutputStream +import net.woggioni.rbcs.common.RBCS.digest +import net.woggioni.rbcs.common.contextLogger +import net.woggioni.rbcs.common.debug +import net.woggioni.rbcs.common.extractChunk import net.woggioni.rbcs.server.memcache.client.MemcacheClient -import java.nio.channels.ReadableByteChannel +import net.woggioni.rbcs.server.memcache.client.MemcacheResponseHandle +import net.woggioni.rbcs.server.memcache.client.StreamingRequestEvent +import net.woggioni.rbcs.server.memcache.client.StreamingResponseEvent +import java.security.MessageDigest +import java.time.Duration +import java.time.Instant import java.util.concurrent.CompletableFuture +import java.util.zip.Deflater +import java.util.zip.DeflaterOutputStream +import java.util.zip.Inflater +import java.util.zip.InflaterOutputStream -class MemcacheCache(private val cfg : MemcacheCacheConfiguration) : Cache { - private val memcacheClient = MemcacheClient(cfg) +class MemcacheCache(private val cfg: MemcacheCacheConfiguration) : Cache { - override fun get(key: String): CompletableFuture { - return memcacheClient.get(key) + companion object { + @JvmStatic + private val log = contextLogger() } - override fun put(key: String, content: ByteBuf): CompletableFuture { - return memcacheClient.put(key, content, cfg.maxAge) + private val memcacheClient = MemcacheClient(cfg) + + override fun get(key: String, responseHandle: ResponseHandle, alloc: ByteBufAllocator) { + val compressionMode = cfg.compressionMode + val buf = alloc.compositeBuffer() + val stream = ByteBufOutputStream(buf).let { outputStream -> + if (compressionMode != null) { + when (compressionMode) { + MemcacheCacheConfiguration.CompressionMode.DEFLATE -> { + InflaterOutputStream( + outputStream, + Inflater() + ) + } + } + } else { + outputStream + } + } + val memcacheResponseHandle = object : MemcacheResponseHandle { + override fun handleEvent(evt: StreamingResponseEvent) { + when (evt) { + is StreamingResponseEvent.ResponseReceived -> { + if (evt.response.status() == BinaryMemcacheResponseStatus.SUCCESS) { + responseHandle.handleEvent(ResponseStreamingEvent.RESPONSE_RECEIVED) + } else if (evt.response.status() == BinaryMemcacheResponseStatus.KEY_ENOENT) { + responseHandle.handleEvent(ResponseStreamingEvent.NOT_FOUND) + } else { + responseHandle.handleEvent(ResponseStreamingEvent.ExceptionCaught(MemcacheException(evt.response.status()))) + } + } + + is StreamingResponseEvent.LastContentReceived -> { + evt.content.content().let { content -> + content.readBytes(stream, content.readableBytes()) + } + buf.retain() + stream.close() + val chunk = extractChunk(buf, alloc) + buf.release() + responseHandle.handleEvent( + ResponseStreamingEvent.LastChunkReceived( + chunk + ) + ) + } + + is StreamingResponseEvent.ContentReceived -> { + evt.content.content().let { content -> + content.readBytes(stream, content.readableBytes()) + } + if (buf.readableBytes() >= cfg.chunkSize) { + val chunk = extractChunk(buf, alloc) + responseHandle.handleEvent(ResponseStreamingEvent.ChunkReceived(chunk)) + } + } + + is StreamingResponseEvent.ExceptionCaught -> { + responseHandle.handleEvent(ResponseStreamingEvent.ExceptionCaught(evt.exception)) + } + } + } + } + memcacheClient.sendRequest(Unpooled.wrappedBuffer(key.toByteArray()), memcacheResponseHandle) + .thenApply { memcacheRequestHandle -> + val request = (cfg.digestAlgorithm + ?.let(MessageDigest::getInstance) + ?.let { md -> + digest(key.toByteArray(), md) + } ?: key.toByteArray(Charsets.UTF_8) + ).let { digest -> + DefaultBinaryMemcacheRequest(Unpooled.wrappedBuffer(digest)).apply { + setOpcode(BinaryMemcacheOpcodes.GET) + } + } + memcacheRequestHandle.handleEvent(StreamingRequestEvent.SendRequest(request)) + }.exceptionally { ex -> + responseHandle.handleEvent(ResponseStreamingEvent.ExceptionCaught(ex)) + } + } + + private fun encodeExpiry(expiry: Duration): Int { + val expirySeconds = expiry.toSeconds() + return expirySeconds.toInt().takeIf { it.toLong() == expirySeconds } + ?: Instant.ofEpochSecond(expirySeconds).epochSecond.toInt() + } + + override fun put( + key: String, + responseHandle: ResponseHandle, + alloc: ByteBufAllocator + ): CompletableFuture { + val memcacheResponseHandle = object : MemcacheResponseHandle { + override fun handleEvent(evt: StreamingResponseEvent) { + when (evt) { + is StreamingResponseEvent.ResponseReceived -> { + when (evt.response.status()) { + BinaryMemcacheResponseStatus.SUCCESS -> { + responseHandle.handleEvent(ResponseStreamingEvent.RESPONSE_RECEIVED) + } + + BinaryMemcacheResponseStatus.KEY_ENOENT -> { + responseHandle.handleEvent(ResponseStreamingEvent.NOT_FOUND) + } + + BinaryMemcacheResponseStatus.E2BIG -> { + responseHandle.handleEvent( + ResponseStreamingEvent.ExceptionCaught( + ContentTooLargeException("Request payload is too big", null) + ) + ) + } + + else -> { + responseHandle.handleEvent(ResponseStreamingEvent.ExceptionCaught(MemcacheException(evt.response.status()))) + } + } + } + + is StreamingResponseEvent.LastContentReceived -> { + responseHandle.handleEvent( + ResponseStreamingEvent.LastChunkReceived( + evt.content.content().retain() + ) + ) + } + + is StreamingResponseEvent.ContentReceived -> { + responseHandle.handleEvent(ResponseStreamingEvent.ChunkReceived(evt.content.content().retain())) + } + + is StreamingResponseEvent.ExceptionCaught -> { + responseHandle.handleEvent(ResponseStreamingEvent.ExceptionCaught(evt.exception)) + } + } + } + } + val result: CompletableFuture = + memcacheClient.sendRequest(Unpooled.wrappedBuffer(key.toByteArray()), memcacheResponseHandle) + .thenApply { memcacheRequestHandle -> + val request = (cfg.digestAlgorithm + ?.let(MessageDigest::getInstance) + ?.let { md -> + digest(key.toByteArray(), md) + } ?: key.toByteArray(Charsets.UTF_8)).let { digest -> + val extras = Unpooled.buffer(8, 8) + extras.writeInt(0) + extras.writeInt(encodeExpiry(cfg.maxAge)) + DefaultBinaryMemcacheRequest(Unpooled.wrappedBuffer(digest), extras).apply { + setOpcode(BinaryMemcacheOpcodes.SET) + } + } +// memcacheRequestHandle.handleEvent(StreamingRequestEvent.SendRequest(request)) + val compressionMode = cfg.compressionMode + val buf = alloc.heapBuffer() + val stream = ByteBufOutputStream(buf).let { outputStream -> + if (compressionMode != null) { + when (compressionMode) { + MemcacheCacheConfiguration.CompressionMode.DEFLATE -> { + DeflaterOutputStream( + outputStream, + Deflater(Deflater.DEFAULT_COMPRESSION, false) + ) + } + } + } else { + outputStream + } + } + RequestHandle { evt -> + when (evt) { + is RequestStreamingEvent.LastChunkReceived -> { + evt.chunk.readBytes(stream, evt.chunk.readableBytes()) + buf.retain() + stream.close() + request.setTotalBodyLength(buf.readableBytes() + request.keyLength() + request.extrasLength()) + memcacheRequestHandle.handleEvent(StreamingRequestEvent.SendRequest(request)) + memcacheRequestHandle.handleEvent(StreamingRequestEvent.SendLastChunk(buf)) + } + + is RequestStreamingEvent.ChunkReceived -> { + evt.chunk.readBytes(stream, evt.chunk.readableBytes()) + } + + is RequestStreamingEvent.ExceptionCaught -> { + stream.close() + } + } + } + } + return result } override fun close() { diff --git a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheConfiguration.kt b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheConfiguration.kt index 67a32de..0ff4f42 100644 --- a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheConfiguration.kt +++ b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheConfiguration.kt @@ -10,14 +10,10 @@ data class MemcacheCacheConfiguration( val maxSize: Int = 0x100000, val digestAlgorithm: String? = null, val compressionMode: CompressionMode? = null, + val chunkSize : Int ) : Configuration.Cache { enum class CompressionMode { - /** - * Gzip mode - */ - GZIP, - /** * Deflate mode */ diff --git a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheProvider.kt b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheProvider.kt index 40e0591..445e03e 100644 --- a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheProvider.kt +++ b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheProvider.kt @@ -29,12 +29,14 @@ class MemcacheCacheProvider : CacheProvider { ?.let(Duration::parse) ?: Duration.ofDays(1) val maxSize = el.renderAttribute("max-size") - ?.let(String::toInt) + ?.let(Integer::decode) ?: 0x100000 + val chunkSize = el.renderAttribute("chunk-size") + ?.let(Integer::decode) + ?: 0x4000 val compressionMode = el.renderAttribute("compression-mode") ?.let { when (it) { - "gzip" -> MemcacheCacheConfiguration.CompressionMode.GZIP "deflate" -> MemcacheCacheConfiguration.CompressionMode.DEFLATE else -> MemcacheCacheConfiguration.CompressionMode.DEFLATE } @@ -63,6 +65,7 @@ class MemcacheCacheProvider : CacheProvider { maxSize, digestAlgorithm, compressionMode, + chunkSize ) } @@ -70,7 +73,6 @@ class MemcacheCacheProvider : CacheProvider { val result = doc.createElement("cache") Xml.of(doc, result) { attr("xmlns:${xmlNamespacePrefix}", xmlNamespace, namespaceURI = "http://www.w3.org/2000/xmlns/") - attr("xs:type", "${xmlNamespacePrefix}:$xmlType", RBCS.XML_SCHEMA_NAMESPACE_URI) for (server in servers) { node("server") { @@ -84,13 +86,13 @@ class MemcacheCacheProvider : CacheProvider { } attr("max-age", maxAge.toString()) attr("max-size", maxSize.toString()) + attr("chunk-size", chunkSize.toString()) digestAlgorithm?.let { digestAlgorithm -> attr("digest", digestAlgorithm) } compressionMode?.let { compressionMode -> attr( "compression-mode", when (compressionMode) { - MemcacheCacheConfiguration.CompressionMode.GZIP -> "gzip" MemcacheCacheConfiguration.CompressionMode.DEFLATE -> "deflate" } ) diff --git a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/client/Event.kt b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/client/Event.kt new file mode 100644 index 0000000..06afbf5 --- /dev/null +++ b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/client/Event.kt @@ -0,0 +1,30 @@ +package net.woggioni.rbcs.server.memcache.client + +import io.netty.buffer.ByteBuf +import io.netty.handler.codec.memcache.LastMemcacheContent +import io.netty.handler.codec.memcache.MemcacheContent +import io.netty.handler.codec.memcache.binary.BinaryMemcacheRequest +import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse + +sealed interface StreamingRequestEvent { + class SendRequest(val request : BinaryMemcacheRequest) : StreamingRequestEvent + open class SendChunk(val chunk : ByteBuf) : StreamingRequestEvent + class SendLastChunk(chunk : ByteBuf) : SendChunk(chunk) + class ExceptionCaught(val exception : Throwable) : StreamingRequestEvent +} + +sealed interface StreamingResponseEvent { + class ResponseReceived(val response : BinaryMemcacheResponse) : StreamingResponseEvent + open class ContentReceived(val content : MemcacheContent) : StreamingResponseEvent + class LastContentReceived(val lastContent : LastMemcacheContent) : ContentReceived(lastContent) + class ExceptionCaught(val exception : Throwable) : StreamingResponseEvent +} + +interface MemcacheRequestHandle { + fun handleEvent(evt : StreamingRequestEvent) +} + +interface MemcacheResponseHandle { + fun handleEvent(evt : StreamingResponseEvent) +} + diff --git a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/client/MemcacheClient.kt b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/client/MemcacheClient.kt index fa8568d..5c6e4f1 100644 --- a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/client/MemcacheClient.kt +++ b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/client/MemcacheClient.kt @@ -3,7 +3,6 @@ package net.woggioni.rbcs.server.memcache.client import io.netty.bootstrap.Bootstrap import io.netty.buffer.ByteBuf -import io.netty.buffer.Unpooled import io.netty.channel.Channel import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelOption @@ -14,36 +13,23 @@ import io.netty.channel.pool.AbstractChannelPoolHandler import io.netty.channel.pool.ChannelPool import io.netty.channel.pool.FixedChannelPool import io.netty.channel.socket.nio.NioSocketChannel -import io.netty.handler.codec.DecoderException +import io.netty.handler.codec.memcache.DefaultLastMemcacheContent +import io.netty.handler.codec.memcache.DefaultMemcacheContent +import io.netty.handler.codec.memcache.LastMemcacheContent +import io.netty.handler.codec.memcache.MemcacheContent +import io.netty.handler.codec.memcache.MemcacheObject import io.netty.handler.codec.memcache.binary.BinaryMemcacheClientCodec -import io.netty.handler.codec.memcache.binary.BinaryMemcacheObjectAggregator -import io.netty.handler.codec.memcache.binary.BinaryMemcacheOpcodes -import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus -import io.netty.handler.codec.memcache.binary.DefaultFullBinaryMemcacheRequest -import io.netty.handler.codec.memcache.binary.FullBinaryMemcacheRequest -import io.netty.handler.codec.memcache.binary.FullBinaryMemcacheResponse +import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse +import io.netty.handler.logging.LoggingHandler import io.netty.util.concurrent.GenericFutureListener -import net.woggioni.rbcs.common.ByteBufInputStream -import net.woggioni.rbcs.common.ByteBufOutputStream -import net.woggioni.rbcs.common.RBCS.digest import net.woggioni.rbcs.common.HostAndPort import net.woggioni.rbcs.common.contextLogger +import net.woggioni.rbcs.common.debug import net.woggioni.rbcs.server.memcache.MemcacheCacheConfiguration -import net.woggioni.rbcs.server.memcache.MemcacheException -import net.woggioni.jwo.JWO import java.net.InetSocketAddress -import java.nio.channels.Channels -import java.nio.channels.ReadableByteChannel -import java.security.MessageDigest -import java.time.Duration -import java.time.Instant import java.util.concurrent.CompletableFuture import java.util.concurrent.ConcurrentHashMap -import java.util.zip.Deflater -import java.util.zip.DeflaterOutputStream -import java.util.zip.GZIPInputStream -import java.util.zip.GZIPOutputStream -import java.util.zip.InflaterInputStream +import java.util.concurrent.atomic.AtomicLong import io.netty.util.concurrent.Future as NettyFuture @@ -61,6 +47,8 @@ class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseabl group = NioEventLoopGroup() } + private val counter = AtomicLong(0) + private fun newConnectionPool(server: MemcacheCacheConfiguration.Server): FixedChannelPool { val bootstrap = Bootstrap().apply { group(group) @@ -76,18 +64,15 @@ class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseabl override fun channelCreated(ch: Channel) { val pipeline: ChannelPipeline = ch.pipeline() pipeline.addLast(BinaryMemcacheClientCodec()) - pipeline.addLast(BinaryMemcacheObjectAggregator(cfg.maxSize)) + pipeline.addLast(LoggingHandler()) } } return FixedChannelPool(bootstrap, channelPoolHandler, server.maxConnections) } - - private fun sendRequest(request: FullBinaryMemcacheRequest): CompletableFuture { - + fun sendRequest(key: ByteBuf, responseHandle: MemcacheResponseHandle): CompletableFuture { val server = cfg.servers.let { servers -> if (servers.size > 1) { - val key = request.key().duplicate() var checksum = 0 while (key.readableBytes() > 4) { val byte = key.readInt() @@ -103,7 +88,7 @@ class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseabl } } - val response = CompletableFuture() + val response = CompletableFuture() // Custom handler for processing responses val pool = connectionPool.computeIfAbsent(server.endpoint) { newConnectionPool(server) @@ -113,31 +98,73 @@ class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseabl if (channelFuture.isSuccess) { val channel = channelFuture.now val pipeline = channel.pipeline() - channel.pipeline() - .addLast("client-handler", object : SimpleChannelInboundHandler() { - override fun channelRead0( - ctx: ChannelHandlerContext, - msg: FullBinaryMemcacheResponse - ) { - pipeline.removeLast() - pool.release(channel) - msg.touch("The method's caller must remember to release this") - response.complete(msg.retain()) - } + val handler = object : SimpleChannelInboundHandler() { + override fun channelRead0( + ctx: ChannelHandlerContext, + msg: MemcacheObject + ) { + when (msg) { + is BinaryMemcacheResponse -> responseHandle.handleEvent( + StreamingResponseEvent.ResponseReceived( + msg + ) + ) - override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { - val ex = when (cause) { - is DecoderException -> cause.cause!! - else -> cause + is LastMemcacheContent -> { + responseHandle.handleEvent( + StreamingResponseEvent.LastContentReceived( + msg + ) + ) + pipeline.removeLast() + pool.release(channel) } - ctx.close() - pipeline.removeLast() - pool.release(channel) - response.completeExceptionally(ex) + + is MemcacheContent -> responseHandle.handleEvent( + StreamingResponseEvent.ContentReceived( + msg + ) + ) } - }) - request.touch() - channel.writeAndFlush(request) + } + + override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { + responseHandle.handleEvent(StreamingResponseEvent.ExceptionCaught(cause)) + ctx.close() + pipeline.removeLast() + pool.release(channel) + } + } + channel.pipeline() + .addLast("client-handler", handler) + response.complete(object : MemcacheRequestHandle { + override fun handleEvent(evt: StreamingRequestEvent) { + when (evt) { + is StreamingRequestEvent.SendRequest -> { + channel.writeAndFlush(evt.request) + } + + is StreamingRequestEvent.SendLastChunk -> { + channel.writeAndFlush(DefaultLastMemcacheContent(evt.chunk)) + val value = counter.incrementAndGet() + log.debug { + "Finished request counter: $value" + } + } + + is StreamingRequestEvent.SendChunk -> { + channel.writeAndFlush(DefaultMemcacheContent(evt.chunk)) + } + + is StreamingRequestEvent.ExceptionCaught -> { + responseHandle.handleEvent(StreamingResponseEvent.ExceptionCaught(evt.exception)) + channel.close() + pipeline.removeLast() + pool.release(channel) + } + } + } + }) } else { response.completeExceptionally(channelFuture.cause()) } @@ -146,107 +173,6 @@ class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseabl return response } - private fun encodeExpiry(expiry: Duration): Int { - val expirySeconds = expiry.toSeconds() - return expirySeconds.toInt().takeIf { it.toLong() == expirySeconds } - ?: Instant.ofEpochSecond(expirySeconds).epochSecond.toInt() - } - - fun get(key: String): CompletableFuture { - val request = (cfg.digestAlgorithm - ?.let(MessageDigest::getInstance) - ?.let { md -> - digest(key.toByteArray(), md) - } ?: key.toByteArray(Charsets.UTF_8)).let { digest -> - DefaultFullBinaryMemcacheRequest(Unpooled.wrappedBuffer(digest), null).apply { - setOpcode(BinaryMemcacheOpcodes.GET) - } - } - return sendRequest(request).thenApply { response -> - try { - when (val status = response.status()) { - BinaryMemcacheResponseStatus.SUCCESS -> { - val compressionMode = cfg.compressionMode - val content = response.content().retain() - content.touch() - if (compressionMode != null) { - when (compressionMode) { - MemcacheCacheConfiguration.CompressionMode.GZIP -> { - GZIPInputStream(ByteBufInputStream(content)) - } - - MemcacheCacheConfiguration.CompressionMode.DEFLATE -> { - InflaterInputStream(ByteBufInputStream(content)) - } - } - } else { - ByteBufInputStream(content) - }.let(Channels::newChannel) - } - - BinaryMemcacheResponseStatus.KEY_ENOENT -> { - null - } - - else -> throw MemcacheException(status) - } - } finally { - response.release() - } - } - } - - fun put(key: String, content: ByteBuf, expiry: Duration, cas: Long? = null): CompletableFuture { - val request = (cfg.digestAlgorithm - ?.let(MessageDigest::getInstance) - ?.let { md -> - digest(key.toByteArray(), md) - } ?: key.toByteArray(Charsets.UTF_8)).let { digest -> - val extras = Unpooled.buffer(8, 8) - extras.writeInt(0) - extras.writeInt(encodeExpiry(expiry)) - val compressionMode = cfg.compressionMode - content.retain() - val payload = if (compressionMode != null) { - val inputStream = ByteBufInputStream(content) - val buf = content.alloc().buffer() - buf.retain() - val outputStream = when (compressionMode) { - MemcacheCacheConfiguration.CompressionMode.GZIP -> { - GZIPOutputStream(ByteBufOutputStream(buf)) - } - - MemcacheCacheConfiguration.CompressionMode.DEFLATE -> { - DeflaterOutputStream(ByteBufOutputStream(buf), Deflater(Deflater.DEFAULT_COMPRESSION, false)) - } - } - inputStream.use { i -> - outputStream.use { o -> - JWO.copy(i, o) - } - } - buf - } else { - content - } - DefaultFullBinaryMemcacheRequest(Unpooled.wrappedBuffer(digest), extras, payload).apply { - setOpcode(BinaryMemcacheOpcodes.SET) - cas?.let(this::setCas) - } - } - return sendRequest(request).thenApply { response -> - try { - when (val status = response.status()) { - BinaryMemcacheResponseStatus.SUCCESS -> null - else -> throw MemcacheException(status) - } - } finally { - response.release() - } - } - } - - fun shutDown(): NettyFuture<*> { return group.shutdownGracefully() } diff --git a/rbcs-server-memcache/src/main/resources/net/woggioni/rbcs/server/memcache/schema/rbcs-memcache.xsd b/rbcs-server-memcache/src/main/resources/net/woggioni/rbcs/server/memcache/schema/rbcs-memcache.xsd index ae6b557..acb1db4 100644 --- a/rbcs-server-memcache/src/main/resources/net/woggioni/rbcs/server/memcache/schema/rbcs-memcache.xsd +++ b/rbcs-server-memcache/src/main/resources/net/woggioni/rbcs/server/memcache/schema/rbcs-memcache.xsd @@ -20,7 +20,8 @@ - + + @@ -30,7 +31,6 @@ - diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/RemoteBuildCacheServer.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/RemoteBuildCacheServer.kt index 76af690..4971620 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/RemoteBuildCacheServer.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/RemoteBuildCacheServer.kt @@ -16,7 +16,6 @@ import io.netty.handler.codec.compression.CompressionOptions import io.netty.handler.codec.http.DefaultHttpContent import io.netty.handler.codec.http.HttpContentCompressor import io.netty.handler.codec.http.HttpHeaderNames -import io.netty.handler.codec.http.HttpObjectAggregator import io.netty.handler.codec.http.HttpRequest import io.netty.handler.codec.http.HttpServerCodec import io.netty.handler.ssl.ClientAuth @@ -249,13 +248,7 @@ class RemoteBuildCacheServer(private val cfg: Configuration) { private val cache = cfg.cache.materialize() - private val serverHandler = let { - val prefix = Path.of("/").resolve(Path.of(cfg.serverPath ?: "/")) - ServerHandler(cache, prefix) - } - private val exceptionHandler = ExceptionHandler() - private val throttlingHandler = ThrottlingHandler(cfg) private val authenticator = when (val auth = cfg.authentication) { is Configuration.BasicAuthentication -> NettyHttpBasicAuthenticator(cfg.users, RoleAuthorizer()) @@ -368,11 +361,15 @@ class RemoteBuildCacheServer(private val cfg: Configuration) { pipeline.addLast(HttpServerCodec()) pipeline.addLast(HttpChunkContentCompressor(1024)) pipeline.addLast(ChunkedWriteHandler()) - pipeline.addLast(HttpObjectAggregator(cfg.connection.maxRequestSize)) authenticator?.let { pipeline.addLast(it) } - pipeline.addLast(throttlingHandler) + pipeline.addLast(ThrottlingHandler(cfg)) + + val serverHandler = let { + val prefix = Path.of("/").resolve(Path.of(cfg.serverPath ?: "/")) + ServerHandler(cache, prefix) + } pipeline.addLast(eventExecutorGroup, serverHandler) pipeline.addLast(exceptionHandler) } diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/auth/Authenticator.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/auth/Authenticator.kt index 163fd37..f72a697 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/auth/Authenticator.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/auth/Authenticator.kt @@ -6,6 +6,7 @@ import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelInboundHandlerAdapter import io.netty.handler.codec.http.DefaultFullHttpResponse import io.netty.handler.codec.http.FullHttpResponse +import io.netty.handler.codec.http.HttpContent import io.netty.handler.codec.http.HttpHeaderNames import io.netty.handler.codec.http.HttpRequest import io.netty.handler.codec.http.HttpResponseStatus @@ -57,6 +58,8 @@ abstract class AbstractNettyHttpAuthenticator(private val authorizer: Authorizer } else { authorizationFailure(ctx, msg) } + } else if(msg is HttpContent) { + ctx.fireChannelRead(msg) } } diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCache.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCache.kt index e6a98fc..6ea08cf 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCache.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCache.kt @@ -1,12 +1,16 @@ package net.woggioni.rbcs.server.cache -import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufAllocator import net.woggioni.jwo.JWO import net.woggioni.rbcs.api.Cache -import net.woggioni.rbcs.common.ByteBufInputStream +import net.woggioni.rbcs.api.RequestHandle +import net.woggioni.rbcs.api.ResponseHandle +import net.woggioni.rbcs.api.event.RequestStreamingEvent +import net.woggioni.rbcs.api.event.ResponseStreamingEvent +import net.woggioni.rbcs.common.ByteBufOutputStream import net.woggioni.rbcs.common.RBCS.digestString import net.woggioni.rbcs.common.contextLogger -import java.nio.channels.Channels +import net.woggioni.rbcs.common.extractChunk import java.nio.channels.FileChannel import java.nio.file.Files import java.nio.file.Path @@ -19,7 +23,6 @@ import java.time.Instant import java.util.concurrent.CompletableFuture import java.util.zip.Deflater import java.util.zip.DeflaterOutputStream -import java.util.zip.Inflater import java.util.zip.InflaterInputStream class FileSystemCache( @@ -27,7 +30,8 @@ class FileSystemCache( val maxAge: Duration, val digestAlgorithm: String?, val compressionEnabled: Boolean, - val compressionLevel: Int + val compressionLevel: Int, + val chunkSize: Int ) : Cache { private companion object { @@ -44,61 +48,111 @@ class FileSystemCache( private var nextGc = Instant.now() - override fun get(key: String) = (digestAlgorithm - ?.let(MessageDigest::getInstance) - ?.let { md -> - digestString(key.toByteArray(), md) - } ?: key).let { digest -> - root.resolve(digest).takeIf(Files::exists) - ?.let { file -> - file.takeIf(Files::exists)?.let { file -> - if (compressionEnabled) { - val inflater = Inflater() - Channels.newChannel( - InflaterInputStream( - Channels.newInputStream( - FileChannel.open( - file, - StandardOpenOption.READ - ) - ), inflater - ) - ) - } else { - FileChannel.open(file, StandardOpenOption.READ) - } - } - }.let { - CompletableFuture.completedFuture(it) - } - } - - override fun put(key: String, content: ByteBuf): CompletableFuture { + override fun get(key: String, responseHandle: ResponseHandle, alloc: ByteBufAllocator) { (digestAlgorithm ?.let(MessageDigest::getInstance) ?.let { md -> digestString(key.toByteArray(), md) } ?: key).let { digest -> - val file = root.resolve(digest) - val tmpFile = Files.createTempFile(root, null, ".tmp") - try { - Files.newOutputStream(tmpFile).let { + root.resolve(digest).takeIf(Files::exists) + ?.let { file -> + file.takeIf(Files::exists)?.let { file -> + responseHandle.handleEvent(ResponseStreamingEvent.RESPONSE_RECEIVED) + if (compressionEnabled) { + val compositeBuffer = alloc.compositeBuffer() + ByteBufOutputStream(compositeBuffer).use { outputStream -> + InflaterInputStream(Files.newInputStream(file)).use { inputStream -> + val ioBuffer = alloc.buffer(chunkSize) + try { + while (true) { + val read = ioBuffer.writeBytes(inputStream, chunkSize) + val last = read < 0 + if (read > 0) { + ioBuffer.readBytes(outputStream, read) + } + if (last) { + compositeBuffer.retain() + outputStream.close() + } + if (compositeBuffer.readableBytes() >= chunkSize || last) { + val chunk = extractChunk(compositeBuffer, alloc) + val evt = if (last) { + ResponseStreamingEvent.LastChunkReceived(chunk) + } else { + ResponseStreamingEvent.ChunkReceived(chunk) + } + responseHandle.handleEvent(evt) + } + if (last) break + } + } finally { + ioBuffer.release() + } + } + } + } else { + responseHandle.handleEvent( + ResponseStreamingEvent.FileReceived( + FileChannel.open(file, StandardOpenOption.READ) + ) + ) + } + } + } ?: responseHandle.handleEvent(ResponseStreamingEvent.NOT_FOUND) + } + } + + override fun put( + key: String, + responseHandle: ResponseHandle, + alloc: ByteBufAllocator + ): CompletableFuture { + try { + (digestAlgorithm + ?.let(MessageDigest::getInstance) + ?.let { md -> + digestString(key.toByteArray(), md) + } ?: key).let { digest -> + val file = root.resolve(digest) + val tmpFile = Files.createTempFile(root, null, ".tmp") + val stream = Files.newOutputStream(tmpFile).let { if (compressionEnabled) { val deflater = Deflater(compressionLevel) DeflaterOutputStream(it, deflater) } else { it } - }.use { - JWO.copy(ByteBufInputStream(content), it) } - Files.move(tmpFile, file, StandardCopyOption.ATOMIC_MOVE) - } catch (t: Throwable) { - Files.delete(tmpFile) - throw t + return CompletableFuture.completedFuture(object : RequestHandle { + override fun handleEvent(evt: RequestStreamingEvent) { + try { + when (evt) { + is RequestStreamingEvent.LastChunkReceived -> { + evt.chunk.readBytes(stream, evt.chunk.readableBytes()) + stream.close() + Files.move(tmpFile, file, StandardCopyOption.ATOMIC_MOVE) + responseHandle.handleEvent(ResponseStreamingEvent.RESPONSE_RECEIVED) + } + + is RequestStreamingEvent.ChunkReceived -> { + evt.chunk.readBytes(stream, evt.chunk.readableBytes()) + } + + is RequestStreamingEvent.ExceptionCaught -> { + Files.delete(tmpFile) + stream.close() + } + } + } catch (ex: Throwable) { + responseHandle.handleEvent(ResponseStreamingEvent.ExceptionCaught(ex)) + } + } + }) } + } catch (ex: Throwable) { + responseHandle.handleEvent(ResponseStreamingEvent.ExceptionCaught(ex)) + return CompletableFuture.failedFuture(ex) } - return CompletableFuture.completedFuture(null) } private val garbageCollector = Thread.ofVirtual().name("file-system-cache-gc").start { @@ -119,8 +173,8 @@ class FileSystemCache( /** * Returns the creation timestamp of the oldest cache entry (if any) */ - private fun actualGc(now: Instant) : Instant? { - var result :Instant? = null + private fun actualGc(now: Instant): Instant? { + var result: Instant? = null Files.list(root) .filter { path -> JWO.splitExtension(path) @@ -132,7 +186,7 @@ class FileSystemCache( val creationTimeStamp = Files.readAttributes(it, BasicFileAttributes::class.java) .creationTime() .toInstant() - if(result == null || creationTimeStamp < result) { + if (result == null || creationTimeStamp < result) { result = creationTimeStamp } now > creationTimeStamp.plus(maxAge) diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCacheConfiguration.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCacheConfiguration.kt index 287c0b6..7a029af 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCacheConfiguration.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCacheConfiguration.kt @@ -12,13 +12,15 @@ data class FileSystemCacheConfiguration( val digestAlgorithm : String?, val compressionEnabled: Boolean, val compressionLevel: Int, + val chunkSize: Int, ) : Configuration.Cache { override fun materialize() = FileSystemCache( root ?: Application.builder("rbcs").build().computeCacheDirectory(), maxAge, digestAlgorithm, compressionEnabled, - compressionLevel + compressionLevel, + chunkSize, ) override fun getNamespaceURI() = RBCS.RBCS_NAMESPACE_URI diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCacheProvider.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCacheProvider.kt index 84286f8..c1e524f 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCacheProvider.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/FileSystemCacheProvider.kt @@ -31,13 +31,17 @@ class FileSystemCacheProvider : CacheProvider { ?.let(String::toInt) ?: Deflater.DEFAULT_COMPRESSION val digestAlgorithm = el.renderAttribute("digest") ?: "MD5" + val chunkSize = el.renderAttribute("chunk-size") + ?.let(Integer::decode) + ?: 0x4000 return FileSystemCacheConfiguration( path, maxAge, digestAlgorithm, enableCompression, - compressionLevel + compressionLevel, + chunkSize ) } @@ -57,6 +61,7 @@ class FileSystemCacheProvider : CacheProvider { }?.let { attr("compression-level", it.toString()) } + attr("chunk-size", chunkSize.toString()) } result } diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCache.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCache.kt index 0eb56b2..d1b66e1 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCache.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCache.kt @@ -1,31 +1,36 @@ package net.woggioni.rbcs.server.cache import io.netty.buffer.ByteBuf -import net.woggioni.jwo.JWO +import io.netty.buffer.ByteBufAllocator import net.woggioni.rbcs.api.Cache -import net.woggioni.rbcs.common.ByteBufInputStream +import net.woggioni.rbcs.api.RequestHandle +import net.woggioni.rbcs.api.ResponseHandle +import net.woggioni.rbcs.api.event.RequestStreamingEvent +import net.woggioni.rbcs.api.event.ResponseStreamingEvent import net.woggioni.rbcs.common.ByteBufOutputStream import net.woggioni.rbcs.common.RBCS.digestString import net.woggioni.rbcs.common.contextLogger -import java.nio.channels.Channels +import net.woggioni.rbcs.common.extractChunk import java.security.MessageDigest import java.time.Duration import java.time.Instant import java.util.concurrent.CompletableFuture import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.PriorityBlockingQueue +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicLong import java.util.zip.Deflater import java.util.zip.DeflaterOutputStream import java.util.zip.Inflater -import java.util.zip.InflaterInputStream +import java.util.zip.InflaterOutputStream class InMemoryCache( - val maxAge: Duration, - val maxSize: Long, - val digestAlgorithm: String?, - val compressionEnabled: Boolean, - val compressionLevel: Int + private val maxAge: Duration, + private val maxSize: Long, + private val digestAlgorithm: String?, + private val compressionEnabled: Boolean, + private val compressionLevel: Int, + private val chunkSize : Int ) : Cache { companion object { @@ -35,8 +40,9 @@ class InMemoryCache( private val size = AtomicLong() private val map = ConcurrentHashMap() - - private class RemovalQueueElement(val key: String, val value : ByteBuf, val expiry : Instant) : Comparable { + + private class RemovalQueueElement(val key: String, val value: ByteBuf, val expiry: Instant) : + Comparable { override fun compareTo(other: RemovalQueueElement) = expiry.compareTo(other.expiry) } @@ -46,19 +52,17 @@ class InMemoryCache( private var running = true private val garbageCollector = Thread.ofVirtual().name("in-memory-cache-gc").start { - while(running) { - val el = removalQueue.take() + while (running) { + val el = removalQueue.poll(1, TimeUnit.SECONDS) ?: continue val buf = el.value val now = Instant.now() - if(now > el.expiry) { + if (now > el.expiry) { val removed = map.remove(el.key, buf) - if(removed) { + if (removed) { updateSizeAfterRemoval(buf) //Decrease the reference count for map buf.release() } - //Decrease the reference count for removalQueue - buf.release() } else { removalQueue.put(el) Thread.sleep(minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1))) @@ -66,14 +70,12 @@ class InMemoryCache( } } - private fun removeEldest() : Long { - while(true) { + private fun removeEldest(): Long { + while (true) { val el = removalQueue.take() val buf = el.value val removed = map.remove(el.key, buf) - //Decrease the reference count for removalQueue - buf.release() - if(removed) { + if (removed) { val newSize = updateSizeAfterRemoval(buf) //Decrease the reference count for map buf.release() @@ -82,8 +84,8 @@ class InMemoryCache( } } - private fun updateSizeAfterRemoval(removed: ByteBuf) : Long { - return size.updateAndGet { currentSize : Long -> + private fun updateSizeAfterRemoval(removed: ByteBuf): Long { + return size.updateAndGet { currentSize: Long -> currentSize - removed.readableBytes() } } @@ -93,58 +95,114 @@ class InMemoryCache( garbageCollector.join() } - override fun get(key: String) = - (digestAlgorithm - ?.let(MessageDigest::getInstance) - ?.let { md -> - digestString(key.toByteArray(), md) - } ?: key - ).let { digest -> - map[digest] - ?.let { value -> - val copy = value.retainedDuplicate() - copy.touch("This has to be released by the caller of the cache") - if (compressionEnabled) { - val inflater = Inflater() - Channels.newChannel(InflaterInputStream(ByteBufInputStream(copy), inflater)) - } else { - Channels.newChannel(ByteBufInputStream(copy)) - } + override fun get(key: String, responseHandle: ResponseHandle, alloc: ByteBufAllocator) { + try { + (digestAlgorithm + ?.let(MessageDigest::getInstance) + ?.let { md -> + digestString(key.toByteArray(), md) + } ?: key + ).let { digest -> + map[digest] + ?.let { value -> + val copy = value.retainedDuplicate() + responseHandle.handleEvent(ResponseStreamingEvent.RESPONSE_RECEIVED) + val output = alloc.compositeBuffer() + if (compressionEnabled) { + try { + val stream = ByteBufOutputStream(output).let { + val inflater = Inflater() + InflaterOutputStream(it, inflater) + } + stream.use { os -> + var readable = copy.readableBytes() + while (true) { + copy.readBytes(os, chunkSize.coerceAtMost(readable)) + readable = copy.readableBytes() + val last = readable == 0 + if (last) stream.flush() + if (output.readableBytes() >= chunkSize || last) { + val chunk = extractChunk(output, alloc) + val evt = if (last) { + ResponseStreamingEvent.LastChunkReceived(chunk) + } else { + ResponseStreamingEvent.ChunkReceived(chunk) + } + responseHandle.handleEvent(evt) + } + if (last) break + } + } + } finally { + copy.release() + } + } else { + responseHandle.handleEvent( + ResponseStreamingEvent.LastChunkReceived(copy) + ) + } + } ?: responseHandle.handleEvent(ResponseStreamingEvent.NOT_FOUND) + } + } catch (ex: Throwable) { + responseHandle.handleEvent(ResponseStreamingEvent.ExceptionCaught(ex)) + } + } + + override fun put( + key: String, + responseHandle: ResponseHandle, + alloc: ByteBufAllocator + ): CompletableFuture { + return CompletableFuture.completedFuture(object : RequestHandle { + val buf = alloc.heapBuffer() + val stream = ByteBufOutputStream(buf).let { + if (compressionEnabled) { + val deflater = Deflater(compressionLevel) + DeflaterOutputStream(it, deflater) + } else { + it } - }.let { - CompletableFuture.completedFuture(it) } - override fun put(key: String, content: ByteBuf) = - (digestAlgorithm - ?.let(MessageDigest::getInstance) - ?.let { md -> - digestString(key.toByteArray(), md) - } ?: key).let { digest -> - content.retain() - val value = if (compressionEnabled) { - val deflater = Deflater(compressionLevel) - val buf = content.alloc().buffer() - buf.retain() - DeflaterOutputStream(ByteBufOutputStream(buf), deflater).use { outputStream -> - ByteBufInputStream(content).use { inputStream -> - JWO.copy(inputStream, outputStream) + override fun handleEvent(evt: RequestStreamingEvent) { + when (evt) { + is RequestStreamingEvent.ChunkReceived -> { + evt.chunk.readBytes(stream, evt.chunk.readableBytes()) + if (evt is RequestStreamingEvent.LastChunkReceived) { + (digestAlgorithm + ?.let(MessageDigest::getInstance) + ?.let { md -> + digestString(key.toByteArray(), md) + } ?: key + ).let { digest -> + val oldSize = map.put(digest, buf.retain())?.let { old -> + val result = old.readableBytes() + old.release() + result + } ?: 0 + val delta = buf.readableBytes() - oldSize + var newSize = size.updateAndGet { currentSize : Long -> + currentSize + delta + } + removalQueue.put(RemovalQueueElement(digest, buf, Instant.now().plus(maxAge))) + while(newSize > maxSize) { + newSize = removeEldest() + } + stream.close() + responseHandle.handleEvent(ResponseStreamingEvent.RESPONSE_RECEIVED) + } + } + } + + is RequestStreamingEvent.ExceptionCaught -> { + stream.close() + } + + else -> { + } } - buf - } else { - content } - val old = map.put(digest, value) - val delta = value.readableBytes() - (old?.readableBytes() ?: 0) - var newSize = size.updateAndGet { currentSize : Long -> - currentSize + delta - } - removalQueue.put(RemovalQueueElement(digest, value.retain(), Instant.now().plus(maxAge))) - while(newSize > maxSize) { - newSize = removeEldest() - } - }.let { - CompletableFuture.completedFuture(null) - } + }) + } } \ No newline at end of file diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheConfiguration.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheConfiguration.kt index 9a8071b..4793988 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheConfiguration.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheConfiguration.kt @@ -10,13 +10,15 @@ data class InMemoryCacheConfiguration( val digestAlgorithm : String?, val compressionEnabled: Boolean, val compressionLevel: Int, + val chunkSize : Int ) : Configuration.Cache { override fun materialize() = InMemoryCache( maxAge, maxSize, digestAlgorithm, compressionEnabled, - compressionLevel + compressionLevel, + chunkSize ) override fun getNamespaceURI() = RBCS.RBCS_NAMESPACE_URI diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheProvider.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheProvider.kt index 43112aa..c316a4f 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheProvider.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheProvider.kt @@ -31,13 +31,16 @@ class InMemoryCacheProvider : CacheProvider { ?.let(String::toInt) ?: Deflater.DEFAULT_COMPRESSION val digestAlgorithm = el.renderAttribute("digest") ?: "MD5" - + val chunkSize = el.renderAttribute("chunk-size") + ?.let(Integer::decode) + ?: 0x4000 return InMemoryCacheConfiguration( maxAge, maxSize, digestAlgorithm, enableCompression, - compressionLevel + compressionLevel, + chunkSize ) } @@ -57,6 +60,7 @@ class InMemoryCacheProvider : CacheProvider { }?.let { attr("compression-level", it.toString()) } + attr("chunk-size", chunkSize.toString()) } result } diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/configuration/Parser.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/configuration/Parser.kt index 04ae765..9d44d8c 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/configuration/Parser.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/configuration/Parser.kt @@ -124,7 +124,7 @@ object Parser { val writeIdleTimeout = child.renderAttribute("write-idle-timeout") ?.let(Duration::parse) ?: Duration.of(60, ChronoUnit.SECONDS) val maxRequestSize = child.renderAttribute("max-request-size") - ?.let(String::toInt) ?: 67108864 + ?.let(Integer::decode) ?: 0x4000000 connection = Configuration.Connection( readTimeout, writeTimeout, diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/handler/ServerHandler.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/handler/ServerHandler.kt index 97a3aa3..e1add8c 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/handler/ServerHandler.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/handler/ServerHandler.kt @@ -2,34 +2,66 @@ package net.woggioni.rbcs.server.handler import io.netty.buffer.Unpooled import io.netty.channel.ChannelFutureListener -import io.netty.channel.ChannelHandler import io.netty.channel.ChannelHandlerContext import io.netty.channel.DefaultFileRegion import io.netty.channel.SimpleChannelInboundHandler import io.netty.handler.codec.http.DefaultFullHttpResponse +import io.netty.handler.codec.http.DefaultHttpContent import io.netty.handler.codec.http.DefaultHttpResponse -import io.netty.handler.codec.http.FullHttpRequest +import io.netty.handler.codec.http.DefaultLastHttpContent +import io.netty.handler.codec.http.HttpContent import io.netty.handler.codec.http.HttpHeaderNames import io.netty.handler.codec.http.HttpHeaderValues import io.netty.handler.codec.http.HttpMethod +import io.netty.handler.codec.http.HttpObject +import io.netty.handler.codec.http.HttpRequest import io.netty.handler.codec.http.HttpResponseStatus import io.netty.handler.codec.http.HttpUtil import io.netty.handler.codec.http.LastHttpContent -import io.netty.handler.stream.ChunkedNioStream import net.woggioni.rbcs.api.Cache +import net.woggioni.rbcs.api.RequestHandle +import net.woggioni.rbcs.api.ResponseHandle +import net.woggioni.rbcs.api.event.RequestStreamingEvent +import net.woggioni.rbcs.api.event.ResponseStreamingEvent import net.woggioni.rbcs.common.contextLogger +import net.woggioni.rbcs.common.debug import net.woggioni.rbcs.server.debug import net.woggioni.rbcs.server.warn -import java.nio.channels.FileChannel import java.nio.file.Path +import java.util.concurrent.CompletableFuture -@ChannelHandler.Sharable class ServerHandler(private val cache: Cache, private val serverPrefix: Path) : - SimpleChannelInboundHandler() { + SimpleChannelInboundHandler() { private val log = contextLogger() - override fun channelRead0(ctx: ChannelHandlerContext, msg: FullHttpRequest) { + override fun channelRead0(ctx: ChannelHandlerContext, msg: HttpObject) { + when(msg) { + is HttpRequest -> handleRequest(ctx, msg) + is HttpContent -> handleContent(msg) + } + } + + private var requestHandle : CompletableFuture = CompletableFuture.completedFuture(null) + + private fun handleContent(content : HttpContent) { + content.retain() + requestHandle.thenAccept { handle -> + handle?.let { + val evt = if(content is LastHttpContent) { + RequestStreamingEvent.LastChunkReceived(content.content()) + + } else { + RequestStreamingEvent.ChunkReceived(content.content()) + } + it.handleEvent(evt) + content.release() + } ?: content.release() + } + } + + + private fun handleRequest(ctx : ChannelHandlerContext, msg : HttpRequest) { val keepAlive: Boolean = HttpUtil.isKeepAlive(msg) val method = msg.method() if (method === HttpMethod.GET) { @@ -42,54 +74,55 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) : return } if (serverPrefix == prefix) { - cache.get(key).thenApply { channel -> - if(channel != null) { - log.debug(ctx) { - "Cache hit for key '$key'" - } - val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK) - response.headers()[HttpHeaderNames.CONTENT_TYPE] = HttpHeaderValues.APPLICATION_OCTET_STREAM - if (!keepAlive) { - response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE) - response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.IDENTITY) - } else { - response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE) + val responseHandle = ResponseHandle { evt -> + when (evt) { + is ResponseStreamingEvent.ResponseReceived -> { + val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK) + response.headers()[HttpHeaderNames.CONTENT_TYPE] = HttpHeaderValues.APPLICATION_OCTET_STREAM + if (!keepAlive) { + response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE) + } else { + response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE) + } response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED) + ctx.writeAndFlush(response) } - ctx.write(response) - when (channel) { - is FileChannel -> { - val content = DefaultFileRegion(channel, 0, channel.size()) - if (keepAlive) { - ctx.write(content) - ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate()) - } else { - ctx.writeAndFlush(content) - .addListener(ChannelFutureListener.CLOSE) - } - } - else -> { - val content = ChunkedNioStream(channel) - if (keepAlive) { - ctx.write(content).addListener { - content.close() - } - ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate()) - } else { - ctx.writeAndFlush(content) - .addListener(ChannelFutureListener.CLOSE) - } + + is ResponseStreamingEvent.LastChunkReceived -> { + val channelFuture = ctx.writeAndFlush(DefaultLastHttpContent(evt.chunk)) + if (!keepAlive) { + channelFuture + .addListener(ChannelFutureListener.CLOSE) } } - } else { - log.debug(ctx) { - "Cache miss for key '$key'" + + is ResponseStreamingEvent.ChunkReceived -> { + ctx.writeAndFlush(DefaultHttpContent(evt.chunk)) + } + + is ResponseStreamingEvent.ExceptionCaught -> { + ctx.fireExceptionCaught(evt.exception) + } + + is ResponseStreamingEvent.NotFound -> { + val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND) + response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0 + ctx.writeAndFlush(response) + } + + is ResponseStreamingEvent.FileReceived -> { + val content = DefaultFileRegion(evt.file, 0, evt.file.size()) + if (keepAlive) { + ctx.write(content) + ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate()) + } else { + ctx.writeAndFlush(content) + .addListener(ChannelFutureListener.CLOSE) + } } - val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND) - response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0 - ctx.writeAndFlush(response) } - }.whenComplete { _, ex -> ex?.let(ctx::fireExceptionCaught) } + } + cache.get(key, responseHandle, ctx.alloc()) } else { log.warn(ctx) { "Got request for unhandled path '${msg.uri()}'" @@ -107,15 +140,32 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) : log.debug(ctx) { "Added value for key '$key' to build cache" } - cache.put(key, msg.content()).thenRun { - val response = DefaultFullHttpResponse( - msg.protocolVersion(), HttpResponseStatus.CREATED, - Unpooled.copiedBuffer(key.toByteArray()) - ) - response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes() - ctx.writeAndFlush(response) - }.whenComplete { _, ex -> + val responseHandle = ResponseHandle { evt -> + when (evt) { + is ResponseStreamingEvent.ResponseReceived -> { + val response = DefaultFullHttpResponse( + msg.protocolVersion(), HttpResponseStatus.CREATED, + Unpooled.copiedBuffer(key.toByteArray()) + ) + response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes() + ctx.writeAndFlush(response) + this.requestHandle = CompletableFuture.completedFuture(null) + } + is ResponseStreamingEvent.ChunkReceived -> { + evt.chunk.release() + } + is ResponseStreamingEvent.ExceptionCaught -> { + ctx.fireExceptionCaught(evt.exception) + } + else -> {} + } + } + + this.requestHandle = cache.put(key, responseHandle, ctx.alloc()).exceptionally { ex -> ctx.fireExceptionCaught(ex) + null + }.also { + log.debug { "Replacing request handle with $it"} } } else { log.warn(ctx) { @@ -125,9 +175,12 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) : response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0" ctx.writeAndFlush(response) } - } else if(method == HttpMethod.TRACE) { + } else if (method == HttpMethod.TRACE) { val replayedRequestHead = ctx.alloc().buffer() - replayedRequestHead.writeCharSequence("TRACE ${Path.of(msg.uri())} ${msg.protocolVersion().text()}\r\n", Charsets.US_ASCII) + replayedRequestHead.writeCharSequence( + "TRACE ${Path.of(msg.uri())} ${msg.protocolVersion().text()}\r\n", + Charsets.US_ASCII + ) msg.headers().forEach { (key, value) -> replayedRequestHead.apply { writeCharSequence(key, Charsets.US_ASCII) @@ -137,16 +190,24 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) : } } replayedRequestHead.writeCharSequence("\r\n", Charsets.US_ASCII) - val requestBody = msg.content() - requestBody.retain() - val responseBody = ctx.alloc().compositeBuffer(2).apply { - addComponents(true, replayedRequestHead) - addComponents(true, requestBody) + this.requestHandle = CompletableFuture.completedFuture(RequestHandle { evt -> + when(evt) { + is RequestStreamingEvent.LastChunkReceived -> { + ctx.writeAndFlush(DefaultLastHttpContent(evt.chunk.retain())) + this.requestHandle = CompletableFuture.completedFuture(null) + } + is RequestStreamingEvent.ChunkReceived -> ctx.writeAndFlush(DefaultHttpContent(evt.chunk.retain())) + is RequestStreamingEvent.ExceptionCaught -> ctx.fireExceptionCaught(evt.exception) + else -> { + + } + } + }).also { + log.debug { "Replacing request handle with $it"} } - val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK, responseBody) + val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK) response.headers().apply { set(HttpHeaderNames.CONTENT_TYPE, "message/http") - set(HttpHeaderNames.CONTENT_LENGTH, responseBody.readableBytes()) } ctx.writeAndFlush(response) } else { @@ -158,4 +219,11 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) : ctx.writeAndFlush(response) } } + + override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { + requestHandle.thenAccept { handle -> + handle?.handleEvent(RequestStreamingEvent.ExceptionCaught(cause)) + } + super.exceptionCaught(ctx, cause) + } } \ No newline at end of file diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/throttling/ThrottlingHandler.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/throttling/ThrottlingHandler.kt index 8420a49..029b409 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/throttling/ThrottlingHandler.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/throttling/ThrottlingHandler.kt @@ -1,10 +1,11 @@ package net.woggioni.rbcs.server.throttling -import io.netty.channel.ChannelHandler.Sharable import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelInboundHandlerAdapter import io.netty.handler.codec.http.DefaultFullHttpResponse +import io.netty.handler.codec.http.HttpContent import io.netty.handler.codec.http.HttpHeaderNames +import io.netty.handler.codec.http.HttpRequest import io.netty.handler.codec.http.HttpResponseStatus import io.netty.handler.codec.http.HttpVersion import net.woggioni.rbcs.api.Configuration @@ -18,7 +19,6 @@ import java.time.temporal.ChronoUnit import java.util.concurrent.TimeUnit -@Sharable class ThrottlingHandler(cfg: Configuration) : ChannelInboundHandlerAdapter() { private companion object { @@ -30,6 +30,8 @@ class ThrottlingHandler(cfg: Configuration) : ChannelInboundHandlerAdapter() { private val connectionConfiguration = cfg.connection + private var queuedContent : MutableList? = null + /** * If the suggested waiting time from the bucket is lower than this * amount, then the server will simply wait by itself before sending a response @@ -41,25 +43,34 @@ class ThrottlingHandler(cfg: Configuration) : ChannelInboundHandlerAdapter() { connectionConfiguration.writeIdleTimeout ).dividedBy(2) + + override fun channelRead(ctx: ChannelHandlerContext, msg: Any) { - val buckets = mutableListOf() - val user = ctx.channel().attr(RemoteBuildCacheServer.userAttribute).get() - if (user != null) { - bucketManager.getBucketByUser(user)?.let(buckets::addAll) - } - val groups = ctx.channel().attr(RemoteBuildCacheServer.groupAttribute).get() ?: emptySet() - if (groups.isNotEmpty()) { - groups.forEach { group -> - bucketManager.getBucketByGroup(group)?.let(buckets::add) + if(msg is HttpRequest) { + val buckets = mutableListOf() + val user = ctx.channel().attr(RemoteBuildCacheServer.userAttribute).get() + if (user != null) { + bucketManager.getBucketByUser(user)?.let(buckets::addAll) } - } - if (user == null && groups.isEmpty()) { - bucketManager.getBucketByAddress(ctx.channel().remoteAddress() as InetSocketAddress)?.let(buckets::add) - } - if (buckets.isEmpty()) { - return super.channelRead(ctx, msg) + val groups = ctx.channel().attr(RemoteBuildCacheServer.groupAttribute).get() ?: emptySet() + if (groups.isNotEmpty()) { + groups.forEach { group -> + bucketManager.getBucketByGroup(group)?.let(buckets::add) + } + } + if (user == null && groups.isEmpty()) { + bucketManager.getBucketByAddress(ctx.channel().remoteAddress() as InetSocketAddress)?.let(buckets::add) + } + if (buckets.isEmpty()) { + super.channelRead(ctx, msg) + } else { + handleBuckets(buckets, ctx, msg, true) + } + ctx.channel().id() + } else if(msg is HttpContent) { + queuedContent?.add(msg) ?: super.channelRead(ctx, msg) } else { - handleBuckets(buckets, ctx, msg, true) + super.channelRead(ctx, msg) } } @@ -73,9 +84,16 @@ class ThrottlingHandler(cfg: Configuration) : ChannelInboundHandlerAdapter() { } if (nextAttempt < 0) { super.channelRead(ctx, msg) + queuedContent?.let { + for(content in it) { + super.channelRead(ctx, content) + } + queuedContent = null + } } else { val waitDuration = Duration.of(LongMath.ceilDiv(nextAttempt, 100_000_000L) * 100L, ChronoUnit.MILLIS) if (delayResponse && waitDuration < waitThreshold) { + this.queuedContent = mutableListOf() ctx.executor().schedule({ handleBuckets(buckets, ctx, msg, false) }, waitDuration.toMillis(), TimeUnit.MILLISECONDS) diff --git a/rbcs-server/src/main/resources/net/woggioni/rbcs/server/schema/rbcs.xsd b/rbcs-server/src/main/resources/net/woggioni/rbcs/server/schema/rbcs.xsd index 5ede3b1..caeac74 100644 --- a/rbcs-server/src/main/resources/net/woggioni/rbcs/server/schema/rbcs.xsd +++ b/rbcs-server/src/main/resources/net/woggioni/rbcs/server/schema/rbcs.xsd @@ -39,7 +39,7 @@ - + @@ -52,10 +52,11 @@ - + + @@ -68,6 +69,7 @@ + @@ -220,5 +222,10 @@ + + + + + diff --git a/rbcs-server/src/test/kotlin/net/woggioni/rbcs/server/test/AbstractBasicAuthServerTest.kt b/rbcs-server/src/test/kotlin/net/woggioni/rbcs/server/test/AbstractBasicAuthServerTest.kt index 318f9e4..2d947c3 100644 --- a/rbcs-server/src/test/kotlin/net/woggioni/rbcs/server/test/AbstractBasicAuthServerTest.kt +++ b/rbcs-server/src/test/kotlin/net/woggioni/rbcs/server/test/AbstractBasicAuthServerTest.kt @@ -47,11 +47,13 @@ abstract class AbstractBasicAuthServerTest : AbstractServerTest() { ), users.asSequence().map { it.name to it}.toMap(), sequenceOf(writersGroup, readersGroup).map { it.name to it}.toMap(), - FileSystemCacheConfiguration(this.cacheDir, + FileSystemCacheConfiguration( + this.cacheDir, maxAge = Duration.ofSeconds(3600 * 24), digestAlgorithm = "MD5", compressionLevel = Deflater.DEFAULT_COMPRESSION, - compressionEnabled = false + compressionEnabled = false, + chunkSize = 0x1000 ), Configuration.BasicAuthentication(), null, diff --git a/rbcs-server/src/test/kotlin/net/woggioni/rbcs/server/test/AbstractTlsServerTest.kt b/rbcs-server/src/test/kotlin/net/woggioni/rbcs/server/test/AbstractTlsServerTest.kt index db365fe..ac50be4 100644 --- a/rbcs-server/src/test/kotlin/net/woggioni/rbcs/server/test/AbstractTlsServerTest.kt +++ b/rbcs-server/src/test/kotlin/net/woggioni/rbcs/server/test/AbstractTlsServerTest.kt @@ -156,7 +156,8 @@ abstract class AbstractTlsServerTest : AbstractServerTest() { maxAge = Duration.ofSeconds(3600 * 24), compressionEnabled = true, compressionLevel = Deflater.DEFAULT_COMPRESSION, - digestAlgorithm = "MD5" + digestAlgorithm = "MD5", + chunkSize = 0x1000 ), // InMemoryCacheConfiguration( // maxAge = Duration.ofSeconds(3600 * 24), diff --git a/rbcs-server/src/test/kotlin/net/woggioni/rbcs/server/test/BasicAuthServerTest.kt b/rbcs-server/src/test/kotlin/net/woggioni/rbcs/server/test/BasicAuthServerTest.kt index c5b8349..b07fbe5 100644 --- a/rbcs-server/src/test/kotlin/net/woggioni/rbcs/server/test/BasicAuthServerTest.kt +++ b/rbcs-server/src/test/kotlin/net/woggioni/rbcs/server/test/BasicAuthServerTest.kt @@ -86,7 +86,7 @@ class BasicAuthServerTest : AbstractBasicAuthServerTest() { @Test @Order(4) fun putAsAWriterUser() { - val client: HttpClient = HttpClient.newHttpClient() + val client: HttpClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).build() val (key, value) = keyValuePair val user = cfg.users.values.find { diff --git a/rbcs-server/src/test/kotlin/net/woggioni/rbcs/server/test/NoAuthServerTest.kt b/rbcs-server/src/test/kotlin/net/woggioni/rbcs/server/test/NoAuthServerTest.kt index b0d5c46..f2d5f5d 100644 --- a/rbcs-server/src/test/kotlin/net/woggioni/rbcs/server/test/NoAuthServerTest.kt +++ b/rbcs-server/src/test/kotlin/net/woggioni/rbcs/server/test/NoAuthServerTest.kt @@ -52,7 +52,8 @@ class NoAuthServerTest : AbstractServerTest() { compressionEnabled = true, digestAlgorithm = "MD5", compressionLevel = Deflater.DEFAULT_COMPRESSION, - maxSize = 0x1000000 + maxSize = 0x1000000, + chunkSize = 0x1000 ), null, null, @@ -80,7 +81,7 @@ class NoAuthServerTest : AbstractServerTest() { @Test @Order(1) fun putWithNoAuthorizationHeader() { - val client: HttpClient = HttpClient.newHttpClient() + val client: HttpClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).build() val (key, value) = keyValuePair val requestBuilder = newRequestBuilder(key) diff --git a/rbcs-server/src/test/resources/net/woggioni/rbcs/server/test/valid/rbcs-default.xml b/rbcs-server/src/test/resources/net/woggioni/rbcs/server/test/valid/rbcs-default.xml index 6f4a6c7..5b8c866 100644 --- a/rbcs-server/src/test/resources/net/woggioni/rbcs/server/test/valid/rbcs-default.xml +++ b/rbcs-server/src/test/resources/net/woggioni/rbcs/server/test/valid/rbcs-default.xml @@ -11,7 +11,7 @@ idle-timeout="PT30M" max-request-size="101325"/> - + diff --git a/rbcs-server/src/test/resources/net/woggioni/rbcs/server/test/valid/rbcs-memcached-tls.xml b/rbcs-server/src/test/resources/net/woggioni/rbcs/server/test/valid/rbcs-memcached-tls.xml index 05a96e2..2972235 100644 --- a/rbcs-server/src/test/resources/net/woggioni/rbcs/server/test/valid/rbcs-memcached-tls.xml +++ b/rbcs-server/src/test/resources/net/woggioni/rbcs/server/test/valid/rbcs-memcached-tls.xml @@ -13,7 +13,7 @@ read-timeout="PT5M" write-timeout="PT5M"/> - + diff --git a/rbcs-server/src/test/resources/net/woggioni/rbcs/server/test/valid/rbcs-memcached.xml b/rbcs-server/src/test/resources/net/woggioni/rbcs/server/test/valid/rbcs-memcached.xml index 9c1e0fb..0b6fb2e 100644 --- a/rbcs-server/src/test/resources/net/woggioni/rbcs/server/test/valid/rbcs-memcached.xml +++ b/rbcs-server/src/test/resources/net/woggioni/rbcs/server/test/valid/rbcs-memcached.xml @@ -12,7 +12,7 @@ idle-timeout="PT30M" max-request-size="101325"/> - + diff --git a/rbcs-server/src/test/resources/net/woggioni/rbcs/server/test/valid/rbcs-tls.xml b/rbcs-server/src/test/resources/net/woggioni/rbcs/server/test/valid/rbcs-tls.xml index 284c0dd..b774ced 100644 --- a/rbcs-server/src/test/resources/net/woggioni/rbcs/server/test/valid/rbcs-tls.xml +++ b/rbcs-server/src/test/resources/net/woggioni/rbcs/server/test/valid/rbcs-tls.xml @@ -11,7 +11,7 @@ idle-timeout="PT30M" max-request-size="4096"/> - + diff --git a/settings.gradle b/settings.gradle index 761ac6e..e44344d 100644 --- a/settings.gradle +++ b/settings.gradle @@ -29,6 +29,6 @@ include 'rbcs-api' include 'rbcs-common' include 'rbcs-server-memcache' include 'rbcs-cli' -include 'docker' include 'rbcs-client' include 'rbcs-server' +include 'docker'