Compare commits

...

9 Commits

Author SHA1 Message Date
59a12d6218 added server support for request pipelining
Some checks failed
CI / build (push) Has been cancelled
2025-03-07 11:53:42 +08:00
fc298de548 made chunk size a global shared parameter between the server and the cache backends 2025-03-06 22:08:19 +08:00
8b639fc0b3 added request pipelining support to RemoteBuildCacheClient 2025-03-06 21:58:53 +08:00
5545f618f9 updated documentation 2025-03-04 10:40:05 +08:00
43c0938d9a added test case 2025-03-04 09:35:26 +08:00
17215b401a fixed shutdown issue
All checks were successful
CI / build (push) Successful in 16m6s
2025-03-03 22:02:00 +08:00
4aced1c717 moved to GraalVM CE
All checks were successful
CI / build (push) Successful in 15m34s
2025-03-03 17:53:12 +08:00
31ce34cddb simplified InMemoryCache implementation 2025-03-03 09:44:37 +08:00
d64f7f4f27 added performance benchmarks 2025-02-28 13:49:05 +08:00
36 changed files with 458 additions and 225 deletions

View File

@@ -35,6 +35,7 @@ RBCS helps teams become more productive and efficient.
- [Plugins](#plugins) - [Plugins](#plugins)
- [Client Tools](#rbcs-client) - [Client Tools](#rbcs-client)
- [Logging](#logging) - [Logging](#logging)
- [Performance](#performance)
- [FAQ](#faq) - [FAQ](#faq)
@@ -78,9 +79,13 @@ writing data to the disk, that you can use for testing
If you are on a Linux X86_64 machine you can download the native executable If you are on a Linux X86_64 machine you can download the native executable
from [here](https://gitea.woggioni.net/woggioni/-/packages/maven/net.woggioni:rbcs-cli/). from [here](https://gitea.woggioni.net/woggioni/-/packages/maven/net.woggioni:rbcs-cli/).
It behaves the same as the jar file but it doesn't require a JVM and it has faster startup times. It behaves the same as the jar file but it doesn't require a JVM and it has faster startup times.
becausue of GraalVm's [closed-world assumption](https://www.graalvm.org/latest/reference-manual/native-image/basics/#static-analysis), because of GraalVM's [closed-world assumption](https://www.graalvm.org/latest/reference-manual/native-image/basics/#static-analysis),
the native executable does not supports plugins, so it comes with all plugins embedded into it. the native executable does not supports plugins, so it comes with all plugins embedded into it.
> [!WARNING]
> The native executable is built with `-march=skylake`, so it may fail with SIGILL on x86 CPUs that do not support
> the full skylake instruction set (as a rule of thumb, older than 2015)
## Integration with build tools ## Integration with build tools
### Use RBCS with Gradle ### Use RBCS with Gradle
@@ -347,6 +352,10 @@ can be overridden with `-Dlogback.configurationFile=path/to/custom/configuration
[Logback documentation](https://logback.qos.ch/manual/configuration.html) for more details about [Logback documentation](https://logback.qos.ch/manual/configuration.html) for more details about
how to configure Logback how to configure Logback
## Performance
You can check performance benchmarks [here](doc/benchmarks.md)
## FAQ ## FAQ
### Why should I use a build cache? ### Why should I use a build cache?

View File

@@ -38,8 +38,7 @@ allprojects { subproject ->
withSourcesJar() withSourcesJar()
modularity.inferModulePath = true modularity.inferModulePath = true
toolchain { toolchain {
languageVersion = JavaLanguageVersion.of(23) languageVersion = JavaLanguageVersion.of(21)
vendor = JvmVendorSpec.ORACLE
} }
} }

87
doc/benchmarks.md Normal file
View File

@@ -0,0 +1,87 @@
# RBCS performance benchmarks
All test were executed under the following conditions:
- CPU: Intel Celeron J3455 (4 physical cores)
- memory: 8GB DDR3L 1600 MHz
- disk: SATA3 120GB SSD
- HTTP compression: disabled
- cache compression: disabled
- digest: none
- authentication: disabled
- TLS: disabled
- network RTT: 14ms
- network bandwidth: 112 MiB/s
### In memory cache backend
| Cache backend | CPU | CPU quota | Memory quota (GB) | Request size (b) | Client connections | PUT (req/s) | GET (req/s) |
|----------------|---------------------|-----------|-------------------|------------------|--------------------|-------------|-------------|
| in-memory | Intel Celeron J3455 | 1.00 | 4 | 128 | 10 | 3691 | 4037 |
| in-memory | Intel Celeron J3455 | 1.00 | 4 | 128 | 100 | 6881 | 7483 |
| in-memory | Intel Celeron J3455 | 1.00 | 4 | 512 | 10 | 3790 | 4069 |
| in-memory | Intel Celeron J3455 | 1.00 | 4 | 512 | 100 | 6716 | 7408 |
| in-memory | Intel Celeron J3455 | 1.00 | 4 | 4096 | 10 | 3399 | 1974 |
| in-memory | Intel Celeron J3455 | 1.00 | 4 | 4096 | 100 | 5341 | 6402 |
| in-memory | Intel Celeron J3455 | 1.00 | 4 | 65536 | 10 | 1099 | 1116 |
| in-memory | Intel Celeron J3455 | 1.00 | 4 | 65536 | 100 | 1379 | 1703 |
| in-memory | Intel Celeron J3455 | 3.50 | 4 | 128 | 10 | 4443 | 5170 |
| in-memory | Intel Celeron J3455 | 3.50 | 4 | 128 | 100 | 12813 | 13568 |
| in-memory | Intel Celeron J3455 | 3.50 | 4 | 512 | 10 | 4450 | 4383 |
| in-memory | Intel Celeron J3455 | 3.50 | 4 | 512 | 100 | 12212 | 13586 |
| in-memory | Intel Celeron J3455 | 3.50 | 4 | 4096 | 10 | 3441 | 3012 |
| in-memory | Intel Celeron J3455 | 3.50 | 4 | 4096 | 100 | 8982 | 10452 |
| in-memory | Intel Celeron J3455 | 3.50 | 4 | 65536 | 10 | 1391 | 1167 |
| in-memory | Intel Celeron J3455 | 3.50 | 4 | 65536 | 100 | 1303 | 1151 |
### Filesystem cache backend
compression: disabled
digest: none
authentication: disabled
TLS: disabled
| Cache backend | CPU | CPU quota | Memory quota (GB) | Request size (b) | Client connections | PUT (req/s) | GET (req/s) |
|---------------|---------------------|-----------|-------------------|------------------|--------------------|-------------|-------------|
| filesystem | Intel Celeron J3455 | 1.00 | 0.25 | 128 | 10 | 1208 | 2048 |
| filesystem | Intel Celeron J3455 | 1.00 | 0.25 | 128 | 100 | 1304 | 2394 |
| filesystem | Intel Celeron J3455 | 1.00 | 0.25 | 512 | 10 | 1408 | 2157 |
| filesystem | Intel Celeron J3455 | 1.00 | 0.25 | 512 | 100 | 1282 | 1888 |
| filesystem | Intel Celeron J3455 | 1.00 | 0.25 | 4096 | 10 | 1291 | 1256 |
| filesystem | Intel Celeron J3455 | 1.00 | 0.25 | 4096 | 100 | 1170 | 1423 |
| filesystem | Intel Celeron J3455 | 1.00 | 0.25 | 65536 | 10 | 313 | 606 |
| filesystem | Intel Celeron J3455 | 1.00 | 0.25 | 65536 | 100 | 298 | 609 |
| filesystem | Intel Celeron J3455 | 3.50 | 0.25 | 128 | 10 | 2195 | 3477 |
| filesystem | Intel Celeron J3455 | 3.50 | 0.25 | 128 | 100 | 2480 | 6207 |
| filesystem | Intel Celeron J3455 | 3.50 | 0.25 | 512 | 10 | 2164 | 3413 |
| filesystem | Intel Celeron J3455 | 3.50 | 0.25 | 512 | 100 | 2842 | 6218 |
| filesystem | Intel Celeron J3455 | 3.50 | 0.25 | 4096 | 10 | 1302 | 2591 |
| filesystem | Intel Celeron J3455 | 3.50 | 0.25 | 4096 | 100 | 2270 | 3045 |
| filesystem | Intel Celeron J3455 | 3.50 | 0.25 | 65536 | 10 | 375 | 394 |
| filesystem | Intel Celeron J3455 | 3.50 | 0.25 | 65536 | 100 | 364 | 462 |
### Memcache cache backend
compression: disabled
digest: MD5
authentication: disabled
TLS: disabled
| Cache backend | CPU | CPU quota | Memory quota (GB) | Request size (b) | Client connections | PUT (req/s) | GET (req/s) |
|---------------|---------------------|-----------|-------------------|------------------|--------------------|-------------|-------------|
| memcache | Intel Celeron J3455 | 1.00 | 0.25 | 128 | 10 | 2505 | 2578 |
| memcache | Intel Celeron J3455 | 1.00 | 0.25 | 128 | 100 | 3582 | 3935 |
| memcache | Intel Celeron J3455 | 1.00 | 0.25 | 512 | 10 | 2495 | 2784 |
| memcache | Intel Celeron J3455 | 1.00 | 0.25 | 512 | 100 | 3565 | 3883 |
| memcache | Intel Celeron J3455 | 1.00 | 0.25 | 4096 | 10 | 2174 | 2505 |
| memcache | Intel Celeron J3455 | 1.00 | 0.25 | 4096 | 100 | 2937 | 3563 |
| memcache | Intel Celeron J3455 | 1.00 | 0.25 | 65536 | 10 | 648 | 1074 |
| memcache | Intel Celeron J3455 | 1.00 | 0.25 | 65536 | 100 | 724 | 1548 |
| memcache | Intel Celeron J3455 | 3.50 | 0.25 | 128 | 10 | 2362 | 2927 |
| memcache | Intel Celeron J3455 | 3.50 | 0.25 | 128 | 100 | 5491 | 6531 |
| memcache | Intel Celeron J3455 | 3.50 | 0.25 | 512 | 10 | 2125 | 2807 |
| memcache | Intel Celeron J3455 | 3.50 | 0.25 | 512 | 100 | 5173 | 6242 |
| memcache | Intel Celeron J3455 | 3.50 | 0.25 | 4096 | 10 | 1720 | 2397 |
| memcache | Intel Celeron J3455 | 3.50 | 0.25 | 4096 | 100 | 3871 | 5859 |
| memcache | Intel Celeron J3455 | 3.50 | 0.25 | 65536 | 10 | 616 | 1016 |
| memcache | Intel Celeron J3455 | 3.50 | 0.25 | 65536 | 100 | 820 | 1677 |

View File

@@ -24,6 +24,7 @@ Configures connection handling parameters.
- `read-idle-timeout` (optional, default: PT60S): Connection timeout when no reads - `read-idle-timeout` (optional, default: PT60S): Connection timeout when no reads
- `write-idle-timeout` (optional, default: PT60S): Connection timeout when no writes - `write-idle-timeout` (optional, default: PT60S): Connection timeout when no writes
- `max-request-size` (optional, default: 0x4000000): Maximum allowed request body size - `max-request-size` (optional, default: 0x4000000): Maximum allowed request body size
- `chunk-size` (default: 0x10000): Maximum socket write size
#### `<event-executor>` #### `<event-executor>`
Configures event execution settings. Configures event execution settings.
@@ -44,7 +45,6 @@ A simple storage backend that uses an hash map to store data in memory
- `digest` (default: MD5): Key hashing algorithm - `digest` (default: MD5): Key hashing algorithm
- `enable-compression` (default: true): Enable deflate compression - `enable-compression` (default: true): Enable deflate compression
- `compression-level` (default: -1): Compression level (-1 to 9) - `compression-level` (default: -1): Compression level (-1 to 9)
- `chunk-size` (default: 0x10000): Maximum socket write size
##### FileSystem Cache ##### FileSystem Cache
@@ -56,7 +56,6 @@ A storage backend that stores data in a folder on the disk
- `digest` (default: MD5): Key hashing algorithm - `digest` (default: MD5): Key hashing algorithm
- `enable-compression` (default: true): Enable deflate compression - `enable-compression` (default: true): Enable deflate compression
- `compression-level` (default: -1): Compression level - `compression-level` (default: -1): Compression level
- `chunk-size` (default: 0x10000): Maximum in-memory cache value size
#### `<authorization>` #### `<authorization>`
Configures user and group-based access control. Configures user and group-based access control.
@@ -134,8 +133,7 @@ Configures TLS encryption.
idle-timeout="PT10S" idle-timeout="PT10S"
read-idle-timeout="PT20S" read-idle-timeout="PT20S"
write-idle-timeout="PT20S" write-idle-timeout="PT20S"
read-timeout="PT5S" chunk-size="0x1000"/>
write-timeout="PT5S"/>
<event-executor use-virtual-threads="true"/> <event-executor use-virtual-threads="true"/>
<cache xs:type="rbcs:inMemoryCacheType" max-age="P7D" enable-compression="false" max-size="0x10000000" /> <cache xs:type="rbcs:inMemoryCacheType" max-age="P7D" enable-compression="false" max-size="0x10000000" />
@@ -147,7 +145,7 @@ Configures TLS encryption.
<!-- uncomment this to use memcache as the storage backend, also make sure you have <!-- uncomment this to use memcache as the storage backend, also make sure you have
the memcache plugin installed in the `plugins` directory if you are using running the memcache plugin installed in the `plugins` directory if you are using running
the jar version of RBCS the jar version of RBCS
<cache xs:type="rbcs-memcache:memcacheCacheType" max-age="P7D" chunk-size="0x1000" digest="MD5"> <cache xs:type="rbcs-memcache:memcacheCacheType" max-age="P7D" digest="MD5">
<server host="127.0.0.1" port="11211" max-connections="256"/> <server host="127.0.0.1" port="11211" max-connections="256"/>
</cache> </cache>
--> -->

View File

@@ -4,7 +4,7 @@ org.gradle.caching=true
rbcs.version = 0.2.0 rbcs.version = 0.2.0
lys.version = 2025.02.26 lys.version = 2025.03.03
gitea.maven.url = https://gitea.woggioni.net/api/packages/woggioni/maven gitea.maven.url = https://gitea.woggioni.net/api/packages/woggioni/maven
docker.registry.url=gitea.woggioni.net docker.registry.url=gitea.woggioni.net

View File

@@ -8,6 +8,7 @@ import io.netty.channel.socket.SocketChannel;
public interface CacheHandlerFactory extends AsyncCloseable { public interface CacheHandlerFactory extends AsyncCloseable {
ChannelHandler newHandler( ChannelHandler newHandler(
Configuration configuration,
EventLoopGroup eventLoopGroup, EventLoopGroup eventLoopGroup,
ChannelFactory<SocketChannel> socketChannelFactory, ChannelFactory<SocketChannel> socketChannelFactory,
ChannelFactory<DatagramChannel> datagramChannelFactory ChannelFactory<DatagramChannel> datagramChannelFactory

View File

@@ -39,6 +39,7 @@ public class Configuration {
Duration readIdleTimeout; Duration readIdleTimeout;
Duration writeIdleTimeout; Duration writeIdleTimeout;
int maxRequestSize; int maxRequestSize;
int chunkSize;
} }
@Value @Value

View File

@@ -12,6 +12,7 @@ plugins {
import net.woggioni.gradle.envelope.EnvelopePlugin import net.woggioni.gradle.envelope.EnvelopePlugin
import net.woggioni.gradle.envelope.EnvelopeJarTask import net.woggioni.gradle.envelope.EnvelopeJarTask
import net.woggioni.gradle.graalvm.NativeImageConfigurationTask import net.woggioni.gradle.graalvm.NativeImageConfigurationTask
import net.woggioni.gradle.graalvm.NativeImageTask
import net.woggioni.gradle.graalvm.NativeImagePlugin import net.woggioni.gradle.graalvm.NativeImagePlugin
import net.woggioni.gradle.graalvm.UpxTask import net.woggioni.gradle.graalvm.UpxTask
import net.woggioni.gradle.graalvm.JlinkPlugin import net.woggioni.gradle.graalvm.JlinkPlugin
@@ -90,11 +91,10 @@ Provider<EnvelopeJarTask> envelopeJarTaskProvider = tasks.named(EnvelopePlugin.E
} }
tasks.named(NativeImagePlugin.CONFIGURE_NATIVE_IMAGE_TASK_NAME, NativeImageConfigurationTask) { tasks.named(NativeImagePlugin.CONFIGURE_NATIVE_IMAGE_TASK_NAME, NativeImageConfigurationTask) {
javaLauncher = javaToolchains.launcherFor { toolchain {
languageVersion = JavaLanguageVersion.of(21) languageVersion = JavaLanguageVersion.of(21)
vendor = JvmVendorSpec.ORACLE vendor = JvmVendorSpec.GRAAL_VM
} }
mainClass = "net.woggioni.rbcs.cli.graal.GraalNativeImageConfiguration" mainClass = "net.woggioni.rbcs.cli.graal.GraalNativeImageConfiguration"
classpath = project.files( classpath = project.files(
configurations.configureNativeImageRuntimeClasspath, configurations.configureNativeImageRuntimeClasspath,
@@ -108,6 +108,10 @@ tasks.named(NativeImagePlugin.CONFIGURE_NATIVE_IMAGE_TASK_NAME, NativeImageConfi
} }
nativeImage { nativeImage {
toolchain {
languageVersion = JavaLanguageVersion.of(23)
vendor = JvmVendorSpec.GRAAL_VM
}
mainClass = mainClassName mainClass = mainClassName
// mainModule = mainModuleName // mainModule = mainModuleName
useMusl = true useMusl = true

View File

@@ -4,9 +4,7 @@ import io.netty.bootstrap.Bootstrap
import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled import io.netty.buffer.Unpooled
import io.netty.channel.Channel import io.netty.channel.Channel
import io.netty.channel.ChannelHandler
import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.channel.ChannelOption import io.netty.channel.ChannelOption
import io.netty.channel.ChannelPipeline import io.netty.channel.ChannelPipeline
import io.netty.channel.SimpleChannelInboundHandler import io.netty.channel.SimpleChannelInboundHandler
@@ -297,47 +295,28 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
// Custom handler for processing responses // Custom handler for processing responses
pool.acquire().addListener(object : GenericFutureListener<NettyFuture<Channel>> { pool.acquire().addListener(object : GenericFutureListener<NettyFuture<Channel>> {
private val handlers = mutableListOf<ChannelHandler>()
fun cleanup(channel: Channel, pipeline: ChannelPipeline) {
handlers.forEach(pipeline::remove)
pool.release(channel)
}
override fun operationComplete(channelFuture: Future<Channel>) { override fun operationComplete(channelFuture: Future<Channel>) {
if (channelFuture.isSuccess) { if (channelFuture.isSuccess) {
val channel = channelFuture.now val channel = channelFuture.now
val pipeline = channel.pipeline() val pipeline = channel.pipeline()
val timeoutHandler = object : ChannelInboundHandlerAdapter() {
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
if (evt is IdleStateEvent) {
val te = when (evt.state()) {
IdleState.READER_IDLE -> TimeoutException(
"Read timeout",
)
IdleState.WRITER_IDLE -> TimeoutException("Write timeout")
IdleState.ALL_IDLE -> TimeoutException("Idle timeout")
null -> throw IllegalStateException("This should never happen")
}
responseFuture.completeExceptionally(te)
ctx.close()
}
}
}
val closeListener = GenericFutureListener<Future<Void>> { val closeListener = GenericFutureListener<Future<Void>> {
responseFuture.completeExceptionally(IOException("The remote server closed the connection")) responseFuture.completeExceptionally(IOException("The remote server closed the connection"))
pool.release(channel)
} }
channel.closeFuture().addListener(closeListener)
val responseHandler = object : SimpleChannelInboundHandler<FullHttpResponse>() { val responseHandler = object : SimpleChannelInboundHandler<FullHttpResponse>() {
override fun handlerAdded(ctx: ChannelHandlerContext) {
channel.closeFuture().removeListener(closeListener)
}
override fun channelRead0( override fun channelRead0(
ctx: ChannelHandlerContext, ctx: ChannelHandlerContext,
response: FullHttpResponse response: FullHttpResponse
) { ) {
channel.closeFuture().removeListener(closeListener) pipeline.remove(this)
cleanup(channel, pipeline)
responseFuture.complete(response) responseFuture.complete(response)
} }
@@ -352,16 +331,33 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
} }
override fun channelInactive(ctx: ChannelHandlerContext) { override fun channelInactive(ctx: ChannelHandlerContext) {
pool.release(channel)
responseFuture.completeExceptionally(IOException("The remote server closed the connection")) responseFuture.completeExceptionally(IOException("The remote server closed the connection"))
super.channelInactive(ctx) super.channelInactive(ctx)
} }
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
if (evt is IdleStateEvent) {
val te = when (evt.state()) {
IdleState.READER_IDLE -> TimeoutException(
"Read timeout",
)
IdleState.WRITER_IDLE -> TimeoutException("Write timeout")
IdleState.ALL_IDLE -> TimeoutException("Idle timeout")
null -> throw IllegalStateException("This should never happen")
} }
for (handler in arrayOf(timeoutHandler, responseHandler)) { responseFuture.completeExceptionally(te)
handlers.add(handler) super.userEventTriggered(ctx, evt)
if (this === pipeline.last()) {
ctx.close()
} }
pipeline.addLast(timeoutHandler, responseHandler) } else {
channel.closeFuture().addListener(closeListener) super.userEventTriggered(ctx, evt)
}
}
}
pipeline.addLast(responseHandler)
// Prepare the HTTP request // Prepare the HTTP request
@@ -373,6 +369,7 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
uri.rawPath, uri.rawPath,
content ?: Unpooled.buffer(0) content ?: Unpooled.buffer(0)
).apply { ).apply {
// Set headers
headers().apply { headers().apply {
if (content != null) { if (content != null) {
set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes()) set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes())
@@ -398,9 +395,15 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
} }
} }
// Set headers
// Send the request // Send the request
channel.writeAndFlush(request) channel.writeAndFlush(request).addListener {
if(!it.isSuccess) {
val ex = it.cause()
log.warn(ex.message, ex)
}
pool.release(channel)
}
} else { } else {
responseFuture.completeExceptionally(channelFuture.cause()) responseFuture.completeExceptionally(channelFuture.cause())
} }

View File

@@ -22,7 +22,7 @@ The plugins currently supports the following configuration attributes:
- `digest`: digest algorithm to use on the key before submission - `digest`: digest algorithm to use on the key before submission
to memcache (optional, no digest is applied if omitted) to memcache (optional, no digest is applied if omitted)
- `compression`: compression algorithm to apply to cache values before, - `compression`: compression algorithm to apply to cache values before,
currently only `deflate` is supported (optionla, if omitted compression is disabled) currently only `deflate` is supported (optional, if omitted compression is disabled)
- `compression-level`: compression level to use, deflate supports compression levels from 1 to 9, - `compression-level`: compression level to use, deflate supports compression levels from 1 to 9,
where 1 is for fast compression at the expense of speed (optional, 6 is used if omitted) where 1 is for fast compression at the expense of speed (optional, 6 is used if omitted)
```xml ```xml
@@ -37,8 +37,7 @@ The plugins currently supports the following configuration attributes:
max-age="P7D" max-age="P7D"
digest="SHA-256" digest="SHA-256"
compression-mode="deflate" compression-mode="deflate"
compression-level="6" compression-level="6">
chunk-size="0x10000">
<server host="127.0.0.1" port="11211" max-connections="256"/> <server host="127.0.0.1" port="11211" max-connections="256"/>
<server host="127.0.0.1" port="11212" max-connections="256"/> <server host="127.0.0.1" port="11212" max-connections="256"/>
</cache> </cache>

View File

@@ -9,6 +9,7 @@ import io.netty.channel.socket.SocketChannel
import net.woggioni.rbcs.api.CacheHandlerFactory import net.woggioni.rbcs.api.CacheHandlerFactory
import net.woggioni.rbcs.api.Configuration import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.common.HostAndPort import net.woggioni.rbcs.common.HostAndPort
import net.woggioni.rbcs.common.createLogger
import net.woggioni.rbcs.server.memcache.client.MemcacheClient import net.woggioni.rbcs.server.memcache.client.MemcacheClient
import java.time.Duration import java.time.Duration
import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletableFuture
@@ -22,9 +23,12 @@ data class MemcacheCacheConfiguration(
val digestAlgorithm: String? = null, val digestAlgorithm: String? = null,
val compressionMode: CompressionMode? = null, val compressionMode: CompressionMode? = null,
val compressionLevel: Int, val compressionLevel: Int,
val chunkSize: Int
) : Configuration.Cache { ) : Configuration.Cache {
companion object {
private val log = createLogger<MemcacheCacheConfiguration>()
}
enum class CompressionMode { enum class CompressionMode {
/** /**
* Deflate mode * Deflate mode
@@ -43,14 +47,15 @@ data class MemcacheCacheConfiguration(
private val connectionPoolMap = ConcurrentHashMap<HostAndPort, FixedChannelPool>() private val connectionPoolMap = ConcurrentHashMap<HostAndPort, FixedChannelPool>()
override fun newHandler( override fun newHandler(
cfg : Configuration,
eventLoop: EventLoopGroup, eventLoop: EventLoopGroup,
socketChannelFactory: ChannelFactory<SocketChannel>, socketChannelFactory: ChannelFactory<SocketChannel>,
datagramChannelFactory: ChannelFactory<DatagramChannel> datagramChannelFactory: ChannelFactory<DatagramChannel>,
): ChannelHandler { ): ChannelHandler {
return MemcacheCacheHandler( return MemcacheCacheHandler(
MemcacheClient( MemcacheClient(
this@MemcacheCacheConfiguration.servers, this@MemcacheCacheConfiguration.servers,
chunkSize, cfg.connection.chunkSize,
eventLoop, eventLoop,
socketChannelFactory, socketChannelFactory,
connectionPoolMap connectionPoolMap
@@ -58,7 +63,7 @@ data class MemcacheCacheConfiguration(
digestAlgorithm, digestAlgorithm,
compressionMode != null, compressionMode != null,
compressionLevel, compressionLevel,
chunkSize, cfg.connection.chunkSize,
maxAge maxAge
) )
} }
@@ -69,6 +74,9 @@ data class MemcacheCacheConfiguration(
val pools = connectionPoolMap.values.toList() val pools = connectionPoolMap.values.toList()
val npools = pools.size val npools = pools.size
val finished = AtomicInteger(0) val finished = AtomicInteger(0)
if (pools.isEmpty()) {
complete(null)
} else {
pools.forEach { pool -> pools.forEach { pool ->
pool.closeAsync().addListener { pool.closeAsync().addListener {
if (!it.isSuccess) { if (!it.isSuccess) {
@@ -84,6 +92,7 @@ data class MemcacheCacheConfiguration(
} }
} }
} }
}
} }

View File

@@ -28,9 +28,6 @@ class MemcacheCacheProvider : CacheProvider<MemcacheCacheConfiguration> {
val maxAge = el.renderAttribute("max-age") val maxAge = el.renderAttribute("max-age")
?.let(Duration::parse) ?.let(Duration::parse)
?: Duration.ofDays(1) ?: Duration.ofDays(1)
val chunkSize = el.renderAttribute("chunk-size")
?.let(Integer::decode)
?: 0x10000
val compressionLevel = el.renderAttribute("compression-level") val compressionLevel = el.renderAttribute("compression-level")
?.let(Integer::decode) ?.let(Integer::decode)
?: -1 ?: -1
@@ -63,8 +60,7 @@ class MemcacheCacheProvider : CacheProvider<MemcacheCacheConfiguration> {
maxAge, maxAge,
digestAlgorithm, digestAlgorithm,
compressionMode, compressionMode,
compressionLevel, compressionLevel
chunkSize
) )
} }
@@ -84,7 +80,6 @@ class MemcacheCacheProvider : CacheProvider<MemcacheCacheConfiguration> {
} }
} }
attr("max-age", maxAge.toString()) attr("max-age", maxAge.toString())
attr("chunk-size", chunkSize.toString())
digestAlgorithm?.let { digestAlgorithm -> digestAlgorithm?.let { digestAlgorithm ->
attr("digest", digestAlgorithm) attr("digest", digestAlgorithm)
} }

View File

@@ -24,6 +24,7 @@ module net.woggioni.rbcs.server {
opens net.woggioni.rbcs.server; opens net.woggioni.rbcs.server;
opens net.woggioni.rbcs.server.schema; opens net.woggioni.rbcs.server.schema;
uses CacheProvider; uses CacheProvider;
provides CacheProvider with FileSystemCacheProvider, InMemoryCacheProvider; provides CacheProvider with FileSystemCacheProvider, InMemoryCacheProvider;
} }

View File

@@ -21,6 +21,7 @@ import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.compression.CompressionOptions import io.netty.handler.codec.compression.CompressionOptions
import io.netty.handler.codec.http.DefaultHttpContent import io.netty.handler.codec.http.DefaultHttpContent
import io.netty.handler.codec.http.HttpContentCompressor import io.netty.handler.codec.http.HttpContentCompressor
import io.netty.handler.codec.http.HttpDecoderConfig
import io.netty.handler.codec.http.HttpHeaderNames import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpRequest import io.netty.handler.codec.http.HttpRequest
import io.netty.handler.codec.http.HttpServerCodec import io.netty.handler.codec.http.HttpServerCodec
@@ -55,7 +56,6 @@ import net.woggioni.rbcs.server.configuration.Serializer
import net.woggioni.rbcs.server.exception.ExceptionHandler import net.woggioni.rbcs.server.exception.ExceptionHandler
import net.woggioni.rbcs.server.handler.MaxRequestSizeHandler import net.woggioni.rbcs.server.handler.MaxRequestSizeHandler
import net.woggioni.rbcs.server.handler.ServerHandler import net.woggioni.rbcs.server.handler.ServerHandler
import net.woggioni.rbcs.server.handler.TraceHandler
import net.woggioni.rbcs.server.throttling.BucketManager import net.woggioni.rbcs.server.throttling.BucketManager
import net.woggioni.rbcs.server.throttling.ThrottlingHandler import net.woggioni.rbcs.server.throttling.ThrottlingHandler
import java.io.OutputStream import java.io.OutputStream
@@ -340,7 +340,10 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
sslContext?.newHandler(ch.alloc())?.also { sslContext?.newHandler(ch.alloc())?.also {
pipeline.addLast(SSL_HANDLER_NAME, it) pipeline.addLast(SSL_HANDLER_NAME, it)
} }
pipeline.addLast(HttpServerCodec()) val httpDecoderConfig = HttpDecoderConfig().apply {
maxChunkSize = cfg.connection.chunkSize
}
pipeline.addLast(HttpServerCodec(httpDecoderConfig))
pipeline.addLast(MaxRequestSizeHandler.NAME, MaxRequestSizeHandler(cfg.connection.maxRequestSize)) pipeline.addLast(MaxRequestSizeHandler.NAME, MaxRequestSizeHandler(cfg.connection.maxRequestSize))
pipeline.addLast(HttpChunkContentCompressor(1024)) pipeline.addLast(HttpChunkContentCompressor(1024))
pipeline.addLast(ChunkedWriteHandler()) pipeline.addLast(ChunkedWriteHandler())
@@ -351,13 +354,12 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
val serverHandler = let { val serverHandler = let {
val prefix = Path.of("/").resolve(Path.of(cfg.serverPath ?: "/")) val prefix = Path.of("/").resolve(Path.of(cfg.serverPath ?: "/"))
ServerHandler(prefix) ServerHandler(prefix) {
cacheHandlerFactory.newHandler(cfg, ch.eventLoop(), channelFactory, datagramChannelFactory)
}
} }
pipeline.addLast(eventExecutorGroup, ServerHandler.NAME, serverHandler) pipeline.addLast(eventExecutorGroup, ServerHandler.NAME, serverHandler)
pipeline.addLast(ExceptionHandler.NAME, ExceptionHandler)
pipeline.addLast(cacheHandlerFactory.newHandler(ch.eventLoop(), channelFactory, datagramChannelFactory))
pipeline.addLast(TraceHandler)
pipeline.addLast(ExceptionHandler)
} }
override fun asyncClose() = cacheHandlerFactory.asyncClose() override fun asyncClose() = cacheHandlerFactory.asyncClose()
@@ -368,13 +370,14 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
private val bossGroup: EventExecutorGroup, private val bossGroup: EventExecutorGroup,
private val executorGroups: Iterable<EventExecutorGroup>, private val executorGroups: Iterable<EventExecutorGroup>,
private val serverInitializer: AsyncCloseable, private val serverInitializer: AsyncCloseable,
) : Future<Void> by from(closeFuture, executorGroups, serverInitializer) { ) : Future<Void> by from(closeFuture, bossGroup, executorGroups, serverInitializer) {
companion object { companion object {
private val log = createLogger<ServerHandle>() private val log = createLogger<ServerHandle>()
private fun from( private fun from(
closeFuture: ChannelFuture, closeFuture: ChannelFuture,
bossGroup: EventExecutorGroup,
executorGroups: Iterable<EventExecutorGroup>, executorGroups: Iterable<EventExecutorGroup>,
serverInitializer: AsyncCloseable serverInitializer: AsyncCloseable
): CompletableFuture<Void> { ): CompletableFuture<Void> {
@@ -382,22 +385,15 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
closeFuture.addListener { closeFuture.addListener {
val errors = mutableListOf<Throwable>() val errors = mutableListOf<Throwable>()
val deadline = Instant.now().plusSeconds(20) val deadline = Instant.now().plusSeconds(20)
try {
serverInitializer.close()
} catch (ex: Throwable) {
log.error(ex.message, ex)
errors.addLast(ex)
}
serverInitializer.asyncClose().whenComplete { _, ex -> serverInitializer.asyncClose().whenCompleteAsync { _, ex ->
if(ex != null) { if(ex != null) {
log.error(ex.message, ex) log.error(ex.message, ex)
errors.addLast(ex) errors.addLast(ex)
} }
executorGroups.map { executorGroups.forEach(EventExecutorGroup::shutdownGracefully)
it.shutdownGracefully() bossGroup.terminationFuture().sync()
}
for (executorGroup in executorGroups) { for (executorGroup in executorGroups) {
val future = executorGroup.terminationFuture() val future = executorGroup.terminationFuture()

View File

@@ -17,7 +17,6 @@ data class FileSystemCacheConfiguration(
val digestAlgorithm : String?, val digestAlgorithm : String?,
val compressionEnabled: Boolean, val compressionEnabled: Boolean,
val compressionLevel: Int, val compressionLevel: Int,
val chunkSize: Int,
) : Configuration.Cache { ) : Configuration.Cache {
override fun materialize() = object : CacheHandlerFactory { override fun materialize() = object : CacheHandlerFactory {
@@ -26,10 +25,11 @@ data class FileSystemCacheConfiguration(
override fun asyncClose() = cache.asyncClose() override fun asyncClose() = cache.asyncClose()
override fun newHandler( override fun newHandler(
cfg : Configuration,
eventLoop: EventLoopGroup, eventLoop: EventLoopGroup,
socketChannelFactory: ChannelFactory<SocketChannel>, socketChannelFactory: ChannelFactory<SocketChannel>,
datagramChannelFactory: ChannelFactory<DatagramChannel> datagramChannelFactory: ChannelFactory<DatagramChannel>
) = FileSystemCacheHandler(cache, digestAlgorithm, compressionEnabled, compressionLevel, chunkSize) ) = FileSystemCacheHandler(cache, digestAlgorithm, compressionEnabled, compressionLevel, cfg.connection.chunkSize)
} }
override fun getNamespaceURI() = RBCS.RBCS_NAMESPACE_URI override fun getNamespaceURI() = RBCS.RBCS_NAMESPACE_URI

View File

@@ -31,9 +31,6 @@ class FileSystemCacheProvider : CacheProvider<FileSystemCacheConfiguration> {
?.let(String::toInt) ?.let(String::toInt)
?: Deflater.DEFAULT_COMPRESSION ?: Deflater.DEFAULT_COMPRESSION
val digestAlgorithm = el.renderAttribute("digest") val digestAlgorithm = el.renderAttribute("digest")
val chunkSize = el.renderAttribute("chunk-size")
?.let(Integer::decode)
?: 0x10000
return FileSystemCacheConfiguration( return FileSystemCacheConfiguration(
path, path,
@@ -41,7 +38,6 @@ class FileSystemCacheProvider : CacheProvider<FileSystemCacheConfiguration> {
digestAlgorithm, digestAlgorithm,
enableCompression, enableCompression,
compressionLevel, compressionLevel,
chunkSize
) )
} }
@@ -63,7 +59,6 @@ class FileSystemCacheProvider : CacheProvider<FileSystemCacheConfiguration> {
}?.let { }?.let {
attr("compression-level", it.toString()) attr("compression-level", it.toString())
} }
attr("chunk-size", chunkSize.toString())
} }
result result
} }

View File

@@ -6,11 +6,11 @@ import net.woggioni.rbcs.api.CacheValueMetadata
import net.woggioni.rbcs.common.createLogger import net.woggioni.rbcs.common.createLogger
import java.time.Duration import java.time.Duration
import java.time.Instant import java.time.Instant
import java.util.PriorityQueue
import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.PriorityBlockingQueue
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.concurrent.withLock
private class CacheKey(private val value: ByteArray) { private class CacheKey(private val value: ByteArray) {
override fun equals(other: Any?) = if (other is CacheKey) { override fun equals(other: Any?) = if (other is CacheKey) {
@@ -34,15 +34,17 @@ class InMemoryCache(
private val log = createLogger<InMemoryCache>() private val log = createLogger<InMemoryCache>()
} }
private val size = AtomicLong() private var mapSize : Long = 0
private val map = ConcurrentHashMap<CacheKey, CacheEntry>() private val map = HashMap<CacheKey, CacheEntry>()
private val lock = ReentrantReadWriteLock()
private val cond = lock.writeLock().newCondition()
private class RemovalQueueElement(val key: CacheKey, val value: CacheEntry, val expiry: Instant) : private class RemovalQueueElement(val key: CacheKey, val value: CacheEntry, val expiry: Instant) :
Comparable<RemovalQueueElement> { Comparable<RemovalQueueElement> {
override fun compareTo(other: RemovalQueueElement) = expiry.compareTo(other.expiry) override fun compareTo(other: RemovalQueueElement) = expiry.compareTo(other.expiry)
} }
private val removalQueue = PriorityBlockingQueue<RemovalQueueElement>() private val removalQueue = PriorityQueue<RemovalQueueElement>()
@Volatile @Volatile
private var running = true private var running = true
@@ -51,8 +53,13 @@ class InMemoryCache(
init { init {
Thread.ofVirtual().name("in-memory-cache-gc").start { Thread.ofVirtual().name("in-memory-cache-gc").start {
try { try {
lock.writeLock().withLock {
while (running) { while (running) {
val el = removalQueue.poll(1, TimeUnit.SECONDS) ?: continue val el = removalQueue.poll()
if(el == null) {
cond.await(1000, TimeUnit.MILLISECONDS)
continue
}
val value = el.value val value = el.value
val now = Instant.now() val now = Instant.now()
if (now > el.expiry) { if (now > el.expiry) {
@@ -63,8 +70,10 @@ class InMemoryCache(
value.content.release() value.content.release()
} }
} else { } else {
removalQueue.put(el) removalQueue.offer(el)
Thread.sleep(minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1))) val interval = minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1))
cond.await(interval.toMillis(), TimeUnit.MILLISECONDS)
}
} }
} }
complete(null) complete(null)
@@ -77,7 +86,7 @@ class InMemoryCache(
fun removeEldest(): Long { fun removeEldest(): Long {
while (true) { while (true) {
val el = removalQueue.take() val el = removalQueue.poll() ?: return mapSize
val value = el.value val value = el.value
val removed = map.remove(el.key, value) val removed = map.remove(el.key, value)
if (removed) { if (removed) {
@@ -90,37 +99,41 @@ class InMemoryCache(
} }
private fun updateSizeAfterRemoval(removed: ByteBuf): Long { private fun updateSizeAfterRemoval(removed: ByteBuf): Long {
return size.updateAndGet { currentSize: Long -> mapSize -= removed.readableBytes()
currentSize - removed.readableBytes() return mapSize
}
} }
override fun asyncClose() : CompletableFuture<Void> { override fun asyncClose() : CompletableFuture<Void> {
running = false running = false
lock.writeLock().withLock {
cond.signal()
}
return closeFuture return closeFuture
} }
fun get(key: ByteArray) = map[CacheKey(key)]?.run { fun get(key: ByteArray) = lock.readLock().withLock {
map[CacheKey(key)]?.run {
CacheEntry(metadata, content.retainedDuplicate()) CacheEntry(metadata, content.retainedDuplicate())
} }
}
fun put( fun put(
key: ByteArray, key: ByteArray,
value: CacheEntry, value: CacheEntry,
) { ) {
val cacheKey = CacheKey(key) val cacheKey = CacheKey(key)
lock.writeLock().withLock {
val oldSize = map.put(cacheKey, value)?.let { old -> val oldSize = map.put(cacheKey, value)?.let { old ->
val result = old.content.readableBytes() val result = old.content.readableBytes()
old.content.release() old.content.release()
result result
} ?: 0 } ?: 0
val delta = value.content.readableBytes() - oldSize val delta = value.content.readableBytes() - oldSize
var newSize = size.updateAndGet { currentSize: Long -> mapSize += delta
currentSize + delta removalQueue.offer(RemovalQueueElement(cacheKey, value, Instant.now().plus(maxAge)))
} while (mapSize > maxSize) {
removalQueue.put(RemovalQueueElement(cacheKey, value, Instant.now().plus(maxAge))) removeEldest()
while (newSize > maxSize) { }
newSize = removeEldest()
} }
} }
} }

View File

@@ -16,7 +16,6 @@ data class InMemoryCacheConfiguration(
val digestAlgorithm : String?, val digestAlgorithm : String?,
val compressionEnabled: Boolean, val compressionEnabled: Boolean,
val compressionLevel: Int, val compressionLevel: Int,
val chunkSize : Int
) : Configuration.Cache { ) : Configuration.Cache {
override fun materialize() = object : CacheHandlerFactory { override fun materialize() = object : CacheHandlerFactory {
private val cache = InMemoryCache(maxAge, maxSize) private val cache = InMemoryCache(maxAge, maxSize)
@@ -24,6 +23,7 @@ data class InMemoryCacheConfiguration(
override fun asyncClose() = cache.asyncClose() override fun asyncClose() = cache.asyncClose()
override fun newHandler( override fun newHandler(
cfg : Configuration,
eventLoop: EventLoopGroup, eventLoop: EventLoopGroup,
socketChannelFactory: ChannelFactory<SocketChannel>, socketChannelFactory: ChannelFactory<SocketChannel>,
datagramChannelFactory: ChannelFactory<DatagramChannel> datagramChannelFactory: ChannelFactory<DatagramChannel>

View File

@@ -31,16 +31,12 @@ class InMemoryCacheProvider : CacheProvider<InMemoryCacheConfiguration> {
?.let(String::toInt) ?.let(String::toInt)
?: Deflater.DEFAULT_COMPRESSION ?: Deflater.DEFAULT_COMPRESSION
val digestAlgorithm = el.renderAttribute("digest") val digestAlgorithm = el.renderAttribute("digest")
val chunkSize = el.renderAttribute("chunk-size")
?.let(Integer::decode)
?: 0x10000
return InMemoryCacheConfiguration( return InMemoryCacheConfiguration(
maxAge, maxAge,
maxSize, maxSize,
digestAlgorithm, digestAlgorithm,
enableCompression, enableCompression,
compressionLevel, compressionLevel,
chunkSize
) )
} }
@@ -60,7 +56,6 @@ class InMemoryCacheProvider : CacheProvider<InMemoryCacheConfiguration> {
}?.let { }?.let {
attr("compression-level", it.toString()) attr("compression-level", it.toString())
} }
attr("chunk-size", chunkSize.toString())
} }
result result
} }

View File

@@ -27,10 +27,11 @@ object Parser {
val root = document.documentElement val root = document.documentElement
val anonymousUser = User("", null, emptySet(), null) val anonymousUser = User("", null, emptySet(), null)
var connection: Configuration.Connection = Configuration.Connection( var connection: Configuration.Connection = Configuration.Connection(
Duration.of(30, ChronoUnit.SECONDS),
Duration.of(60, ChronoUnit.SECONDS), Duration.of(60, ChronoUnit.SECONDS),
Duration.of(30, ChronoUnit.SECONDS), Duration.of(60, ChronoUnit.SECONDS),
Duration.of(30, ChronoUnit.SECONDS), 0x4000000,
67108864 0x10000
) )
var eventExecutor: Configuration.EventExecutor = Configuration.EventExecutor(true) var eventExecutor: Configuration.EventExecutor = Configuration.EventExecutor(true)
var cache: Cache? = null var cache: Cache? = null
@@ -119,11 +120,14 @@ object Parser {
?.let(Duration::parse) ?: Duration.of(60, ChronoUnit.SECONDS) ?.let(Duration::parse) ?: Duration.of(60, ChronoUnit.SECONDS)
val maxRequestSize = child.renderAttribute("max-request-size") val maxRequestSize = child.renderAttribute("max-request-size")
?.let(Integer::decode) ?: 0x4000000 ?.let(Integer::decode) ?: 0x4000000
val chunkSize = child.renderAttribute("chunk-size")
?.let(Integer::decode) ?: 0x10000
connection = Configuration.Connection( connection = Configuration.Connection(
idleTimeout, idleTimeout,
readIdleTimeout, readIdleTimeout,
writeIdleTimeout, writeIdleTimeout,
maxRequestSize maxRequestSize,
chunkSize
) )
} }

View File

@@ -40,6 +40,7 @@ object Serializer {
attr("read-idle-timeout", connection.readIdleTimeout.toString()) attr("read-idle-timeout", connection.readIdleTimeout.toString())
attr("write-idle-timeout", connection.writeIdleTimeout.toString()) attr("write-idle-timeout", connection.writeIdleTimeout.toString())
attr("max-request-size", connection.maxRequestSize.toString()) attr("max-request-size", connection.maxRequestSize.toString())
attr("chunk-size", connection.chunkSize.toString())
} }
} }
node("event-executor") { node("event-executor") {

View File

@@ -0,0 +1,4 @@
package net.woggioni.rbcs.server.event
class RequestCompletedEvent {
}

View File

@@ -27,6 +27,9 @@ import javax.net.ssl.SSLPeerUnverifiedException
@Sharable @Sharable
object ExceptionHandler : ChannelDuplexHandler() { object ExceptionHandler : ChannelDuplexHandler() {
val NAME : String = this::class.java.name
private val log = contextLogger() private val log = contextLogger()
private val NOT_AUTHORIZED: FullHttpResponse = DefaultFullHttpResponse( private val NOT_AUTHORIZED: FullHttpResponse = DefaultFullHttpResponse(

View File

@@ -1,28 +1,79 @@
package net.woggioni.rbcs.server.handler package net.woggioni.rbcs.server.handler
import io.netty.channel.ChannelHandler
import io.netty.channel.ChannelHandler.Sharable import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelOutboundHandler
import io.netty.channel.ChannelPromise
import io.netty.channel.SimpleChannelInboundHandler import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.http.HttpContent import io.netty.handler.codec.http.HttpContent
import io.netty.handler.codec.http.LastHttpContent import io.netty.handler.codec.http.LastHttpContent
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse
import net.woggioni.rbcs.api.message.CacheMessage.CacheContent import net.woggioni.rbcs.api.message.CacheMessage.CacheContent
import net.woggioni.rbcs.api.message.CacheMessage.CachePutResponse
import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent
import java.net.SocketAddress
@Sharable class CacheContentHandler(private val pairedHandler : ChannelHandler) : SimpleChannelInboundHandler<HttpContent>(), ChannelOutboundHandler {
object CacheContentHandler : SimpleChannelInboundHandler<HttpContent>() { private var requestFinished = false
val NAME = this::class.java.name
override fun channelRead0(ctx: ChannelHandlerContext, msg: HttpContent) { override fun channelRead0(ctx: ChannelHandlerContext, msg: HttpContent) {
if(requestFinished) {
ctx.fireChannelRead(msg.retain())
} else {
when (msg) { when (msg) {
is LastHttpContent -> { is LastHttpContent -> {
ctx.fireChannelRead(LastCacheContent(msg.content().retain())) ctx.fireChannelRead(LastCacheContent(msg.content().retain()))
ctx.pipeline().remove(this) requestFinished = true
} }
else -> ctx.fireChannelRead(CacheContent(msg.content().retain())) else -> ctx.fireChannelRead(CacheContent(msg.content().retain()))
} }
} }
}
override fun exceptionCaught(ctx: ChannelHandlerContext?, cause: Throwable?) { override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
super.exceptionCaught(ctx, cause) super.exceptionCaught(ctx, cause)
} }
override fun bind(ctx: ChannelHandlerContext, localAddress: SocketAddress, promise: ChannelPromise) {
ctx.bind(localAddress, promise)
}
override fun connect(
ctx: ChannelHandlerContext,
remoteAddress: SocketAddress,
localAddress: SocketAddress,
promise: ChannelPromise
) {
ctx.connect(remoteAddress, localAddress, promise)
}
override fun disconnect(ctx: ChannelHandlerContext, promise: ChannelPromise) {
ctx.disconnect(promise)
}
override fun close(ctx: ChannelHandlerContext, promise: ChannelPromise) {
ctx.close(promise)
}
override fun deregister(ctx: ChannelHandlerContext, promise: ChannelPromise) {
ctx.deregister(promise)
}
override fun read(ctx: ChannelHandlerContext) {
ctx.read()
}
override fun flush(ctx: ChannelHandlerContext) {
ctx.flush()
}
override fun write(ctx: ChannelHandlerContext, msg: Any, promise: ChannelPromise) {
ctx.write(msg, promise)
if(msg is LastCacheContent || msg is CachePutResponse || msg is CacheValueNotFoundResponse || msg is LastHttpContent) {
ctx.pipeline().remove(pairedHandler)
ctx.pipeline().remove(this)
}
}
} }

View File

@@ -0,0 +1,56 @@
package net.woggioni.rbcs.server.handler
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.channel.ChannelOutboundHandler
import io.netty.channel.ChannelPromise
import net.woggioni.rbcs.server.event.RequestCompletedEvent
import java.net.SocketAddress
class ResponseCapHandler : ChannelInboundHandlerAdapter(), ChannelOutboundHandler {
val bufferedMessages = mutableListOf<Any>()
override fun bind(ctx: ChannelHandlerContext, localAddress: SocketAddress, promise: ChannelPromise) {
ctx.bind(localAddress, promise)
}
override fun connect(
ctx: ChannelHandlerContext,
remoteAddress: SocketAddress,
localAddress: SocketAddress,
promise: ChannelPromise
) {
ctx.connect(remoteAddress, localAddress, promise)
}
override fun disconnect(ctx: ChannelHandlerContext, promise: ChannelPromise) {
ctx.disconnect(promise)
}
override fun close(ctx: ChannelHandlerContext, promise: ChannelPromise) {
ctx.close(promise)
}
override fun deregister(ctx: ChannelHandlerContext, promise: ChannelPromise) {
ctx.deregister(promise)
}
override fun read(ctx: ChannelHandlerContext) {
ctx.read()
}
override fun write(ctx: ChannelHandlerContext, msg: Any, promise: ChannelPromise) {
bufferedMessages.add(msg)
}
override fun flush(ctx: ChannelHandlerContext) {
}
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
if(evt is RequestCompletedEvent) {
for(msg in bufferedMessages) ctx.write(msg)
ctx.flush()
ctx.pipeline().remove(this)
}
}
}

View File

@@ -1,6 +1,7 @@
package net.woggioni.rbcs.server.handler package net.woggioni.rbcs.server.handler
import io.netty.channel.ChannelDuplexHandler import io.netty.channel.ChannelDuplexHandler
import io.netty.channel.ChannelHandler
import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelPromise import io.netty.channel.ChannelPromise
import io.netty.handler.codec.http.DefaultFullHttpResponse import io.netty.handler.codec.http.DefaultFullHttpResponse
@@ -15,6 +16,8 @@ import io.netty.handler.codec.http.HttpRequest
import io.netty.handler.codec.http.HttpResponseStatus import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpUtil import io.netty.handler.codec.http.HttpUtil
import io.netty.handler.codec.http.HttpVersion import io.netty.handler.codec.http.HttpVersion
import io.netty.handler.codec.http.LastHttpContent
import net.woggioni.rbcs.api.CacheHandlerFactory
import net.woggioni.rbcs.api.CacheValueMetadata import net.woggioni.rbcs.api.CacheValueMetadata
import net.woggioni.rbcs.api.message.CacheMessage import net.woggioni.rbcs.api.message.CacheMessage
import net.woggioni.rbcs.api.message.CacheMessage.CacheContent import net.woggioni.rbcs.api.message.CacheMessage.CacheContent
@@ -27,19 +30,22 @@ import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent
import net.woggioni.rbcs.common.createLogger import net.woggioni.rbcs.common.createLogger
import net.woggioni.rbcs.common.debug import net.woggioni.rbcs.common.debug
import net.woggioni.rbcs.common.warn import net.woggioni.rbcs.common.warn
import net.woggioni.rbcs.server.event.RequestCompletedEvent
import net.woggioni.rbcs.server.exception.ExceptionHandler
import java.nio.file.Path import java.nio.file.Path
import java.util.Locale import java.util.Locale
class ServerHandler(private val serverPrefix: Path) : class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupplier : () -> ChannelHandler) :
ChannelDuplexHandler() { ChannelDuplexHandler() {
companion object { companion object {
private val log = createLogger<ServerHandler>() private val log = createLogger<ServerHandler>()
val NAME = this::class.java.name val NAME = ServerHandler::class.java.name
} }
private var httpVersion = HttpVersion.HTTP_1_1 private var httpVersion = HttpVersion.HTTP_1_1
private var keepAlive = true private var keepAlive = true
private var pipelinedRequests = 0
private fun resetRequestMetadata() { private fun resetRequestMetadata() {
httpVersion = HttpVersion.HTTP_1_1 httpVersion = HttpVersion.HTTP_1_1
@@ -73,6 +79,8 @@ class ServerHandler(private val serverPrefix: Path) :
try { try {
when (msg) { when (msg) {
is CachePutResponse -> { is CachePutResponse -> {
pipelinedRequests -= 1
ctx.fireUserEventTriggered(RequestCompletedEvent())
val response = DefaultFullHttpResponse(httpVersion, HttpResponseStatus.CREATED) val response = DefaultFullHttpResponse(httpVersion, HttpResponseStatus.CREATED)
val keyBytes = msg.key.toByteArray(Charsets.UTF_8) val keyBytes = msg.key.toByteArray(Charsets.UTF_8)
response.headers().apply { response.headers().apply {
@@ -88,6 +96,8 @@ class ServerHandler(private val serverPrefix: Path) :
} }
is CacheValueNotFoundResponse -> { is CacheValueNotFoundResponse -> {
pipelinedRequests -= 1
ctx.fireUserEventTriggered(RequestCompletedEvent())
val response = DefaultFullHttpResponse(httpVersion, HttpResponseStatus.NOT_FOUND) val response = DefaultFullHttpResponse(httpVersion, HttpResponseStatus.NOT_FOUND)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0 response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
setKeepAliveHeader(response.headers()) setKeepAliveHeader(response.headers())
@@ -108,6 +118,8 @@ class ServerHandler(private val serverPrefix: Path) :
} }
is LastCacheContent -> { is LastCacheContent -> {
pipelinedRequests -= 1
ctx.fireUserEventTriggered(RequestCompletedEvent())
ctx.writeAndFlush(DefaultLastHttpContent(msg.content())) ctx.writeAndFlush(DefaultLastHttpContent(msg.content()))
} }
@@ -127,6 +139,10 @@ class ServerHandler(private val serverPrefix: Path) :
} finally { } finally {
resetRequestMetadata() resetRequestMetadata()
} }
} else if(msg is LastHttpContent) {
pipelinedRequests -= 1
ctx.fireUserEventTriggered(RequestCompletedEvent())
ctx.write(msg, promise)
} else super.write(ctx, msg, promise) } else super.write(ctx, msg, promise)
} }
@@ -139,7 +155,13 @@ class ServerHandler(private val serverPrefix: Path) :
if (path.startsWith(serverPrefix)) { if (path.startsWith(serverPrefix)) {
val relativePath = serverPrefix.relativize(path) val relativePath = serverPrefix.relativize(path)
val key = relativePath.toString() val key = relativePath.toString()
ctx.pipeline().addAfter(NAME, CacheContentHandler.NAME, CacheContentHandler) if(pipelinedRequests > 0) {
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, ResponseCapHandler())
}
val cacheHandler = cacheHandlerSupplier()
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, CacheContentHandler(cacheHandler))
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, cacheHandler)
pipelinedRequests += 1
key.let(::CacheGetRequest) key.let(::CacheGetRequest)
.let(ctx::fireChannelRead) .let(ctx::fireChannelRead)
?: ctx.channel().write(CacheValueNotFoundResponse()) ?: ctx.channel().write(CacheValueNotFoundResponse())
@@ -159,7 +181,14 @@ class ServerHandler(private val serverPrefix: Path) :
log.debug(ctx) { log.debug(ctx) {
"Added value for key '$key' to build cache" "Added value for key '$key' to build cache"
} }
ctx.pipeline().addAfter(NAME, CacheContentHandler.NAME, CacheContentHandler) if(pipelinedRequests > 0) {
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, ResponseCapHandler())
}
val cacheHandler = cacheHandlerSupplier()
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, CacheContentHandler(cacheHandler))
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, cacheHandler)
pipelinedRequests += 1
path.fileName?.toString() path.fileName?.toString()
?.let { ?.let {
val mimeType = HttpUtil.getMimeType(msg)?.toString() val mimeType = HttpUtil.getMimeType(msg)?.toString()
@@ -176,6 +205,11 @@ class ServerHandler(private val serverPrefix: Path) :
ctx.writeAndFlush(response) ctx.writeAndFlush(response)
} }
} else if (method == HttpMethod.TRACE) { } else if (method == HttpMethod.TRACE) {
if(pipelinedRequests > 0) {
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, ResponseCapHandler())
}
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, TraceHandler)
pipelinedRequests += 1
super.channelRead(ctx, msg) super.channelRead(ctx, msg)
} else { } else {
log.warn(ctx) { log.warn(ctx) {
@@ -187,42 +221,6 @@ class ServerHandler(private val serverPrefix: Path) :
} }
} }
data class ContentDisposition(val type: Type?, val fileName: String?) {
enum class Type {
attachment, `inline`;
companion object {
@JvmStatic
fun parse(maybeString: String?) = maybeString.let { s ->
try {
java.lang.Enum.valueOf(Type::class.java, s)
} catch (ex: IllegalArgumentException) {
null
}
}
}
}
companion object {
@JvmStatic
fun parse(contentDisposition: String) : ContentDisposition {
val parts = contentDisposition.split(";").dropLastWhile { it.isEmpty() }.toTypedArray()
val dispositionType = parts[0].trim { it <= ' ' }.let(Type::parse) // Get the type (e.g., attachment)
var filename: String? = null
for (i in 1..<parts.size) {
val part = parts[i].trim { it <= ' ' }
if (part.lowercase(Locale.getDefault()).startsWith("filename=")) {
filename = part.substring("filename=".length).trim { it <= ' ' }.replace("\"", "")
break
}
}
return ContentDisposition(dispositionType, filename)
}
}
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
super.exceptionCaught(ctx, cause) super.exceptionCaught(ctx, cause)
} }

View File

@@ -42,6 +42,7 @@ object TraceHandler : ChannelInboundHandlerAdapter() {
} }
is LastHttpContent -> { is LastHttpContent -> {
ctx.writeAndFlush(msg) ctx.writeAndFlush(msg)
ctx.pipeline().remove(this)
} }
is HttpContent -> ctx.writeAndFlush(msg) is HttpContent -> ctx.writeAndFlush(msg)
else -> super.channelRead(ctx, msg) else -> super.channelRead(ctx, msg)

View File

@@ -115,6 +115,14 @@
</xs:documentation> </xs:documentation>
</xs:annotation> </xs:annotation>
</xs:attribute> </xs:attribute>
<xs:attribute name="chunk-size" type="rbcs:byteSizeType" default="0x10000">
<xs:annotation>
<xs:documentation>
Maximum byte size of socket write calls
(reduce it to reduce memory consumption, increase it for increased throughput)
</xs:documentation>
</xs:annotation>
</xs:attribute>
</xs:complexType> </xs:complexType>
<xs:complexType name="eventExecutorType"> <xs:complexType name="eventExecutorType">
@@ -175,13 +183,6 @@
</xs:documentation> </xs:documentation>
</xs:annotation> </xs:annotation>
</xs:attribute> </xs:attribute>
<xs:attribute name="chunk-size" type="rbcs:byteSizeType" default="0x10000">
<xs:annotation>
<xs:documentation>
Maximum byte size of socket write calls
</xs:documentation>
</xs:annotation>
</xs:attribute>
</xs:extension> </xs:extension>
</xs:complexContent> </xs:complexContent>
</xs:complexType> </xs:complexType>
@@ -231,14 +232,6 @@
</xs:documentation> </xs:documentation>
</xs:annotation> </xs:annotation>
</xs:attribute> </xs:attribute>
<xs:attribute name="chunk-size" type="rbcs:byteSizeType" default="0x10000">
<xs:annotation>
<xs:documentation>
Maximum byte size of a cache value that will be stored in memory
(reduce it to reduce memory consumption, increase it for increased throughput)
</xs:documentation>
</xs:annotation>
</xs:attribute>
</xs:extension> </xs:extension>
</xs:complexContent> </xs:complexContent>
</xs:complexType> </xs:complexType>

View File

@@ -41,7 +41,8 @@ abstract class AbstractBasicAuthServerTest : AbstractServerTest() {
Duration.of(60, ChronoUnit.SECONDS), Duration.of(60, ChronoUnit.SECONDS),
Duration.of(30, ChronoUnit.SECONDS), Duration.of(30, ChronoUnit.SECONDS),
Duration.of(30, ChronoUnit.SECONDS), Duration.of(30, ChronoUnit.SECONDS),
0x1000 0x1000,
0x10000
), ),
users.asSequence().map { it.name to it}.toMap(), users.asSequence().map { it.name to it}.toMap(),
sequenceOf(writersGroup, readersGroup).map { it.name to it}.toMap(), sequenceOf(writersGroup, readersGroup).map { it.name to it}.toMap(),
@@ -50,8 +51,7 @@ abstract class AbstractBasicAuthServerTest : AbstractServerTest() {
maxAge = Duration.ofSeconds(3600 * 24), maxAge = Duration.ofSeconds(3600 * 24),
digestAlgorithm = "MD5", digestAlgorithm = "MD5",
compressionLevel = Deflater.DEFAULT_COMPRESSION, compressionLevel = Deflater.DEFAULT_COMPRESSION,
compressionEnabled = false, compressionEnabled = false
chunkSize = 0x1000
), ),
Configuration.BasicAuthentication(), Configuration.BasicAuthentication(),
null, null,

View File

@@ -147,7 +147,8 @@ abstract class AbstractTlsServerTest : AbstractServerTest() {
Duration.of(60, ChronoUnit.SECONDS), Duration.of(60, ChronoUnit.SECONDS),
Duration.of(30, ChronoUnit.SECONDS), Duration.of(30, ChronoUnit.SECONDS),
Duration.of(30, ChronoUnit.SECONDS), Duration.of(30, ChronoUnit.SECONDS),
0x1000 0x1000,
0x10000
), ),
users.asSequence().map { it.name to it }.toMap(), users.asSequence().map { it.name to it }.toMap(),
sequenceOf(writersGroup, readersGroup).map { it.name to it }.toMap(), sequenceOf(writersGroup, readersGroup).map { it.name to it }.toMap(),
@@ -156,7 +157,6 @@ abstract class AbstractTlsServerTest : AbstractServerTest() {
compressionEnabled = false, compressionEnabled = false,
compressionLevel = Deflater.DEFAULT_COMPRESSION, compressionLevel = Deflater.DEFAULT_COMPRESSION,
digestAlgorithm = "MD5", digestAlgorithm = "MD5",
chunkSize = 0x1000
), ),
// InMemoryCacheConfiguration( // InMemoryCacheConfiguration(
// maxAge = Duration.ofSeconds(3600 * 24), // maxAge = Duration.ofSeconds(3600 * 24),

View File

@@ -41,7 +41,8 @@ class NoAuthServerTest : AbstractServerTest() {
Duration.of(60, ChronoUnit.SECONDS), Duration.of(60, ChronoUnit.SECONDS),
Duration.of(30, ChronoUnit.SECONDS), Duration.of(30, ChronoUnit.SECONDS),
Duration.of(30, ChronoUnit.SECONDS), Duration.of(30, ChronoUnit.SECONDS),
0x1000 0x1000,
0x10000
), ),
emptyMap(), emptyMap(),
emptyMap(), emptyMap(),
@@ -51,7 +52,6 @@ class NoAuthServerTest : AbstractServerTest() {
digestAlgorithm = "MD5", digestAlgorithm = "MD5",
compressionLevel = Deflater.DEFAULT_COMPRESSION, compressionLevel = Deflater.DEFAULT_COMPRESSION,
maxSize = 0x1000000, maxSize = 0x1000000,
chunkSize = 0x1000
), ),
null, null,
null, null,

View File

@@ -166,4 +166,17 @@ class TlsServerTest : AbstractTlsServerTest() {
Assertions.assertEquals(HttpResponseStatus.OK.code(), response.statusCode()) Assertions.assertEquals(HttpResponseStatus.OK.code(), response.statusCode())
println(String(response.body())) println(String(response.body()))
} }
@Test
@Order(10)
fun putAsUnknownUserUser() {
val (key, value) = keyValuePair
val client: HttpClient = getHttpClient(getClientKeyStore(ca, X500Name("CN=Unknown user")))
val requestBuilder = newRequestBuilder(key)
.header("Content-Type", "application/octet-stream")
.PUT(HttpRequest.BodyPublishers.ofByteArray(value))
val response: HttpResponse<String> = client.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofString())
Assertions.assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), response.statusCode())
}
} }

View File

@@ -7,9 +7,10 @@
read-idle-timeout="PT10M" read-idle-timeout="PT10M"
write-idle-timeout="PT11M" write-idle-timeout="PT11M"
idle-timeout="PT30M" idle-timeout="PT30M"
max-request-size="101325"/> max-request-size="101325"
chunk-size="0xa910"/>
<event-executor use-virtual-threads="false"/> <event-executor use-virtual-threads="false"/>
<cache xs:type="rbcs:fileSystemCacheType" path="/tmp/rbcs" max-age="P7D" chunk-size="0xa910"/> <cache xs:type="rbcs:fileSystemCacheType" path="/tmp/rbcs" max-age="P7D"/>
<authentication> <authentication>
<none/> <none/>
</authentication> </authentication>

View File

@@ -9,9 +9,10 @@
max-request-size="67108864" max-request-size="67108864"
idle-timeout="PT30S" idle-timeout="PT30S"
read-idle-timeout="PT60S" read-idle-timeout="PT60S"
write-idle-timeout="PT60S"/> write-idle-timeout="PT60S"
chunk-size="123"/>
<event-executor use-virtual-threads="true"/> <event-executor use-virtual-threads="true"/>
<cache xs:type="rbcs-memcache:memcacheCacheType" max-age="P7D" chunk-size="123"> <cache xs:type="rbcs-memcache:memcacheCacheType" max-age="P7D">
<server host="memcached" port="11211"/> <server host="memcached" port="11211"/>
</cache> </cache>
<authorization> <authorization>

View File

@@ -8,9 +8,10 @@
read-idle-timeout="PT10M" read-idle-timeout="PT10M"
write-idle-timeout="PT11M" write-idle-timeout="PT11M"
idle-timeout="PT30M" idle-timeout="PT30M"
max-request-size="101325"/> max-request-size="101325"
chunk-size="456"/>
<event-executor use-virtual-threads="false"/> <event-executor use-virtual-threads="false"/>
<cache xs:type="rbcs-memcache:memcacheCacheType" max-age="P7D" digest="SHA-256" chunk-size="456" compression-mode="deflate" compression-level="7"> <cache xs:type="rbcs-memcache:memcacheCacheType" max-age="P7D" digest="SHA-256" compression-mode="deflate" compression-level="7">
<server host="127.0.0.1" port="11211" max-connections="10" connection-timeout="PT20S"/> <server host="127.0.0.1" port="11211" max-connections="10" connection-timeout="PT20S"/>
</cache> </cache>
<authentication> <authentication>

View File

@@ -7,9 +7,10 @@
read-idle-timeout="PT10M" read-idle-timeout="PT10M"
write-idle-timeout="PT11M" write-idle-timeout="PT11M"
idle-timeout="PT30M" idle-timeout="PT30M"
max-request-size="4096"/> max-request-size="4096"
chunk-size="0xa91f"/>
<event-executor use-virtual-threads="false"/> <event-executor use-virtual-threads="false"/>
<cache xs:type="rbcs:inMemoryCacheType" max-age="P7D" chunk-size="0xa91f"/> <cache xs:type="rbcs:inMemoryCacheType" max-age="P7D"/>
<authorization> <authorization>
<users> <users>
<user name="user1" password="password1"> <user name="user1" password="password1">