Compare commits

...

3 Commits

Author SHA1 Message Date
aeae98d9eb resolved race condition hendling pipelined requests
All checks were successful
CI / build (push) Successful in 2m2s
2025-06-17 23:06:04 +08:00
6cba4d24bb resolved race condition in the client for response lifetime
All checks were successful
CI / build (push) Successful in 2m10s
improved memory usage of the in-memory cache backend
2025-06-17 21:40:48 +08:00
52a1b4c200 moved builds to woryzen 2025-06-13 20:52:27 +08:00
7 changed files with 76 additions and 41 deletions

View File

@@ -5,7 +5,7 @@ on:
- 'dev'
jobs:
build:
runs-on: hostinger
runs-on: woryzen
steps:
- name: Checkout sources
uses: actions/checkout@v4

View File

@@ -5,7 +5,7 @@ on:
- '*'
jobs:
build:
runs-on: hostinger
runs-on: woryzen
steps:
- name: Checkout sources
uses: actions/checkout@v4

View File

@@ -254,19 +254,25 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
fun get(key: String): CompletableFuture<ByteArray?> {
return executeWithRetry {
sendRequest(profile.serverURI.resolve(key), HttpMethod.GET, null)
}.thenApply {
val status = it.status()
if (it.status() == HttpResponseStatus.NOT_FOUND) {
}.thenApply { response ->
val status = response.status()
if (response.status() == HttpResponseStatus.NOT_FOUND) {
response.release()
null
} else if (it.status() != HttpResponseStatus.OK) {
} else if (response.status() != HttpResponseStatus.OK) {
response.release()
throw HttpException(status)
} else {
it.content()
response.content().also {
it.retain()
response.release()
}
}
}.thenApply { maybeByteBuf ->
maybeByteBuf?.let {
val result = ByteArray(it.readableBytes())
it.getBytes(0, result)
maybeByteBuf?.let { buf ->
val result = ByteArray(buf.readableBytes())
buf.getBytes(0, result)
buf.release()
result
}
}
@@ -318,7 +324,7 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
response: FullHttpResponse
) {
pipeline.remove(this)
responseFuture.complete(response)
responseFuture.complete(response.retainedDuplicate())
if (!profile.connection.requestPipelining) {
pool.release(channel)
}

View File

@@ -345,7 +345,7 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
maxChunkSize = cfg.connection.chunkSize
}
pipeline.addLast(HttpServerCodec(httpDecoderConfig))
pipeline.addLast(ReadTriggerDuplexHandler.NAME, ReadTriggerDuplexHandler)
pipeline.addLast(ReadTriggerDuplexHandler.NAME, ReadTriggerDuplexHandler())
pipeline.addLast(MaxRequestSizeHandler.NAME, MaxRequestSizeHandler(cfg.connection.maxRequestSize))
pipeline.addLast(HttpChunkContentCompressor(1024))
pipeline.addLast(ChunkedWriteHandler())

View File

@@ -1,6 +1,5 @@
package net.woggioni.rbcs.server.cache
import io.netty.buffer.ByteBuf
import java.time.Duration
import java.time.Instant
import java.util.PriorityQueue
@@ -22,7 +21,7 @@ private class CacheKey(private val value: ByteArray) {
class CacheEntry(
val metadata: CacheValueMetadata,
val content: ByteBuf
val content: ByteArray
)
class InMemoryCache(
@@ -66,8 +65,6 @@ class InMemoryCache(
val removed = map.remove(el.key, value)
if (removed) {
updateSizeAfterRemoval(value.content)
//Decrease the reference count for map
value.content.release()
}
} else {
removalQueue.offer(el)
@@ -75,9 +72,6 @@ class InMemoryCache(
cond.await(interval.toMillis(), TimeUnit.MILLISECONDS)
}
}
map.forEach {
it.value.content.release()
}
map.clear()
}
complete(null)
@@ -95,15 +89,13 @@ class InMemoryCache(
val removed = map.remove(el.key, value)
if (removed) {
val newSize = updateSizeAfterRemoval(value.content)
//Decrease the reference count for map
value.content.release()
return newSize
}
}
}
private fun updateSizeAfterRemoval(removed: ByteBuf): Long {
mapSize -= removed.readableBytes()
private fun updateSizeAfterRemoval(removed: ByteArray): Long {
mapSize -= removed.size
return mapSize
}
@@ -117,7 +109,7 @@ class InMemoryCache(
fun get(key: ByteArray) = lock.readLock().withLock {
map[CacheKey(key)]?.run {
CacheEntry(metadata, content.retainedDuplicate())
CacheEntry(metadata, content)
}
}
@@ -127,12 +119,8 @@ class InMemoryCache(
) {
val cacheKey = CacheKey(key)
lock.writeLock().withLock {
val oldSize = map.put(cacheKey, value)?.let { old ->
val result = old.content.readableBytes()
old.content.release()
result
} ?: 0
val delta = value.content.readableBytes() - oldSize
val oldSize = map.put(cacheKey, value)?.content?.size ?: 0
val delta = value.content.size - oldSize
mapSize += delta
removalQueue.offer(RemovalQueueElement(cacheKey, value, Instant.now().plus(maxAge)))
while (mapSize > maxSize) {

View File

@@ -2,6 +2,8 @@ package net.woggioni.rbcs.server.cache
import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelHandlerContext
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.InflaterOutputStream
@@ -111,18 +113,23 @@ class InMemoryCacheHandler(
handleCacheContent(ctx, msg)
when (val req = inProgressRequest) {
is InProgressGetRequest -> {
// this.inProgressRequest = null
cache.get(processCacheKey(req.request.key, null, digestAlgorithm))?.let { value ->
sendMessageAndFlush(ctx, CacheValueFoundResponse(req.request.key, value.metadata))
if (compressionEnabled) {
val buf = ctx.alloc().heapBuffer()
InflaterOutputStream(ByteBufOutputStream(buf)).use {
value.content.readBytes(it, value.content.readableBytes())
value.content.release()
it.write(value.content)
buf.retain()
}
sendMessage(ctx, LastCacheContent(buf))
} else {
sendMessage(ctx, LastCacheContent(value.content))
val buf = ctx.alloc().heapBuffer()
ByteBufOutputStream(buf).use {
it.write(value.content)
buf.retain()
}
sendMessage(ctx, LastCacheContent(buf))
}
} ?: sendMessage(ctx, CacheValueNotFoundResponse(req.request.key))
}
@@ -132,8 +139,11 @@ class InMemoryCacheHandler(
val buf = req.buf
buf.retain()
req.close()
val bytes = ByteArray(buf.readableBytes()).also(buf::readBytes)
buf.release()
val cacheKey = processCacheKey(req.request.key, null, digestAlgorithm)
cache.put(cacheKey, CacheEntry(req.request.metadata, buf))
cache.put(cacheKey, CacheEntry(req.request.metadata, bytes))
sendMessageAndFlush(ctx, CachePutResponse(req.request.key))
}
}

View File

@@ -1,23 +1,43 @@
package net.woggioni.rbcs.server.handler
import io.netty.buffer.ByteBufHolder
import io.netty.channel.ChannelDuplexHandler
import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelPromise
import io.netty.handler.codec.http.LastHttpContent
import net.woggioni.rbcs.common.createLogger
@Sharable
object ReadTriggerDuplexHandler : ChannelDuplexHandler() {
val NAME = ReadTriggerDuplexHandler::class.java.name
class ReadTriggerDuplexHandler : ChannelDuplexHandler() {
companion object {
val NAME = ReadTriggerDuplexHandler::class.java.name
private val log = createLogger<ReadTriggerDuplexHandler>()
}
private var inFlight = 0
private val messageBuffer = ArrayDeque<Any>()
override fun handlerAdded(ctx: ChannelHandlerContext) {
ctx.read()
}
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
super.channelRead(ctx, msg)
if(msg !is LastHttpContent) {
if(inFlight > 0) {
messageBuffer.addLast(msg)
} else {
super.channelRead(ctx, msg)
if(msg !is LastHttpContent) {
invokeRead(ctx)
} else {
inFlight += 1
}
}
}
private fun invokeRead(ctx : ChannelHandlerContext) {
if(messageBuffer.isEmpty()) {
ctx.read()
} else {
this.channelRead(ctx, messageBuffer.removeFirst())
}
}
@@ -28,7 +48,18 @@ object ReadTriggerDuplexHandler : ChannelDuplexHandler() {
) {
super.write(ctx, msg, promise)
if(msg is LastHttpContent) {
ctx.read()
inFlight -= 1
invokeRead(ctx)
}
}
override fun channelInactive(ctx: ChannelHandlerContext) {
while(messageBuffer.isNotEmpty()) {
val msg = messageBuffer.removeFirst()
if(msg is ByteBufHolder) {
msg.release()
}
}
super.channelInactive(ctx)
}
}