first commit with streaming support (buggy and unreliable)

This commit is contained in:
2025-02-13 23:02:08 +08:00
parent 7eca8a270d
commit 0463038aaa
37 changed files with 917 additions and 411 deletions

View File

@@ -4,4 +4,5 @@ module net.woggioni.rbcs.api {
requires io.netty.buffer;
exports net.woggioni.rbcs.api;
exports net.woggioni.rbcs.api.exception;
exports net.woggioni.rbcs.api.event;
}

View File

@@ -1,14 +1,17 @@
package net.woggioni.rbcs.api;
import io.netty.buffer.ByteBuf;
import net.woggioni.rbcs.api.exception.ContentTooLargeException;
import io.netty.buffer.ByteBufAllocator;
import java.nio.channels.ReadableByteChannel;
import java.util.concurrent.CompletableFuture;
public interface Cache extends AutoCloseable {
CompletableFuture<ReadableByteChannel> get(String key);
CompletableFuture<Void> put(String key, ByteBuf content) throws ContentTooLargeException;
default void get(String key, ResponseHandle responseHandle, ByteBufAllocator alloc) {
throw new UnsupportedOperationException();
}
default CompletableFuture<RequestHandle> put(String key, ResponseHandle responseHandle, ByteBufAllocator alloc) {
throw new UnsupportedOperationException();
}
}

View File

@@ -0,0 +1,8 @@
package net.woggioni.rbcs.api;
import net.woggioni.rbcs.api.event.RequestStreamingEvent;
@FunctionalInterface
public interface RequestHandle {
void handleEvent(RequestStreamingEvent evt);
}

View File

@@ -0,0 +1,8 @@
package net.woggioni.rbcs.api;
import net.woggioni.rbcs.api.event.ResponseStreamingEvent;
@FunctionalInterface
public interface ResponseHandle {
void handleEvent(ResponseStreamingEvent evt);
}

View File

@@ -0,0 +1,26 @@
package net.woggioni.rbcs.api.event;
import io.netty.buffer.ByteBuf;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
public sealed interface RequestStreamingEvent {
@Getter
@RequiredArgsConstructor
non-sealed class ChunkReceived implements RequestStreamingEvent {
private final ByteBuf chunk;
}
final class LastChunkReceived extends ChunkReceived {
public LastChunkReceived(ByteBuf chunk) {
super(chunk);
}
}
@Getter
@RequiredArgsConstructor
final class ExceptionCaught implements RequestStreamingEvent {
private final Throwable exception;
}
}

View File

@@ -0,0 +1,42 @@
package net.woggioni.rbcs.api.event;
import io.netty.buffer.ByteBuf;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import java.nio.channels.FileChannel;
public sealed interface ResponseStreamingEvent {
final class ResponseReceived implements ResponseStreamingEvent {
}
@Getter
@RequiredArgsConstructor
non-sealed class ChunkReceived implements ResponseStreamingEvent {
private final ByteBuf chunk;
}
@Getter
@RequiredArgsConstructor
non-sealed class FileReceived implements ResponseStreamingEvent {
private final FileChannel file;
}
final class LastChunkReceived extends ChunkReceived {
public LastChunkReceived(ByteBuf chunk) {
super(chunk);
}
}
@Getter
@RequiredArgsConstructor
final class ExceptionCaught implements ResponseStreamingEvent {
private final Throwable exception;
}
final class NotFound implements ResponseStreamingEvent { }
NotFound NOT_FOUND = new NotFound();
ResponseReceived RESPONSE_RECEIVED = new ResponseReceived();
}

View File

