resolved race condition in the client for response lifetime
All checks were successful
CI / build (push) Successful in 2m10s
All checks were successful
CI / build (push) Successful in 2m10s
improved memory usage of the in-memory cache backend
This commit is contained in:
@@ -254,19 +254,25 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
|
|||||||
fun get(key: String): CompletableFuture<ByteArray?> {
|
fun get(key: String): CompletableFuture<ByteArray?> {
|
||||||
return executeWithRetry {
|
return executeWithRetry {
|
||||||
sendRequest(profile.serverURI.resolve(key), HttpMethod.GET, null)
|
sendRequest(profile.serverURI.resolve(key), HttpMethod.GET, null)
|
||||||
}.thenApply {
|
}.thenApply { response ->
|
||||||
val status = it.status()
|
val status = response.status()
|
||||||
if (it.status() == HttpResponseStatus.NOT_FOUND) {
|
if (response.status() == HttpResponseStatus.NOT_FOUND) {
|
||||||
|
response.release()
|
||||||
null
|
null
|
||||||
} else if (it.status() != HttpResponseStatus.OK) {
|
} else if (response.status() != HttpResponseStatus.OK) {
|
||||||
|
response.release()
|
||||||
throw HttpException(status)
|
throw HttpException(status)
|
||||||
} else {
|
} else {
|
||||||
it.content()
|
response.content().also {
|
||||||
|
it.retain()
|
||||||
|
response.release()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}.thenApply { maybeByteBuf ->
|
}.thenApply { maybeByteBuf ->
|
||||||
maybeByteBuf?.let {
|
maybeByteBuf?.let { buf ->
|
||||||
val result = ByteArray(it.readableBytes())
|
val result = ByteArray(buf.readableBytes())
|
||||||
it.getBytes(0, result)
|
buf.getBytes(0, result)
|
||||||
|
buf.release()
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -318,7 +324,7 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
|
|||||||
response: FullHttpResponse
|
response: FullHttpResponse
|
||||||
) {
|
) {
|
||||||
pipeline.remove(this)
|
pipeline.remove(this)
|
||||||
responseFuture.complete(response)
|
responseFuture.complete(response.retainedDuplicate())
|
||||||
if (!profile.connection.requestPipelining) {
|
if (!profile.connection.requestPipelining) {
|
||||||
pool.release(channel)
|
pool.release(channel)
|
||||||
}
|
}
|
||||||
|
@@ -1,6 +1,5 @@
|
|||||||
package net.woggioni.rbcs.server.cache
|
package net.woggioni.rbcs.server.cache
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf
|
|
||||||
import java.time.Duration
|
import java.time.Duration
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
import java.util.PriorityQueue
|
import java.util.PriorityQueue
|
||||||
@@ -22,7 +21,7 @@ private class CacheKey(private val value: ByteArray) {
|
|||||||
|
|
||||||
class CacheEntry(
|
class CacheEntry(
|
||||||
val metadata: CacheValueMetadata,
|
val metadata: CacheValueMetadata,
|
||||||
val content: ByteBuf
|
val content: ByteArray
|
||||||
)
|
)
|
||||||
|
|
||||||
class InMemoryCache(
|
class InMemoryCache(
|
||||||
@@ -66,8 +65,6 @@ class InMemoryCache(
|
|||||||
val removed = map.remove(el.key, value)
|
val removed = map.remove(el.key, value)
|
||||||
if (removed) {
|
if (removed) {
|
||||||
updateSizeAfterRemoval(value.content)
|
updateSizeAfterRemoval(value.content)
|
||||||
//Decrease the reference count for map
|
|
||||||
value.content.release()
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
removalQueue.offer(el)
|
removalQueue.offer(el)
|
||||||
@@ -75,9 +72,6 @@ class InMemoryCache(
|
|||||||
cond.await(interval.toMillis(), TimeUnit.MILLISECONDS)
|
cond.await(interval.toMillis(), TimeUnit.MILLISECONDS)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
map.forEach {
|
|
||||||
it.value.content.release()
|
|
||||||
}
|
|
||||||
map.clear()
|
map.clear()
|
||||||
}
|
}
|
||||||
complete(null)
|
complete(null)
|
||||||
@@ -95,15 +89,13 @@ class InMemoryCache(
|
|||||||
val removed = map.remove(el.key, value)
|
val removed = map.remove(el.key, value)
|
||||||
if (removed) {
|
if (removed) {
|
||||||
val newSize = updateSizeAfterRemoval(value.content)
|
val newSize = updateSizeAfterRemoval(value.content)
|
||||||
//Decrease the reference count for map
|
|
||||||
value.content.release()
|
|
||||||
return newSize
|
return newSize
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun updateSizeAfterRemoval(removed: ByteBuf): Long {
|
private fun updateSizeAfterRemoval(removed: ByteArray): Long {
|
||||||
mapSize -= removed.readableBytes()
|
mapSize -= removed.size
|
||||||
return mapSize
|
return mapSize
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -117,7 +109,7 @@ class InMemoryCache(
|
|||||||
|
|
||||||
fun get(key: ByteArray) = lock.readLock().withLock {
|
fun get(key: ByteArray) = lock.readLock().withLock {
|
||||||
map[CacheKey(key)]?.run {
|
map[CacheKey(key)]?.run {
|
||||||
CacheEntry(metadata, content.retainedDuplicate())
|
CacheEntry(metadata, content)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -127,12 +119,8 @@ class InMemoryCache(
|
|||||||
) {
|
) {
|
||||||
val cacheKey = CacheKey(key)
|
val cacheKey = CacheKey(key)
|
||||||
lock.writeLock().withLock {
|
lock.writeLock().withLock {
|
||||||
val oldSize = map.put(cacheKey, value)?.let { old ->
|
val oldSize = map.put(cacheKey, value)?.content?.size ?: 0
|
||||||
val result = old.content.readableBytes()
|
val delta = value.content.size - oldSize
|
||||||
old.content.release()
|
|
||||||
result
|
|
||||||
} ?: 0
|
|
||||||
val delta = value.content.readableBytes() - oldSize
|
|
||||||
mapSize += delta
|
mapSize += delta
|
||||||
removalQueue.offer(RemovalQueueElement(cacheKey, value, Instant.now().plus(maxAge)))
|
removalQueue.offer(RemovalQueueElement(cacheKey, value, Instant.now().plus(maxAge)))
|
||||||
while (mapSize > maxSize) {
|
while (mapSize > maxSize) {
|
||||||
|
@@ -2,6 +2,8 @@ package net.woggioni.rbcs.server.cache
|
|||||||
|
|
||||||
import io.netty.buffer.ByteBuf
|
import io.netty.buffer.ByteBuf
|
||||||
import io.netty.channel.ChannelHandlerContext
|
import io.netty.channel.ChannelHandlerContext
|
||||||
|
import java.io.ByteArrayInputStream
|
||||||
|
import java.io.ByteArrayOutputStream
|
||||||
import java.util.zip.Deflater
|
import java.util.zip.Deflater
|
||||||
import java.util.zip.DeflaterOutputStream
|
import java.util.zip.DeflaterOutputStream
|
||||||
import java.util.zip.InflaterOutputStream
|
import java.util.zip.InflaterOutputStream
|
||||||
@@ -111,18 +113,23 @@ class InMemoryCacheHandler(
|
|||||||
handleCacheContent(ctx, msg)
|
handleCacheContent(ctx, msg)
|
||||||
when (val req = inProgressRequest) {
|
when (val req = inProgressRequest) {
|
||||||
is InProgressGetRequest -> {
|
is InProgressGetRequest -> {
|
||||||
|
// this.inProgressRequest = null
|
||||||
cache.get(processCacheKey(req.request.key, null, digestAlgorithm))?.let { value ->
|
cache.get(processCacheKey(req.request.key, null, digestAlgorithm))?.let { value ->
|
||||||
sendMessageAndFlush(ctx, CacheValueFoundResponse(req.request.key, value.metadata))
|
sendMessageAndFlush(ctx, CacheValueFoundResponse(req.request.key, value.metadata))
|
||||||
if (compressionEnabled) {
|
if (compressionEnabled) {
|
||||||
val buf = ctx.alloc().heapBuffer()
|
val buf = ctx.alloc().heapBuffer()
|
||||||
InflaterOutputStream(ByteBufOutputStream(buf)).use {
|
InflaterOutputStream(ByteBufOutputStream(buf)).use {
|
||||||
value.content.readBytes(it, value.content.readableBytes())
|
it.write(value.content)
|
||||||
value.content.release()
|
|
||||||
buf.retain()
|
buf.retain()
|
||||||
}
|
}
|
||||||
sendMessage(ctx, LastCacheContent(buf))
|
sendMessage(ctx, LastCacheContent(buf))
|
||||||
} else {
|
} else {
|
||||||
sendMessage(ctx, LastCacheContent(value.content))
|
val buf = ctx.alloc().heapBuffer()
|
||||||
|
ByteBufOutputStream(buf).use {
|
||||||
|
it.write(value.content)
|
||||||
|
buf.retain()
|
||||||
|
}
|
||||||
|
sendMessage(ctx, LastCacheContent(buf))
|
||||||
}
|
}
|
||||||
} ?: sendMessage(ctx, CacheValueNotFoundResponse(req.request.key))
|
} ?: sendMessage(ctx, CacheValueNotFoundResponse(req.request.key))
|
||||||
}
|
}
|
||||||
@@ -132,8 +139,11 @@ class InMemoryCacheHandler(
|
|||||||
val buf = req.buf
|
val buf = req.buf
|
||||||
buf.retain()
|
buf.retain()
|
||||||
req.close()
|
req.close()
|
||||||
|
|
||||||
|
val bytes = ByteArray(buf.readableBytes()).also(buf::readBytes)
|
||||||
|
buf.release()
|
||||||
val cacheKey = processCacheKey(req.request.key, null, digestAlgorithm)
|
val cacheKey = processCacheKey(req.request.key, null, digestAlgorithm)
|
||||||
cache.put(cacheKey, CacheEntry(req.request.metadata, buf))
|
cache.put(cacheKey, CacheEntry(req.request.metadata, bytes))
|
||||||
sendMessageAndFlush(ctx, CachePutResponse(req.request.key))
|
sendMessageAndFlush(ctx, CachePutResponse(req.request.key))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user