diff --git a/gbcs-common/src/main/kotlin/net/woggioni/gbcs/common/ByteBufOutputStream.kt b/gbcs-common/src/main/kotlin/net/woggioni/gbcs/common/ByteBufOutputStream.kt new file mode 100644 index 0000000..3e4d026 --- /dev/null +++ b/gbcs-common/src/main/kotlin/net/woggioni/gbcs/common/ByteBufOutputStream.kt @@ -0,0 +1,19 @@ +package net.woggioni.gbcs.common + +import io.netty.buffer.ByteBuf +import java.io.InputStream +import java.io.OutputStream + +class ByteBufOutputStream(private val buf : ByteBuf) : OutputStream() { + override fun write(b: Int) { + buf.writeByte(b) + } + + override fun write(b: ByteArray, off: Int, len: Int) { + buf.writeBytes(b, off, len) + } + + override fun close() { + buf.release() + } +} \ No newline at end of file diff --git a/gbcs-server-memcache/src/main/kotlin/net/woggioni/gbcs/server/memcache/client/MemcacheClient.kt b/gbcs-server-memcache/src/main/kotlin/net/woggioni/gbcs/server/memcache/client/MemcacheClient.kt index 0a9ac29..62cbfad 100644 --- a/gbcs-server-memcache/src/main/kotlin/net/woggioni/gbcs/server/memcache/client/MemcacheClient.kt +++ b/gbcs-server-memcache/src/main/kotlin/net/woggioni/gbcs/server/memcache/client/MemcacheClient.kt @@ -24,6 +24,7 @@ import io.netty.handler.codec.memcache.binary.FullBinaryMemcacheRequest import io.netty.handler.codec.memcache.binary.FullBinaryMemcacheResponse import io.netty.util.concurrent.GenericFutureListener import net.woggioni.gbcs.common.ByteBufInputStream +import net.woggioni.gbcs.common.ByteBufOutputStream import net.woggioni.gbcs.common.GBCS.digest import net.woggioni.gbcs.common.HostAndPort import net.woggioni.gbcs.common.contextLogger @@ -136,6 +137,7 @@ class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseabl response.completeExceptionally(ex) } }) + request.touch() channel.writeAndFlush(request) } else { response.completeExceptionally(channelFuture.cause()) @@ -162,29 +164,35 @@ class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseabl } } return sendRequest(request).thenApply { response -> - when (val status = response.status()) { - BinaryMemcacheResponseStatus.SUCCESS -> { - val compressionMode = cfg.compressionMode - val content = response.content().retain() - response.release() - if (compressionMode != null) { - when (compressionMode) { - MemcacheCacheConfiguration.CompressionMode.GZIP -> { - GZIPInputStream(ByteBufInputStream(content)) - } + try { + when (val status = response.status()) { + BinaryMemcacheResponseStatus.SUCCESS -> { + val compressionMode = cfg.compressionMode + val content = response.content().retain() + content.touch() + if (compressionMode != null) { + when (compressionMode) { + MemcacheCacheConfiguration.CompressionMode.GZIP -> { + GZIPInputStream(ByteBufInputStream(content)) + } - MemcacheCacheConfiguration.CompressionMode.DEFLATE -> { - InflaterInputStream(ByteBufInputStream(content)) + MemcacheCacheConfiguration.CompressionMode.DEFLATE -> { + InflaterInputStream(ByteBufInputStream(content)) + } } - } - } else { - ByteBufInputStream(content) - }.let(Channels::newChannel) + } else { + ByteBufInputStream(content) + }.let(Channels::newChannel) + } + + BinaryMemcacheResponseStatus.KEY_ENOENT -> { + null + } + + else -> throw MemcacheException(status) } - BinaryMemcacheResponseStatus.KEY_ENOENT -> { - null - } - else -> throw MemcacheException(status) + } finally { + response.release() } } } @@ -199,16 +207,18 @@ class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseabl extras.writeInt(0) extras.writeInt(encodeExpiry(expiry)) val compressionMode = cfg.compressionMode + content.retain() val payload = if (compressionMode != null) { - val inputStream = ByteBufInputStream(Unpooled.wrappedBuffer(content)) - val baos = ByteArrayOutputStream() + val inputStream = ByteBufInputStream(content) + val buf = content.alloc().buffer() + buf.retain() val outputStream = when (compressionMode) { MemcacheCacheConfiguration.CompressionMode.GZIP -> { - GZIPOutputStream(baos) + GZIPOutputStream(ByteBufOutputStream(buf)) } MemcacheCacheConfiguration.CompressionMode.DEFLATE -> { - DeflaterOutputStream(baos, Deflater(Deflater.DEFAULT_COMPRESSION, false)) + DeflaterOutputStream(ByteBufOutputStream(buf), Deflater(Deflater.DEFAULT_COMPRESSION, false)) } } inputStream.use { i -> @@ -216,7 +226,7 @@ class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseabl JWO.copy(i, o) } } - Unpooled.wrappedBuffer(baos.toByteArray()) + buf } else { content } diff --git a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/cache/InMemoryCache.kt b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/cache/InMemoryCache.kt index 90271fa..1490684 100644 --- a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/cache/InMemoryCache.kt +++ b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/cache/InMemoryCache.kt @@ -1,12 +1,12 @@ package net.woggioni.gbcs.server.cache import io.netty.buffer.ByteBuf -import io.netty.buffer.Unpooled import net.woggioni.gbcs.api.Cache import net.woggioni.gbcs.common.ByteBufInputStream +import net.woggioni.gbcs.common.ByteBufOutputStream import net.woggioni.gbcs.common.GBCS.digestString +import net.woggioni.gbcs.common.contextLogger import net.woggioni.jwo.JWO -import java.io.ByteArrayOutputStream import java.nio.channels.Channels import java.security.MessageDigest import java.time.Duration @@ -14,6 +14,7 @@ import java.time.Instant import java.util.concurrent.CompletableFuture import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.PriorityBlockingQueue +import java.util.concurrent.atomic.AtomicLong import java.util.zip.Deflater import java.util.zip.DeflaterOutputStream import java.util.zip.Inflater @@ -21,11 +22,18 @@ import java.util.zip.InflaterInputStream class InMemoryCache( val maxAge: Duration, + val maxSize: Long, val digestAlgorithm: String?, val compressionEnabled: Boolean, val compressionLevel: Int ) : Cache { + companion object { + @JvmStatic + private val log = contextLogger() + } + + private val size = AtomicLong() private val map = ConcurrentHashMap() private class RemovalQueueElement(val key: String, val value : ByteBuf, val expiry : Instant) : Comparable { @@ -38,9 +46,17 @@ class InMemoryCache( private val garbageCollector = Thread { while(true) { val el = removalQueue.take() + val buf = el.value val now = Instant.now() if(now > el.expiry) { - map.remove(el.key, el.value) + val removed = map.remove(el.key, buf) + if(removed) { + updateSizeAfterRemoval(buf) + //Decrease the reference count for map + buf.release() + } + //Decrease the reference count for removalQueue + buf.release() } else { removalQueue.put(el) Thread.sleep(minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1))) @@ -50,6 +66,28 @@ class InMemoryCache( start() } + private fun removeEldest() : Long { + while(true) { + val el = removalQueue.take() + val buf = el.value + val removed = map.remove(el.key, buf) + //Decrease the reference count for removalQueue + buf.release() + if(removed) { + val newSize = updateSizeAfterRemoval(buf) + //Decrease the reference count for map + buf.release() + return newSize + } + } + } + + private fun updateSizeAfterRemoval(removed: ByteBuf) : Long { + return size.updateAndGet { currentSize : Long -> + currentSize - removed.readableBytes() + } + } + override fun close() { running = false garbageCollector.join() @@ -64,11 +102,13 @@ class InMemoryCache( ).let { digest -> map[digest] ?.let { value -> + val copy = value.retainedDuplicate() + copy.touch("This has to be released by the caller of the cache") if (compressionEnabled) { val inflater = Inflater() - Channels.newChannel(InflaterInputStream(ByteBufInputStream(value), inflater)) + Channels.newChannel(InflaterInputStream(ByteBufInputStream(copy), inflater)) } else { - Channels.newChannel(ByteBufInputStream(value)) + Channels.newChannel(ByteBufInputStream(copy)) } } }.let { @@ -81,18 +121,29 @@ class InMemoryCache( ?.let { md -> digestString(key.toByteArray(), md) } ?: key).let { digest -> + content.retain() val value = if (compressionEnabled) { val deflater = Deflater(compressionLevel) - val baos = ByteArrayOutputStream() - DeflaterOutputStream(baos, deflater).use { stream -> - JWO.copy(ByteBufInputStream(content), stream) + val buf = content.alloc().buffer() + buf.retain() + DeflaterOutputStream(ByteBufOutputStream(buf), deflater).use { outputStream -> + ByteBufInputStream(content).use { inputStream -> + JWO.copy(inputStream, outputStream) + } } - Unpooled.wrappedBuffer(baos.toByteArray()) + buf } else { content } - map[digest] = value - removalQueue.put(RemovalQueueElement(digest, value, Instant.now().plus(maxAge))) + val old = map.put(digest, value) + val delta = value.readableBytes() - (old?.readableBytes() ?: 0) + var newSize = size.updateAndGet { currentSize : Long -> + currentSize + delta + } + removalQueue.put(RemovalQueueElement(digest, value.retain(), Instant.now().plus(maxAge))) + while(newSize > maxSize) { + newSize = removeEldest() + } }.let { CompletableFuture.completedFuture(null) } diff --git a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/cache/InMemoryCacheConfiguration.kt b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/cache/InMemoryCacheConfiguration.kt index 92a31be..b638206 100644 --- a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/cache/InMemoryCacheConfiguration.kt +++ b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/cache/InMemoryCacheConfiguration.kt @@ -6,12 +6,14 @@ import java.time.Duration data class InMemoryCacheConfiguration( val maxAge: Duration, + val maxSize: Long, val digestAlgorithm : String?, val compressionEnabled: Boolean, val compressionLevel: Int, ) : Configuration.Cache { override fun materialize() = InMemoryCache( maxAge, + maxSize, digestAlgorithm, compressionEnabled, compressionLevel diff --git a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/cache/InMemoryCacheProvider.kt b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/cache/InMemoryCacheProvider.kt index 7bebd42..c54f82c 100644 --- a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/cache/InMemoryCacheProvider.kt +++ b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/cache/InMemoryCacheProvider.kt @@ -21,6 +21,9 @@ class InMemoryCacheProvider : CacheProvider { val maxAge = el.renderAttribute("max-age") ?.let(Duration::parse) ?: Duration.ofDays(1) + val maxSize = el.renderAttribute("max-size") + ?.let(java.lang.Long::decode) + ?: 0x1000000 val enableCompression = el.renderAttribute("enable-compression") ?.let(String::toBoolean) ?: true @@ -31,6 +34,7 @@ class InMemoryCacheProvider : CacheProvider { return InMemoryCacheConfiguration( maxAge, + maxSize, digestAlgorithm, enableCompression, compressionLevel @@ -43,6 +47,7 @@ class InMemoryCacheProvider : CacheProvider { val prefix = doc.lookupPrefix(GBCS.GBCS_NAMESPACE_URI) attr("xs:type", "${prefix}:inMemoryCacheType", GBCS.XML_SCHEMA_NAMESPACE_URI) attr("max-age", maxAge.toString()) + attr("max-size", maxSize.toString()) digestAlgorithm?.let { digestAlgorithm -> attr("digest", digestAlgorithm) } diff --git a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/handler/ServerHandler.kt b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/handler/ServerHandler.kt index 9f61095..41c001c 100644 --- a/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/handler/ServerHandler.kt +++ b/gbcs-server/src/main/kotlin/net/woggioni/gbcs/server/handler/ServerHandler.kt @@ -107,7 +107,7 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) : log.debug(ctx) { "Added value for key '$key' to build cache" } - cache.put(key, msg.content().retain()).thenRun { + cache.put(key, msg.content()).thenRun { val response = DefaultFullHttpResponse( msg.protocolVersion(), HttpResponseStatus.CREATED, Unpooled.copiedBuffer(key.toByteArray()) diff --git a/gbcs-server/src/main/resources/net/woggioni/gbcs/server/schema/gbcs.xsd b/gbcs-server/src/main/resources/net/woggioni/gbcs/server/schema/gbcs.xsd index 881bf35..a436257 100644 --- a/gbcs-server/src/main/resources/net/woggioni/gbcs/server/schema/gbcs.xsd +++ b/gbcs-server/src/main/resources/net/woggioni/gbcs/server/schema/gbcs.xsd @@ -52,6 +52,7 @@ +