@@ -6,6 +6,8 @@ import net.woggioni.rbcs.common.contextLogger
import net.woggioni.rbcs.common.error
import net.woggioni.rbcs.common.info
import net.woggioni.jwo.JWO
import net.woggioni.jwo.LongMath
import net.woggioni.rbcs.common.debug
import picocli.CommandLine
import java.security.SecureRandom
import java.time.Duration
@@ -46,6 +48,7 @@ class BenchmarkCommand : RbcsCommand() {
clientCommand.configuration.profiles[profileName]
?: throw IllegalArgumentException("Profile $profileName does not exist in configuration")
}
val progressThreshold = LongMath.ceilDiv(numberOfEntries.toLong(), 20)
RemoteBuildCacheClient(profile).use { client ->
val entryGenerator = sequence {
@@ -79,7 +82,12 @@ class BenchmarkCommand : RbcsCommand() {
completionQueue.put(result)
}
semaphore.release()
completionCounter.incrementAndGet()
val completed = completionCounter.incrementAndGet()
if(completed.mod(progressThreshold) == 0L) {
log.debug {
"Inserted $completed / $numberOfEntries"
}
}
}
} else {
Thread.sleep(0)
@@ -121,7 +129,12 @@ class BenchmarkCommand : RbcsCommand() {
}
}
future.whenComplete { _, _ ->
completionCounter.incrementAndGet()
val completed = completionCounter.incrementAndGet()
if(completed.mod(progressThreshold) == 0L) {
log.debug {
"Retrieved $completed / ${entries.size}"
}
}
semaphore.release()
}
} else {

View File

@@ -0,0 +1,15 @@
package net.woggioni.rbcs.common
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.CompositeByteBuf
fun extractChunk(buf: CompositeByteBuf, alloc: ByteBufAllocator): ByteBuf {
val chunk = alloc.compositeBuffer()
for (component in buf.decompose(0, buf.readableBytes())) {
chunk.addComponent(true, component.retain())
}
buf.removeComponents(0, buf.numComponents())
buf.clear()
return chunk
}

View File

@@ -34,6 +34,7 @@ dependencies {
implementation catalog.jwo
implementation catalog.slf4j.api
implementation catalog.netty.common
implementation catalog.netty.handler
implementation catalog.netty.codec.memcache
bundle catalog.netty.codec.memcache

View File

@@ -11,6 +11,7 @@ module net.woggioni.rbcs.server.memcache {
requires io.netty.codec.memcache;
requires io.netty.common;
requires io.netty.buffer;
requires io.netty.handler;
requires org.slf4j;
provides CacheProvider with net.woggioni.rbcs.server.memcache.MemcacheCacheProvider;

View File

@@ -1,20 +1,232 @@
package net.woggioni.rbcs.server.memcache
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.Unpooled
import io.netty.handler.codec.memcache.binary.BinaryMemcacheOpcodes
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus
import io.netty.handler.codec.memcache.binary.DefaultBinaryMemcacheRequest
import net.woggioni.rbcs.api.Cache
import net.woggioni.rbcs.api.RequestHandle
import net.woggioni.rbcs.api.ResponseHandle
import net.woggioni.rbcs.api.event.RequestStreamingEvent
import net.woggioni.rbcs.api.event.ResponseStreamingEvent
import net.woggioni.rbcs.api.exception.ContentTooLargeException
import net.woggioni.rbcs.common.ByteBufOutputStream
import net.woggioni.rbcs.common.RBCS.digest
import net.woggioni.rbcs.common.contextLogger
import net.woggioni.rbcs.common.debug
import net.woggioni.rbcs.common.extractChunk
import net.woggioni.rbcs.server.memcache.client.MemcacheClient
import java.nio.channels.ReadableByteChannel
import net.woggioni.rbcs.server.memcache.client.MemcacheResponseHandle
import net.woggioni.rbcs.server.memcache.client.StreamingRequestEvent
import net.woggioni.rbcs.server.memcache.client.StreamingResponseEvent
import java.security.MessageDigest
import java.time.Duration
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.Inflater
import java.util.zip.InflaterOutputStream
class MemcacheCache(private val cfg : MemcacheCacheConfiguration) : Cache {
private val memcacheClient = MemcacheClient(cfg)
class MemcacheCache(private val cfg: MemcacheCacheConfiguration) : Cache {
override fun get(key: String): CompletableFuture<ReadableByteChannel?> {
return memcacheClient.get(key)
companion object {
@JvmStatic
private val log = contextLogger()
}
override fun put(key: String, content: ByteBuf): CompletableFuture<Void> {
return memcacheClient.put(key, content, cfg.maxAge)
private val memcacheClient = MemcacheClient(cfg)
override fun get(key: String, responseHandle: ResponseHandle, alloc: ByteBufAllocator) {
val compressionMode = cfg.compressionMode
val buf = alloc.compositeBuffer()
val stream = ByteBufOutputStream(buf).let { outputStream ->
if (compressionMode != null) {
when (compressionMode) {
MemcacheCacheConfiguration.CompressionMode.DEFLATE -> {
InflaterOutputStream(
outputStream,
Inflater()
)
}
}
} else {
outputStream
}
}
val memcacheResponseHandle = object : MemcacheResponseHandle {
override fun handleEvent(evt: StreamingResponseEvent) {
when (evt) {
is StreamingResponseEvent.ResponseReceived -> {
if (evt.response.status() == BinaryMemcacheResponseStatus.SUCCESS) {
responseHandle.handleEvent(ResponseStreamingEvent.RESPONSE_RECEIVED)
} else if (evt.response.status() == BinaryMemcacheResponseStatus.KEY_ENOENT) {
responseHandle.handleEvent(ResponseStreamingEvent.NOT_FOUND)
} else {
responseHandle.handleEvent(ResponseStreamingEvent.ExceptionCaught(MemcacheException(evt.response.status())))
}
}
is StreamingResponseEvent.LastContentReceived -> {
evt.content.content().let { content ->
content.readBytes(stream, content.readableBytes())
}
buf.retain()
stream.close()
val chunk = extractChunk(buf, alloc)
buf.release()
responseHandle.handleEvent(
ResponseStreamingEvent.LastChunkReceived(
chunk
)
)
}
is StreamingResponseEvent.ContentReceived -> {
evt.content.content().let { content ->
content.readBytes(stream, content.readableBytes())
}
if (buf.readableBytes() >= cfg.chunkSize) {
val chunk = extractChunk(buf, alloc)
responseHandle.handleEvent(ResponseStreamingEvent.ChunkReceived(chunk))
}
}
is StreamingResponseEvent.ExceptionCaught -> {
responseHandle.handleEvent(ResponseStreamingEvent.ExceptionCaught(evt.exception))
}
}
}
}
memcacheClient.sendRequest(Unpooled.wrappedBuffer(key.toByteArray()), memcacheResponseHandle)
.thenApply { memcacheRequestHandle ->
val request = (cfg.digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digest(key.toByteArray(), md)
} ?: key.toByteArray(Charsets.UTF_8)
).let { digest ->
DefaultBinaryMemcacheRequest(Unpooled.wrappedBuffer(digest)).apply {
setOpcode(BinaryMemcacheOpcodes.GET)
}
}
memcacheRequestHandle.handleEvent(StreamingRequestEvent.SendRequest(request))
}.exceptionally { ex ->
responseHandle.handleEvent(ResponseStreamingEvent.ExceptionCaught(ex))
}
}
private fun encodeExpiry(expiry: Duration): Int {
val expirySeconds = expiry.toSeconds()
return expirySeconds.toInt().takeIf { it.toLong() == expirySeconds }
?: Instant.ofEpochSecond(expirySeconds).epochSecond.toInt()
}
override fun put(
key: String,
responseHandle: ResponseHandle,
alloc: ByteBufAllocator
): CompletableFuture<RequestHandle> {
val memcacheResponseHandle = object : MemcacheResponseHandle {
override fun handleEvent(evt: StreamingResponseEvent) {
when (evt) {
is StreamingResponseEvent.ResponseReceived -> {
when (evt.response.status()) {
BinaryMemcacheResponseStatus.SUCCESS -> {
responseHandle.handleEvent(ResponseStreamingEvent.RESPONSE_RECEIVED)
}
BinaryMemcacheResponseStatus.KEY_ENOENT -> {
responseHandle.handleEvent(ResponseStreamingEvent.NOT_FOUND)
}
BinaryMemcacheResponseStatus.E2BIG -> {
responseHandle.handleEvent(
ResponseStreamingEvent.ExceptionCaught(
ContentTooLargeException("Request payload is too big", null)
)
)
}
else -> {
responseHandle.handleEvent(ResponseStreamingEvent.ExceptionCaught(MemcacheException(evt.response.status())))
}
}
}
is StreamingResponseEvent.LastContentReceived -> {
responseHandle.handleEvent(
ResponseStreamingEvent.LastChunkReceived(
evt.content.content().retain()
)
)
}
is StreamingResponseEvent.ContentReceived -> {
responseHandle.handleEvent(ResponseStreamingEvent.ChunkReceived(evt.content.content().retain()))
}
is StreamingResponseEvent.ExceptionCaught -> {
responseHandle.handleEvent(ResponseStreamingEvent.ExceptionCaught(evt.exception))
}
}
}
}
val result: CompletableFuture<RequestHandle> =
memcacheClient.sendRequest(Unpooled.wrappedBuffer(key.toByteArray()), memcacheResponseHandle)
.thenApply { memcacheRequestHandle ->
val request = (cfg.digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digest(key.toByteArray(), md)
} ?: key.toByteArray(Charsets.UTF_8)).let { digest ->
val extras = Unpooled.buffer(8, 8)
extras.writeInt(0)
extras.writeInt(encodeExpiry(cfg.maxAge))
DefaultBinaryMemcacheRequest(Unpooled.wrappedBuffer(digest), extras).apply {
setOpcode(BinaryMemcacheOpcodes.SET)
}
}
// memcacheRequestHandle.handleEvent(StreamingRequestEvent.SendRequest(request))
val compressionMode = cfg.compressionMode
val buf = alloc.heapBuffer()
val stream = ByteBufOutputStream(buf).let { outputStream ->
if (compressionMode != null) {
when (compressionMode) {
MemcacheCacheConfiguration.CompressionMode.DEFLATE -> {
DeflaterOutputStream(
outputStream,
Deflater(Deflater.DEFAULT_COMPRESSION, false)
)
}
}
} else {
outputStream
}
}
RequestHandle { evt ->
when (evt) {
is RequestStreamingEvent.LastChunkReceived -> {
evt.chunk.readBytes(stream, evt.chunk.readableBytes())
buf.retain()
stream.close()
request.setTotalBodyLength(buf.readableBytes() + request.keyLength() + request.extrasLength())
memcacheRequestHandle.handleEvent(StreamingRequestEvent.SendRequest(request))
memcacheRequestHandle.handleEvent(StreamingRequestEvent.SendLastChunk(buf))
}
is RequestStreamingEvent.ChunkReceived -> {
evt.chunk.readBytes(stream, evt.chunk.readableBytes())
}
is RequestStreamingEvent.ExceptionCaught -> {
stream.close()
}
}
}
}
return result
}
override fun close() {

View File

@@ -10,14 +10,10 @@ data class MemcacheCacheConfiguration(
val maxSize: Int = 0x100000,
val digestAlgorithm: String? = null,
val compressionMode: CompressionMode? = null,
val chunkSize : Int
) : Configuration.Cache {
enum class CompressionMode {
/**
* Gzip mode
*/
GZIP,
/**
* Deflate mode
*/

View File

@@ -29,12 +29,14 @@ class MemcacheCacheProvider : CacheProvider<MemcacheCacheConfiguration> {
?.let(Duration::parse)
?: Duration.ofDays(1)
val maxSize = el.renderAttribute("max-size")
?.let(String::toInt)
?.let(Integer::decode)
?: 0x100000
val chunkSize = el.renderAttribute("chunk-size")
?.let(Integer::decode)
?: 0x4000
val compressionMode = el.renderAttribute("compression-mode")
?.let {
when (it) {
"gzip" -> MemcacheCacheConfiguration.CompressionMode.GZIP
"deflate" -> MemcacheCacheConfiguration.CompressionMode.DEFLATE
else -> MemcacheCacheConfiguration.CompressionMode.DEFLATE
}
@@ -63,6 +65,7 @@ class MemcacheCacheProvider : CacheProvider<MemcacheCacheConfiguration> {
maxSize,
digestAlgorithm,
compressionMode,
chunkSize
)
}
@@ -70,7 +73,6 @@ class MemcacheCacheProvider : CacheProvider<MemcacheCacheConfiguration> {
val result = doc.createElement("cache")
Xml.of(doc, result) {
attr("xmlns:${xmlNamespacePrefix}", xmlNamespace, namespaceURI = "http://www.w3.org/2000/xmlns/")
attr("xs:type", "${xmlNamespacePrefix}:$xmlType", RBCS.XML_SCHEMA_NAMESPACE_URI)
for (server in servers) {
node("server") {
@@ -84,13 +86,13 @@ class MemcacheCacheProvider : CacheProvider<MemcacheCacheConfiguration> {
}
attr("max-age", maxAge.toString())
attr("max-size", maxSize.toString())
attr("chunk-size", chunkSize.toString())
digestAlgorithm?.let { digestAlgorithm ->
attr("digest", digestAlgorithm)
}
compressionMode?.let { compressionMode ->
attr(
"compression-mode", when (compressionMode) {
MemcacheCacheConfiguration.CompressionMode.GZIP -> "gzip"
MemcacheCacheConfiguration.CompressionMode.DEFLATE -> "deflate"
}
)

View File

@@ -0,0 +1,30 @@
package net.woggioni.rbcs.server.memcache.client
import io.netty.buffer.ByteBuf
import io.netty.handler.codec.memcache.LastMemcacheContent
import io.netty.handler.codec.memcache.MemcacheContent
import io.netty.handler.codec.memcache.binary.BinaryMemcacheRequest
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
sealed interface StreamingRequestEvent {
class SendRequest(val request : BinaryMemcacheRequest) : StreamingRequestEvent
open class SendChunk(val chunk : ByteBuf) : StreamingRequestEvent
class SendLastChunk(chunk : ByteBuf) : SendChunk(chunk)
class ExceptionCaught(val exception : Throwable) : StreamingRequestEvent
}
sealed interface StreamingResponseEvent {
class ResponseReceived(val response : BinaryMemcacheResponse) : StreamingResponseEvent
open class ContentReceived(val content : MemcacheContent) : StreamingResponseEvent
class LastContentReceived(val lastContent : LastMemcacheContent) : ContentReceived(lastContent)
class ExceptionCaught(val exception : Throwable) : StreamingResponseEvent
}
interface MemcacheRequestHandle {
fun handleEvent(evt : StreamingRequestEvent)
}
interface MemcacheResponseHandle {
fun handleEvent(evt : StreamingResponseEvent)
}

View File

@@ -3,7 +3,6 @@ package net.woggioni.rbcs.server.memcache.client
import io.netty.bootstrap.Bootstrap
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelOption
@@ -14,36 +13,23 @@ import io.netty.channel.pool.AbstractChannelPoolHandler
import io.netty.channel.pool.ChannelPool
import io.netty.channel.pool.FixedChannelPool
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.DecoderException
import io.netty.handler.codec.memcache.DefaultLastMemcacheContent
import io.netty.handler.codec.memcache.DefaultMemcacheContent
import io.netty.handler.codec.memcache.LastMemcacheContent
import io.netty.handler.codec.memcache.MemcacheContent
import io.netty.handler.codec.memcache.MemcacheObject
import io.netty.handler.codec.memcache.binary.BinaryMemcacheClientCodec
import io.netty.handler.codec.memcache.binary.BinaryMemcacheObjectAggregator
import io.netty.handler.codec.memcache.binary.BinaryMemcacheOpcodes
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus
import io.netty.handler.codec.memcache.binary.DefaultFullBinaryMemcacheRequest
import io.netty.handler.codec.memcache.binary.FullBinaryMemcacheRequest
import io.netty.handler.codec.memcache.binary.FullBinaryMemcacheResponse
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
import io.netty.handler.logging.LoggingHandler
import io.netty.util.concurrent.GenericFutureListener
import net.woggioni.rbcs.common.ByteBufInputStream
import net.woggioni.rbcs.common.ByteBufOutputStream
import net.woggioni.rbcs.common.RBCS.digest
import net.woggioni.rbcs.common.HostAndPort
import net.woggioni.rbcs.common.contextLogger
import net.woggioni.rbcs.common.debug
import net.woggioni.rbcs.server.memcache.MemcacheCacheConfiguration
import net.woggioni.rbcs.server.memcache.MemcacheException
import net.woggioni.jwo.JWO
import java.net.InetSocketAddress
import java.nio.channels.Channels
import java.nio.channels.ReadableByteChannel
import java.security.MessageDigest
import java.time.Duration
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.GZIPInputStream
import java.util.zip.GZIPOutputStream
import java.util.zip.InflaterInputStream
import java.util.concurrent.atomic.AtomicLong
import io.netty.util.concurrent.Future as NettyFuture
@@ -61,6 +47,8 @@ class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseabl
group = NioEventLoopGroup()
}
private val counter = AtomicLong(0)
private fun newConnectionPool(server: MemcacheCacheConfiguration.Server): FixedChannelPool {
val bootstrap = Bootstrap().apply {
group(group)
@@ -76,18 +64,15 @@ class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseabl
override fun channelCreated(ch: Channel) {
val pipeline: ChannelPipeline = ch.pipeline()
pipeline.addLast(BinaryMemcacheClientCodec())
pipeline.addLast(BinaryMemcacheObjectAggregator(cfg.maxSize))
pipeline.addLast(LoggingHandler())
}
}
return FixedChannelPool(bootstrap, channelPoolHandler, server.maxConnections)
}
private fun sendRequest(request: FullBinaryMemcacheRequest): CompletableFuture<FullBinaryMemcacheResponse> {
fun sendRequest(key: ByteBuf, responseHandle: MemcacheResponseHandle): CompletableFuture<MemcacheRequestHandle> {
val server = cfg.servers.let { servers ->
if (servers.size > 1) {
val key = request.key().duplicate()
var checksum = 0
while (key.readableBytes() > 4) {
val byte = key.readInt()
@@ -103,7 +88,7 @@ class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseabl
}
}
val response = CompletableFuture<FullBinaryMemcacheResponse>()
val response = CompletableFuture<MemcacheRequestHandle>()
// Custom handler for processing responses
val pool = connectionPool.computeIfAbsent(server.endpoint) {
newConnectionPool(server)
@@ -113,31 +98,73 @@ class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseabl
if (channelFuture.isSuccess) {
val channel = channelFuture.now
val pipeline = channel.pipeline()
channel.pipeline()
.addLast("client-handler", object : SimpleChannelInboundHandler<FullBinaryMemcacheResponse>() {
override fun channelRead0(
ctx: ChannelHandlerContext,
msg: FullBinaryMemcacheResponse
) {
pipeline.removeLast()
pool.release(channel)
msg.touch("The method's caller must remember to release this")
response.complete(msg.retain())
}
val handler = object : SimpleChannelInboundHandler<MemcacheObject>() {
override fun channelRead0(
ctx: ChannelHandlerContext,
msg: MemcacheObject
) {
when (msg) {
is BinaryMemcacheResponse -> responseHandle.handleEvent(
StreamingResponseEvent.ResponseReceived(
msg
)
)
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
val ex = when (cause) {
is DecoderException -> cause.cause!!
else -> cause
is LastMemcacheContent -> {
responseHandle.handleEvent(
StreamingResponseEvent.LastContentReceived(
msg
)
)
pipeline.removeLast()
pool.release(channel)
}
ctx.close()
pipeline.removeLast()
pool.release(channel)
response.completeExceptionally(ex)
is MemcacheContent -> responseHandle.handleEvent(
StreamingResponseEvent.ContentReceived(
msg
)
)
}
})
request.touch()
channel.writeAndFlush(request)
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
responseHandle.handleEvent(StreamingResponseEvent.ExceptionCaught(cause))
ctx.close()
pipeline.removeLast()
pool.release(channel)
}
}
channel.pipeline()
.addLast("client-handler", handler)
response.complete(object : MemcacheRequestHandle {
override fun handleEvent(evt: StreamingRequestEvent) {
when (evt) {
is StreamingRequestEvent.SendRequest -> {
channel.writeAndFlush(evt.request)
}
is StreamingRequestEvent.SendLastChunk -> {
channel.writeAndFlush(DefaultLastMemcacheContent(evt.chunk))
val value = counter.incrementAndGet()
log.debug {
"Finished request counter: $value"
}
}
is StreamingRequestEvent.SendChunk -> {
channel.writeAndFlush(DefaultMemcacheContent(evt.chunk))
}
is StreamingRequestEvent.ExceptionCaught -> {
responseHandle.handleEvent(StreamingResponseEvent.ExceptionCaught(evt.exception))
channel.close()
pipeline.removeLast()
pool.release(channel)
}
}
}
})
} else {
response.completeExceptionally(channelFuture.cause())
}
@@ -146,107 +173,6 @@ class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseabl
return response
}
private fun encodeExpiry(expiry: Duration): Int {
val expirySeconds = expiry.toSeconds()
return expirySeconds.toInt().takeIf { it.toLong() == expirySeconds }
?: Instant.ofEpochSecond(expirySeconds).epochSecond.toInt()
}
fun get(key: String): CompletableFuture<ReadableByteChannel?> {
val request = (cfg.digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digest(key.toByteArray(), md)
} ?: key.toByteArray(Charsets.UTF_8)).let { digest ->
DefaultFullBinaryMemcacheRequest(Unpooled.wrappedBuffer(digest), null).apply {
setOpcode(BinaryMemcacheOpcodes.GET)
}
}
return sendRequest(request).thenApply { response ->
try {
when (val status = response.status()) {
BinaryMemcacheResponseStatus.SUCCESS -> {
val compressionMode = cfg.compressionMode
val content = response.content().retain()
content.touch()
if (compressionMode != null) {
when (compressionMode) {
MemcacheCacheConfiguration.CompressionMode.GZIP -> {
GZIPInputStream(ByteBufInputStream(content))
}
MemcacheCacheConfiguration.CompressionMode.DEFLATE -> {
InflaterInputStream(ByteBufInputStream(content))
}
}
} else {
ByteBufInputStream(content)
}.let(Channels::newChannel)
}
BinaryMemcacheResponseStatus.KEY_ENOENT -> {
null
}
else -> throw MemcacheException(status)
}
} finally {
response.release()
}
}
}
fun put(key: String, content: ByteBuf, expiry: Duration, cas: Long? = null): CompletableFuture<Void> {
val request = (cfg.digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digest(key.toByteArray(), md)
} ?: key.toByteArray(Charsets.UTF_8)).let { digest ->
val extras = Unpooled.buffer(8, 8)
extras.writeInt(0)
extras.writeInt(encodeExpiry(expiry))
val compressionMode = cfg.compressionMode
content.retain()
val payload = if (compressionMode != null) {
val inputStream = ByteBufInputStream(content)
val buf = content.alloc().buffer()
buf.retain()
val outputStream = when (compressionMode) {
MemcacheCacheConfiguration.CompressionMode.GZIP -> {
GZIPOutputStream(ByteBufOutputStream(buf))
}
MemcacheCacheConfiguration.CompressionMode.DEFLATE -> {
DeflaterOutputStream(ByteBufOutputStream(buf), Deflater(Deflater.DEFAULT_COMPRESSION, false))
}
}
inputStream.use { i ->
outputStream.use { o ->
JWO.copy(i, o)
}
}
buf
} else {
content
}
DefaultFullBinaryMemcacheRequest(Unpooled.wrappedBuffer(digest), extras, payload).apply {
setOpcode(BinaryMemcacheOpcodes.SET)
cas?.let(this::setCas)
}
}
return sendRequest(request).thenApply { response ->
try {
when (val status = response.status()) {
BinaryMemcacheResponseStatus.SUCCESS -> null
else -> throw MemcacheException(status)
}
} finally {
response.release()
}
}
}
fun shutDown(): NettyFuture<*> {
return group.shutdownGracefully()
}

View File

@@ -20,7 +20,8 @@
<xs:element name="server" type="rbcs-memcache:memcacheServerType"/>
</xs:sequence>
<xs:attribute name="max-age" type="xs:duration" default="P1D"/>
<xs:attribute name="max-size" type="xs:unsignedInt" default="1048576"/>
<xs:attribute name="max-size" type="rbcs:byteSize" default="1048576"/>
<xs:attribute name="chunk-size" type="rbcs:byteSize" default="0x4000"/>
<xs:attribute name="digest" type="xs:token" />
<xs:attribute name="compression-mode" type="rbcs-memcache:compressionType"/>
</xs:extension>
@@ -30,7 +31,6 @@
<xs:simpleType name="compressionType">
<xs:restriction base="xs:token">
<xs:enumeration value="deflate"/>
<xs:enumeration value="gzip"/>
</xs:restriction>
</xs:simpleType>

View File

@@ -16,7 +16,6 @@ import io.netty.handler.codec.compression.CompressionOptions
import io.netty.handler.codec.http.DefaultHttpContent
import io.netty.handler.codec.http.HttpContentCompressor
import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpObjectAggregator
import io.netty.handler.codec.http.HttpRequest
import io.netty.handler.codec.http.HttpServerCodec
import io.netty.handler.ssl.ClientAuth
@@ -249,13 +248,7 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
private val cache = cfg.cache.materialize()
private val serverHandler = let {
val prefix = Path.of("/").resolve(Path.of(cfg.serverPath ?: "/"))
ServerHandler(cache, prefix)
}
private val exceptionHandler = ExceptionHandler()
private val throttlingHandler = ThrottlingHandler(cfg)
private val authenticator = when (val auth = cfg.authentication) {
is Configuration.BasicAuthentication -> NettyHttpBasicAuthenticator(cfg.users, RoleAuthorizer())
@@ -368,11 +361,15 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
pipeline.addLast(HttpServerCodec())
pipeline.addLast(HttpChunkContentCompressor(1024))
pipeline.addLast(ChunkedWriteHandler())
pipeline.addLast(HttpObjectAggregator(cfg.connection.maxRequestSize))
authenticator?.let {
pipeline.addLast(it)
}
pipeline.addLast(throttlingHandler)
pipeline.addLast(ThrottlingHandler(cfg))
val serverHandler = let {
val prefix = Path.of("/").resolve(Path.of(cfg.serverPath ?: "/"))
ServerHandler(cache, prefix)
}
pipeline.addLast(eventExecutorGroup, serverHandler)
pipeline.addLast(exceptionHandler)
}

View File

@@ -6,6 +6,7 @@ import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.handler.codec.http.DefaultFullHttpResponse
import io.netty.handler.codec.http.FullHttpResponse
import io.netty.handler.codec.http.HttpContent
import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpRequest
import io.netty.handler.codec.http.HttpResponseStatus
@@ -57,6 +58,8 @@ abstract class AbstractNettyHttpAuthenticator(private val authorizer: Authorizer
} else {
authorizationFailure(ctx, msg)
}
} else if(msg is HttpContent) {
ctx.fireChannelRead(msg)
}
}

View File

@@ -1,12 +1,16 @@
package net.woggioni.rbcs.server.cache
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import net.woggioni.jwo.JWO
import net.woggioni.rbcs.api.Cache
import net.woggioni.rbcs.common.ByteBufInputStream
import net.woggioni.rbcs.api.RequestHandle
import net.woggioni.rbcs.api.ResponseHandle
import net.woggioni.rbcs.api.event.RequestStreamingEvent
import net.woggioni.rbcs.api.event.ResponseStreamingEvent
import net.woggioni.rbcs.common.ByteBufOutputStream
import net.woggioni.rbcs.common.RBCS.digestString
import net.woggioni.rbcs.common.contextLogger
import java.nio.channels.Channels
import net.woggioni.rbcs.common.extractChunk
import java.nio.channels.FileChannel
import java.nio.file.Files
import java.nio.file.Path
@@ -19,7 +23,6 @@ import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.Inflater
import java.util.zip.InflaterInputStream
class FileSystemCache(
@@ -27,7 +30,8 @@ class FileSystemCache(
val maxAge: Duration,
val digestAlgorithm: String?,
val compressionEnabled: Boolean,
val compressionLevel: Int
val compressionLevel: Int,
val chunkSize: Int
) : Cache {
private companion object {
@@ -44,61 +48,111 @@ class FileSystemCache(
private var nextGc = Instant.now()
override fun get(key: String) = (digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digestString(key.toByteArray(), md)
} ?: key).let { digest ->
root.resolve(digest).takeIf(Files::exists)
?.let { file ->
file.takeIf(Files::exists)?.let { file ->
if (compressionEnabled) {
val inflater = Inflater()
Channels.newChannel(
InflaterInputStream(
Channels.newInputStream(
FileChannel.open(
file,
StandardOpenOption.READ
)
), inflater
)
)
} else {
FileChannel.open(file, StandardOpenOption.READ)
}
}
}.let {
CompletableFuture.completedFuture(it)
}
}
override fun put(key: String, content: ByteBuf): CompletableFuture<Void> {
override fun get(key: String, responseHandle: ResponseHandle, alloc: ByteBufAllocator) {
(digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digestString(key.toByteArray(), md)
} ?: key).let { digest ->
val file = root.resolve(digest)
val tmpFile = Files.createTempFile(root, null, ".tmp")
try {
Files.newOutputStream(tmpFile).let {
root.resolve(digest).takeIf(Files::exists)
?.let { file ->
file.takeIf(Files::exists)?.let { file ->
responseHandle.handleEvent(ResponseStreamingEvent.RESPONSE_RECEIVED)
if (compressionEnabled) {
val compositeBuffer = alloc.compositeBuffer()
ByteBufOutputStream(compositeBuffer).use { outputStream ->
InflaterInputStream(Files.newInputStream(file)).use { inputStream ->
val ioBuffer = alloc.buffer(chunkSize)
try {
while (true) {
val read = ioBuffer.writeBytes(inputStream, chunkSize)
val last = read < 0
if (read > 0) {
ioBuffer.readBytes(outputStream, read)
}
if (last) {
compositeBuffer.retain()
outputStream.close()
}
if (compositeBuffer.readableBytes() >= chunkSize || last) {
val chunk = extractChunk(compositeBuffer, alloc)
val evt = if (last) {
ResponseStreamingEvent.LastChunkReceived(chunk)
} else {
ResponseStreamingEvent.ChunkReceived(chunk)
}
responseHandle.handleEvent(evt)
}
if (last) break
}
} finally {
ioBuffer.release()
}
}
}
} else {
responseHandle.handleEvent(
ResponseStreamingEvent.FileReceived(
FileChannel.open(file, StandardOpenOption.READ)
)
)
}
}
} ?: responseHandle.handleEvent(ResponseStreamingEvent.NOT_FOUND)
}
}
override fun put(
key: String,
responseHandle: ResponseHandle,
alloc: ByteBufAllocator
): CompletableFuture<RequestHandle> {
try {
(digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digestString(key.toByteArray(), md)
} ?: key).let { digest ->
val file = root.resolve(digest)
val tmpFile = Files.createTempFile(root, null, ".tmp")
val stream = Files.newOutputStream(tmpFile).let {
if (compressionEnabled) {
val deflater = Deflater(compressionLevel)
DeflaterOutputStream(it, deflater)
} else {
it
}
}.use {
JWO.copy(ByteBufInputStream(content), it)
}
Files.move(tmpFile, file, StandardCopyOption.ATOMIC_MOVE)
} catch (t: Throwable) {
Files.delete(tmpFile)
throw t
return CompletableFuture.completedFuture(object : RequestHandle {
override fun handleEvent(evt: RequestStreamingEvent) {
try {
when (evt) {
is RequestStreamingEvent.LastChunkReceived -> {
evt.chunk.readBytes(stream, evt.chunk.readableBytes())
stream.close()
Files.move(tmpFile, file, StandardCopyOption.ATOMIC_MOVE)
responseHandle.handleEvent(ResponseStreamingEvent.RESPONSE_RECEIVED)
}
is RequestStreamingEvent.ChunkReceived -> {
evt.chunk.readBytes(stream, evt.chunk.readableBytes())
}
is RequestStreamingEvent.ExceptionCaught -> {
Files.delete(tmpFile)
stream.close()
}
}
} catch (ex: Throwable) {
responseHandle.handleEvent(ResponseStreamingEvent.ExceptionCaught(ex))
}
}
})
}
} catch (ex: Throwable) {
responseHandle.handleEvent(ResponseStreamingEvent.ExceptionCaught(ex))
return CompletableFuture.failedFuture(ex)
}
return CompletableFuture.completedFuture(null)
}
private val garbageCollector = Thread.ofVirtual().name("file-system-cache-gc").start {
@@ -119,8 +173,8 @@ class FileSystemCache(
/**
* Returns the creation timestamp of the oldest cache entry (if any)
*/
private fun actualGc(now: Instant) : Instant? {
var result :Instant? = null
private fun actualGc(now: Instant): Instant? {
var result: Instant? = null
Files.list(root)
.filter { path ->
JWO.splitExtension(path)
@@ -132,7 +186,7 @@ class FileSystemCache(
val creationTimeStamp = Files.readAttributes(it, BasicFileAttributes::class.java)
.creationTime()
.toInstant()
if(result == null || creationTimeStamp < result) {
if (result == null || creationTimeStamp < result) {
result = creationTimeStamp
}
now > creationTimeStamp.plus(maxAge)

View File

@@ -12,13 +12,15 @@ data class FileSystemCacheConfiguration(
val digestAlgorithm : String?,
val compressionEnabled: Boolean,
val compressionLevel: Int,
val chunkSize: Int,
) : Configuration.Cache {
override fun materialize() = FileSystemCache(
root ?: Application.builder("rbcs").build().computeCacheDirectory(),
maxAge,
digestAlgorithm,
compressionEnabled,
compressionLevel
compressionLevel,
chunkSize,
)
override fun getNamespaceURI() = RBCS.RBCS_NAMESPACE_URI

View File

@@ -31,13 +31,17 @@ class FileSystemCacheProvider : CacheProvider<FileSystemCacheConfiguration> {
?.let(String::toInt)
?: Deflater.DEFAULT_COMPRESSION
val digestAlgorithm = el.renderAttribute("digest") ?: "MD5"
val chunkSize = el.renderAttribute("chunk-size")
?.let(Integer::decode)
?: 0x4000
return FileSystemCacheConfiguration(
path,
maxAge,
digestAlgorithm,
enableCompression,
compressionLevel
compressionLevel,
chunkSize
)
}
@@ -57,6 +61,7 @@ class FileSystemCacheProvider : CacheProvider<FileSystemCacheConfiguration> {
}?.let {
attr("compression-level", it.toString())
}
attr("chunk-size", chunkSize.toString())
}
result
}

View File

@@ -1,31 +1,36 @@
package net.woggioni.rbcs.server.cache
import io.netty.buffer.ByteBuf
import net.woggioni.jwo.JWO
import io.netty.buffer.ByteBufAllocator
import net.woggioni.rbcs.api.Cache
import net.woggioni.rbcs.common.ByteBufInputStream
import net.woggioni.rbcs.api.RequestHandle
import net.woggioni.rbcs.api.ResponseHandle
import net.woggioni.rbcs.api.event.RequestStreamingEvent
import net.woggioni.rbcs.api.event.ResponseStreamingEvent
import net.woggioni.rbcs.common.ByteBufOutputStream
import net.woggioni.rbcs.common.RBCS.digestString
import net.woggioni.rbcs.common.contextLogger
import java.nio.channels.Channels
import net.woggioni.rbcs.common.extractChunk
import java.security.MessageDigest
import java.time.Duration
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.PriorityBlockingQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.Inflater
import java.util.zip.InflaterInputStream
import java.util.zip.InflaterOutputStream
class InMemoryCache(
val maxAge: Duration,
val maxSize: Long,
val digestAlgorithm: String?,
val compressionEnabled: Boolean,
val compressionLevel: Int
private val maxAge: Duration,
private val maxSize: Long,
private val digestAlgorithm: String?,
private val compressionEnabled: Boolean,
private val compressionLevel: Int,
private val chunkSize : Int
) : Cache {
companion object {
@@ -35,8 +40,9 @@ class InMemoryCache(
private val size = AtomicLong()
private val map = ConcurrentHashMap<String, ByteBuf>()
private class RemovalQueueElement(val key: String, val value : ByteBuf, val expiry : Instant) : Comparable<RemovalQueueElement> {
private class RemovalQueueElement(val key: String, val value: ByteBuf, val expiry: Instant) :
Comparable<RemovalQueueElement> {
override fun compareTo(other: RemovalQueueElement) = expiry.compareTo(other.expiry)
}
@@ -46,19 +52,17 @@ class InMemoryCache(
private var running = true
private val garbageCollector = Thread.ofVirtual().name("in-memory-cache-gc").start {
while(running) {
val el = removalQueue.take()
while (running) {
val el = removalQueue.poll(1, TimeUnit.SECONDS) ?: continue
val buf = el.value
val now = Instant.now()
if(now > el.expiry) {
if (now > el.expiry) {
val removed = map.remove(el.key, buf)
if(removed) {
if (removed) {
updateSizeAfterRemoval(buf)
//Decrease the reference count for map
buf.release()
}
//Decrease the reference count for removalQueue
buf.release()
} else {
removalQueue.put(el)
Thread.sleep(minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1)))
@@ -66,14 +70,12 @@ class InMemoryCache(
}
}
private fun removeEldest() : Long {
while(true) {
private fun removeEldest(): Long {
while (true) {
val el = removalQueue.take()
val buf = el.value
val removed = map.remove(el.key, buf)
//Decrease the reference count for removalQueue
buf.release()
if(removed) {
if (removed) {
val newSize = updateSizeAfterRemoval(buf)
//Decrease the reference count for map
buf.release()
@@ -82,8 +84,8 @@ class InMemoryCache(
}
}
private fun updateSizeAfterRemoval(removed: ByteBuf) : Long {
return size.updateAndGet { currentSize : Long ->
private fun updateSizeAfterRemoval(removed: ByteBuf): Long {
return size.updateAndGet { currentSize: Long ->
currentSize - removed.readableBytes()
}
}
@@ -93,58 +95,114 @@ class InMemoryCache(
garbageCollector.join()
}
override fun get(key: String) =
(digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digestString(key.toByteArray(), md)
} ?: key
).let { digest ->
map[digest]
?.let { value ->
val copy = value.retainedDuplicate()
copy.touch("This has to be released by the caller of the cache")
if (compressionEnabled) {
val inflater = Inflater()
Channels.newChannel(InflaterInputStream(ByteBufInputStream(copy), inflater))
} else {
Channels.newChannel(ByteBufInputStream(copy))
}
override fun get(key: String, responseHandle: ResponseHandle, alloc: ByteBufAllocator) {
try {
(digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digestString(key.toByteArray(), md)
} ?: key
).let { digest ->
map[digest]
?.let { value ->
val copy = value.retainedDuplicate()
responseHandle.handleEvent(ResponseStreamingEvent.RESPONSE_RECEIVED)
val output = alloc.compositeBuffer()
if (compressionEnabled) {
try {
val stream = ByteBufOutputStream(output).let {
val inflater = Inflater()
InflaterOutputStream(it, inflater)
}
stream.use { os ->
var readable = copy.readableBytes()
while (true) {
copy.readBytes(os, chunkSize.coerceAtMost(readable))
readable = copy.readableBytes()
val last = readable == 0
if (last) stream.flush()
if (output.readableBytes() >= chunkSize || last) {
val chunk = extractChunk(output, alloc)
val evt = if (last) {
ResponseStreamingEvent.LastChunkReceived(chunk)
} else {
ResponseStreamingEvent.ChunkReceived(chunk)
}
responseHandle.handleEvent(evt)
}
if (last) break
}
}
} finally {
copy.release()
}
} else {
responseHandle.handleEvent(
ResponseStreamingEvent.LastChunkReceived(copy)
)
}
} ?: responseHandle.handleEvent(ResponseStreamingEvent.NOT_FOUND)
}
} catch (ex: Throwable) {
responseHandle.handleEvent(ResponseStreamingEvent.ExceptionCaught(ex))
}
}
override fun put(
key: String,
responseHandle: ResponseHandle,
alloc: ByteBufAllocator
): CompletableFuture<RequestHandle> {
return CompletableFuture.completedFuture(object : RequestHandle {
val buf = alloc.heapBuffer()
val stream = ByteBufOutputStream(buf).let {
if (compressionEnabled) {
val deflater = Deflater(compressionLevel)
DeflaterOutputStream(it, deflater)
} else {
it
}
}.let {
CompletableFuture.completedFuture(it)
}
override fun put(key: String, content: ByteBuf) =
(digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digestString(key.toByteArray(), md)
} ?: key).let { digest ->
content.retain()
val value = if (compressionEnabled) {
val deflater = Deflater(compressionLevel)
val buf = content.alloc().buffer()
buf.retain()
DeflaterOutputStream(ByteBufOutputStream(buf), deflater).use { outputStream ->
ByteBufInputStream(content).use { inputStream ->
JWO.copy(inputStream, outputStream)
override fun handleEvent(evt: RequestStreamingEvent) {
when (evt) {
is RequestStreamingEvent.ChunkReceived -> {
evt.chunk.readBytes(stream, evt.chunk.readableBytes())
if (evt is RequestStreamingEvent.LastChunkReceived) {
(digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digestString(key.toByteArray(), md)
} ?: key
).let { digest ->
val oldSize = map.put(digest, buf.retain())?.let { old ->
val result = old.readableBytes()
old.release()
result
} ?: 0
val delta = buf.readableBytes() - oldSize
var newSize = size.updateAndGet { currentSize : Long ->
currentSize + delta
}
removalQueue.put(RemovalQueueElement(digest, buf, Instant.now().plus(maxAge)))
while(newSize > maxSize) {
newSize = removeEldest()
}
stream.close()
responseHandle.handleEvent(ResponseStreamingEvent.RESPONSE_RECEIVED)
}
}
}
is RequestStreamingEvent.ExceptionCaught -> {
stream.close()
}
else -> {
}
}
buf
} else {
content
}
val old = map.put(digest, value)
val delta = value.readableBytes() - (old?.readableBytes() ?: 0)
var newSize = size.updateAndGet { currentSize : Long ->
currentSize + delta
}
removalQueue.put(RemovalQueueElement(digest, value.retain(), Instant.now().plus(maxAge)))
while(newSize > maxSize) {
newSize = removeEldest()
}
}.let {
CompletableFuture.completedFuture<Void>(null)
}
})
}
}

View File

@@ -10,13 +10,15 @@ data class InMemoryCacheConfiguration(
val digestAlgorithm : String?,
val compressionEnabled: Boolean,
val compressionLevel: Int,
val chunkSize : Int
) : Configuration.Cache {
override fun materialize() = InMemoryCache(
maxAge,
maxSize,
digestAlgorithm,
compressionEnabled,
compressionLevel
compressionLevel,
chunkSize
)
override fun getNamespaceURI() = RBCS.RBCS_NAMESPACE_URI

View File

@@ -31,13 +31,16 @@ class InMemoryCacheProvider : CacheProvider<InMemoryCacheConfiguration> {
?.let(String::toInt)
?: Deflater.DEFAULT_COMPRESSION
val digestAlgorithm = el.renderAttribute("digest") ?: "MD5"
val chunkSize = el.renderAttribute("chunk-size")
?.let(Integer::decode)
?: 0x4000
return InMemoryCacheConfiguration(
maxAge,
maxSize,
digestAlgorithm,
enableCompression,
compressionLevel
compressionLevel,
chunkSize
)
}
@@ -57,6 +60,7 @@ class InMemoryCacheProvider : CacheProvider<InMemoryCacheConfiguration> {
}?.let {
attr("compression-level", it.toString())
}
attr("chunk-size", chunkSize.toString())
}
result
}

View File

@@ -124,7 +124,7 @@ object Parser {
val writeIdleTimeout = child.renderAttribute("write-idle-timeout")
?.let(Duration::parse) ?: Duration.of(60, ChronoUnit.SECONDS)
val maxRequestSize = child.renderAttribute("max-request-size")
?.let(String::toInt) ?: 67108864
?.let(Integer::decode) ?: 0x4000000
connection = Configuration.Connection(
readTimeout,
writeTimeout,

View File

@@ -2,34 +2,66 @@ package net.woggioni.rbcs.server.handler
import io.netty.buffer.Unpooled
import io.netty.channel.ChannelFutureListener
import io.netty.channel.ChannelHandler
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.DefaultFileRegion
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.http.DefaultFullHttpResponse
import io.netty.handler.codec.http.DefaultHttpContent
import io.netty.handler.codec.http.DefaultHttpResponse
import io.netty.handler.codec.http.FullHttpRequest
import io.netty.handler.codec.http.DefaultLastHttpContent
import io.netty.handler.codec.http.HttpContent
import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpHeaderValues
import io.netty.handler.codec.http.HttpMethod
import io.netty.handler.codec.http.HttpObject
import io.netty.handler.codec.http.HttpRequest
import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpUtil
import io.netty.handler.codec.http.LastHttpContent
import io.netty.handler.stream.ChunkedNioStream
import net.woggioni.rbcs.api.Cache
import net.woggioni.rbcs.api.RequestHandle
import net.woggioni.rbcs.api.ResponseHandle
import net.woggioni.rbcs.api.event.RequestStreamingEvent
import net.woggioni.rbcs.api.event.ResponseStreamingEvent
import net.woggioni.rbcs.common.contextLogger
import net.woggioni.rbcs.common.debug
import net.woggioni.rbcs.server.debug
import net.woggioni.rbcs.server.warn
import java.nio.channels.FileChannel
import java.nio.file.Path
import java.util.concurrent.CompletableFuture
@ChannelHandler.Sharable
class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
SimpleChannelInboundHandler<FullHttpRequest>() {
SimpleChannelInboundHandler<HttpObject>() {
private val log = contextLogger()
override fun channelRead0(ctx: ChannelHandlerContext, msg: FullHttpRequest) {
override fun channelRead0(ctx: ChannelHandlerContext, msg: HttpObject) {
when(msg) {
is HttpRequest -> handleRequest(ctx, msg)
is HttpContent -> handleContent(msg)
}
}
private var requestHandle : CompletableFuture<RequestHandle?> = CompletableFuture.completedFuture(null)
private fun handleContent(content : HttpContent) {
content.retain()
requestHandle.thenAccept { handle ->
handle?.let {
val evt = if(content is LastHttpContent) {
RequestStreamingEvent.LastChunkReceived(content.content())
} else {
RequestStreamingEvent.ChunkReceived(content.content())
}
it.handleEvent(evt)
content.release()
} ?: content.release()
}
}
private fun handleRequest(ctx : ChannelHandlerContext, msg : HttpRequest) {
val keepAlive: Boolean = HttpUtil.isKeepAlive(msg)
val method = msg.method()
if (method === HttpMethod.GET) {
@@ -42,54 +74,55 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
return
}
if (serverPrefix == prefix) {
cache.get(key).thenApply { channel ->
if(channel != null) {
log.debug(ctx) {
"Cache hit for key '$key'"
}
val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK)
response.headers()[HttpHeaderNames.CONTENT_TYPE] = HttpHeaderValues.APPLICATION_OCTET_STREAM
if (!keepAlive) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.IDENTITY)
} else {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
val responseHandle = ResponseHandle { evt ->
when (evt) {
is ResponseStreamingEvent.ResponseReceived -> {
val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK)
response.headers()[HttpHeaderNames.CONTENT_TYPE] = HttpHeaderValues.APPLICATION_OCTET_STREAM
if (!keepAlive) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
} else {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
}
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
ctx.writeAndFlush(response)
}
ctx.write(response)
when (channel) {
is FileChannel -> {
val content = DefaultFileRegion(channel, 0, channel.size())
if (keepAlive) {
ctx.write(content)
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate())
} else {
ctx.writeAndFlush(content)
.addListener(ChannelFutureListener.CLOSE)
}
}
else -> {
val content = ChunkedNioStream(channel)
if (keepAlive) {
ctx.write(content).addListener {
content.close()
}
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate())
} else {
ctx.writeAndFlush(content)
.addListener(ChannelFutureListener.CLOSE)
}
is ResponseStreamingEvent.LastChunkReceived -> {
val channelFuture = ctx.writeAndFlush(DefaultLastHttpContent(evt.chunk))
if (!keepAlive) {
channelFuture
.addListener(ChannelFutureListener.CLOSE)
}
}
} else {
log.debug(ctx) {
"Cache miss for key '$key'"
is ResponseStreamingEvent.ChunkReceived -> {
ctx.writeAndFlush(DefaultHttpContent(evt.chunk))
}
is ResponseStreamingEvent.ExceptionCaught -> {
ctx.fireExceptionCaught(evt.exception)
}
is ResponseStreamingEvent.NotFound -> {
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response)
}
is ResponseStreamingEvent.FileReceived -> {
val content = DefaultFileRegion(evt.file, 0, evt.file.size())
if (keepAlive) {
ctx.write(content)
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate())
} else {
ctx.writeAndFlush(content)
.addListener(ChannelFutureListener.CLOSE)
}
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response)
}
}.whenComplete { _, ex -> ex?.let(ctx::fireExceptionCaught) }
}
cache.get(key, responseHandle, ctx.alloc())
} else {
log.warn(ctx) {
"Got request for unhandled path '${msg.uri()}'"
@@ -107,15 +140,32 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
log.debug(ctx) {
"Added value for key '$key' to build cache"
}
cache.put(key, msg.content()).thenRun {
val response = DefaultFullHttpResponse(
msg.protocolVersion(), HttpResponseStatus.CREATED,
Unpooled.copiedBuffer(key.toByteArray())
)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes()
ctx.writeAndFlush(response)
}.whenComplete { _, ex ->
val responseHandle = ResponseHandle { evt ->
when (evt) {
is ResponseStreamingEvent.ResponseReceived -> {
val response = DefaultFullHttpResponse(
msg.protocolVersion(), HttpResponseStatus.CREATED,
Unpooled.copiedBuffer(key.toByteArray())
)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes()
ctx.writeAndFlush(response)
this.requestHandle = CompletableFuture.completedFuture(null)
}
is ResponseStreamingEvent.ChunkReceived -> {
evt.chunk.release()
}
is ResponseStreamingEvent.ExceptionCaught -> {
ctx.fireExceptionCaught(evt.exception)
}
else -> {}
}
}
this.requestHandle = cache.put(key, responseHandle, ctx.alloc()).exceptionally { ex ->
ctx.fireExceptionCaught(ex)
null
}.also {
log.debug { "Replacing request handle with $it"}
}
} else {
log.warn(ctx) {
@@ -125,9 +175,12 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
ctx.writeAndFlush(response)
}
} else if(method == HttpMethod.TRACE) {
} else if (method == HttpMethod.TRACE) {
val replayedRequestHead = ctx.alloc().buffer()
replayedRequestHead.writeCharSequence("TRACE ${Path.of(msg.uri())} ${msg.protocolVersion().text()}\r\n", Charsets.US_ASCII)
replayedRequestHead.writeCharSequence(
"TRACE ${Path.of(msg.uri())} ${msg.protocolVersion().text()}\r\n",
Charsets.US_ASCII
)
msg.headers().forEach { (key, value) ->
replayedRequestHead.apply {
writeCharSequence(key, Charsets.US_ASCII)
@@ -137,16 +190,24 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
}
}
replayedRequestHead.writeCharSequence("\r\n", Charsets.US_ASCII)
val requestBody = msg.content()
requestBody.retain()
val responseBody = ctx.alloc().compositeBuffer(2).apply {
addComponents(true, replayedRequestHead)
addComponents(true, requestBody)
this.requestHandle = CompletableFuture.completedFuture(RequestHandle { evt ->
when(evt) {
is RequestStreamingEvent.LastChunkReceived -> {
ctx.writeAndFlush(DefaultLastHttpContent(evt.chunk.retain()))
this.requestHandle = CompletableFuture.completedFuture(null)
}
is RequestStreamingEvent.ChunkReceived -> ctx.writeAndFlush(DefaultHttpContent(evt.chunk.retain()))
is RequestStreamingEvent.ExceptionCaught -> ctx.fireExceptionCaught(evt.exception)
else -> {
}
}
}).also {
log.debug { "Replacing request handle with $it"}
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK, responseBody)
val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK)
response.headers().apply {
set(HttpHeaderNames.CONTENT_TYPE, "message/http")
set(HttpHeaderNames.CONTENT_LENGTH, responseBody.readableBytes())
}
ctx.writeAndFlush(response)
} else {
@@ -158,4 +219,11 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
ctx.writeAndFlush(response)
}
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
requestHandle.thenAccept { handle ->
handle?.handleEvent(RequestStreamingEvent.ExceptionCaught(cause))
}
super.exceptionCaught(ctx, cause)
}
}

View File

@@ -1,10 +1,11 @@
package net.woggioni.rbcs.server.throttling
import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.handler.codec.http.DefaultFullHttpResponse
import io.netty.handler.codec.http.HttpContent
import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpRequest
import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpVersion
import net.woggioni.rbcs.api.Configuration
@@ -18,7 +19,6 @@ import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeUnit
@Sharable
class ThrottlingHandler(cfg: Configuration) : ChannelInboundHandlerAdapter() {
private companion object {
@@ -30,6 +30,8 @@ class ThrottlingHandler(cfg: Configuration) : ChannelInboundHandlerAdapter() {
private val connectionConfiguration = cfg.connection
private var queuedContent : MutableList<HttpContent>? = null
/**
* If the suggested waiting time from the bucket is lower than this
* amount, then the server will simply wait by itself before sending a response
@@ -41,25 +43,34 @@ class ThrottlingHandler(cfg: Configuration) : ChannelInboundHandlerAdapter() {
connectionConfiguration.writeIdleTimeout
).dividedBy(2)
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
val buckets = mutableListOf<Bucket>()
val user = ctx.channel().attr(RemoteBuildCacheServer.userAttribute).get()
if (user != null) {
bucketManager.getBucketByUser(user)?.let(buckets::addAll)
}
val groups = ctx.channel().attr(RemoteBuildCacheServer.groupAttribute).get() ?: emptySet()
if (groups.isNotEmpty()) {
groups.forEach { group ->
bucketManager.getBucketByGroup(group)?.let(buckets::add)
if(msg is HttpRequest) {
val buckets = mutableListOf<Bucket>()
val user = ctx.channel().attr(RemoteBuildCacheServer.userAttribute).get()
if (user != null) {
bucketManager.getBucketByUser(user)?.let(buckets::addAll)
}
}
if (user == null && groups.isEmpty()) {
bucketManager.getBucketByAddress(ctx.channel().remoteAddress() as InetSocketAddress)?.let(buckets::add)
}
if (buckets.isEmpty()) {
return super.channelRead(ctx, msg)
val groups = ctx.channel().attr(RemoteBuildCacheServer.groupAttribute).get() ?: emptySet()
if (groups.isNotEmpty()) {
groups.forEach { group ->
bucketManager.getBucketByGroup(group)?.let(buckets::add)
}
}
if (user == null && groups.isEmpty()) {
bucketManager.getBucketByAddress(ctx.channel().remoteAddress() as InetSocketAddress)?.let(buckets::add)
}
if (buckets.isEmpty()) {
super.channelRead(ctx, msg)
} else {
handleBuckets(buckets, ctx, msg, true)
}
ctx.channel().id()
} else if(msg is HttpContent) {
queuedContent?.add(msg) ?: super.channelRead(ctx, msg)
} else {
handleBuckets(buckets, ctx, msg, true)
super.channelRead(ctx, msg)
}
}
@@ -73,9 +84,16 @@ class ThrottlingHandler(cfg: Configuration) : ChannelInboundHandlerAdapter() {
}
if (nextAttempt < 0) {
super.channelRead(ctx, msg)
queuedContent?.let {
for(content in it) {
super.channelRead(ctx, content)
}
queuedContent = null
}
} else {
val waitDuration = Duration.of(LongMath.ceilDiv(nextAttempt, 100_000_000L) * 100L, ChronoUnit.MILLIS)
if (delayResponse && waitDuration < waitThreshold) {
this.queuedContent = mutableListOf()
ctx.executor().schedule({
handleBuckets(buckets, ctx, msg, false)
}, waitDuration.toMillis(), TimeUnit.MILLISECONDS)

View File

@@ -39,7 +39,7 @@
<xs:attribute name="idle-timeout" type="xs:duration" use="optional" default="PT30S"/>
<xs:attribute name="read-idle-timeout" type="xs:duration" use="optional" default="PT60S"/>
<xs:attribute name="write-idle-timeout" type="xs:duration" use="optional" default="PT60S"/>
<xs:attribute name="max-request-size" type="xs:unsignedInt" use="optional" default="67108864"/>
<xs:attribute name="max-request-size" type="rbcs:byteSize" use="optional" default="0x4000000"/>
</xs:complexType>
<xs:complexType name="eventExecutorType">
@@ -52,10 +52,11 @@
<xs:complexContent>
<xs:extension base="rbcs:cacheType">
<xs:attribute name="max-age" type="xs:duration" default="P1D"/>
<xs:attribute name="max-size" type="xs:token" default="0x1000000"/>
<xs:attribute name="max-size" type="rbcs:byteSize" default="0x1000000"/>
<xs:attribute name="digest" type="xs:token" default="MD5"/>
<xs:attribute name="enable-compression" type="xs:boolean" default="true"/>
<xs:attribute name="compression-level" type="xs:byte" default="-1"/>
<xs:attribute name="chunk-size" type="rbcs:byteSize" default="0x4000"/>
</xs:extension>
</xs:complexContent>
</xs:complexType>
@@ -68,6 +69,7 @@
<xs:attribute name="digest" type="xs:token" default="MD5"/>
<xs:attribute name="enable-compression" type="xs:boolean" default="true"/>
<xs:attribute name="compression-level" type="xs:byte" default="-1"/>
<xs:attribute name="chunk-size" type="rbcs:byteSize" default="0x4000"/>
</xs:extension>
</xs:complexContent>
</xs:complexType>
@@ -220,5 +222,10 @@
<xs:attribute name="port" type="xs:unsignedShort" use="required"/>
</xs:complexType>
<xs:simpleType name="byteSize">
<xs:restriction base="xs:token">
<xs:pattern value="(0x[a-f0-9]+|[0-9]+)"/>
</xs:restriction>
</xs:simpleType>
</xs:schema>

View File

@@ -47,11 +47,13 @@ abstract class AbstractBasicAuthServerTest : AbstractServerTest() {
),
users.asSequence().map { it.name to it}.toMap(),
sequenceOf(writersGroup, readersGroup).map { it.name to it}.toMap(),
FileSystemCacheConfiguration(this.cacheDir,
FileSystemCacheConfiguration(
this.cacheDir,
maxAge = Duration.ofSeconds(3600 * 24),
digestAlgorithm = "MD5",
compressionLevel = Deflater.DEFAULT_COMPRESSION,
compressionEnabled = false
compressionEnabled = false,
chunkSize = 0x1000
),
Configuration.BasicAuthentication(),
null,

View File

@@ -156,7 +156,8 @@ abstract class AbstractTlsServerTest : AbstractServerTest() {
maxAge = Duration.ofSeconds(3600 * 24),
compressionEnabled = true,
compressionLevel = Deflater.DEFAULT_COMPRESSION,
digestAlgorithm = "MD5"
digestAlgorithm = "MD5",
chunkSize = 0x1000
),
// InMemoryCacheConfiguration(
// maxAge = Duration.ofSeconds(3600 * 24),

View File

@@ -86,7 +86,7 @@ class BasicAuthServerTest : AbstractBasicAuthServerTest() {
@Test
@Order(4)
fun putAsAWriterUser() {
val client: HttpClient = HttpClient.newHttpClient()
val client: HttpClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).build()
val (key, value) = keyValuePair
val user = cfg.users.values.find {

View File

@@ -52,7 +52,8 @@ class NoAuthServerTest : AbstractServerTest() {
compressionEnabled = true,
digestAlgorithm = "MD5",
compressionLevel = Deflater.DEFAULT_COMPRESSION,
maxSize = 0x1000000
maxSize = 0x1000000,
chunkSize = 0x1000
),
null,
null,
@@ -80,7 +81,7 @@ class NoAuthServerTest : AbstractServerTest() {
@Test
@Order(1)
fun putWithNoAuthorizationHeader() {
val client: HttpClient = HttpClient.newHttpClient()
val client: HttpClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).build()
val (key, value) = keyValuePair
val requestBuilder = newRequestBuilder(key)

View File

@@ -11,7 +11,7 @@
idle-timeout="PT30M"
max-request-size="101325"/>
<event-executor use-virtual-threads="false"/>
<cache xs:type="rbcs:fileSystemCacheType" path="/tmp/rbcs" max-age="P7D"/>
<cache xs:type="rbcs:fileSystemCacheType" path="/tmp/rbcs" max-age="P7D" chunk-size="0xa910"/>
<authentication>
<none/>
</authentication>

View File

@@ -13,7 +13,7 @@
read-timeout="PT5M"
write-timeout="PT5M"/>
<event-executor use-virtual-threads="true"/>
<cache xs:type="rbcs-memcache:memcacheCacheType" max-age="P7D" max-size="16777216" compression-mode="deflate">
<cache xs:type="rbcs-memcache:memcacheCacheType" max-age="P7D" max-size="16777216" compression-mode="deflate" chunk-size="123">
<server host="memcached" port="11211"/>
</cache>
<authorization>

View File

@@ -12,7 +12,7 @@
idle-timeout="PT30M"
max-request-size="101325"/>
<event-executor use-virtual-threads="false"/>
<cache xs:type="rbcs-memcache:memcacheCacheType" max-age="P7D" max-size="101325" digest="SHA-256">
<cache xs:type="rbcs-memcache:memcacheCacheType" max-age="P7D" max-size="101325" digest="SHA-256" chunk-size="456">
<server host="127.0.0.1" port="11211" max-connections="10" connection-timeout="PT20S"/>
</cache>
<authentication>

View File

@@ -11,7 +11,7 @@
idle-timeout="PT30M"
max-request-size="4096"/>
<event-executor use-virtual-threads="false"/>
<cache xs:type="rbcs:inMemoryCacheType" max-age="P7D"/>
<cache xs:type="rbcs:inMemoryCacheType" max-age="P7D" chunk-size="0xa91f"/>
<authorization>
<users>
<user name="user1" password="password1">

View File

@@ -29,6 +29,6 @@ include 'rbcs-api'
include 'rbcs-common'
include 'rbcs-server-memcache'
include 'rbcs-cli'
include 'docker'
include 'rbcs-client'
include 'rbcs-server'
include 'docker'