Compare commits

...

7 Commits

Author SHA1 Message Date
559ad5e528 fixed module-info.java
All checks were successful
CI / build (push) Successful in 6m36s
2025-06-13 20:46:35 +08:00
fd0bd1ee5f added optional key prefix to memcache backend 2025-06-13 17:45:15 +08:00
0e92998f16 downgraded toi GraalVM 23 because of bugs in GraalVM 24 2025-06-13 14:32:25 +08:00
9eef91ebba removed excessive logging 2025-06-13 14:16:57 +08:00
3416c327b9 updated GraalVM configuration 2025-06-13 14:09:38 +08:00
9bdaa0d32e optimize imports 2025-06-13 14:08:46 +08:00
206bcd6319 fixed bug with throttling handler when requests are delayed 2025-06-13 13:50:35 +08:00
68 changed files with 601 additions and 381 deletions

View File

@@ -2,7 +2,7 @@ org.gradle.configuration-cache=false
org.gradle.parallel=true org.gradle.parallel=true
org.gradle.caching=true org.gradle.caching=true
rbcs.version = 0.3.0-SNAPSHOT rbcs.version = 0.3.1
lys.version = 2025.06.10 lys.version = 2025.06.10

View File

@@ -1,9 +1,10 @@
package net.woggioni.rbcs.api; package net.woggioni.rbcs.api;
import java.io.Serializable;
import lombok.Getter; import lombok.Getter;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import java.io.Serializable;
@Getter @Getter
@RequiredArgsConstructor @RequiredArgsConstructor
public class CacheValueMetadata implements Serializable { public class CacheValueMetadata implements Serializable {

View File

@@ -1,15 +1,16 @@
package net.woggioni.rbcs.api; package net.woggioni.rbcs.api;
import lombok.EqualsAndHashCode;
import lombok.NonNull;
import lombok.Value;
import java.nio.file.Path; import java.nio.file.Path;
import java.security.cert.X509Certificate; import java.security.cert.X509Certificate;
import java.time.Duration; import java.time.Duration;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import lombok.EqualsAndHashCode;
import lombok.NonNull;
import lombok.Value;
@Value @Value
public class Configuration { public class Configuration {
@@ -20,6 +21,8 @@ public class Configuration {
@NonNull @NonNull
EventExecutor eventExecutor; EventExecutor eventExecutor;
@NonNull @NonNull
RateLimiter rateLimiter;
@NonNull
Connection connection; Connection connection;
Map<String, User> users; Map<String, User> users;
Map<String, Group> groups; Map<String, Group> groups;
@@ -27,6 +30,13 @@ public class Configuration {
Authentication authentication; Authentication authentication;
Tls tls; Tls tls;
@Value
public static class RateLimiter {
boolean delayRequest;
int messageBufferSize;
int maxQueuedMessages;
}
@Value @Value
public static class EventExecutor { public static class EventExecutor {
boolean useVirtualThreads; boolean useVirtualThreads;
@@ -133,6 +143,7 @@ public class Configuration {
int incomingConnectionsBacklogSize, int incomingConnectionsBacklogSize,
String serverPath, String serverPath,
EventExecutor eventExecutor, EventExecutor eventExecutor,
RateLimiter rateLimiter,
Connection connection, Connection connection,
Map<String, User> users, Map<String, User> users,
Map<String, Group> groups, Map<String, Group> groups,
@@ -146,6 +157,7 @@ public class Configuration {
incomingConnectionsBacklogSize, incomingConnectionsBacklogSize,
serverPath != null && !serverPath.isEmpty() && !serverPath.equals("/") ? serverPath : null, serverPath != null && !serverPath.isEmpty() && !serverPath.equals("/") ? serverPath : null,
eventExecutor, eventExecutor,
rateLimiter,
connection, connection,
users, users,
groups, groups,

View File

@@ -14,17 +14,26 @@ public sealed interface CacheMessage {
private final String key; private final String key;
} }
@Getter
@RequiredArgsConstructor
abstract sealed class CacheGetResponse implements CacheMessage { abstract sealed class CacheGetResponse implements CacheMessage {
private final String key;
} }
@Getter @Getter
@RequiredArgsConstructor
final class CacheValueFoundResponse extends CacheGetResponse { final class CacheValueFoundResponse extends CacheGetResponse {
private final String key;
private final CacheValueMetadata metadata; private final CacheValueMetadata metadata;
public CacheValueFoundResponse(String key, CacheValueMetadata metadata) {
super(key);
this.metadata = metadata;
}
} }
final class CacheValueNotFoundResponse extends CacheGetResponse { final class CacheValueNotFoundResponse extends CacheGetResponse {
public CacheValueNotFoundResponse(String key) {
super(key);
}
} }
@Getter @Getter

View File

@@ -9,15 +9,10 @@ plugins {
id 'maven-publish' id 'maven-publish'
} }
import net.woggioni.gradle.envelope.EnvelopePlugin
import net.woggioni.gradle.envelope.EnvelopeJarTask
import net.woggioni.gradle.graalvm.NativeImageConfigurationTask
import net.woggioni.gradle.graalvm.NativeImageTask
import net.woggioni.gradle.graalvm.NativeImagePlugin
import net.woggioni.gradle.graalvm.UpxTask
import net.woggioni.gradle.graalvm.JlinkPlugin
import net.woggioni.gradle.graalvm.JlinkTask
import net.woggioni.gradle.envelope.EnvelopeJarTask
import net.woggioni.gradle.envelope.EnvelopePlugin
import net.woggioni.gradle.graalvm.*
sourceSets { sourceSets {
configureNativeImage { configureNativeImage {

View File

@@ -46,6 +46,9 @@
{ {
"name":"com.github.luben.zstd.Zstd" "name":"com.github.luben.zstd.Zstd"
}, },
{
"name":"com.jcraft.jzlib.JZlib"
},
{ {
"name":"com.sun.crypto.provider.AESCipher$General", "name":"com.sun.crypto.provider.AESCipher$General",
"methods":[{"name":"<init>","parameterTypes":[] }] "methods":[{"name":"<init>","parameterTypes":[] }]
@@ -557,16 +560,17 @@
"name":"net.woggioni.rbcs.server.exception.ExceptionHandler", "name":"net.woggioni.rbcs.server.exception.ExceptionHandler",
"methods":[{"name":"exceptionCaught","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Throwable"] }] "methods":[{"name":"exceptionCaught","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Throwable"] }]
}, },
{
"name":"net.woggioni.rbcs.server.handler.BlackHoleRequestHandler"
},
{ {
"name":"net.woggioni.rbcs.server.handler.MaxRequestSizeHandler", "name":"net.woggioni.rbcs.server.handler.MaxRequestSizeHandler",
"methods":[{"name":"channelRead","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] }] "methods":[{"name":"channelRead","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] }]
}, },
{
"name":"net.woggioni.rbcs.server.handler.ReadTriggerDuplexHandler",
"methods":[{"name":"channelRead","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] }, {"name":"write","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object","io.netty.channel.ChannelPromise"] }]
},
{ {
"name":"net.woggioni.rbcs.server.handler.ServerHandler", "name":"net.woggioni.rbcs.server.handler.ServerHandler",
"methods":[{"name":"channelRead","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] }, {"name":"channelReadComplete","parameterTypes":["io.netty.channel.ChannelHandlerContext"] }, {"name":"exceptionCaught","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Throwable"] }, {"name":"write","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object","io.netty.channel.ChannelPromise"] }] "methods":[{"name":"channelRead","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] }, {"name":"exceptionCaught","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Throwable"] }, {"name":"write","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object","io.netty.channel.ChannelPromise"] }]
}, },
{ {
"name":"net.woggioni.rbcs.server.handler.TraceHandler", "name":"net.woggioni.rbcs.server.handler.TraceHandler",
@@ -609,6 +613,10 @@
"name":"sun.security.provider.DSA$SHA256withDSA", "name":"sun.security.provider.DSA$SHA256withDSA",
"methods":[{"name":"<init>","parameterTypes":[] }] "methods":[{"name":"<init>","parameterTypes":[] }]
}, },
{
"name":"sun.security.provider.JavaKeyStore$JKS",
"methods":[{"name":"<init>","parameterTypes":[] }]
},
{ {
"name":"sun.security.provider.MD5", "name":"sun.security.provider.MD5",
"methods":[{"name":"<init>","parameterTypes":[] }] "methods":[{"name":"<init>","parameterTypes":[] }]
@@ -697,14 +705,6 @@
"name":"sun.security.x509.CertificatePoliciesExtension", "name":"sun.security.x509.CertificatePoliciesExtension",
"methods":[{"name":"<init>","parameterTypes":["java.lang.Boolean","java.lang.Object"] }] "methods":[{"name":"<init>","parameterTypes":["java.lang.Boolean","java.lang.Object"] }]
}, },
{
"name":"sun.security.x509.ExtendedKeyUsageExtension",
"methods":[{"name":"<init>","parameterTypes":["java.lang.Boolean","java.lang.Object"] }]
},
{
"name":"sun.security.x509.IssuerAlternativeNameExtension",
"methods":[{"name":"<init>","parameterTypes":["java.lang.Boolean","java.lang.Object"] }]
},
{ {
"name":"sun.security.x509.KeyUsageExtension", "name":"sun.security.x509.KeyUsageExtension",
"methods":[{"name":"<init>","parameterTypes":["java.lang.Boolean","java.lang.Object"] }] "methods":[{"name":"<init>","parameterTypes":["java.lang.Boolean","java.lang.Object"] }]

View File

@@ -34,8 +34,6 @@
"pattern":"\\Qnet/woggioni/rbcs/client/schema/rbcs-client.xsd\\E" "pattern":"\\Qnet/woggioni/rbcs/client/schema/rbcs-client.xsd\\E"
}, { }, {
"pattern":"\\Qnet/woggioni/rbcs/server/memcache/schema/rbcs-memcache.xsd\\E" "pattern":"\\Qnet/woggioni/rbcs/server/memcache/schema/rbcs-memcache.xsd\\E"
}, {
"pattern":"\\Qnet/woggioni/rbcs/server/rbcs-default.xml\\E"
}, { }, {
"pattern":"\\Qnet/woggioni/rbcs/server/schema/rbcs-server.xsd\\E" "pattern":"\\Qnet/woggioni/rbcs/server/schema/rbcs-server.xsd\\E"
}]}, }]},

View File

@@ -1,5 +1,12 @@
package net.woggioni.rbcs.cli.graal package net.woggioni.rbcs.cli.graal
import java.io.ByteArrayInputStream
import java.net.URI
import java.nio.file.Path
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.concurrent.ExecutionException
import java.util.zip.Deflater
import net.woggioni.jwo.NullOutputStream import net.woggioni.jwo.NullOutputStream
import net.woggioni.rbcs.api.Configuration import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.api.Configuration.User import net.woggioni.rbcs.api.Configuration.User
@@ -9,6 +16,8 @@ import net.woggioni.rbcs.cli.impl.commands.BenchmarkCommand
import net.woggioni.rbcs.cli.impl.commands.GetCommand import net.woggioni.rbcs.cli.impl.commands.GetCommand
import net.woggioni.rbcs.cli.impl.commands.HealthCheckCommand import net.woggioni.rbcs.cli.impl.commands.HealthCheckCommand
import net.woggioni.rbcs.cli.impl.commands.PutCommand import net.woggioni.rbcs.cli.impl.commands.PutCommand
import net.woggioni.rbcs.client.Configuration as ClientConfiguration
import net.woggioni.rbcs.client.impl.Parser as ClientConfigurationParser
import net.woggioni.rbcs.common.HostAndPort import net.woggioni.rbcs.common.HostAndPort
import net.woggioni.rbcs.common.PasswordSecurity.hashPassword import net.woggioni.rbcs.common.PasswordSecurity.hashPassword
import net.woggioni.rbcs.common.RBCS import net.woggioni.rbcs.common.RBCS
@@ -18,15 +27,6 @@ import net.woggioni.rbcs.server.cache.FileSystemCacheConfiguration
import net.woggioni.rbcs.server.cache.InMemoryCacheConfiguration import net.woggioni.rbcs.server.cache.InMemoryCacheConfiguration
import net.woggioni.rbcs.server.configuration.Parser import net.woggioni.rbcs.server.configuration.Parser
import net.woggioni.rbcs.server.memcache.MemcacheCacheConfiguration import net.woggioni.rbcs.server.memcache.MemcacheCacheConfiguration
import java.io.ByteArrayInputStream
import java.net.URI
import java.nio.file.Path
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.concurrent.ExecutionException
import java.util.zip.Deflater
import net.woggioni.rbcs.client.Configuration as ClientConfiguration
import net.woggioni.rbcs.client.impl.Parser as ClientConfigurationParser
object GraalNativeImageConfiguration { object GraalNativeImageConfiguration {
@JvmStatic @JvmStatic
@@ -86,6 +86,7 @@ object GraalNativeImageConfiguration {
4) 4)
), ),
Duration.ofSeconds(60), Duration.ofSeconds(60),
"someCustomPrefix",
"MD5", "MD5",
null, null,
1, 1,
@@ -99,6 +100,9 @@ object GraalNativeImageConfiguration {
100, 100,
null, null,
Configuration.EventExecutor(true), Configuration.EventExecutor(true),
Configuration.RateLimiter(
false, 0x100000, 10
),
Configuration.Connection( Configuration.Connection(
Duration.ofSeconds(10), Duration.ofSeconds(10),
Duration.ofSeconds(15), Duration.ofSeconds(15),
@@ -113,23 +117,8 @@ object GraalNativeImageConfiguration {
null, null,
) )
MemcacheCacheConfiguration(
listOf(
MemcacheCacheConfiguration.Server(
HostAndPort("127.0.0.1", 11211),
1000,
4
)
),
Duration.ofSeconds(60),
"MD5",
null,
1,
)
val serverHandle = RemoteBuildCacheServer(serverConfiguration).run() val serverHandle = RemoteBuildCacheServer(serverConfiguration).run()
val clientProfile = ClientConfiguration.Profile( val clientProfile = ClientConfiguration.Profile(
URI.create("http://127.0.0.1:$serverPort/"), URI.create("http://127.0.0.1:$serverPort/"),
ClientConfiguration.Connection( ClientConfiguration.Connection(

View File

@@ -1,9 +1,9 @@
package net.woggioni.rbcs.cli.impl package net.woggioni.rbcs.cli.impl
import picocli.CommandLine
import java.util.jar.Attributes import java.util.jar.Attributes
import java.util.jar.JarFile import java.util.jar.JarFile
import java.util.jar.Manifest import java.util.jar.Manifest
import picocli.CommandLine
abstract class AbstractVersionProvider : CommandLine.IVersionProvider { abstract class AbstractVersionProvider : CommandLine.IVersionProvider {

View File

@@ -1,8 +1,8 @@
package net.woggioni.rbcs.cli.impl package net.woggioni.rbcs.cli.impl
import java.nio.file.Path
import net.woggioni.jwo.Application import net.woggioni.jwo.Application
import picocli.CommandLine import picocli.CommandLine
import java.nio.file.Path
abstract class RbcsCommand : Runnable { abstract class RbcsCommand : Runnable {

View File

@@ -1,5 +1,13 @@
package net.woggioni.rbcs.cli.impl.commands package net.woggioni.rbcs.cli.impl.commands
import java.security.SecureRandom
import java.time.Duration
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.Semaphore
import java.util.concurrent.atomic.AtomicLong
import kotlin.random.Random
import net.woggioni.jwo.JWO import net.woggioni.jwo.JWO
import net.woggioni.jwo.LongMath import net.woggioni.jwo.LongMath
import net.woggioni.rbcs.api.CacheValueMetadata import net.woggioni.rbcs.api.CacheValueMetadata
@@ -12,14 +20,6 @@ import net.woggioni.rbcs.common.debug
import net.woggioni.rbcs.common.error import net.woggioni.rbcs.common.error
import net.woggioni.rbcs.common.info import net.woggioni.rbcs.common.info
import picocli.CommandLine import picocli.CommandLine
import java.security.SecureRandom
import java.time.Duration
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.Semaphore
import java.util.concurrent.atomic.AtomicLong
import kotlin.random.Random
@CommandLine.Command( @CommandLine.Command(
name = "benchmark", name = "benchmark",
@@ -101,6 +101,7 @@ class BenchmarkCommand : RbcsCommand() {
"Starting retrieval" "Starting retrieval"
} }
if (entries.isNotEmpty()) { if (entries.isNotEmpty()) {
val errorCounter = AtomicLong(0)
val completionCounter = AtomicLong(0) val completionCounter = AtomicLong(0)
val semaphore = Semaphore(profile.maxConnections * 5) val semaphore = Semaphore(profile.maxConnections * 5)
val start = Instant.now() val start = Instant.now()
@@ -109,14 +110,20 @@ class BenchmarkCommand : RbcsCommand() {
if (it.hasNext()) { if (it.hasNext()) {
val entry = it.next() val entry = it.next()
semaphore.acquire() semaphore.acquire()
val future = client.get(entry.first).thenApply { val future = client.get(entry.first).handle { response, ex ->
if (it == null) { if(ex != null) {
errorCounter.incrementAndGet()
log.error(ex.message, ex)
} else if (response == null) {
errorCounter.incrementAndGet()
log.error { log.error {
"Missing entry for key '${entry.first}'" "Missing entry for key '${entry.first}'"
} }
} else if (!entry.second.contentEquals(it)) { } else if (!entry.second.contentEquals(response)) {
errorCounter.incrementAndGet()
log.error { log.error {
"Retrieved a value different from what was inserted for key '${entry.first}'" "Retrieved a value different from what was inserted for key '${entry.first}': " +
"expected '${JWO.bytesToHex(entry.second)}', got '${JWO.bytesToHex(response)}' instead"
} }
} }
} }
@@ -134,6 +141,12 @@ class BenchmarkCommand : RbcsCommand() {
} }
} }
val end = Instant.now() val end = Instant.now()
val errors = errorCounter.get()
val successfulRetrievals = entries.size - errors
val successRate = successfulRetrievals.toDouble() / entries.size
log.info {
"Successfully retrieved ${entries.size - errors}/${entries.size} (${String.format("%.1f", successRate * 100)}%)"
}
log.info { log.info {
val elapsed = Duration.between(start, end).toMillis() val elapsed = Duration.between(start, end).toMillis()
val opsPerSecond = String.format("%.2f", entries.size.toDouble() / elapsed * 1000) val opsPerSecond = String.format("%.2f", entries.size.toDouble() / elapsed * 1000)

View File

@@ -1,12 +1,12 @@
package net.woggioni.rbcs.cli.impl.commands package net.woggioni.rbcs.cli.impl.commands
import java.nio.file.Path
import net.woggioni.jwo.Application import net.woggioni.jwo.Application
import net.woggioni.rbcs.cli.impl.RbcsCommand import net.woggioni.rbcs.cli.impl.RbcsCommand
import net.woggioni.rbcs.client.Configuration import net.woggioni.rbcs.client.Configuration
import net.woggioni.rbcs.common.createLogger import net.woggioni.rbcs.common.createLogger
import net.woggioni.rbcs.common.debug import net.woggioni.rbcs.common.debug
import picocli.CommandLine import picocli.CommandLine
import java.nio.file.Path
@CommandLine.Command( @CommandLine.Command(
name = "client", name = "client",

View File

@@ -1,13 +1,13 @@
package net.woggioni.rbcs.cli.impl.commands package net.woggioni.rbcs.cli.impl.commands
import java.io.OutputStream
import java.nio.file.Files
import java.nio.file.Path
import net.woggioni.rbcs.cli.impl.RbcsCommand import net.woggioni.rbcs.cli.impl.RbcsCommand
import net.woggioni.rbcs.client.Configuration import net.woggioni.rbcs.client.Configuration
import net.woggioni.rbcs.client.RemoteBuildCacheClient import net.woggioni.rbcs.client.RemoteBuildCacheClient
import net.woggioni.rbcs.common.createLogger import net.woggioni.rbcs.common.createLogger
import picocli.CommandLine import picocli.CommandLine
import java.io.OutputStream
import java.nio.file.Files
import java.nio.file.Path
@CommandLine.Command( @CommandLine.Command(
name = "get", name = "get",

View File

@@ -1,12 +1,12 @@
package net.woggioni.rbcs.cli.impl.commands package net.woggioni.rbcs.cli.impl.commands
import java.security.SecureRandom
import kotlin.random.Random
import net.woggioni.rbcs.cli.impl.RbcsCommand import net.woggioni.rbcs.cli.impl.RbcsCommand
import net.woggioni.rbcs.client.Configuration import net.woggioni.rbcs.client.Configuration
import net.woggioni.rbcs.client.RemoteBuildCacheClient import net.woggioni.rbcs.client.RemoteBuildCacheClient
import net.woggioni.rbcs.common.createLogger import net.woggioni.rbcs.common.createLogger
import picocli.CommandLine import picocli.CommandLine
import java.security.SecureRandom
import kotlin.random.Random
@CommandLine.Command( @CommandLine.Command(
name = "health", name = "health",

View File

@@ -1,13 +1,13 @@
package net.woggioni.rbcs.cli.impl.commands package net.woggioni.rbcs.cli.impl.commands
import java.io.OutputStream
import java.io.OutputStreamWriter
import java.io.PrintWriter
import net.woggioni.jwo.UncloseableOutputStream import net.woggioni.jwo.UncloseableOutputStream
import net.woggioni.rbcs.cli.impl.RbcsCommand import net.woggioni.rbcs.cli.impl.RbcsCommand
import net.woggioni.rbcs.cli.impl.converters.OutputStreamConverter import net.woggioni.rbcs.cli.impl.converters.OutputStreamConverter
import net.woggioni.rbcs.common.PasswordSecurity.hashPassword import net.woggioni.rbcs.common.PasswordSecurity.hashPassword
import picocli.CommandLine import picocli.CommandLine
import java.io.OutputStream
import java.io.OutputStreamWriter
import java.io.PrintWriter
@CommandLine.Command( @CommandLine.Command(

View File

@@ -1,5 +1,9 @@
package net.woggioni.rbcs.cli.impl.commands package net.woggioni.rbcs.cli.impl.commands
import java.io.InputStream
import java.nio.file.Files
import java.nio.file.Path
import java.util.UUID
import net.woggioni.jwo.Hash import net.woggioni.jwo.Hash
import net.woggioni.jwo.JWO import net.woggioni.jwo.JWO
import net.woggioni.jwo.NullOutputStream import net.woggioni.jwo.NullOutputStream
@@ -9,10 +13,6 @@ import net.woggioni.rbcs.client.Configuration
import net.woggioni.rbcs.client.RemoteBuildCacheClient import net.woggioni.rbcs.client.RemoteBuildCacheClient
import net.woggioni.rbcs.common.createLogger import net.woggioni.rbcs.common.createLogger
import picocli.CommandLine import picocli.CommandLine
import java.io.InputStream
import java.nio.file.Files
import java.nio.file.Path
import java.util.UUID
@CommandLine.Command( @CommandLine.Command(
name = "put", name = "put",

View File

@@ -1,5 +1,10 @@
package net.woggioni.rbcs.cli.impl.commands package net.woggioni.rbcs.cli.impl.commands
import java.io.ByteArrayOutputStream
import java.nio.file.Files
import java.nio.file.Path
import java.time.Duration
import java.util.concurrent.TimeUnit
import net.woggioni.jwo.Application import net.woggioni.jwo.Application
import net.woggioni.jwo.JWO import net.woggioni.jwo.JWO
import net.woggioni.rbcs.cli.impl.RbcsCommand import net.woggioni.rbcs.cli.impl.RbcsCommand
@@ -10,11 +15,6 @@ import net.woggioni.rbcs.common.info
import net.woggioni.rbcs.server.RemoteBuildCacheServer import net.woggioni.rbcs.server.RemoteBuildCacheServer
import net.woggioni.rbcs.server.RemoteBuildCacheServer.Companion.DEFAULT_CONFIGURATION_URL import net.woggioni.rbcs.server.RemoteBuildCacheServer.Companion.DEFAULT_CONFIGURATION_URL
import picocli.CommandLine import picocli.CommandLine
import java.io.ByteArrayOutputStream
import java.nio.file.Files
import java.nio.file.Path
import java.time.Duration
import java.util.concurrent.TimeUnit
@CommandLine.Command( @CommandLine.Command(
name = "server", name = "server",

View File

@@ -1,7 +1,7 @@
package net.woggioni.rbcs.cli.impl.converters package net.woggioni.rbcs.cli.impl.converters
import picocli.CommandLine
import java.time.Duration import java.time.Duration
import picocli.CommandLine
class DurationConverter : CommandLine.ITypeConverter<Duration> { class DurationConverter : CommandLine.ITypeConverter<Duration> {

View File

@@ -1,9 +1,9 @@
package net.woggioni.rbcs.cli.impl.converters package net.woggioni.rbcs.cli.impl.converters
import picocli.CommandLine
import java.io.InputStream import java.io.InputStream
import java.nio.file.Files import java.nio.file.Files
import java.nio.file.Paths import java.nio.file.Paths
import picocli.CommandLine
class InputStreamConverter : CommandLine.ITypeConverter<InputStream> { class InputStreamConverter : CommandLine.ITypeConverter<InputStream> {

View File

@@ -1,9 +1,9 @@
package net.woggioni.rbcs.cli.impl.converters package net.woggioni.rbcs.cli.impl.converters
import picocli.CommandLine
import java.io.OutputStream import java.io.OutputStream
import java.nio.file.Files import java.nio.file.Files
import java.nio.file.Paths import java.nio.file.Paths
import picocli.CommandLine
class OutputStreamConverter : CommandLine.ITypeConverter<OutputStream> { class OutputStreamConverter : CommandLine.ITypeConverter<OutputStream> {

View File

@@ -1,13 +1,13 @@
package net.woggioni.rbcs.client package net.woggioni.rbcs.client
import net.woggioni.rbcs.client.impl.Parser
import net.woggioni.rbcs.common.Xml
import java.net.URI import java.net.URI
import java.nio.file.Files import java.nio.file.Files
import java.nio.file.Path import java.nio.file.Path
import java.security.PrivateKey import java.security.PrivateKey
import java.security.cert.X509Certificate import java.security.cert.X509Certificate
import java.time.Duration import java.time.Duration
import net.woggioni.rbcs.client.impl.Parser
import net.woggioni.rbcs.common.Xml
data class Configuration( data class Configuration(
val profiles: Map<String, Profile> val profiles: Map<String, Profile>

View File

@@ -152,7 +152,7 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
} }
val pipeline: ChannelPipeline = ch.pipeline() val pipeline: ChannelPipeline = ch.pipeline()
profile.connection?.also { conn -> profile.connection.also { conn ->
val readIdleTimeout = conn.readIdleTimeout.toMillis() val readIdleTimeout = conn.readIdleTimeout.toMillis()
val writeIdleTimeout = conn.writeIdleTimeout.toMillis() val writeIdleTimeout = conn.writeIdleTimeout.toMillis()
val idleTimeout = conn.idleTimeout.toMillis() val idleTimeout = conn.idleTimeout.toMillis()
@@ -295,7 +295,6 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
): CompletableFuture<FullHttpResponse> { ): CompletableFuture<FullHttpResponse> {
val responseFuture = CompletableFuture<FullHttpResponse>() val responseFuture = CompletableFuture<FullHttpResponse>()
// Custom handler for processing responses // Custom handler for processing responses
pool.acquire().addListener(object : GenericFutureListener<NettyFuture<Channel>> { pool.acquire().addListener(object : GenericFutureListener<NettyFuture<Channel>> {
override fun operationComplete(channelFuture: Future<Channel>) { override fun operationComplete(channelFuture: Future<Channel>) {
@@ -337,21 +336,15 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
override fun channelInactive(ctx: ChannelHandlerContext) { override fun channelInactive(ctx: ChannelHandlerContext) {
responseFuture.completeExceptionally(IOException("The remote server closed the connection")) responseFuture.completeExceptionally(IOException("The remote server closed the connection"))
if(!profile.connection.requestPipelining) {
pool.release(channel)
}
super.channelInactive(ctx) super.channelInactive(ctx)
pool.release(channel)
} }
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) { override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
if (evt is IdleStateEvent) { if (evt is IdleStateEvent) {
val te = when (evt.state()) { val te = when (evt.state()) {
IdleState.READER_IDLE -> TimeoutException( IdleState.READER_IDLE -> TimeoutException("Read timeout")
"Read timeout",
)
IdleState.WRITER_IDLE -> TimeoutException("Write timeout") IdleState.WRITER_IDLE -> TimeoutException("Write timeout")
IdleState.ALL_IDLE -> TimeoutException("Idle timeout") IdleState.ALL_IDLE -> TimeoutException("Idle timeout")
null -> throw IllegalStateException("This should never happen") null -> throw IllegalStateException("This should never happen")
} }

View File

@@ -1,10 +1,5 @@
package net.woggioni.rbcs.client.impl package net.woggioni.rbcs.client.impl
import net.woggioni.rbcs.api.exception.ConfigurationException
import net.woggioni.rbcs.client.Configuration
import net.woggioni.rbcs.common.Xml.Companion.asIterable
import net.woggioni.rbcs.common.Xml.Companion.renderAttribute
import org.w3c.dom.Document
import java.net.URI import java.net.URI
import java.nio.file.Files import java.nio.file.Files
import java.nio.file.Path import java.nio.file.Path
@@ -13,6 +8,11 @@ import java.security.PrivateKey
import java.security.cert.X509Certificate import java.security.cert.X509Certificate
import java.time.Duration import java.time.Duration
import java.time.temporal.ChronoUnit import java.time.temporal.ChronoUnit
import net.woggioni.rbcs.api.exception.ConfigurationException
import net.woggioni.rbcs.client.Configuration
import net.woggioni.rbcs.common.Xml.Companion.asIterable
import net.woggioni.rbcs.common.Xml.Companion.renderAttribute
import org.w3c.dom.Document
object Parser { object Parser {

View File

@@ -2,6 +2,9 @@ package net.woggioni.rbcs.client
import io.netty.util.concurrent.DefaultEventExecutorGroup import io.netty.util.concurrent.DefaultEventExecutorGroup
import io.netty.util.concurrent.EventExecutorGroup import io.netty.util.concurrent.EventExecutorGroup
import java.util.concurrent.CompletableFuture
import java.util.stream.Stream
import kotlin.random.Random
import net.woggioni.rbcs.common.contextLogger import net.woggioni.rbcs.common.contextLogger
import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.extension.ExtensionContext import org.junit.jupiter.api.extension.ExtensionContext
@@ -9,9 +12,6 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments import org.junit.jupiter.params.provider.Arguments
import org.junit.jupiter.params.provider.ArgumentsProvider import org.junit.jupiter.params.provider.ArgumentsProvider
import org.junit.jupiter.params.provider.ArgumentsSource import org.junit.jupiter.params.provider.ArgumentsSource
import java.util.concurrent.CompletableFuture
import java.util.stream.Stream
import kotlin.random.Random
class RetryTest { class RetryTest {

View File

@@ -2,14 +2,14 @@ package net.woggioni.rbcs.common
import io.netty.channel.Channel import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelHandlerContext
import java.nio.file.Files
import java.nio.file.Path
import java.util.logging.LogManager
import org.slf4j.Logger import org.slf4j.Logger
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import org.slf4j.MDC import org.slf4j.MDC
import org.slf4j.event.Level import org.slf4j.event.Level
import org.slf4j.spi.LoggingEventBuilder import org.slf4j.spi.LoggingEventBuilder
import java.nio.file.Files
import java.nio.file.Path
import java.util.logging.LogManager
inline fun <reified T> T.contextLogger() = LoggerFactory.getLogger(T::class.java) inline fun <reified T> T.contextLogger() = LoggerFactory.getLogger(T::class.java)
inline fun <reified T> createLogger() = LoggerFactory.getLogger(T::class.java) inline fun <reified T> createLogger() = LoggerFactory.getLogger(T::class.java)

View File

@@ -1,7 +1,5 @@
package net.woggioni.rbcs.common package net.woggioni.rbcs.common
import net.woggioni.jwo.JWO
import net.woggioni.jwo.Tuple2
import java.io.IOException import java.io.IOException
import java.net.InetAddress import java.net.InetAddress
import java.net.ServerSocket import java.net.ServerSocket
@@ -21,6 +19,8 @@ import java.security.cert.X509Certificate
import java.util.EnumSet import java.util.EnumSet
import javax.net.ssl.TrustManagerFactory import javax.net.ssl.TrustManagerFactory
import javax.net.ssl.X509TrustManager import javax.net.ssl.X509TrustManager
import net.woggioni.jwo.JWO
import net.woggioni.jwo.Tuple2
object RBCS { object RBCS {
fun String.toUrl(): URL = URL.of(URI(this), null) fun String.toUrl(): URL = URL.of(URI(this), null)
@@ -62,11 +62,18 @@ object RBCS {
return JWO.bytesToHex(digest(data, md)) return JWO.bytesToHex(digest(data, md))
} }
fun processCacheKey(key: String, digestAlgorithm: String?) = digestAlgorithm fun processCacheKey(key: String, keyPrefix: String?, digestAlgorithm: String?) : ByteArray {
val prefixedKey = if (keyPrefix == null) {
key
} else {
key + keyPrefix
}.toByteArray(Charsets.UTF_8)
return digestAlgorithm
?.let(MessageDigest::getInstance) ?.let(MessageDigest::getInstance)
?.let { md -> ?.let { md ->
digest(key.toByteArray(), md) digest(prefixedKey, md)
} ?: key.toByteArray(Charsets.UTF_8) } ?: prefixedKey
}
fun Long.toIntOrNull(): Int? { fun Long.toIntOrNull(): Int? {
return if (this >= Int.MIN_VALUE && this <= Int.MAX_VALUE) { return if (this >= Int.MIN_VALUE && this <= Int.MAX_VALUE) {

View File

@@ -1,14 +1,5 @@
package net.woggioni.rbcs.common package net.woggioni.rbcs.common
import net.woggioni.jwo.JWO
import org.slf4j.event.Level
import org.w3c.dom.Document
import org.w3c.dom.Element
import org.w3c.dom.Node
import org.w3c.dom.NodeList
import org.xml.sax.SAXNotRecognizedException
import org.xml.sax.SAXNotSupportedException
import org.xml.sax.SAXParseException
import java.io.InputStream import java.io.InputStream
import java.io.OutputStream import java.io.OutputStream
import java.net.URL import java.net.URL
@@ -25,7 +16,16 @@ import javax.xml.transform.stream.StreamResult
import javax.xml.transform.stream.StreamSource import javax.xml.transform.stream.StreamSource
import javax.xml.validation.Schema import javax.xml.validation.Schema
import javax.xml.validation.SchemaFactory import javax.xml.validation.SchemaFactory
import net.woggioni.jwo.JWO
import org.slf4j.event.Level
import org.w3c.dom.Document
import org.w3c.dom.Element
import org.w3c.dom.Node
import org.w3c.dom.NodeList
import org.xml.sax.ErrorHandler as ErrHandler import org.xml.sax.ErrorHandler as ErrHandler
import org.xml.sax.SAXNotRecognizedException
import org.xml.sax.SAXNotSupportedException
import org.xml.sax.SAXParseException
class NodeListIterator(private val nodeList: NodeList) : Iterator<Node> { class NodeListIterator(private val nodeList: NodeList) : Iterator<Node> {

View File

@@ -1,14 +1,14 @@
package net.woggioni.rbcs.common package net.woggioni.rbcs.common
import java.security.Provider
import java.security.Security
import java.util.Base64
import net.woggioni.rbcs.common.PasswordSecurity.decodePasswordHash import net.woggioni.rbcs.common.PasswordSecurity.decodePasswordHash
import net.woggioni.rbcs.common.PasswordSecurity.hashPassword import net.woggioni.rbcs.common.PasswordSecurity.hashPassword
import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.EnumSource import org.junit.jupiter.params.provider.EnumSource
import java.security.Provider
import java.security.Security
import java.util.Base64
class PasswordHashingTest { class PasswordHashingTest {

View File

@@ -5,21 +5,22 @@ import io.netty.channel.EventLoopGroup
import io.netty.channel.pool.FixedChannelPool import io.netty.channel.pool.FixedChannelPool
import io.netty.channel.socket.DatagramChannel import io.netty.channel.socket.DatagramChannel
import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.SocketChannel
import java.time.Duration
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
import net.woggioni.rbcs.api.CacheHandler import net.woggioni.rbcs.api.CacheHandler
import net.woggioni.rbcs.api.CacheHandlerFactory import net.woggioni.rbcs.api.CacheHandlerFactory
import net.woggioni.rbcs.api.Configuration import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.common.HostAndPort import net.woggioni.rbcs.common.HostAndPort
import net.woggioni.rbcs.common.createLogger import net.woggioni.rbcs.common.createLogger
import net.woggioni.rbcs.server.memcache.client.MemcacheClient import net.woggioni.rbcs.server.memcache.client.MemcacheClient
import java.time.Duration
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
data class MemcacheCacheConfiguration( data class MemcacheCacheConfiguration(
val servers: List<Server>, val servers: List<Server>,
val maxAge: Duration = Duration.ofDays(1), val maxAge: Duration = Duration.ofDays(1),
val keyPrefix : String? = null,
val digestAlgorithm: String? = null, val digestAlgorithm: String? = null,
val compressionMode: CompressionMode? = null, val compressionMode: CompressionMode? = null,
val compressionLevel: Int, val compressionLevel: Int,
@@ -60,6 +61,7 @@ data class MemcacheCacheConfiguration(
socketChannelFactory, socketChannelFactory,
connectionPoolMap connectionPoolMap
), ),
keyPrefix,
digestAlgorithm, digestAlgorithm,
compressionMode != null, compressionMode != null,
compressionLevel, compressionLevel,

View File

@@ -3,6 +3,7 @@ package net.woggioni.rbcs.server.memcache
import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.CompositeByteBuf import io.netty.buffer.CompositeByteBuf
import io.netty.channel.Channel as NettyChannel
import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.memcache.DefaultLastMemcacheContent import io.netty.handler.codec.memcache.DefaultLastMemcacheContent
import io.netty.handler.codec.memcache.DefaultMemcacheContent import io.netty.handler.codec.memcache.DefaultMemcacheContent
@@ -12,6 +13,21 @@ import io.netty.handler.codec.memcache.binary.BinaryMemcacheOpcodes
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus
import io.netty.handler.codec.memcache.binary.DefaultBinaryMemcacheRequest import io.netty.handler.codec.memcache.binary.DefaultBinaryMemcacheRequest
import java.io.ByteArrayOutputStream
import java.io.ObjectInputStream
import java.io.ObjectOutputStream
import java.nio.ByteBuffer
import java.nio.channels.Channels
import java.nio.channels.FileChannel
import java.nio.channels.ReadableByteChannel
import java.nio.file.Files
import java.nio.file.StandardOpenOption
import java.time.Duration
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.InflaterOutputStream
import net.woggioni.rbcs.api.CacheHandler import net.woggioni.rbcs.api.CacheHandler
import net.woggioni.rbcs.api.CacheValueMetadata import net.woggioni.rbcs.api.CacheValueMetadata
import net.woggioni.rbcs.api.exception.ContentTooLargeException import net.woggioni.rbcs.api.exception.ContentTooLargeException
@@ -34,25 +50,10 @@ import net.woggioni.rbcs.common.trace
import net.woggioni.rbcs.server.memcache.client.MemcacheClient import net.woggioni.rbcs.server.memcache.client.MemcacheClient
import net.woggioni.rbcs.server.memcache.client.MemcacheRequestController import net.woggioni.rbcs.server.memcache.client.MemcacheRequestController
import net.woggioni.rbcs.server.memcache.client.MemcacheResponseHandler import net.woggioni.rbcs.server.memcache.client.MemcacheResponseHandler
import java.io.ByteArrayOutputStream
import java.io.ObjectInputStream
import java.io.ObjectOutputStream
import java.nio.ByteBuffer
import java.nio.channels.Channels
import java.nio.channels.FileChannel
import java.nio.channels.ReadableByteChannel
import java.nio.file.Files
import java.nio.file.StandardOpenOption
import java.time.Duration
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.InflaterOutputStream
import io.netty.channel.Channel as NettyChannel
class MemcacheCacheHandler( class MemcacheCacheHandler(
private val client: MemcacheClient, private val client: MemcacheClient,
private val keyPrefix: String?,
private val digestAlgorithm: String?, private val digestAlgorithm: String?,
private val compressionEnabled: Boolean, private val compressionEnabled: Boolean,
private val compressionLevel: Int, private val compressionLevel: Int,
@@ -248,7 +249,7 @@ class MemcacheCacheHandler(
"Fetching ${msg.key} from memcache" "Fetching ${msg.key} from memcache"
} }
val key = ctx.alloc().buffer().also { val key = ctx.alloc().buffer().also {
it.writeBytes(processCacheKey(msg.key, digestAlgorithm)) it.writeBytes(processCacheKey(msg.key, keyPrefix, digestAlgorithm))
} }
val responseHandler = object : MemcacheResponseHandler { val responseHandler = object : MemcacheResponseHandler {
override fun responseReceived(response: BinaryMemcacheResponse) { override fun responseReceived(response: BinaryMemcacheResponse) {
@@ -265,7 +266,7 @@ class MemcacheCacheHandler(
log.debug(ctx) { log.debug(ctx) {
"Cache miss for key ${msg.key} on memcache" "Cache miss for key ${msg.key} on memcache"
} }
sendMessageAndFlush(ctx, CacheValueNotFoundResponse()) sendMessageAndFlush(ctx, CacheValueNotFoundResponse(msg.key))
} }
} }
} }
@@ -309,7 +310,7 @@ class MemcacheCacheHandler(
private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) { private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) {
val key = ctx.alloc().buffer().also { val key = ctx.alloc().buffer().also {
it.writeBytes(processCacheKey(msg.key, digestAlgorithm)) it.writeBytes(processCacheKey(msg.key, keyPrefix, digestAlgorithm))
} }
val responseHandler = object : MemcacheResponseHandler { val responseHandler = object : MemcacheResponseHandler {
override fun responseReceived(response: BinaryMemcacheResponse) { override fun responseReceived(response: BinaryMemcacheResponse) {

View File

@@ -1,5 +1,7 @@
package net.woggioni.rbcs.server.memcache package net.woggioni.rbcs.server.memcache
import java.time.Duration
import java.time.temporal.ChronoUnit
import net.woggioni.rbcs.api.CacheProvider import net.woggioni.rbcs.api.CacheProvider
import net.woggioni.rbcs.api.exception.ConfigurationException import net.woggioni.rbcs.api.exception.ConfigurationException
import net.woggioni.rbcs.common.HostAndPort import net.woggioni.rbcs.common.HostAndPort
@@ -9,8 +11,6 @@ import net.woggioni.rbcs.common.Xml.Companion.asIterable
import net.woggioni.rbcs.common.Xml.Companion.renderAttribute import net.woggioni.rbcs.common.Xml.Companion.renderAttribute
import org.w3c.dom.Document import org.w3c.dom.Document
import org.w3c.dom.Element import org.w3c.dom.Element
import java.time.Duration
import java.time.temporal.ChronoUnit
class MemcacheCacheProvider : CacheProvider<MemcacheCacheConfiguration> { class MemcacheCacheProvider : CacheProvider<MemcacheCacheConfiguration> {
@@ -38,6 +38,7 @@ class MemcacheCacheProvider : CacheProvider<MemcacheCacheConfiguration> {
else -> MemcacheCacheConfiguration.CompressionMode.DEFLATE else -> MemcacheCacheConfiguration.CompressionMode.DEFLATE
} }
} }
val keyPrefix = el.renderAttribute("key-prefix")
val digestAlgorithm = el.renderAttribute("digest") val digestAlgorithm = el.renderAttribute("digest")
for (child in el.asIterable()) { for (child in el.asIterable()) {
when (child.nodeName) { when (child.nodeName) {
@@ -54,10 +55,10 @@ class MemcacheCacheProvider : CacheProvider<MemcacheCacheConfiguration> {
} }
} }
} }
return MemcacheCacheConfiguration( return MemcacheCacheConfiguration(
servers, servers,
maxAge, maxAge,
keyPrefix,
digestAlgorithm, digestAlgorithm,
compressionMode, compressionMode,
compressionLevel compressionLevel
@@ -78,8 +79,12 @@ class MemcacheCacheProvider : CacheProvider<MemcacheCacheConfiguration> {
} }
attr("max-connections", server.maxConnections.toString()) attr("max-connections", server.maxConnections.toString())
} }
} }
attr("max-age", maxAge.toString()) attr("max-age", maxAge.toString())
keyPrefix?.let {
attr("key-prefix", it)
}
digestAlgorithm?.let { digestAlgorithm -> digestAlgorithm?.let { digestAlgorithm ->
attr("digest", digestAlgorithm) attr("digest", digestAlgorithm)
} }

View File

@@ -20,17 +20,17 @@ import io.netty.handler.codec.memcache.MemcacheObject
import io.netty.handler.codec.memcache.binary.BinaryMemcacheClientCodec import io.netty.handler.codec.memcache.binary.BinaryMemcacheClientCodec
import io.netty.handler.codec.memcache.binary.BinaryMemcacheRequest import io.netty.handler.codec.memcache.binary.BinaryMemcacheRequest
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
import io.netty.util.concurrent.Future as NettyFuture
import io.netty.util.concurrent.GenericFutureListener import io.netty.util.concurrent.GenericFutureListener
import java.io.IOException
import java.net.InetSocketAddress
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import net.woggioni.rbcs.common.HostAndPort import net.woggioni.rbcs.common.HostAndPort
import net.woggioni.rbcs.common.createLogger import net.woggioni.rbcs.common.createLogger
import net.woggioni.rbcs.common.trace import net.woggioni.rbcs.common.trace
import net.woggioni.rbcs.server.memcache.MemcacheCacheConfiguration import net.woggioni.rbcs.server.memcache.MemcacheCacheConfiguration
import net.woggioni.rbcs.server.memcache.MemcacheCacheHandler import net.woggioni.rbcs.server.memcache.MemcacheCacheHandler
import java.io.IOException
import java.net.InetSocketAddress
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import io.netty.util.concurrent.Future as NettyFuture
class MemcacheClient( class MemcacheClient(

View File

@@ -21,6 +21,14 @@
</xs:sequence> </xs:sequence>
<xs:attribute name="max-age" type="xs:duration" default="P1D"/> <xs:attribute name="max-age" type="xs:duration" default="P1D"/>
<xs:attribute name="chunk-size" type="rbcs:byteSizeType" default="0x10000"/> <xs:attribute name="chunk-size" type="rbcs:byteSizeType" default="0x10000"/>
<xs:attribute name="key-prefix" type="xs:string" use="optional">
<xs:annotation>
<xs:documentation>
Prepend this string to all the keys inserted in memcache,
useful in case the caching backend is shared with other applications
</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="digest" type="xs:token"/> <xs:attribute name="digest" type="xs:token"/>
<xs:attribute name="compression-mode" type="rbcs-memcache:compressionType"/> <xs:attribute name="compression-mode" type="rbcs-memcache:compressionType"/>
<xs:attribute name="compression-level" type="rbcs:compressionLevelType" default="-1"/> <xs:attribute name="compression-level" type="rbcs:compressionLevelType" default="-1"/>

View File

@@ -2,12 +2,12 @@ package net.woggioni.rbcs.server.memcache.client
import io.netty.buffer.ByteBufUtil import io.netty.buffer.ByteBufUtil
import io.netty.buffer.Unpooled import io.netty.buffer.Unpooled
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import java.io.ByteArrayInputStream import java.io.ByteArrayInputStream
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.nio.channels.Channels import java.nio.channels.Channels
import kotlin.random.Random import kotlin.random.Random
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
class ByteBufferTest { class ByteBufferTest {

View File

@@ -15,6 +15,7 @@ module net.woggioni.rbcs.server {
requires io.netty.transport; requires io.netty.transport;
requires io.netty.buffer; requires io.netty.buffer;
requires io.netty.common; requires io.netty.common;
requires io.netty.codec;
requires org.slf4j; requires org.slf4j;
exports net.woggioni.rbcs.server; exports net.woggioni.rbcs.server;

View File

@@ -72,8 +72,8 @@ import net.woggioni.rbcs.server.auth.RoleAuthorizer
import net.woggioni.rbcs.server.configuration.Parser import net.woggioni.rbcs.server.configuration.Parser
import net.woggioni.rbcs.server.configuration.Serializer import net.woggioni.rbcs.server.configuration.Serializer
import net.woggioni.rbcs.server.exception.ExceptionHandler import net.woggioni.rbcs.server.exception.ExceptionHandler
import net.woggioni.rbcs.server.handler.BlackHoleRequestHandler
import net.woggioni.rbcs.server.handler.MaxRequestSizeHandler import net.woggioni.rbcs.server.handler.MaxRequestSizeHandler
import net.woggioni.rbcs.server.handler.ReadTriggerDuplexHandler
import net.woggioni.rbcs.server.handler.ServerHandler import net.woggioni.rbcs.server.handler.ServerHandler
import net.woggioni.rbcs.server.throttling.BucketManager import net.woggioni.rbcs.server.throttling.BucketManager
import net.woggioni.rbcs.server.throttling.ThrottlingHandler import net.woggioni.rbcs.server.throttling.ThrottlingHandler
@@ -298,7 +298,7 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
"Closed connection ${ch.id().asShortText()} with ${ch.remoteAddress()}" "Closed connection ${ch.id().asShortText()} with ${ch.remoteAddress()}"
} }
} }
ch.config().setAutoRead(false) ch.config().isAutoRead = false
val pipeline = ch.pipeline() val pipeline = ch.pipeline()
cfg.connection.also { conn -> cfg.connection.also { conn ->
val readIdleTimeout = conn.readIdleTimeout.toMillis() val readIdleTimeout = conn.readIdleTimeout.toMillis()
@@ -345,13 +345,14 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
maxChunkSize = cfg.connection.chunkSize maxChunkSize = cfg.connection.chunkSize
} }
pipeline.addLast(HttpServerCodec(httpDecoderConfig)) pipeline.addLast(HttpServerCodec(httpDecoderConfig))
pipeline.addLast(ReadTriggerDuplexHandler.NAME, ReadTriggerDuplexHandler)
pipeline.addLast(MaxRequestSizeHandler.NAME, MaxRequestSizeHandler(cfg.connection.maxRequestSize)) pipeline.addLast(MaxRequestSizeHandler.NAME, MaxRequestSizeHandler(cfg.connection.maxRequestSize))
pipeline.addLast(HttpChunkContentCompressor(1024)) pipeline.addLast(HttpChunkContentCompressor(1024))
pipeline.addLast(ChunkedWriteHandler()) pipeline.addLast(ChunkedWriteHandler())
authenticator?.let { authenticator?.let {
pipeline.addLast(it) pipeline.addLast(it)
} }
pipeline.addLast(ThrottlingHandler(bucketManager, cfg.connection)) pipeline.addLast(ThrottlingHandler(bucketManager,cfg.rateLimiter, cfg.connection))
val serverHandler = let { val serverHandler = let {
val prefix = Path.of("/").resolve(Path.of(cfg.serverPath ?: "/")) val prefix = Path.of("/").resolve(Path.of(cfg.serverPath ?: "/"))
@@ -361,7 +362,6 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
} }
pipeline.addLast(ServerHandler.NAME, serverHandler) pipeline.addLast(ServerHandler.NAME, serverHandler)
pipeline.addLast(ExceptionHandler.NAME, ExceptionHandler) pipeline.addLast(ExceptionHandler.NAME, ExceptionHandler)
pipeline.addLast(BlackHoleRequestHandler.NAME, BlackHoleRequestHandler())
} }
override fun asyncClose() = cacheHandlerFactory.asyncClose() override fun asyncClose() = cacheHandlerFactory.asyncClose()

View File

@@ -1,9 +1,5 @@
package net.woggioni.rbcs.server.cache package net.woggioni.rbcs.server.cache
import net.woggioni.jwo.JWO
import net.woggioni.rbcs.api.AsyncCloseable
import net.woggioni.rbcs.api.CacheValueMetadata
import net.woggioni.rbcs.common.createLogger
import java.io.ByteArrayOutputStream import java.io.ByteArrayOutputStream
import java.io.InputStream import java.io.InputStream
import java.io.ObjectInputStream import java.io.ObjectInputStream
@@ -20,6 +16,10 @@ import java.nio.file.attribute.BasicFileAttributes
import java.time.Duration import java.time.Duration
import java.time.Instant import java.time.Instant
import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletableFuture
import net.woggioni.jwo.JWO
import net.woggioni.rbcs.api.AsyncCloseable
import net.woggioni.rbcs.api.CacheValueMetadata
import net.woggioni.rbcs.common.createLogger
class FileSystemCache( class FileSystemCache(
val root: Path, val root: Path,

View File

@@ -4,12 +4,12 @@ import io.netty.channel.ChannelFactory
import io.netty.channel.EventLoopGroup import io.netty.channel.EventLoopGroup
import io.netty.channel.socket.DatagramChannel import io.netty.channel.socket.DatagramChannel
import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.SocketChannel
import java.nio.file.Path
import java.time.Duration
import net.woggioni.jwo.Application import net.woggioni.jwo.Application
import net.woggioni.rbcs.api.CacheHandlerFactory import net.woggioni.rbcs.api.CacheHandlerFactory
import net.woggioni.rbcs.api.Configuration import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.common.RBCS import net.woggioni.rbcs.common.RBCS
import java.nio.file.Path
import java.time.Duration
data class FileSystemCacheConfiguration( data class FileSystemCacheConfiguration(
val root: Path?, val root: Path?,

View File

@@ -4,6 +4,11 @@ import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.http.LastHttpContent import io.netty.handler.codec.http.LastHttpContent
import io.netty.handler.stream.ChunkedNioFile import io.netty.handler.stream.ChunkedNioFile
import java.nio.channels.Channels
import java.util.Base64
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.InflaterInputStream
import net.woggioni.rbcs.api.CacheHandler import net.woggioni.rbcs.api.CacheHandler
import net.woggioni.rbcs.api.message.CacheMessage import net.woggioni.rbcs.api.message.CacheMessage
import net.woggioni.rbcs.api.message.CacheMessage.CacheContent import net.woggioni.rbcs.api.message.CacheMessage.CacheContent
@@ -14,11 +19,6 @@ import net.woggioni.rbcs.api.message.CacheMessage.CacheValueFoundResponse
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse
import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent
import net.woggioni.rbcs.common.RBCS.processCacheKey import net.woggioni.rbcs.common.RBCS.processCacheKey
import java.nio.channels.Channels
import java.util.Base64
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.InflaterInputStream
class FileSystemCacheHandler( class FileSystemCacheHandler(
private val cache: FileSystemCache, private val cache: FileSystemCache,
@@ -79,7 +79,7 @@ class FileSystemCacheHandler(
} }
private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) { private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) {
val key = String(Base64.getUrlEncoder().encode(processCacheKey(msg.key, digestAlgorithm))) val key = String(Base64.getUrlEncoder().encode(processCacheKey(msg.key, null, digestAlgorithm)))
val sink = cache.put(key, msg.metadata) val sink = cache.put(key, msg.metadata)
inProgressRequest = InProgressPutRequest(msg.key, sink) inProgressRequest = InProgressPutRequest(msg.key, sink)
} }
@@ -100,7 +100,7 @@ class FileSystemCacheHandler(
sendMessageAndFlush(ctx, CachePutResponse(request.key)) sendMessageAndFlush(ctx, CachePutResponse(request.key))
} }
is InProgressGetRequest -> { is InProgressGetRequest -> {
val key = String(Base64.getUrlEncoder().encode(processCacheKey(request.request.key, digestAlgorithm))) val key = String(Base64.getUrlEncoder().encode(processCacheKey(request.request.key, null, digestAlgorithm)))
cache.get(key)?.also { entryValue -> cache.get(key)?.also { entryValue ->
sendMessageAndFlush(ctx, CacheValueFoundResponse(request.request.key, entryValue.metadata)) sendMessageAndFlush(ctx, CacheValueFoundResponse(request.request.key, entryValue.metadata))
entryValue.channel.let { channel -> entryValue.channel.let { channel ->
@@ -125,7 +125,7 @@ class FileSystemCacheHandler(
sendMessageAndFlush(ctx, LastHttpContent.EMPTY_LAST_CONTENT) sendMessageAndFlush(ctx, LastHttpContent.EMPTY_LAST_CONTENT)
} }
} }
} ?: sendMessageAndFlush(ctx, CacheValueNotFoundResponse()) } ?: sendMessageAndFlush(ctx, CacheValueNotFoundResponse(key))
} }
} }
} }

View File

@@ -1,14 +1,14 @@
package net.woggioni.rbcs.server.cache package net.woggioni.rbcs.server.cache
import java.nio.file.Path
import java.time.Duration
import java.util.zip.Deflater
import net.woggioni.rbcs.api.CacheProvider import net.woggioni.rbcs.api.CacheProvider
import net.woggioni.rbcs.common.RBCS import net.woggioni.rbcs.common.RBCS
import net.woggioni.rbcs.common.Xml import net.woggioni.rbcs.common.Xml
import net.woggioni.rbcs.common.Xml.Companion.renderAttribute import net.woggioni.rbcs.common.Xml.Companion.renderAttribute
import org.w3c.dom.Document import org.w3c.dom.Document
import org.w3c.dom.Element import org.w3c.dom.Element
import java.nio.file.Path
import java.time.Duration
import java.util.zip.Deflater
class FileSystemCacheProvider : CacheProvider<FileSystemCacheConfiguration> { class FileSystemCacheProvider : CacheProvider<FileSystemCacheConfiguration> {

View File

@@ -1,9 +1,6 @@
package net.woggioni.rbcs.server.cache package net.woggioni.rbcs.server.cache
import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBuf
import net.woggioni.rbcs.api.AsyncCloseable
import net.woggioni.rbcs.api.CacheValueMetadata
import net.woggioni.rbcs.common.createLogger
import java.time.Duration import java.time.Duration
import java.time.Instant import java.time.Instant
import java.util.PriorityQueue import java.util.PriorityQueue
@@ -11,6 +8,9 @@ import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantReadWriteLock import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.concurrent.withLock import kotlin.concurrent.withLock
import net.woggioni.rbcs.api.AsyncCloseable
import net.woggioni.rbcs.api.CacheValueMetadata
import net.woggioni.rbcs.common.createLogger
private class CacheKey(private val value: ByteArray) { private class CacheKey(private val value: ByteArray) {
override fun equals(other: Any?) = if (other is CacheKey) { override fun equals(other: Any?) = if (other is CacheKey) {

View File

@@ -4,10 +4,10 @@ import io.netty.channel.ChannelFactory
import io.netty.channel.EventLoopGroup import io.netty.channel.EventLoopGroup
import io.netty.channel.socket.DatagramChannel import io.netty.channel.socket.DatagramChannel
import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.SocketChannel
import java.time.Duration
import net.woggioni.rbcs.api.CacheHandlerFactory import net.woggioni.rbcs.api.CacheHandlerFactory
import net.woggioni.rbcs.api.Configuration import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.common.RBCS import net.woggioni.rbcs.common.RBCS
import java.time.Duration
data class InMemoryCacheConfiguration( data class InMemoryCacheConfiguration(
val maxAge: Duration, val maxAge: Duration,

View File

@@ -2,14 +2,20 @@ package net.woggioni.rbcs.server.cache
import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelHandlerContext
import net.woggioni.rbcs.api.CacheHandler
import net.woggioni.rbcs.api.message.CacheMessage
import net.woggioni.rbcs.api.message.CacheMessage.*
import net.woggioni.rbcs.common.ByteBufOutputStream
import net.woggioni.rbcs.common.RBCS.processCacheKey
import java.util.zip.Deflater import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream import java.util.zip.DeflaterOutputStream
import java.util.zip.InflaterOutputStream import java.util.zip.InflaterOutputStream
import net.woggioni.rbcs.api.CacheHandler
import net.woggioni.rbcs.api.message.CacheMessage
import net.woggioni.rbcs.api.message.CacheMessage.CacheContent
import net.woggioni.rbcs.api.message.CacheMessage.CacheGetRequest
import net.woggioni.rbcs.api.message.CacheMessage.CachePutRequest
import net.woggioni.rbcs.api.message.CacheMessage.CachePutResponse
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueFoundResponse
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse
import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent
import net.woggioni.rbcs.common.ByteBufOutputStream
import net.woggioni.rbcs.common.RBCS.processCacheKey
class InMemoryCacheHandler( class InMemoryCacheHandler(
private val cache: InMemoryCache, private val cache: InMemoryCache,
@@ -105,7 +111,7 @@ class InMemoryCacheHandler(
handleCacheContent(ctx, msg) handleCacheContent(ctx, msg)
when (val req = inProgressRequest) { when (val req = inProgressRequest) {
is InProgressGetRequest -> { is InProgressGetRequest -> {
cache.get(processCacheKey(req.request.key, digestAlgorithm))?.let { value -> cache.get(processCacheKey(req.request.key, null, digestAlgorithm))?.let { value ->
sendMessageAndFlush(ctx, CacheValueFoundResponse(req.request.key, value.metadata)) sendMessageAndFlush(ctx, CacheValueFoundResponse(req.request.key, value.metadata))
if (compressionEnabled) { if (compressionEnabled) {
val buf = ctx.alloc().heapBuffer() val buf = ctx.alloc().heapBuffer()
@@ -118,14 +124,15 @@ class InMemoryCacheHandler(
} else { } else {
sendMessage(ctx, LastCacheContent(value.content)) sendMessage(ctx, LastCacheContent(value.content))
} }
} ?: sendMessage(ctx, CacheValueNotFoundResponse()) } ?: sendMessage(ctx, CacheValueNotFoundResponse(req.request.key))
} }
is InProgressPutRequest -> { is InProgressPutRequest -> {
this.inProgressRequest = null this.inProgressRequest = null
val buf = req.buf val buf = req.buf
buf.retain() buf.retain()
req.close() req.close()
val cacheKey = processCacheKey(req.request.key, digestAlgorithm) val cacheKey = processCacheKey(req.request.key, null, digestAlgorithm)
cache.put(cacheKey, CacheEntry(req.request.metadata, buf)) cache.put(cacheKey, CacheEntry(req.request.metadata, buf))
sendMessageAndFlush(ctx, CachePutResponse(req.request.key)) sendMessageAndFlush(ctx, CachePutResponse(req.request.key))
} }

View File

@@ -1,13 +1,13 @@
package net.woggioni.rbcs.server.cache package net.woggioni.rbcs.server.cache
import java.time.Duration
import java.util.zip.Deflater
import net.woggioni.rbcs.api.CacheProvider import net.woggioni.rbcs.api.CacheProvider
import net.woggioni.rbcs.common.RBCS import net.woggioni.rbcs.common.RBCS
import net.woggioni.rbcs.common.Xml import net.woggioni.rbcs.common.Xml
import net.woggioni.rbcs.common.Xml.Companion.renderAttribute import net.woggioni.rbcs.common.Xml.Companion.renderAttribute
import org.w3c.dom.Document import org.w3c.dom.Document
import org.w3c.dom.Element import org.w3c.dom.Element
import java.time.Duration
import java.util.zip.Deflater
class InMemoryCacheProvider : CacheProvider<InMemoryCacheConfiguration> { class InMemoryCacheProvider : CacheProvider<InMemoryCacheConfiguration> {

View File

@@ -1,8 +1,8 @@
package net.woggioni.rbcs.server.configuration package net.woggioni.rbcs.server.configuration
import java.util.ServiceLoader
import net.woggioni.rbcs.api.CacheProvider import net.woggioni.rbcs.api.CacheProvider
import net.woggioni.rbcs.api.Configuration import net.woggioni.rbcs.api.Configuration
import java.util.ServiceLoader
object CacheSerializers { object CacheSerializers {
val index = (Configuration::class.java.module.layer?.let { layer -> val index = (Configuration::class.java.module.layer?.let { layer ->

View File

@@ -1,5 +1,8 @@
package net.woggioni.rbcs.server.configuration package net.woggioni.rbcs.server.configuration
import java.nio.file.Paths
import java.time.Duration
import java.time.temporal.ChronoUnit
import net.woggioni.rbcs.api.Configuration import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.api.Configuration.Authentication import net.woggioni.rbcs.api.Configuration.Authentication
import net.woggioni.rbcs.api.Configuration.BasicAuthentication import net.woggioni.rbcs.api.Configuration.BasicAuthentication
@@ -18,9 +21,6 @@ import net.woggioni.rbcs.common.Xml.Companion.renderAttribute
import org.w3c.dom.Document import org.w3c.dom.Document
import org.w3c.dom.Element import org.w3c.dom.Element
import org.w3c.dom.TypeInfo import org.w3c.dom.TypeInfo
import java.nio.file.Paths
import java.time.Duration
import java.time.temporal.ChronoUnit
object Parser { object Parser {
fun parse(document: Document): Configuration { fun parse(document: Document): Configuration {
@@ -33,6 +33,7 @@ object Parser {
0x4000000, 0x4000000,
0x10000 0x10000
) )
var rateLimiter = Configuration.RateLimiter(false, 0x100000, 100)
var eventExecutor: Configuration.EventExecutor = Configuration.EventExecutor(true) var eventExecutor: Configuration.EventExecutor = Configuration.EventExecutor(true)
var cache: Cache? = null var cache: Cache? = null
var host = "127.0.0.1" var host = "127.0.0.1"
@@ -132,11 +133,24 @@ object Parser {
} }
"event-executor" -> { "event-executor" -> {
val useVirtualThread = root.renderAttribute("use-virtual-threads") val useVirtualThread = child.renderAttribute("use-virtual-threads")
?.let(String::toBoolean) ?: true ?.let(String::toBoolean) ?: true
eventExecutor = Configuration.EventExecutor(useVirtualThread) eventExecutor = Configuration.EventExecutor(useVirtualThread)
} }
"rate-limiter" -> {
val delayResponse = child.renderAttribute("delay-response")
?.let(String::toBoolean)
?: false
val messageBufferSize = child.renderAttribute("message-buffer-size")
?.let(Integer::decode)
?: 0x100000
val maxQueuedMessages = child.renderAttribute("max-queued-messages")
?.let(Integer::decode)
?: 100
rateLimiter = Configuration.RateLimiter(delayResponse, messageBufferSize, maxQueuedMessages)
}
"tls" -> { "tls" -> {
var keyStore: KeyStore? = null var keyStore: KeyStore? = null
var trustStore: TrustStore? = null var trustStore: TrustStore? = null
@@ -184,6 +198,7 @@ object Parser {
incomingConnectionsBacklogSize, incomingConnectionsBacklogSize,
serverPath, serverPath,
eventExecutor, eventExecutor,
rateLimiter,
connection, connection,
users, users,
groups, groups,

View File

@@ -46,6 +46,11 @@ object Serializer {
node("event-executor") { node("event-executor") {
attr("use-virtual-threads", conf.eventExecutor.isUseVirtualThreads.toString()) attr("use-virtual-threads", conf.eventExecutor.isUseVirtualThreads.toString())
} }
node("rate-limiter") {
attr("delay-response", conf.rateLimiter.isDelayRequest.toString())
attr("max-queued-messages", conf.rateLimiter.maxQueuedMessages.toString())
attr("message-buffer-size", conf.rateLimiter.messageBufferSize.toString())
}
val cache = conf.cache val cache = conf.cache
val serializer : CacheProvider<Configuration.Cache> = val serializer : CacheProvider<Configuration.Cache> =
(CacheSerializers.index[cache.namespaceURI to cache.typeName] as? CacheProvider<Configuration.Cache>) ?: throw NotImplementedError() (CacheSerializers.index[cache.namespaceURI to cache.typeName] as? CacheProvider<Configuration.Cache>) ?: throw NotImplementedError()

View File

@@ -13,6 +13,10 @@ import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpVersion import io.netty.handler.codec.http.HttpVersion
import io.netty.handler.timeout.ReadTimeoutException import io.netty.handler.timeout.ReadTimeoutException
import io.netty.handler.timeout.WriteTimeoutException import io.netty.handler.timeout.WriteTimeoutException
import java.net.ConnectException
import java.net.SocketException
import javax.net.ssl.SSLException
import javax.net.ssl.SSLPeerUnverifiedException
import net.woggioni.rbcs.api.exception.CacheException import net.woggioni.rbcs.api.exception.CacheException
import net.woggioni.rbcs.api.exception.ContentTooLargeException import net.woggioni.rbcs.api.exception.ContentTooLargeException
import net.woggioni.rbcs.common.contextLogger import net.woggioni.rbcs.common.contextLogger
@@ -20,10 +24,6 @@ import net.woggioni.rbcs.common.debug
import net.woggioni.rbcs.common.log import net.woggioni.rbcs.common.log
import org.slf4j.event.Level import org.slf4j.event.Level
import org.slf4j.spi.LoggingEventBuilder import org.slf4j.spi.LoggingEventBuilder
import java.net.ConnectException
import java.net.SocketException
import javax.net.ssl.SSLException
import javax.net.ssl.SSLPeerUnverifiedException
@Sharable @Sharable
object ExceptionHandler : ChannelDuplexHandler() { object ExceptionHandler : ChannelDuplexHandler() {

View File

@@ -0,0 +1,34 @@
package net.woggioni.rbcs.server.handler
import io.netty.channel.ChannelDuplexHandler
import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelPromise
import io.netty.handler.codec.http.LastHttpContent
@Sharable
object ReadTriggerDuplexHandler : ChannelDuplexHandler() {
val NAME = ReadTriggerDuplexHandler::class.java.name
override fun handlerAdded(ctx: ChannelHandlerContext) {
ctx.read()
}
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
super.channelRead(ctx, msg)
if(msg !is LastHttpContent) {
ctx.read()
}
}
override fun write(
ctx: ChannelHandlerContext,
msg: Any,
promise: ChannelPromise
) {
super.write(ctx, msg, promise)
if(msg is LastHttpContent) {
ctx.read()
}
}
}

View File

@@ -18,6 +18,7 @@ import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpUtil import io.netty.handler.codec.http.HttpUtil
import io.netty.handler.codec.http.HttpVersion import io.netty.handler.codec.http.HttpVersion
import io.netty.handler.codec.http.LastHttpContent import io.netty.handler.codec.http.LastHttpContent
import java.nio.file.Path
import net.woggioni.rbcs.api.CacheValueMetadata import net.woggioni.rbcs.api.CacheValueMetadata
import net.woggioni.rbcs.api.message.CacheMessage import net.woggioni.rbcs.api.message.CacheMessage
import net.woggioni.rbcs.api.message.CacheMessage.CacheContent import net.woggioni.rbcs.api.message.CacheMessage.CacheContent
@@ -31,7 +32,6 @@ import net.woggioni.rbcs.common.createLogger
import net.woggioni.rbcs.common.debug import net.woggioni.rbcs.common.debug
import net.woggioni.rbcs.common.warn import net.woggioni.rbcs.common.warn
import net.woggioni.rbcs.server.exception.ExceptionHandler import net.woggioni.rbcs.server.exception.ExceptionHandler
import java.nio.file.Path
class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupplier : () -> ChannelHandler) : class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupplier : () -> ChannelHandler) :
ChannelDuplexHandler() { ChannelDuplexHandler() {
@@ -43,16 +43,6 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp
private var httpVersion = HttpVersion.HTTP_1_1 private var httpVersion = HttpVersion.HTTP_1_1
private var keepAlive = true private var keepAlive = true
private var pipelinedRequests = 0
private fun newRequest() {
pipelinedRequests += 1
}
private fun requestCompleted(ctx : ChannelHandlerContext) {
pipelinedRequests -= 1
if(pipelinedRequests == 0) ctx.read()
}
private fun resetRequestMetadata() { private fun resetRequestMetadata() {
httpVersion = HttpVersion.HTTP_1_1 httpVersion = HttpVersion.HTTP_1_1
@@ -74,10 +64,6 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp
private var cacheRequestInProgress : Boolean = false private var cacheRequestInProgress : Boolean = false
override fun handlerAdded(ctx: ChannelHandlerContext) {
ctx.read()
}
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) { override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
when (msg) { when (msg) {
is HttpRequest -> handleRequest(ctx, msg) is HttpRequest -> handleRequest(ctx, msg)
@@ -98,18 +84,14 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp
} }
} }
override fun channelReadComplete(ctx: ChannelHandlerContext) {
super.channelReadComplete(ctx)
if(cacheRequestInProgress) {
ctx.read()
}
}
override fun write(ctx: ChannelHandlerContext, msg: Any, promise: ChannelPromise?) { override fun write(ctx: ChannelHandlerContext, msg: Any, promise: ChannelPromise?) {
if (msg is CacheMessage) { if (msg is CacheMessage) {
try { try {
when (msg) { when (msg) {
is CachePutResponse -> { is CachePutResponse -> {
log.debug(ctx) {
"Added value for key '${msg.key}' to build cache"
}
val response = DefaultFullHttpResponse(httpVersion, HttpResponseStatus.CREATED) val response = DefaultFullHttpResponse(httpVersion, HttpResponseStatus.CREATED)
val keyBytes = msg.key.toByteArray(Charsets.UTF_8) val keyBytes = msg.key.toByteArray(Charsets.UTF_8)
response.headers().apply { response.headers().apply {
@@ -121,21 +103,23 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp
val buf = ctx.alloc().buffer(keyBytes.size).apply { val buf = ctx.alloc().buffer(keyBytes.size).apply {
writeBytes(keyBytes) writeBytes(keyBytes)
} }
ctx.writeAndFlush(DefaultLastHttpContent(buf)).also { ctx.writeAndFlush(DefaultLastHttpContent(buf))
requestCompleted(ctx)
}
} }
is CacheValueNotFoundResponse -> { is CacheValueNotFoundResponse -> {
log.debug(ctx) {
"Value not found for key '${msg.key}'"
}
val response = DefaultFullHttpResponse(httpVersion, HttpResponseStatus.NOT_FOUND) val response = DefaultFullHttpResponse(httpVersion, HttpResponseStatus.NOT_FOUND)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0 response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
setKeepAliveHeader(response.headers()) setKeepAliveHeader(response.headers())
ctx.writeAndFlush(response).also { ctx.writeAndFlush(response)
requestCompleted(ctx)
}
} }
is CacheValueFoundResponse -> { is CacheValueFoundResponse -> {
log.debug(ctx) {
"Retrieved value for key '${msg.key}'"
}
val response = DefaultHttpResponse(httpVersion, HttpResponseStatus.OK) val response = DefaultHttpResponse(httpVersion, HttpResponseStatus.OK)
response.headers().apply { response.headers().apply {
set(HttpHeaderNames.CONTENT_TYPE, msg.metadata.mimeType ?: HttpHeaderValues.APPLICATION_OCTET_STREAM) set(HttpHeaderNames.CONTENT_TYPE, msg.metadata.mimeType ?: HttpHeaderValues.APPLICATION_OCTET_STREAM)
@@ -149,9 +133,7 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp
} }
is LastCacheContent -> { is LastCacheContent -> {
ctx.writeAndFlush(DefaultLastHttpContent(msg.content())).also { ctx.writeAndFlush(DefaultLastHttpContent(msg.content()))
requestCompleted(ctx)
}
} }
is CacheContent -> { is CacheContent -> {
@@ -172,7 +154,6 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp
} }
} else if(msg is LastHttpContent) { } else if(msg is LastHttpContent) {
ctx.write(msg, promise) ctx.write(msg, promise)
requestCompleted(ctx)
} else super.write(ctx, msg, promise) } else super.write(ctx, msg, promise)
} }
@@ -186,13 +167,13 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp
cacheRequestInProgress = true cacheRequestInProgress = true
val relativePath = serverPrefix.relativize(path) val relativePath = serverPrefix.relativize(path)
val key : String = relativePath.toString() val key : String = relativePath.toString()
newRequest()
val cacheHandler = cacheHandlerSupplier() val cacheHandler = cacheHandlerSupplier()
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, cacheHandler) ctx.pipeline().addBefore(ExceptionHandler.NAME, null, cacheHandler)
key.let(::CacheGetRequest) key.let(::CacheGetRequest)
.let(ctx::fireChannelRead) .let(ctx::fireChannelRead)
?: ctx.channel().write(CacheValueNotFoundResponse()) ?: ctx.channel().write(CacheValueNotFoundResponse(key))
} else { } else {
cacheRequestInProgress = false
log.warn(ctx) { log.warn(ctx) {
"Got request for unhandled path '${msg.uri()}'" "Got request for unhandled path '${msg.uri()}'"
} }
@@ -206,12 +187,8 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp
cacheRequestInProgress = true cacheRequestInProgress = true
val relativePath = serverPrefix.relativize(path) val relativePath = serverPrefix.relativize(path)
val key = relativePath.toString() val key = relativePath.toString()
log.debug(ctx) {
"Added value for key '$key' to build cache"
}
newRequest()
val cacheHandler = cacheHandlerSupplier() val cacheHandler = cacheHandlerSupplier()
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, cacheHandler) ctx.pipeline().addAfter(NAME, null, cacheHandler)
path.fileName?.toString() path.fileName?.toString()
?.let { ?.let {
@@ -219,8 +196,9 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp
CachePutRequest(key, CacheValueMetadata(msg.headers().get(HttpHeaderNames.CONTENT_DISPOSITION), mimeType)) CachePutRequest(key, CacheValueMetadata(msg.headers().get(HttpHeaderNames.CONTENT_DISPOSITION), mimeType))
} }
?.let(ctx::fireChannelRead) ?.let(ctx::fireChannelRead)
?: ctx.channel().write(CacheValueNotFoundResponse()) ?: ctx.channel().write(CacheValueNotFoundResponse(key))
} else { } else {
cacheRequestInProgress = false
log.warn(ctx) { log.warn(ctx) {
"Got request for unhandled path '${msg.uri()}'" "Got request for unhandled path '${msg.uri()}'"
} }
@@ -229,10 +207,11 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp
ctx.writeAndFlush(response) ctx.writeAndFlush(response)
} }
} else if (method == HttpMethod.TRACE) { } else if (method == HttpMethod.TRACE) {
newRequest() cacheRequestInProgress = false
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, TraceHandler) ctx.pipeline().addAfter(NAME, null, TraceHandler)
super.channelRead(ctx, msg) super.channelRead(ctx, msg)
} else { } else {
cacheRequestInProgress = false
log.warn(ctx) { log.warn(ctx) {
"Got request with unhandled method '${msg.method().name()}'" "Got request with unhandled method '${msg.method().name()}'"
} }

View File

@@ -1,11 +1,11 @@
package net.woggioni.rbcs.server.throttling package net.woggioni.rbcs.server.throttling
import net.woggioni.jwo.Bucket
import net.woggioni.rbcs.api.Configuration
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.util.Arrays import java.util.Arrays
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.function.Function import java.util.function.Function
import net.woggioni.jwo.Bucket
import net.woggioni.rbcs.api.Configuration
class BucketManager private constructor( class BucketManager private constructor(
private val bucketsByUser: Map<Configuration.User, List<Bucket>> = HashMap(), private val bucketsByUser: Map<Configuration.User, List<Bucket>> = HashMap(),

View File

@@ -1,32 +1,50 @@
package net.woggioni.rbcs.server.throttling package net.woggioni.rbcs.server.throttling
import io.netty.buffer.ByteBufHolder
import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.handler.codec.http.DefaultFullHttpResponse import io.netty.handler.codec.http.DefaultFullHttpResponse
import io.netty.handler.codec.http.HttpContent import io.netty.handler.codec.http.FullHttpMessage
import io.netty.handler.codec.http.HttpHeaderNames import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpRequest import io.netty.handler.codec.http.HttpRequest
import io.netty.handler.codec.http.HttpResponseStatus import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpVersion import io.netty.handler.codec.http.HttpVersion
import io.netty.handler.codec.http.LastHttpContent
import java.net.InetSocketAddress
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.ArrayDeque
import java.util.concurrent.TimeUnit
import net.woggioni.jwo.Bucket import net.woggioni.jwo.Bucket
import net.woggioni.jwo.LongMath import net.woggioni.jwo.LongMath
import net.woggioni.rbcs.api.Configuration import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.common.createLogger import net.woggioni.rbcs.common.createLogger
import net.woggioni.rbcs.common.debug
import net.woggioni.rbcs.server.RemoteBuildCacheServer import net.woggioni.rbcs.server.RemoteBuildCacheServer
import java.net.InetSocketAddress
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeUnit
class ThrottlingHandler(
class ThrottlingHandler(private val bucketManager : BucketManager, private val bucketManager: BucketManager,
private val connectionConfiguration : Configuration.Connection) : ChannelInboundHandlerAdapter() { rateLimiterConfiguration: Configuration.RateLimiter,
connectionConfiguration: Configuration.Connection
) : ChannelInboundHandlerAdapter() {
private companion object { private companion object {
private val log = createLogger<ThrottlingHandler>() private val log = createLogger<ThrottlingHandler>()
fun nextAttemptIsWithinThreshold(nextAttemptNanos : Long, waitThreshold : Duration) : Boolean {
val waitDuration = Duration.of(LongMath.ceilDiv(nextAttemptNanos, 100_000_000L) * 100L, ChronoUnit.MILLIS)
return waitDuration < waitThreshold
}
} }
private var queuedContent : MutableList<HttpContent>? = null private class RefusedRequest
private val maxMessageBufferSize = rateLimiterConfiguration.messageBufferSize
private val maxQueuedMessages = rateLimiterConfiguration.maxQueuedMessages
private val delayRequests = rateLimiterConfiguration.isDelayRequest
private var requestBufferSize : Int = 0
private var valveClosed = false
private var queuedContent = ArrayDeque<Any>()
/** /**
* If the suggested waiting time from the bucket is lower than this * If the suggested waiting time from the bucket is lower than this
@@ -39,10 +57,134 @@ class ThrottlingHandler(private val bucketManager : BucketManager,
connectionConfiguration.writeIdleTimeout connectionConfiguration.writeIdleTimeout
).dividedBy(2) ).dividedBy(2)
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) { override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
if(msg is HttpRequest) { if(valveClosed) {
if(msg !is HttpRequest && msg is ByteBufHolder) {
val newBufferSize = requestBufferSize + msg.content().readableBytes()
if(newBufferSize > maxMessageBufferSize || queuedContent.size + 1 > maxQueuedMessages) {
log.debug {
if (newBufferSize > maxMessageBufferSize) {
"New message part exceeds maxMessageBufferSize, removing previous chunks"
} else {
"New message part exceeds maxQueuedMessages, removing previous chunks"
}
}
// If this message overflows the maxMessageBufferSize,
// then remove the previously enqueued chunks of the request from the deque,
// then discard the message
while(true) {
val tail = queuedContent.last()
if(tail is ByteBufHolder) {
requestBufferSize -= tail.content().readableBytes()
tail.release()
}
queuedContent.removeLast()
if(tail is HttpRequest) {
break
}
}
msg.release()
//Add a placeholder to remember to return a 429 response corresponding to this request
queuedContent.addLast(RefusedRequest())
} else {
//If the message does not overflow maxMessageBufferSize, just add it to the deque
queuedContent.addLast(msg)
requestBufferSize = newBufferSize
}
} else if(msg is HttpRequest && msg is FullHttpMessage){
val newBufferSize = requestBufferSize + msg.content().readableBytes()
// If this message overflows the maxMessageBufferSize,
// discard the message
if(newBufferSize > maxMessageBufferSize || queuedContent.size + 1 > maxQueuedMessages) {
log.debug {
if (newBufferSize > maxMessageBufferSize) {
"New message exceeds maxMessageBufferSize, discarding it"
} else {
"New message exceeds maxQueuedMessages, discarding it"
}
}
msg.release()
//Add a placeholder to remember to return a 429 response corresponding to this request
queuedContent.addLast(RefusedRequest())
} else {
//If the message does not exceed maxMessageBufferSize or maxQueuedMessages, just add it to the deque
queuedContent.addLast(msg)
requestBufferSize = newBufferSize
}
} else {
queuedContent.addLast(msg)
}
} else {
entryPoint(ctx, msg)
}
}
private fun entryPoint(ctx : ChannelHandlerContext, msg : Any) {
if(msg is RefusedRequest) {
sendThrottledResponse(ctx, null)
if(queuedContent.isEmpty()) {
valveClosed = false
} else {
val head = queuedContent.poll()
if(head is ByteBufHolder) {
requestBufferSize -= head.content().readableBytes()
}
entryPoint(ctx, head)
}
} else if(msg is HttpRequest) {
val nextAttempt = getNextAttempt(ctx)
if (nextAttempt < 0) {
super.channelRead(ctx, msg)
if(msg !is LastHttpContent) {
while (true) {
val head = queuedContent.poll() ?: break
if(head is ByteBufHolder) {
requestBufferSize -= head.content().readableBytes()
}
super.channelRead(ctx, head)
if (head is LastHttpContent) break
}
}
if(queuedContent.isEmpty()) {
valveClosed = false
} else {
val head = queuedContent.poll()
if(head is ByteBufHolder) {
requestBufferSize -= head.content().readableBytes()
}
entryPoint(ctx, head)
}
} else {
val waitDuration = Duration.of(LongMath.ceilDiv(nextAttempt, 100_000_000L) * 100L, ChronoUnit.MILLIS)
if (delayRequests && nextAttemptIsWithinThreshold(nextAttempt, waitThreshold)) {
valveClosed = true
ctx.executor().schedule({
entryPoint(ctx, msg)
}, waitDuration.toMillis(), TimeUnit.MILLISECONDS)
} else {
sendThrottledResponse(ctx, waitDuration)
if(queuedContent.isEmpty()) {
valveClosed = false
} else {
val head = queuedContent.poll()
if(head is ByteBufHolder) {
requestBufferSize -= head.content().readableBytes()
}
entryPoint(ctx, head)
}
}
}
} else {
super.channelRead(ctx, msg)
}
}
/**
* Returns the number amount of milliseconds to wait before the requests can be processed
* or -1 if the request can be performed immediately
*/
private fun getNextAttempt(ctx : ChannelHandlerContext) : Long {
val buckets = mutableListOf<Bucket>() val buckets = mutableListOf<Bucket>()
val user = ctx.channel().attr(RemoteBuildCacheServer.userAttribute).get() val user = ctx.channel().attr(RemoteBuildCacheServer.userAttribute).get()
if (user != null) { if (user != null) {
@@ -57,20 +199,7 @@ class ThrottlingHandler(private val bucketManager : BucketManager,
if (user == null && groups.isEmpty()) { if (user == null && groups.isEmpty()) {
bucketManager.getBucketByAddress(ctx.channel().remoteAddress() as InetSocketAddress)?.let(buckets::add) bucketManager.getBucketByAddress(ctx.channel().remoteAddress() as InetSocketAddress)?.let(buckets::add)
} }
if (buckets.isEmpty()) {
super.channelRead(ctx, msg)
} else {
handleBuckets(buckets, ctx, msg, true)
}
ctx.channel().id()
} else if(msg is HttpContent) {
queuedContent?.add(msg) ?: super.channelRead(ctx, msg)
} else {
super.channelRead(ctx, msg)
}
}
private fun handleBuckets(buckets: List<Bucket>, ctx: ChannelHandlerContext, msg: Any, delayResponse: Boolean) {
var nextAttempt = -1L var nextAttempt = -1L
for (bucket in buckets) { for (bucket in buckets) {
val bucketNextAttempt = bucket.removeTokensWithEstimate(1) val bucketNextAttempt = bucket.removeTokensWithEstimate(1)
@@ -78,41 +207,19 @@ class ThrottlingHandler(private val bucketManager : BucketManager,
nextAttempt = bucketNextAttempt nextAttempt = bucketNextAttempt
} }
} }
if (nextAttempt < 0) { return nextAttempt
super.channelRead(ctx, msg)
queuedContent?.let {
for(content in it) {
super.channelRead(ctx, content)
}
queuedContent = null
}
} else {
val waitDuration = Duration.of(LongMath.ceilDiv(nextAttempt, 100_000_000L) * 100L, ChronoUnit.MILLIS)
if (delayResponse && waitDuration < waitThreshold) {
this.queuedContent = mutableListOf()
ctx.executor().schedule({
handleBuckets(buckets, ctx, msg, false)
}, waitDuration.toMillis(), TimeUnit.MILLISECONDS)
} else {
queuedContent?.let { qc ->
qc.forEach { it.release() }
}
this.queuedContent = null
sendThrottledResponse(ctx, waitDuration)
}
}
} }
private fun sendThrottledResponse(ctx: ChannelHandlerContext, retryAfter: Duration) { private fun sendThrottledResponse(ctx: ChannelHandlerContext, retryAfter: Duration?) {
val response = DefaultFullHttpResponse( val response = DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpVersion.HTTP_1_1,
HttpResponseStatus.TOO_MANY_REQUESTS HttpResponseStatus.TOO_MANY_REQUESTS
) )
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0 response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
retryAfter.seconds.takeIf { retryAfter?.seconds?.takeIf {
it > 0 it > 0
}?.let { }?.let {
response.headers()[HttpHeaderNames.RETRY_AFTER] = retryAfter.seconds response.headers()[HttpHeaderNames.RETRY_AFTER] = it
} }
ctx.writeAndFlush(response) ctx.writeAndFlush(response)

View File

@@ -16,6 +16,7 @@
<xs:element name="bind" type="rbcs:bindType" maxOccurs="1"/> <xs:element name="bind" type="rbcs:bindType" maxOccurs="1"/>
<xs:element name="connection" type="rbcs:connectionType" minOccurs="0" maxOccurs="1"/> <xs:element name="connection" type="rbcs:connectionType" minOccurs="0" maxOccurs="1"/>
<xs:element name="event-executor" type="rbcs:eventExecutorType" minOccurs="0" maxOccurs="1"/> <xs:element name="event-executor" type="rbcs:eventExecutorType" minOccurs="0" maxOccurs="1"/>
<xs:element name="rate-limiter" type="rbcs:rateLimiterType" minOccurs="0" maxOccurs="1"/>
<xs:element name="cache" type="rbcs:cacheType" maxOccurs="1"> <xs:element name="cache" type="rbcs:cacheType" maxOccurs="1">
<xs:annotation> <xs:annotation>
<xs:documentation> <xs:documentation>
@@ -136,6 +137,37 @@
</xs:attribute> </xs:attribute>
</xs:complexType> </xs:complexType>
<xs:complexType name="rateLimiterType">
<xs:attribute name="delay-response" type="xs:boolean" use="optional" default="false">
<xs:annotation>
<xs:documentation>
If set to true, the server will delay responses to meet user quotas, otherwise it will simply
return an immediate 429 status code to all requests that exceed the configured quota
</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="max-queued-messages" type="xs:nonNegativeInteger" use="optional" default="100">
<xs:annotation>
<xs:documentation>
Only meaningful when "delay-response" is set to "true",
when a request is delayed, it and all the following messages are queued
as long as "max-queued-messages" is not crossed, all requests that would exceed the
max-queued-message limit are instead discarded and responded with a 429 status code
</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="message-buffer-size" type="rbcs:byteSizeType" use="optional" default="0x100000">
<xs:annotation>
<xs:documentation>
Only meaningful when "delay-response" is set to "true",
when a request is delayed, it and all the following requests are buffered
as long as "message-buffer-size" is not crossed, all requests that would exceed the buffer
size are instead discarded and responded with a 429 status code
</xs:documentation>
</xs:annotation>
</xs:attribute>
</xs:complexType>
<xs:complexType name="cacheType" abstract="true"/> <xs:complexType name="cacheType" abstract="true"/>
<xs:complexType name="inMemoryCacheType"> <xs:complexType name="inMemoryCacheType">

View File

@@ -1,14 +1,5 @@
package net.woggioni.rbcs.server.test.utils; package net.woggioni.rbcs.server.test.utils;
import java.math.BigInteger;
import java.security.KeyPair;
import java.security.KeyPairGenerator;
import java.security.PrivateKey;
import java.security.SecureRandom;
import java.security.cert.X509Certificate;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Date;
import org.bouncycastle.asn1.DERSequence; import org.bouncycastle.asn1.DERSequence;
import org.bouncycastle.asn1.x500.X500Name; import org.bouncycastle.asn1.x500.X500Name;
import org.bouncycastle.asn1.x509.BasicConstraints; import org.bouncycastle.asn1.x509.BasicConstraints;
@@ -24,6 +15,16 @@ import org.bouncycastle.cert.jcajce.JcaX509v3CertificateBuilder;
import org.bouncycastle.operator.ContentSigner; import org.bouncycastle.operator.ContentSigner;
import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder; import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder;
import java.math.BigInteger;
import java.security.KeyPair;
import java.security.KeyPairGenerator;
import java.security.PrivateKey;
import java.security.SecureRandom;
import java.security.cert.X509Certificate;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Date;
public class CertificateUtils { public class CertificateUtils {
public record X509Credentials( public record X509Credentials(

View File

@@ -1,11 +1,5 @@
package net.woggioni.rbcs.server.test package net.woggioni.rbcs.server.test
import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.api.Role
import net.woggioni.rbcs.common.RBCS.getFreePort
import net.woggioni.rbcs.common.Xml
import net.woggioni.rbcs.server.cache.FileSystemCacheConfiguration
import net.woggioni.rbcs.server.configuration.Serializer
import java.net.URI import java.net.URI
import java.net.http.HttpRequest import java.net.http.HttpRequest
import java.nio.charset.StandardCharsets import java.nio.charset.StandardCharsets
@@ -15,6 +9,12 @@ import java.time.temporal.ChronoUnit
import java.util.Base64 import java.util.Base64
import java.util.zip.Deflater import java.util.zip.Deflater
import kotlin.random.Random import kotlin.random.Random
import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.api.Role
import net.woggioni.rbcs.common.RBCS.getFreePort
import net.woggioni.rbcs.common.Xml
import net.woggioni.rbcs.server.cache.FileSystemCacheConfiguration
import net.woggioni.rbcs.server.configuration.Serializer
abstract class AbstractBasicAuthServerTest : AbstractServerTest() { abstract class AbstractBasicAuthServerTest : AbstractServerTest() {
@@ -37,6 +37,7 @@ abstract class AbstractBasicAuthServerTest : AbstractServerTest() {
50, 50,
serverPath, serverPath,
Configuration.EventExecutor(false), Configuration.EventExecutor(false),
Configuration.RateLimiter(true, 0x100000, 50),
Configuration.Connection( Configuration.Connection(
Duration.of(60, ChronoUnit.SECONDS), Duration.of(60, ChronoUnit.SECONDS),
Duration.of(30, ChronoUnit.SECONDS), Duration.of(30, ChronoUnit.SECONDS),

View File

@@ -1,5 +1,6 @@
package net.woggioni.rbcs.server.test package net.woggioni.rbcs.server.test
import java.nio.file.Path
import net.woggioni.rbcs.api.Configuration import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.server.RemoteBuildCacheServer import net.woggioni.rbcs.server.RemoteBuildCacheServer
import org.junit.jupiter.api.AfterAll import org.junit.jupiter.api.AfterAll
@@ -8,7 +9,6 @@ import org.junit.jupiter.api.MethodOrderer
import org.junit.jupiter.api.TestInstance import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.TestMethodOrder import org.junit.jupiter.api.TestMethodOrder
import org.junit.jupiter.api.io.TempDir import org.junit.jupiter.api.io.TempDir
import java.nio.file.Path
@TestInstance(TestInstance.Lifecycle.PER_CLASS) @TestInstance(TestInstance.Lifecycle.PER_CLASS)

View File

@@ -1,14 +1,5 @@
package net.woggioni.rbcs.server.test package net.woggioni.rbcs.server.test
import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.api.Role
import net.woggioni.rbcs.common.RBCS.getFreePort
import net.woggioni.rbcs.common.Xml
import net.woggioni.rbcs.server.cache.FileSystemCacheConfiguration
import net.woggioni.rbcs.server.configuration.Serializer
import net.woggioni.rbcs.server.test.utils.CertificateUtils
import net.woggioni.rbcs.server.test.utils.CertificateUtils.X509Credentials
import org.bouncycastle.asn1.x500.X500Name
import java.net.URI import java.net.URI
import java.net.http.HttpClient import java.net.http.HttpClient
import java.net.http.HttpRequest import java.net.http.HttpRequest
@@ -25,6 +16,15 @@ import javax.net.ssl.KeyManagerFactory
import javax.net.ssl.SSLContext import javax.net.ssl.SSLContext
import javax.net.ssl.TrustManagerFactory import javax.net.ssl.TrustManagerFactory
import kotlin.random.Random import kotlin.random.Random
import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.api.Role
import net.woggioni.rbcs.common.RBCS.getFreePort
import net.woggioni.rbcs.common.Xml
import net.woggioni.rbcs.server.cache.FileSystemCacheConfiguration
import net.woggioni.rbcs.server.configuration.Serializer
import net.woggioni.rbcs.server.test.utils.CertificateUtils
import net.woggioni.rbcs.server.test.utils.CertificateUtils.X509Credentials
import org.bouncycastle.asn1.x500.X500Name
abstract class AbstractTlsServerTest : AbstractServerTest() { abstract class AbstractTlsServerTest : AbstractServerTest() {
@@ -143,6 +143,7 @@ abstract class AbstractTlsServerTest : AbstractServerTest() {
100, 100,
serverPath, serverPath,
Configuration.EventExecutor(false), Configuration.EventExecutor(false),
Configuration.RateLimiter(true, 0x100000, 50),
Configuration.Connection( Configuration.Connection(
Duration.of(60, ChronoUnit.SECONDS), Duration.of(60, ChronoUnit.SECONDS),
Duration.of(30, ChronoUnit.SECONDS), Duration.of(30, ChronoUnit.SECONDS),

View File

@@ -1,17 +1,17 @@
package net.woggioni.rbcs.server.test package net.woggioni.rbcs.server.test
import io.netty.handler.codec.http.HttpResponseStatus import io.netty.handler.codec.http.HttpResponseStatus
import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.time.Duration
import java.time.temporal.ChronoUnit
import net.woggioni.rbcs.api.Configuration import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.api.Role import net.woggioni.rbcs.api.Role
import net.woggioni.rbcs.common.PasswordSecurity.hashPassword import net.woggioni.rbcs.common.PasswordSecurity.hashPassword
import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Order import org.junit.jupiter.api.Order
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.time.Duration
import java.time.temporal.ChronoUnit
class BasicAuthServerTest : AbstractBasicAuthServerTest() { class BasicAuthServerTest : AbstractBasicAuthServerTest() {

View File

@@ -1,5 +1,7 @@
package net.woggioni.rbcs.server.test package net.woggioni.rbcs.server.test
import java.nio.file.Files
import java.nio.file.Path
import net.woggioni.rbcs.common.RBCS.toUrl import net.woggioni.rbcs.common.RBCS.toUrl
import net.woggioni.rbcs.common.RbcsUrlStreamHandlerFactory import net.woggioni.rbcs.common.RbcsUrlStreamHandlerFactory
import net.woggioni.rbcs.common.Xml import net.woggioni.rbcs.common.Xml
@@ -10,8 +12,6 @@ import org.junit.jupiter.api.io.TempDir
import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource import org.junit.jupiter.params.provider.ValueSource
import org.xml.sax.SAXParseException import org.xml.sax.SAXParseException
import java.nio.file.Files
import java.nio.file.Path
class ConfigurationTest { class ConfigurationTest {

View File

@@ -1,14 +1,14 @@
package net.woggioni.rbcs.server.test package net.woggioni.rbcs.server.test
import io.netty.handler.codec.http.HttpResponseStatus import io.netty.handler.codec.http.HttpResponseStatus
import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import net.woggioni.rbcs.api.Configuration import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.common.PasswordSecurity.hashPassword import net.woggioni.rbcs.common.PasswordSecurity.hashPassword
import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Order import org.junit.jupiter.api.Order
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
class NoAnonymousUserBasicAuthServerTest : AbstractBasicAuthServerTest() { class NoAnonymousUserBasicAuthServerTest : AbstractBasicAuthServerTest() {

View File

@@ -1,13 +1,13 @@
package net.woggioni.rbcs.server.test package net.woggioni.rbcs.server.test
import io.netty.handler.codec.http.HttpResponseStatus import io.netty.handler.codec.http.HttpResponseStatus
import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import net.woggioni.rbcs.api.Configuration import net.woggioni.rbcs.api.Configuration
import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Order import org.junit.jupiter.api.Order
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
class NoAnonymousUserTlsServerTest : AbstractTlsServerTest() { class NoAnonymousUserTlsServerTest : AbstractTlsServerTest() {

View File

@@ -1,14 +1,6 @@
package net.woggioni.rbcs.server.test package net.woggioni.rbcs.server.test
import io.netty.handler.codec.http.HttpResponseStatus import io.netty.handler.codec.http.HttpResponseStatus
import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.common.RBCS.getFreePort
import net.woggioni.rbcs.common.Xml
import net.woggioni.rbcs.server.cache.InMemoryCacheConfiguration
import net.woggioni.rbcs.server.configuration.Serializer
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Order
import org.junit.jupiter.api.Test
import java.net.URI import java.net.URI
import java.net.http.HttpClient import java.net.http.HttpClient
import java.net.http.HttpRequest import java.net.http.HttpRequest
@@ -19,6 +11,14 @@ import java.time.temporal.ChronoUnit
import java.util.Base64 import java.util.Base64
import java.util.zip.Deflater import java.util.zip.Deflater
import kotlin.random.Random import kotlin.random.Random
import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.common.RBCS.getFreePort
import net.woggioni.rbcs.common.Xml
import net.woggioni.rbcs.server.cache.InMemoryCacheConfiguration
import net.woggioni.rbcs.server.configuration.Serializer
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Order
import org.junit.jupiter.api.Test
class NoAuthServerTest : AbstractServerTest() { class NoAuthServerTest : AbstractServerTest() {
@@ -37,6 +37,7 @@ class NoAuthServerTest : AbstractServerTest() {
100, 100,
serverPath, serverPath,
Configuration.EventExecutor(false), Configuration.EventExecutor(false),
Configuration.RateLimiter(true, 0x100000, 50),
Configuration.Connection( Configuration.Connection(
Duration.of(60, ChronoUnit.SECONDS), Duration.of(60, ChronoUnit.SECONDS),
Duration.of(30, ChronoUnit.SECONDS), Duration.of(30, ChronoUnit.SECONDS),

View File

@@ -1,15 +1,15 @@
package net.woggioni.rbcs.server.test package net.woggioni.rbcs.server.test
import io.netty.handler.codec.http.HttpResponseStatus import io.netty.handler.codec.http.HttpResponseStatus
import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import net.woggioni.rbcs.api.Configuration import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.api.Role import net.woggioni.rbcs.api.Role
import org.bouncycastle.asn1.x500.X500Name import org.bouncycastle.asn1.x500.X500Name
import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Order import org.junit.jupiter.api.Order
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
class TlsServerTest : AbstractTlsServerTest() { class TlsServerTest : AbstractTlsServerTest() {

View File

@@ -1,8 +1,8 @@
package net.woggioni.rbcs.server.test package net.woggioni.rbcs.server.test
import javax.naming.ldap.LdapName
import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import javax.naming.ldap.LdapName
class X500NameTest { class X500NameTest {

View File

@@ -10,6 +10,7 @@
max-request-size="101325" max-request-size="101325"
chunk-size="0xa910"/> chunk-size="0xa910"/>
<event-executor use-virtual-threads="false"/> <event-executor use-virtual-threads="false"/>
<rate-limiter delay-response="false" message-buffer-size="0x1234" max-queued-messages="13"/>
<cache xs:type="rbcs:fileSystemCacheType" path="/tmp/rbcs" max-age="P7D"/> <cache xs:type="rbcs:fileSystemCacheType" path="/tmp/rbcs" max-age="P7D"/>
<authentication> <authentication>
<none/> <none/>

View File

@@ -12,7 +12,8 @@
write-idle-timeout="PT60S" write-idle-timeout="PT60S"
chunk-size="123"/> chunk-size="123"/>
<event-executor use-virtual-threads="true"/> <event-executor use-virtual-threads="true"/>
<cache xs:type="rbcs-memcache:memcacheCacheType" max-age="P7D"> <rate-limiter delay-response="false" message-buffer-size="12000" max-queued-messages="53"/>
<cache xs:type="rbcs-memcache:memcacheCacheType" max-age="P7D" key-prefix="some-prefix-string">
<server host="memcached" port="11211"/> <server host="memcached" port="11211"/>
</cache> </cache>
<authorization> <authorization>

View File

@@ -11,7 +11,8 @@
max-request-size="101325" max-request-size="101325"
chunk-size="456"/> chunk-size="456"/>
<event-executor use-virtual-threads="false"/> <event-executor use-virtual-threads="false"/>
<cache xs:type="rbcs-memcache:memcacheCacheType" max-age="P7D" digest="SHA-256" compression-mode="deflate" compression-level="7"> <rate-limiter delay-response="true" message-buffer-size="65432" max-queued-messages="21"/>
<cache xs:type="rbcs-memcache:memcacheCacheType" max-age="P7D" key-prefix="some-prefix-string" digest="SHA-256" compression-mode="deflate" compression-level="7">
<server host="127.0.0.1" port="11211" max-connections="10" connection-timeout="PT20S"/> <server host="127.0.0.1" port="11211" max-connections="10" connection-timeout="PT20S"/>
</cache> </cache>
<authentication> <authentication>

View File

@@ -7,8 +7,6 @@ import jakarta.servlet.annotation.WebServlet
import jakarta.servlet.http.HttpServlet import jakarta.servlet.http.HttpServlet
import jakarta.servlet.http.HttpServletRequest import jakarta.servlet.http.HttpServletRequest
import jakarta.servlet.http.HttpServletResponse import jakarta.servlet.http.HttpServletResponse
import net.woggioni.jwo.HttpClient.HttpStatus
import net.woggioni.jwo.JWO
import java.io.ByteArrayInputStream import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream import java.io.ByteArrayOutputStream
import java.nio.file.Path import java.nio.file.Path
@@ -19,6 +17,8 @@ import java.util.concurrent.PriorityBlockingQueue
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import java.util.logging.Logger import java.util.logging.Logger
import net.woggioni.jwo.HttpClient.HttpStatus
import net.woggioni.jwo.JWO
private class CacheKey(private val value: ByteArray) { private class CacheKey(private val value: ByteArray) {