Compare commits

...

7 Commits

Author SHA1 Message Date
ce8e93f9d5 updated to netty 4.2.6 and Gradle 9.1.0
All checks were successful
CI / build (push) Successful in 5m51s
2025-09-30 21:40:03 +08:00
94021d94c3 updated Netty to 4.2.4
All checks were successful
CI / build (push) Successful in 3m2s
2025-08-15 10:44:38 +08:00
b3c6f29c0f updated library dependencies
All checks were successful
CI / build (push) Successful in 3m44s
2025-07-29 13:15:42 +08:00
ce7e5bb4a0 added documentation 2025-06-18 09:59:48 +08:00
aeae98d9eb resolved race condition hendling pipelined requests
All checks were successful
CI / build (push) Successful in 2m2s
2025-06-17 23:06:04 +08:00
6cba4d24bb resolved race condition in the client for response lifetime
All checks were successful
CI / build (push) Successful in 2m10s
improved memory usage of the in-memory cache backend
2025-06-17 21:40:48 +08:00
52a1b4c200 moved builds to woryzen 2025-06-13 20:52:27 +08:00
15 changed files with 107 additions and 50 deletions

View File

@@ -5,7 +5,7 @@ on:
- 'dev' - 'dev'
jobs: jobs:
build: build:
runs-on: hostinger runs-on: woryzen
steps: steps:
- name: Checkout sources - name: Checkout sources
uses: actions/checkout@v4 uses: actions/checkout@v4

View File

@@ -46,6 +46,7 @@ allprojects { subproject ->
testImplementation catalog.junit.jupiter.api testImplementation catalog.junit.jupiter.api
testImplementation catalog.junit.jupiter.params testImplementation catalog.junit.jupiter.params
testRuntimeOnly catalog.junit.jupiter.engine testRuntimeOnly catalog.junit.jupiter.engine
testRuntimeOnly catalog.junit.platform.launcher
} }
test { test {

View File

@@ -2,9 +2,9 @@ org.gradle.configuration-cache=false
org.gradle.parallel=true org.gradle.parallel=true
org.gradle.caching=true org.gradle.caching=true
rbcs.version = 0.3.1 rbcs.version = 0.3.4
lys.version = 2025.06.10 lys.version = 2025.09.30
gitea.maven.url = https://gitea.woggioni.net/api/packages/woggioni/maven gitea.maven.url = https://gitea.woggioni.net/api/packages/woggioni/maven
docker.registry.url=gitea.woggioni.net docker.registry.url=gitea.woggioni.net

Binary file not shown.

View File

@@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.14.2-bin.zip distributionUrl=https\://services.gradle.org/distributions/gradle-9.1.0-bin.zip
networkTimeout=10000 networkTimeout=10000
validateDistributionUrl=true validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME zipStoreBase=GRADLE_USER_HOME

6
gradlew vendored
View File

@@ -114,7 +114,7 @@ case "$( uname )" in #(
NONSTOP* ) nonstop=true ;; NONSTOP* ) nonstop=true ;;
esac esac
CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar CLASSPATH="\\\"\\\""
# Determine the Java command to use to start the JVM. # Determine the Java command to use to start the JVM.
@@ -205,7 +205,7 @@ fi
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
# Collect all arguments for the java command: # Collect all arguments for the java command:
# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments, # * DEFAULT_JVM_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments,
# and any embedded shellness will be escaped. # and any embedded shellness will be escaped.
# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be # * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be
# treated as '${Hostname}' itself on the command line. # treated as '${Hostname}' itself on the command line.
@@ -213,7 +213,7 @@ DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
set -- \ set -- \
"-Dorg.gradle.appname=$APP_BASE_NAME" \ "-Dorg.gradle.appname=$APP_BASE_NAME" \
-classpath "$CLASSPATH" \ -classpath "$CLASSPATH" \
org.gradle.wrapper.GradleWrapperMain \ -jar "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" \
"$@" "$@"
# Stop when "xargs" is not available. # Stop when "xargs" is not available.

4
gradlew.bat vendored
View File

@@ -70,11 +70,11 @@ goto fail
:execute :execute
@rem Setup the command line @rem Setup the command line
set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar set CLASSPATH=
@rem Execute Gradle @rem Execute Gradle
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* "%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" -jar "%APP_HOME%\gradle\wrapper\gradle-wrapper.jar" %*
:end :end
@rem End local scope for the variables with windows NT shell @rem End local scope for the variables with windows NT shell

View File

