made chunk size a global shared parameter between the server and the cache backends

This commit is contained in:
2025-03-06 22:08:19 +08:00
parent 8b639fc0b3
commit fc298de548
23 changed files with 59 additions and 68 deletions

View File

@@ -24,6 +24,7 @@ Configures connection handling parameters.
- `read-idle-timeout` (optional, default: PT60S): Connection timeout when no reads
- `write-idle-timeout` (optional, default: PT60S): Connection timeout when no writes
- `max-request-size` (optional, default: 0x4000000): Maximum allowed request body size
- `chunk-size` (default: 0x10000): Maximum socket write size
#### `<event-executor>`
Configures event execution settings.
@@ -44,7 +45,6 @@ A simple storage backend that uses an hash map to store data in memory
- `digest` (default: MD5): Key hashing algorithm
- `enable-compression` (default: true): Enable deflate compression
- `compression-level` (default: -1): Compression level (-1 to 9)
- `chunk-size` (default: 0x10000): Maximum socket write size
##### FileSystem Cache
@@ -56,7 +56,6 @@ A storage backend that stores data in a folder on the disk
- `digest` (default: MD5): Key hashing algorithm
- `enable-compression` (default: true): Enable deflate compression
- `compression-level` (default: -1): Compression level
- `chunk-size` (default: 0x10000): Maximum in-memory cache value size
#### `<authorization>`
Configures user and group-based access control.
@@ -134,8 +133,7 @@ Configures TLS encryption.
idle-timeout="PT10S"
read-idle-timeout="PT20S"
write-idle-timeout="PT20S"
read-timeout="PT5S"
write-timeout="PT5S"/>
chunk-size="0x1000"/>
<event-executor use-virtual-threads="true"/>
<cache xs:type="rbcs:inMemoryCacheType" max-age="P7D" enable-compression="false" max-size="0x10000000" />
@@ -147,7 +145,7 @@ Configures TLS encryption.
<!-- uncomment this to use memcache as the storage backend, also make sure you have
the memcache plugin installed in the `plugins` directory if you are using running
the jar version of RBCS
<cache xs:type="rbcs-memcache:memcacheCacheType" max-age="P7D" chunk-size="0x1000" digest="MD5">
<cache xs:type="rbcs-memcache:memcacheCacheType" max-age="P7D" digest="MD5">
<server host="127.0.0.1" port="11211" max-connections="256"/>
</cache>
-->

View File

