From 6cba4d24bb57e180bef10cab6387cc631dc4d3cc Mon Sep 17 00:00:00 2001 From: Walter Oggioni Date: Tue, 17 Jun 2025 21:40:48 +0800 Subject: [PATCH] resolved race condition in the client for response lifetime improved memory usage of the in-memory cache backend --- .../rbcs/client/RemoteBuildCacheClient.kt | 24 ++++++++++++------- .../rbcs/server/cache/InMemoryCache.kt | 24 +++++-------------- .../rbcs/server/cache/InMemoryCacheHandler.kt | 18 ++++++++++---- 3 files changed, 35 insertions(+), 31 deletions(-) diff --git a/rbcs-client/src/main/kotlin/net/woggioni/rbcs/client/RemoteBuildCacheClient.kt b/rbcs-client/src/main/kotlin/net/woggioni/rbcs/client/RemoteBuildCacheClient.kt index 7516fb1..cac8d8a 100644 --- a/rbcs-client/src/main/kotlin/net/woggioni/rbcs/client/RemoteBuildCacheClient.kt +++ b/rbcs-client/src/main/kotlin/net/woggioni/rbcs/client/RemoteBuildCacheClient.kt @@ -254,19 +254,25 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC fun get(key: String): CompletableFuture { return executeWithRetry { sendRequest(profile.serverURI.resolve(key), HttpMethod.GET, null) - }.thenApply { - val status = it.status() - if (it.status() == HttpResponseStatus.NOT_FOUND) { + }.thenApply { response -> + val status = response.status() + if (response.status() == HttpResponseStatus.NOT_FOUND) { + response.release() null - } else if (it.status() != HttpResponseStatus.OK) { + } else if (response.status() != HttpResponseStatus.OK) { + response.release() throw HttpException(status) } else { - it.content() + response.content().also { + it.retain() + response.release() + } } }.thenApply { maybeByteBuf -> - maybeByteBuf?.let { - val result = ByteArray(it.readableBytes()) - it.getBytes(0, result) + maybeByteBuf?.let { buf -> + val result = ByteArray(buf.readableBytes()) + buf.getBytes(0, result) + buf.release() result } } @@ -318,7 +324,7 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC response: FullHttpResponse ) { pipeline.remove(this) - responseFuture.complete(response) + responseFuture.complete(response.retainedDuplicate()) if (!profile.connection.requestPipelining) { pool.release(channel) } diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCache.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCache.kt index dc8055a..2b67750 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCache.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCache.kt @@ -1,6 +1,5 @@ package net.woggioni.rbcs.server.cache -import io.netty.buffer.ByteBuf import java.time.Duration import java.time.Instant import java.util.PriorityQueue @@ -22,7 +21,7 @@ private class CacheKey(private val value: ByteArray) { class CacheEntry( val metadata: CacheValueMetadata, - val content: ByteBuf + val content: ByteArray ) class InMemoryCache( @@ -66,8 +65,6 @@ class InMemoryCache( val removed = map.remove(el.key, value) if (removed) { updateSizeAfterRemoval(value.content) - //Decrease the reference count for map - value.content.release() } } else { removalQueue.offer(el) @@ -75,9 +72,6 @@ class InMemoryCache( cond.await(interval.toMillis(), TimeUnit.MILLISECONDS) } } - map.forEach { - it.value.content.release() - } map.clear() } complete(null) @@ -95,15 +89,13 @@ class InMemoryCache( val removed = map.remove(el.key, value) if (removed) { val newSize = updateSizeAfterRemoval(value.content) - //Decrease the reference count for map - value.content.release() return newSize } } } - private fun updateSizeAfterRemoval(removed: ByteBuf): Long { - mapSize -= removed.readableBytes() + private fun updateSizeAfterRemoval(removed: ByteArray): Long { + mapSize -= removed.size return mapSize } @@ -117,7 +109,7 @@ class InMemoryCache( fun get(key: ByteArray) = lock.readLock().withLock { map[CacheKey(key)]?.run { - CacheEntry(metadata, content.retainedDuplicate()) + CacheEntry(metadata, content) } } @@ -127,12 +119,8 @@ class InMemoryCache( ) { val cacheKey = CacheKey(key) lock.writeLock().withLock { - val oldSize = map.put(cacheKey, value)?.let { old -> - val result = old.content.readableBytes() - old.content.release() - result - } ?: 0 - val delta = value.content.readableBytes() - oldSize + val oldSize = map.put(cacheKey, value)?.content?.size ?: 0 + val delta = value.content.size - oldSize mapSize += delta removalQueue.offer(RemovalQueueElement(cacheKey, value, Instant.now().plus(maxAge))) while (mapSize > maxSize) { diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheHandler.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheHandler.kt index f9e1df9..09c622f 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheHandler.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheHandler.kt @@ -2,6 +2,8 @@ package net.woggioni.rbcs.server.cache import io.netty.buffer.ByteBuf import io.netty.channel.ChannelHandlerContext +import java.io.ByteArrayInputStream +import java.io.ByteArrayOutputStream import java.util.zip.Deflater import java.util.zip.DeflaterOutputStream import java.util.zip.InflaterOutputStream @@ -111,18 +113,23 @@ class InMemoryCacheHandler( handleCacheContent(ctx, msg) when (val req = inProgressRequest) { is InProgressGetRequest -> { +// this.inProgressRequest = null cache.get(processCacheKey(req.request.key, null, digestAlgorithm))?.let { value -> sendMessageAndFlush(ctx, CacheValueFoundResponse(req.request.key, value.metadata)) if (compressionEnabled) { val buf = ctx.alloc().heapBuffer() InflaterOutputStream(ByteBufOutputStream(buf)).use { - value.content.readBytes(it, value.content.readableBytes()) - value.content.release() + it.write(value.content) buf.retain() } sendMessage(ctx, LastCacheContent(buf)) } else { - sendMessage(ctx, LastCacheContent(value.content)) + val buf = ctx.alloc().heapBuffer() + ByteBufOutputStream(buf).use { + it.write(value.content) + buf.retain() + } + sendMessage(ctx, LastCacheContent(buf)) } } ?: sendMessage(ctx, CacheValueNotFoundResponse(req.request.key)) } @@ -132,8 +139,11 @@ class InMemoryCacheHandler( val buf = req.buf buf.retain() req.close() + + val bytes = ByteArray(buf.readableBytes()).also(buf::readBytes) + buf.release() val cacheKey = processCacheKey(req.request.key, null, digestAlgorithm) - cache.put(cacheKey, CacheEntry(req.request.metadata, buf)) + cache.put(cacheKey, CacheEntry(req.request.metadata, bytes)) sendMessageAndFlush(ctx, CachePutResponse(req.request.key)) } }