@@ -254,19 +254,25 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
fun get(key: String): CompletableFuture<ByteArray?> { fun get(key: String): CompletableFuture<ByteArray?> {
return executeWithRetry { return executeWithRetry {
sendRequest(profile.serverURI.resolve(key), HttpMethod.GET, null) sendRequest(profile.serverURI.resolve(key), HttpMethod.GET, null)
}.thenApply { }.thenApply { response ->
val status = it.status() val status = response.status()
if (it.status() == HttpResponseStatus.NOT_FOUND) { if (response.status() == HttpResponseStatus.NOT_FOUND) {
response.release()
null null
} else if (it.status() != HttpResponseStatus.OK) { } else if (response.status() != HttpResponseStatus.OK) {
response.release()
throw HttpException(status) throw HttpException(status)
} else { } else {
it.content() response.content().also {
it.retain()
response.release()
}
} }
}.thenApply { maybeByteBuf -> }.thenApply { maybeByteBuf ->
maybeByteBuf?.let { maybeByteBuf?.let { buf ->
val result = ByteArray(it.readableBytes()) val result = ByteArray(buf.readableBytes())
it.getBytes(0, result) buf.getBytes(0, result)
buf.release()
result result
} }
} }
@@ -318,7 +324,7 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
response: FullHttpResponse response: FullHttpResponse
) { ) {
pipeline.remove(this) pipeline.remove(this)
responseFuture.complete(response) responseFuture.complete(response.retainedDuplicate())
if (!profile.connection.requestPipelining) { if (!profile.connection.requestPipelining) {
pool.release(channel) pool.release(channel)
} }

View File

@@ -12,6 +12,7 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments import org.junit.jupiter.params.provider.Arguments
import org.junit.jupiter.params.provider.ArgumentsProvider import org.junit.jupiter.params.provider.ArgumentsProvider
import org.junit.jupiter.params.provider.ArgumentsSource import org.junit.jupiter.params.provider.ArgumentsSource
import org.junit.jupiter.params.support.ParameterDeclarations
class RetryTest { class RetryTest {
@@ -23,7 +24,10 @@ class RetryTest {
) )
class TestArguments : ArgumentsProvider { class TestArguments : ArgumentsProvider {
override fun provideArguments(context: ExtensionContext): Stream<out Arguments> { override fun provideArguments(
parameters: ParameterDeclarations,
context: ExtensionContext
): Stream<out Arguments> {
return Stream.of( return Stream.of(
TestArgs( TestArgs(
seed = 101325, seed = 101325,

View File

@@ -19,6 +19,8 @@ to `memcacheCacheType`.
The plugins currently supports the following configuration attributes: The plugins currently supports the following configuration attributes:
- `max-age`: the amount of time cache entries will be retained on memcache - `max-age`: the amount of time cache entries will be retained on memcache
- `key-prefix`: a string that will be prepended to all the keys inserted in memcache,
useful in case the caching backend is shared with other applications
- `digest`: digest algorithm to use on the key before submission - `digest`: digest algorithm to use on the key before submission
to memcache (optional, no digest is applied if omitted) to memcache (optional, no digest is applied if omitted)
- `compression`: compression algorithm to apply to cache values before, - `compression`: compression algorithm to apply to cache values before,
@@ -35,6 +37,7 @@ The plugins currently supports the following configuration attributes:
... ...
<cache xs:type="rbcs-memcache:memcacheCacheType" <cache xs:type="rbcs-memcache:memcacheCacheType"
max-age="P7D" max-age="P7D"
key-prefix="rbcs-"
digest="SHA-256" digest="SHA-256"
compression-mode="deflate" compression-mode="deflate"
compression-level="6"> compression-level="6">

View File

@@ -345,7 +345,7 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
maxChunkSize = cfg.connection.chunkSize maxChunkSize = cfg.connection.chunkSize
} }
pipeline.addLast(HttpServerCodec(httpDecoderConfig)) pipeline.addLast(HttpServerCodec(httpDecoderConfig))
pipeline.addLast(ReadTriggerDuplexHandler.NAME, ReadTriggerDuplexHandler) pipeline.addLast(ReadTriggerDuplexHandler.NAME, ReadTriggerDuplexHandler())
pipeline.addLast(MaxRequestSizeHandler.NAME, MaxRequestSizeHandler(cfg.connection.maxRequestSize)) pipeline.addLast(MaxRequestSizeHandler.NAME, MaxRequestSizeHandler(cfg.connection.maxRequestSize))
pipeline.addLast(HttpChunkContentCompressor(1024)) pipeline.addLast(HttpChunkContentCompressor(1024))
pipeline.addLast(ChunkedWriteHandler()) pipeline.addLast(ChunkedWriteHandler())

View File

@@ -1,6 +1,5 @@
package net.woggioni.rbcs.server.cache package net.woggioni.rbcs.server.cache
import io.netty.buffer.ByteBuf
import java.time.Duration import java.time.Duration
import java.time.Instant import java.time.Instant
import java.util.PriorityQueue import java.util.PriorityQueue
@@ -22,7 +21,7 @@ private class CacheKey(private val value: ByteArray) {
class CacheEntry( class CacheEntry(
val metadata: CacheValueMetadata, val metadata: CacheValueMetadata,
val content: ByteBuf val content: ByteArray
) )
class InMemoryCache( class InMemoryCache(
@@ -66,8 +65,6 @@ class InMemoryCache(
val removed = map.remove(el.key, value) val removed = map.remove(el.key, value)
if (removed) { if (removed) {
updateSizeAfterRemoval(value.content) updateSizeAfterRemoval(value.content)
//Decrease the reference count for map
value.content.release()
} }
} else { } else {
removalQueue.offer(el) removalQueue.offer(el)
@@ -75,9 +72,6 @@ class InMemoryCache(
cond.await(interval.toMillis(), TimeUnit.MILLISECONDS) cond.await(interval.toMillis(), TimeUnit.MILLISECONDS)
} }
} }
map.forEach {
it.value.content.release()
}
map.clear() map.clear()
} }
complete(null) complete(null)
@@ -95,15 +89,13 @@ class InMemoryCache(
val removed = map.remove(el.key, value) val removed = map.remove(el.key, value)
if (removed) { if (removed) {
val newSize = updateSizeAfterRemoval(value.content) val newSize = updateSizeAfterRemoval(value.content)
//Decrease the reference count for map
value.content.release()
return newSize return newSize
} }
} }
} }
private fun updateSizeAfterRemoval(removed: ByteBuf): Long { private fun updateSizeAfterRemoval(removed: ByteArray): Long {
mapSize -= removed.readableBytes() mapSize -= removed.size
return mapSize return mapSize
} }
@@ -117,7 +109,7 @@ class InMemoryCache(
fun get(key: ByteArray) = lock.readLock().withLock { fun get(key: ByteArray) = lock.readLock().withLock {
map[CacheKey(key)]?.run { map[CacheKey(key)]?.run {
CacheEntry(metadata, content.retainedDuplicate()) CacheEntry(metadata, content)
} }
} }
@@ -127,12 +119,8 @@ class InMemoryCache(
) { ) {
val cacheKey = CacheKey(key) val cacheKey = CacheKey(key)
lock.writeLock().withLock { lock.writeLock().withLock {
val oldSize = map.put(cacheKey, value)?.let { old -> val oldSize = map.put(cacheKey, value)?.content?.size ?: 0
val result = old.content.readableBytes() val delta = value.content.size - oldSize
old.content.release()
result
} ?: 0
val delta = value.content.readableBytes() - oldSize
mapSize += delta mapSize += delta
removalQueue.offer(RemovalQueueElement(cacheKey, value, Instant.now().plus(maxAge))) removalQueue.offer(RemovalQueueElement(cacheKey, value, Instant.now().plus(maxAge)))
while (mapSize > maxSize) { while (mapSize > maxSize) {
@@ -140,4 +128,4 @@ class InMemoryCache(
} }
} }
} }
} }

View File

@@ -2,6 +2,8 @@ package net.woggioni.rbcs.server.cache
import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelHandlerContext
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.util.zip.Deflater import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream import java.util.zip.DeflaterOutputStream
import java.util.zip.InflaterOutputStream import java.util.zip.InflaterOutputStream
@@ -111,18 +113,23 @@ class InMemoryCacheHandler(
handleCacheContent(ctx, msg) handleCacheContent(ctx, msg)
when (val req = inProgressRequest) { when (val req = inProgressRequest) {
is InProgressGetRequest -> { is InProgressGetRequest -> {
// this.inProgressRequest = null
cache.get(processCacheKey(req.request.key, null, digestAlgorithm))?.let { value -> cache.get(processCacheKey(req.request.key, null, digestAlgorithm))?.let { value ->
sendMessageAndFlush(ctx, CacheValueFoundResponse(req.request.key, value.metadata)) sendMessageAndFlush(ctx, CacheValueFoundResponse(req.request.key, value.metadata))
if (compressionEnabled) { if (compressionEnabled) {
val buf = ctx.alloc().heapBuffer() val buf = ctx.alloc().heapBuffer()
InflaterOutputStream(ByteBufOutputStream(buf)).use { InflaterOutputStream(ByteBufOutputStream(buf)).use {
value.content.readBytes(it, value.content.readableBytes()) it.write(value.content)
value.content.release()
buf.retain() buf.retain()
} }
sendMessage(ctx, LastCacheContent(buf)) sendMessage(ctx, LastCacheContent(buf))
} else { } else {
sendMessage(ctx, LastCacheContent(value.content)) val buf = ctx.alloc().heapBuffer()
ByteBufOutputStream(buf).use {
it.write(value.content)
buf.retain()
}
sendMessage(ctx, LastCacheContent(buf))
} }
} ?: sendMessage(ctx, CacheValueNotFoundResponse(req.request.key)) } ?: sendMessage(ctx, CacheValueNotFoundResponse(req.request.key))
} }
@@ -132,8 +139,11 @@ class InMemoryCacheHandler(
val buf = req.buf val buf = req.buf
buf.retain() buf.retain()
req.close() req.close()
val bytes = ByteArray(buf.readableBytes()).also(buf::readBytes)
buf.release()
val cacheKey = processCacheKey(req.request.key, null, digestAlgorithm) val cacheKey = processCacheKey(req.request.key, null, digestAlgorithm)
cache.put(cacheKey, CacheEntry(req.request.metadata, buf)) cache.put(cacheKey, CacheEntry(req.request.metadata, bytes))
sendMessageAndFlush(ctx, CachePutResponse(req.request.key)) sendMessageAndFlush(ctx, CachePutResponse(req.request.key))
} }
} }

View File

@@ -1,23 +1,43 @@
package net.woggioni.rbcs.server.handler package net.woggioni.rbcs.server.handler
import io.netty.buffer.ByteBufHolder
import io.netty.channel.ChannelDuplexHandler import io.netty.channel.ChannelDuplexHandler
import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelPromise import io.netty.channel.ChannelPromise
import io.netty.handler.codec.http.LastHttpContent import io.netty.handler.codec.http.LastHttpContent
import net.woggioni.rbcs.common.createLogger
@Sharable class ReadTriggerDuplexHandler : ChannelDuplexHandler() {
object ReadTriggerDuplexHandler : ChannelDuplexHandler() { companion object {
val NAME = ReadTriggerDuplexHandler::class.java.name val NAME = ReadTriggerDuplexHandler::class.java.name
private val log = createLogger<ReadTriggerDuplexHandler>()
}
private var inFlight = 0
private val messageBuffer = ArrayDeque<Any>()
override fun handlerAdded(ctx: ChannelHandlerContext) { override fun handlerAdded(ctx: ChannelHandlerContext) {
ctx.read() ctx.read()
} }
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) { override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
super.channelRead(ctx, msg) if(inFlight > 0) {
if(msg !is LastHttpContent) { messageBuffer.addLast(msg)
} else {
super.channelRead(ctx, msg)
if(msg !is LastHttpContent) {
invokeRead(ctx)
} else {
inFlight += 1
}
}
}
private fun invokeRead(ctx : ChannelHandlerContext) {
if(messageBuffer.isEmpty()) {
ctx.read() ctx.read()
} else {
this.channelRead(ctx, messageBuffer.removeFirst())
} }
} }
@@ -28,7 +48,18 @@ object ReadTriggerDuplexHandler : ChannelDuplexHandler() {
) { ) {
super.write(ctx, msg, promise) super.write(ctx, msg, promise)
if(msg is LastHttpContent) { if(msg is LastHttpContent) {
ctx.read() inFlight -= 1
invokeRead(ctx)
} }
} }
override fun channelInactive(ctx: ChannelHandlerContext) {
while(messageBuffer.isNotEmpty()) {
val msg = messageBuffer.removeFirst()
if(msg is ByteBufHolder) {
msg.release()
}
}
super.channelInactive(ctx)
}
} }

View File

@@ -1,5 +1,16 @@
pluginManagement { pluginManagement {
repositories { repositories {
// mavenLocal {
// content {
// includeGroup 'net.woggioni.gradle'
// includeGroup 'net.woggioni.gradle.jpms-check'
// includeGroup 'net.woggioni.gradle.lombok'
// includeGroup 'net.woggioni.gradle.jdeps'
// includeGroup 'net.woggioni.gradle.sambal'
// includeGroup 'net.woggioni.gradle.graalvm.jlink'
// includeGroup 'net.woggioni.gradle.graalvm.native-image'
// }
// }
maven { maven {
url = getProperty('gitea.maven.url') url = getProperty('gitea.maven.url')
} }
@@ -19,6 +30,8 @@ dependencyResolutionManagement {
versionCatalogs { versionCatalogs {
catalog { catalog {
from group: 'com.lys', name: 'lys-catalog', version: getProperty('lys.version') from group: 'com.lys', name: 'lys-catalog', version: getProperty('lys.version')
// version('my-gradle-plugins', '2025.04.16')
// version('junit', '5.12.0')
} }
} }
} }
@@ -33,3 +46,4 @@ include 'rbcs-client'
include 'rbcs-server' include 'rbcs-server'
include 'rbcs-servlet' include 'rbcs-servlet'
include 'docker' include 'docker'
//include 'bug'