@@ -8,6 +8,7 @@ import io.netty.channel.socket.SocketChannel;
public interface CacheHandlerFactory extends AsyncCloseable {
ChannelHandler newHandler(
Configuration configuration,
EventLoopGroup eventLoopGroup,
ChannelFactory<SocketChannel> socketChannelFactory,
ChannelFactory<DatagramChannel> datagramChannelFactory

View File

@@ -39,6 +39,7 @@ public class Configuration {
Duration readIdleTimeout;
Duration writeIdleTimeout;
int maxRequestSize;
int chunkSize;
}
@Value

View File

@@ -22,7 +22,7 @@ The plugins currently supports the following configuration attributes:
- `digest`: digest algorithm to use on the key before submission
to memcache (optional, no digest is applied if omitted)
- `compression`: compression algorithm to apply to cache values before,
currently only `deflate` is supported (optionla, if omitted compression is disabled)
currently only `deflate` is supported (optional, if omitted compression is disabled)
- `compression-level`: compression level to use, deflate supports compression levels from 1 to 9,
where 1 is for fast compression at the expense of speed (optional, 6 is used if omitted)
```xml
@@ -37,8 +37,7 @@ The plugins currently supports the following configuration attributes:
max-age="P7D"
digest="SHA-256"
compression-mode="deflate"
compression-level="6"
chunk-size="0x10000">
compression-level="6">
<server host="127.0.0.1" port="11211" max-connections="256"/>
<server host="127.0.0.1" port="11212" max-connections="256"/>
</cache>

View File

@@ -23,7 +23,6 @@ data class MemcacheCacheConfiguration(
val digestAlgorithm: String? = null,
val compressionMode: CompressionMode? = null,
val compressionLevel: Int,
val chunkSize: Int
) : Configuration.Cache {
companion object {
@@ -48,14 +47,15 @@ data class MemcacheCacheConfiguration(
private val connectionPoolMap = ConcurrentHashMap<HostAndPort, FixedChannelPool>()
override fun newHandler(
cfg : Configuration,
eventLoop: EventLoopGroup,
socketChannelFactory: ChannelFactory<SocketChannel>,
datagramChannelFactory: ChannelFactory<DatagramChannel>
datagramChannelFactory: ChannelFactory<DatagramChannel>,
): ChannelHandler {
return MemcacheCacheHandler(
MemcacheClient(
this@MemcacheCacheConfiguration.servers,
chunkSize,
cfg.connection.chunkSize,
eventLoop,
socketChannelFactory,
connectionPoolMap
@@ -63,7 +63,7 @@ data class MemcacheCacheConfiguration(
digestAlgorithm,
compressionMode != null,
compressionLevel,
chunkSize,
cfg.connection.chunkSize,
maxAge
)
}

View File

@@ -28,9 +28,6 @@ class MemcacheCacheProvider : CacheProvider<MemcacheCacheConfiguration> {
val maxAge = el.renderAttribute("max-age")
?.let(Duration::parse)
?: Duration.ofDays(1)
val chunkSize = el.renderAttribute("chunk-size")
?.let(Integer::decode)
?: 0x10000
val compressionLevel = el.renderAttribute("compression-level")
?.let(Integer::decode)
?: -1
@@ -63,8 +60,7 @@ class MemcacheCacheProvider : CacheProvider<MemcacheCacheConfiguration> {
maxAge,
digestAlgorithm,
compressionMode,
compressionLevel,
chunkSize
compressionLevel
)
}
@@ -84,7 +80,6 @@ class MemcacheCacheProvider : CacheProvider<MemcacheCacheConfiguration> {
}
}
attr("max-age", maxAge.toString())
attr("chunk-size", chunkSize.toString())
digestAlgorithm?.let { digestAlgorithm ->
attr("digest", digestAlgorithm)
}

View File

@@ -24,6 +24,7 @@ module net.woggioni.rbcs.server {
opens net.woggioni.rbcs.server;
opens net.woggioni.rbcs.server.schema;
uses CacheProvider;
provides CacheProvider with FileSystemCacheProvider, InMemoryCacheProvider;
}

View File

@@ -21,6 +21,7 @@ import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.compression.CompressionOptions
import io.netty.handler.codec.http.DefaultHttpContent
import io.netty.handler.codec.http.HttpContentCompressor
import io.netty.handler.codec.http.HttpDecoderConfig
import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpRequest
import io.netty.handler.codec.http.HttpServerCodec
@@ -340,7 +341,10 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
sslContext?.newHandler(ch.alloc())?.also {
pipeline.addLast(SSL_HANDLER_NAME, it)
}
pipeline.addLast(HttpServerCodec())
val httpDecoderConfig = HttpDecoderConfig().apply {
maxChunkSize = cfg.connection.chunkSize
}
pipeline.addLast(HttpServerCodec(httpDecoderConfig))
pipeline.addLast(MaxRequestSizeHandler.NAME, MaxRequestSizeHandler(cfg.connection.maxRequestSize))
pipeline.addLast(HttpChunkContentCompressor(1024))
pipeline.addLast(ChunkedWriteHandler())
@@ -355,7 +359,7 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
}
pipeline.addLast(eventExecutorGroup, ServerHandler.NAME, serverHandler)
pipeline.addLast(cacheHandlerFactory.newHandler(ch.eventLoop(), channelFactory, datagramChannelFactory))
pipeline.addLast(cacheHandlerFactory.newHandler(cfg, ch.eventLoop(), channelFactory, datagramChannelFactory))
pipeline.addLast(TraceHandler)
pipeline.addLast(ExceptionHandler)
}

View File

@@ -17,7 +17,6 @@ data class FileSystemCacheConfiguration(
val digestAlgorithm : String?,
val compressionEnabled: Boolean,
val compressionLevel: Int,
val chunkSize: Int,
) : Configuration.Cache {
override fun materialize() = object : CacheHandlerFactory {
@@ -26,10 +25,11 @@ data class FileSystemCacheConfiguration(
override fun asyncClose() = cache.asyncClose()
override fun newHandler(
cfg : Configuration,
eventLoop: EventLoopGroup,
socketChannelFactory: ChannelFactory<SocketChannel>,
datagramChannelFactory: ChannelFactory<DatagramChannel>
) = FileSystemCacheHandler(cache, digestAlgorithm, compressionEnabled, compressionLevel, chunkSize)
) = FileSystemCacheHandler(cache, digestAlgorithm, compressionEnabled, compressionLevel, cfg.connection.chunkSize)
}
override fun getNamespaceURI() = RBCS.RBCS_NAMESPACE_URI

View File

@@ -31,9 +31,6 @@ class FileSystemCacheProvider : CacheProvider<FileSystemCacheConfiguration> {
?.let(String::toInt)
?: Deflater.DEFAULT_COMPRESSION
val digestAlgorithm = el.renderAttribute("digest")
val chunkSize = el.renderAttribute("chunk-size")
?.let(Integer::decode)
?: 0x10000
return FileSystemCacheConfiguration(
path,
@@ -41,7 +38,6 @@ class FileSystemCacheProvider : CacheProvider<FileSystemCacheConfiguration> {
digestAlgorithm,
enableCompression,
compressionLevel,
chunkSize
)
}
@@ -63,7 +59,6 @@ class FileSystemCacheProvider : CacheProvider<FileSystemCacheConfiguration> {
}?.let {
attr("compression-level", it.toString())
}
attr("chunk-size", chunkSize.toString())
}
result
}

View File

@@ -16,7 +16,6 @@ data class InMemoryCacheConfiguration(
val digestAlgorithm : String?,
val compressionEnabled: Boolean,
val compressionLevel: Int,
val chunkSize : Int
) : Configuration.Cache {
override fun materialize() = object : CacheHandlerFactory {
private val cache = InMemoryCache(maxAge, maxSize)
@@ -24,6 +23,7 @@ data class InMemoryCacheConfiguration(
override fun asyncClose() = cache.asyncClose()
override fun newHandler(
cfg : Configuration,
eventLoop: EventLoopGroup,
socketChannelFactory: ChannelFactory<SocketChannel>,
datagramChannelFactory: ChannelFactory<DatagramChannel>

View File

@@ -31,16 +31,12 @@ class InMemoryCacheProvider : CacheProvider<InMemoryCacheConfiguration> {
?.let(String::toInt)
?: Deflater.DEFAULT_COMPRESSION
val digestAlgorithm = el.renderAttribute("digest")
val chunkSize = el.renderAttribute("chunk-size")
?.let(Integer::decode)
?: 0x10000
return InMemoryCacheConfiguration(
maxAge,
maxSize,
digestAlgorithm,
enableCompression,
compressionLevel,
chunkSize
)
}
@@ -60,7 +56,6 @@ class InMemoryCacheProvider : CacheProvider<InMemoryCacheConfiguration> {
}?.let {
attr("compression-level", it.toString())
}
attr("chunk-size", chunkSize.toString())
}
result
}

View File

@@ -27,10 +27,11 @@ object Parser {
val root = document.documentElement
val anonymousUser = User("", null, emptySet(), null)
var connection: Configuration.Connection = Configuration.Connection(
Duration.of(30, ChronoUnit.SECONDS),
Duration.of(60, ChronoUnit.SECONDS),
Duration.of(30, ChronoUnit.SECONDS),
Duration.of(30, ChronoUnit.SECONDS),
67108864
Duration.of(60, ChronoUnit.SECONDS),
0x4000000,
0x10000
)
var eventExecutor: Configuration.EventExecutor = Configuration.EventExecutor(true)
var cache: Cache? = null
@@ -119,11 +120,14 @@ object Parser {
?.let(Duration::parse) ?: Duration.of(60, ChronoUnit.SECONDS)
val maxRequestSize = child.renderAttribute("max-request-size")
?.let(Integer::decode) ?: 0x4000000
val chunkSize = child.renderAttribute("chunk-size")
?.let(Integer::decode) ?: 0x10000
connection = Configuration.Connection(
idleTimeout,
readIdleTimeout,
writeIdleTimeout,
maxRequestSize
maxRequestSize,
chunkSize
)
}

View File

@@ -40,6 +40,7 @@ object Serializer {
attr("read-idle-timeout", connection.readIdleTimeout.toString())
attr("write-idle-timeout", connection.writeIdleTimeout.toString())
attr("max-request-size", connection.maxRequestSize.toString())
attr("chunk-size", connection.chunkSize.toString())
}
}
node("event-executor") {

View File

@@ -22,7 +22,7 @@ object CacheContentHandler : SimpleChannelInboundHandler<HttpContent>() {
}
}
override fun exceptionCaught(ctx: ChannelHandlerContext?, cause: Throwable?) {
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
super.exceptionCaught(ctx, cause)
}
}

View File

@@ -115,6 +115,14 @@
</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="chunk-size" type="rbcs:byteSizeType" default="0x10000">
<xs:annotation>
<xs:documentation>
Maximum byte size of socket write calls
(reduce it to reduce memory consumption, increase it for increased throughput)
</xs:documentation>
</xs:annotation>
</xs:attribute>
</xs:complexType>
<xs:complexType name="eventExecutorType">
@@ -175,13 +183,6 @@
</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="chunk-size" type="rbcs:byteSizeType" default="0x10000">
<xs:annotation>
<xs:documentation>
Maximum byte size of socket write calls
</xs:documentation>
</xs:annotation>
</xs:attribute>
</xs:extension>
</xs:complexContent>
</xs:complexType>
@@ -231,14 +232,6 @@
</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="chunk-size" type="rbcs:byteSizeType" default="0x10000">
<xs:annotation>
<xs:documentation>
Maximum byte size of a cache value that will be stored in memory
(reduce it to reduce memory consumption, increase it for increased throughput)
</xs:documentation>
</xs:annotation>
</xs:attribute>
</xs:extension>
</xs:complexContent>
</xs:complexType>

View File

@@ -41,7 +41,8 @@ abstract class AbstractBasicAuthServerTest : AbstractServerTest() {
Duration.of(60, ChronoUnit.SECONDS),
Duration.of(30, ChronoUnit.SECONDS),
Duration.of(30, ChronoUnit.SECONDS),
0x1000
0x1000,
0x10000
),
users.asSequence().map { it.name to it}.toMap(),
sequenceOf(writersGroup, readersGroup).map { it.name to it}.toMap(),
@@ -50,8 +51,7 @@ abstract class AbstractBasicAuthServerTest : AbstractServerTest() {
maxAge = Duration.ofSeconds(3600 * 24),
digestAlgorithm = "MD5",
compressionLevel = Deflater.DEFAULT_COMPRESSION,
compressionEnabled = false,
chunkSize = 0x1000
compressionEnabled = false
),
Configuration.BasicAuthentication(),
null,

View File

@@ -147,7 +147,8 @@ abstract class AbstractTlsServerTest : AbstractServerTest() {
Duration.of(60, ChronoUnit.SECONDS),
Duration.of(30, ChronoUnit.SECONDS),
Duration.of(30, ChronoUnit.SECONDS),
0x1000
0x1000,
0x10000
),
users.asSequence().map { it.name to it }.toMap(),
sequenceOf(writersGroup, readersGroup).map { it.name to it }.toMap(),
@@ -156,7 +157,6 @@ abstract class AbstractTlsServerTest : AbstractServerTest() {
compressionEnabled = false,
compressionLevel = Deflater.DEFAULT_COMPRESSION,
digestAlgorithm = "MD5",
chunkSize = 0x1000
),
// InMemoryCacheConfiguration(
// maxAge = Duration.ofSeconds(3600 * 24),

View File

@@ -41,7 +41,8 @@ class NoAuthServerTest : AbstractServerTest() {
Duration.of(60, ChronoUnit.SECONDS),
Duration.of(30, ChronoUnit.SECONDS),
Duration.of(30, ChronoUnit.SECONDS),
0x1000
0x1000,
0x10000
),
emptyMap(),
emptyMap(),
@@ -51,7 +52,6 @@ class NoAuthServerTest : AbstractServerTest() {
digestAlgorithm = "MD5",
compressionLevel = Deflater.DEFAULT_COMPRESSION,
maxSize = 0x1000000,
chunkSize = 0x1000
),
null,
null,

View File

@@ -7,9 +7,10 @@
read-idle-timeout="PT10M"
write-idle-timeout="PT11M"
idle-timeout="PT30M"
max-request-size="101325"/>
max-request-size="101325"
chunk-size="0xa910"/>
<event-executor use-virtual-threads="false"/>
<cache xs:type="rbcs:fileSystemCacheType" path="/tmp/rbcs" max-age="P7D" chunk-size="0xa910"/>
<cache xs:type="rbcs:fileSystemCacheType" path="/tmp/rbcs" max-age="P7D"/>
<authentication>
<none/>
</authentication>

View File

@@ -9,9 +9,10 @@
max-request-size="67108864"
idle-timeout="PT30S"
read-idle-timeout="PT60S"
write-idle-timeout="PT60S"/>
write-idle-timeout="PT60S"
chunk-size="123"/>
<event-executor use-virtual-threads="true"/>
<cache xs:type="rbcs-memcache:memcacheCacheType" max-age="P7D" chunk-size="123">
<cache xs:type="rbcs-memcache:memcacheCacheType" max-age="P7D">
<server host="memcached" port="11211"/>
</cache>
<authorization>

View File

@@ -8,9 +8,10 @@
read-idle-timeout="PT10M"
write-idle-timeout="PT11M"
idle-timeout="PT30M"
max-request-size="101325"/>
max-request-size="101325"
chunk-size="456"/>
<event-executor use-virtual-threads="false"/>
<cache xs:type="rbcs-memcache:memcacheCacheType" max-age="P7D" digest="SHA-256" chunk-size="456" compression-mode="deflate" compression-level="7">
<cache xs:type="rbcs-memcache:memcacheCacheType" max-age="P7D" digest="SHA-256" compression-mode="deflate" compression-level="7">
<server host="127.0.0.1" port="11211" max-connections="10" connection-timeout="PT20S"/>
</cache>
<authentication>

View File

@@ -7,9 +7,10 @@
read-idle-timeout="PT10M"
write-idle-timeout="PT11M"
idle-timeout="PT30M"
max-request-size="4096"/>
max-request-size="4096"
chunk-size="0xa91f"/>
<event-executor use-virtual-threads="false"/>
<cache xs:type="rbcs:inMemoryCacheType" max-age="P7D" chunk-size="0xa91f"/>
<cache xs:type="rbcs:inMemoryCacheType" max-age="P7D"/>
<authorization>
<users>
<user name="user1" password="password1">