Compare commits
7 Commits
59a12d6218
...
0.2.0-RC5
Author | SHA1 | Date | |
---|---|---|---|
222b475223
|
|||
ede515e2ca
|
|||
974fdb7a91
|
|||
a294229ff0
|
|||
9600dd7e4f
|
|||
729276a2b1
|
|||
7ba7070693
|
93
benchmark/rbcs-filesystem.yml
Normal file
93
benchmark/rbcs-filesystem.yml
Normal file
@@ -0,0 +1,93 @@
|
||||
apiVersion: v1
|
||||
kind: ConfigMap
|
||||
metadata:
|
||||
name: rbcs-server
|
||||
data:
|
||||
rbcs-server.xml: |
|
||||
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
|
||||
<rbcs:server xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xmlns:rbcs="urn:net.woggioni.rbcs.server"
|
||||
xmlns:rbcs-memcache="urn:net.woggioni.rbcs.server.memcache"
|
||||
xs:schemaLocation="urn:net.woggioni.rbcs.server.memcache jpms://net.woggioni.rbcs.server.memcache/net/woggioni/rbcs/server/memcache/schema/rbcs-memcache.xsd urn:net.woggioni.rbcs.server jpms://net.woggioni.rbcs.server/net/woggioni/rbcs/server/schema/rbcs-server.xsd"
|
||||
>
|
||||
<bind host="0.0.0.0" port="8080" incoming-connections-backlog-size="128"/>
|
||||
<connection
|
||||
max-request-size="0xd000000"
|
||||
idle-timeout="PT15S"
|
||||
read-idle-timeout="PT30S"
|
||||
write-idle-timeout="PT30S"/>
|
||||
<event-executor use-virtual-threads="true"/>
|
||||
<cache xs:type="rbcs:fileSystemCacheType" max-age="P7D" enable-compression="false" path="/rbcs/cache"/>
|
||||
</rbcs:server>
|
||||
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: PersistentVolumeClaim
|
||||
metadata:
|
||||
name: rbcs-pvc
|
||||
namespace: default
|
||||
spec:
|
||||
accessModes:
|
||||
- ReadWriteOnce
|
||||
storageClassName: local-path
|
||||
resources:
|
||||
requests:
|
||||
storage: 16Gi
|
||||
---
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: rbcs-deployment
|
||||
labels:
|
||||
app: rbcs
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
app: rbcs
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: rbcs
|
||||
spec:
|
||||
containers:
|
||||
- name: rbcs
|
||||
image: gitea.woggioni.net/woggioni/rbcs:native
|
||||
imagePullPolicy: Always
|
||||
args: ['server', '-c', 'rbcs-server.xml']
|
||||
ports:
|
||||
- containerPort: 8080
|
||||
volumeMounts:
|
||||
- name: config-volume
|
||||
mountPath: /rbcs/rbcs-server.xml
|
||||
subPath: rbcs-server.xml
|
||||
- name: cache-volume
|
||||
mountPath: /rbcs/cache
|
||||
resources:
|
||||
requests:
|
||||
memory: "0.25Gi"
|
||||
cpu: "1"
|
||||
limits:
|
||||
memory: "0.25Gi"
|
||||
cpu: "3.5"
|
||||
volumes:
|
||||
- name: config-volume
|
||||
configMap:
|
||||
name: rbcs-server
|
||||
- name: cache-volume
|
||||
persistentVolumeClaim:
|
||||
claimName: rbcs-pvc
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: rbcs-service
|
||||
spec:
|
||||
type: LoadBalancer
|
||||
ports:
|
||||
- port: 8080
|
||||
targetPort: 8080
|
||||
protocol: TCP
|
||||
selector:
|
||||
app: rbcs
|
||||
|
76
benchmark/rbcs-in-memory.yml
Normal file
76
benchmark/rbcs-in-memory.yml
Normal file
@@ -0,0 +1,76 @@
|
||||
apiVersion: v1
|
||||
kind: ConfigMap
|
||||
metadata:
|
||||
name: rbcs-server
|
||||
data:
|
||||
rbcs-server.xml: |
|
||||
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
|
||||
<rbcs:server xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xmlns:rbcs="urn:net.woggioni.rbcs.server"
|
||||
xmlns:rbcs-memcache="urn:net.woggioni.rbcs.server.memcache"
|
||||
xs:schemaLocation="urn:net.woggioni.rbcs.server.memcache jpms://net.woggioni.rbcs.server.memcache/net/woggioni/rbcs/server/memcache/schema/rbcs-memcache.xsd urn:net.woggioni.rbcs.server jpms://net.woggioni.rbcs.server/net/woggioni/rbcs/server/schema/rbcs-server.xsd"
|
||||
>
|
||||
<bind host="0.0.0.0" port="8080" incoming-connections-backlog-size="128"/>
|
||||
<connection
|
||||
max-request-size="0xd000000"
|
||||
idle-timeout="PT15S"
|
||||
read-idle-timeout="PT30S"
|
||||
write-idle-timeout="PT30S"/>
|
||||
<event-executor use-virtual-threads="true"/>
|
||||
<cache xs:type="rbcs:inMemoryCacheType" max-age="P7D" enable-compression="false" max-size="0xb0000000" />
|
||||
</rbcs:server>
|
||||
|
||||
---
|
||||
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: rbcs-deployment
|
||||
labels:
|
||||
app: rbcs
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
app: rbcs
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: rbcs
|
||||
spec:
|
||||
containers:
|
||||
- name: rbcs
|
||||
image: gitea.woggioni.net/woggioni/rbcs:native
|
||||
imagePullPolicy: Always
|
||||
args: ['server', '-c', 'rbcs-server.xml']
|
||||
ports:
|
||||
- containerPort: 8080
|
||||
volumeMounts:
|
||||
- name: config-volume
|
||||
mountPath: /rbcs/rbcs-server.xml
|
||||
subPath: rbcs-server.xml
|
||||
resources:
|
||||
requests:
|
||||
memory: "0.5Gi"
|
||||
cpu: "1"
|
||||
limits:
|
||||
memory: "4Gi"
|
||||
cpu: "3.5"
|
||||
volumes:
|
||||
- name: config-volume
|
||||
configMap:
|
||||
name: rbcs-server
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: rbcs-service
|
||||
spec:
|
||||
type: LoadBalancer
|
||||
ports:
|
||||
- port: 8080
|
||||
targetPort: 8080
|
||||
protocol: TCP
|
||||
selector:
|
||||
app: rbcs
|
||||
|
117
benchmark/rbcs-memcache.yml
Normal file
117
benchmark/rbcs-memcache.yml
Normal file
@@ -0,0 +1,117 @@
|
||||
apiVersion: v1
|
||||
kind: ConfigMap
|
||||
metadata:
|
||||
name: rbcs-server
|
||||
data:
|
||||
rbcs-server.xml: |
|
||||
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
|
||||
<rbcs:server xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xmlns:rbcs="urn:net.woggioni.rbcs.server"
|
||||
xmlns:rbcs-memcache="urn:net.woggioni.rbcs.server.memcache"
|
||||
xs:schemaLocation="urn:net.woggioni.rbcs.server.memcache jpms://net.woggioni.rbcs.server.memcache/net/woggioni/rbcs/server/memcache/schema/rbcs-memcache.xsd urn:net.woggioni.rbcs.server jpms://net.woggioni.rbcs.server/net/woggioni/rbcs/server/schema/rbcs-server.xsd"
|
||||
>
|
||||
<bind host="0.0.0.0" port="8080" incoming-connections-backlog-size="128"/>
|
||||
<connection
|
||||
max-request-size="0xd000000"
|
||||
idle-timeout="PT15S"
|
||||
read-idle-timeout="PT30S"
|
||||
write-idle-timeout="PT30S"/>
|
||||
<event-executor use-virtual-threads="true"/>
|
||||
<!--cache xs:type="rbcs:inMemoryCacheType" max-age="P7D" enable-compression="false" max-size="0x10000000" /-->
|
||||
<cache xs:type="rbcs-memcache:memcacheCacheType" max-age="P7D" chunk-size="0x1000" digest="MD5">
|
||||
<server host="memcached-service" port="11211" max-connections="256"/>
|
||||
</cache>
|
||||
</rbcs:server>
|
||||
|
||||
---
|
||||
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: rbcs-deployment
|
||||
labels:
|
||||
app: rbcs
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
app: rbcs
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: rbcs
|
||||
spec:
|
||||
containers:
|
||||
- name: rbcs
|
||||
image: gitea.woggioni.net/woggioni/rbcs:native
|
||||
imagePullPolicy: Always
|
||||
args: ['server', '-c', 'rbcs-server.xml']
|
||||
ports:
|
||||
- containerPort: 8080
|
||||
volumeMounts:
|
||||
- name: config-volume
|
||||
mountPath: /rbcs/rbcs-server.xml
|
||||
subPath: rbcs-server.xml
|
||||
resources:
|
||||
requests:
|
||||
memory: "0.25Gi"
|
||||
cpu: "1"
|
||||
limits:
|
||||
memory: "0.25Gi"
|
||||
cpu: "1"
|
||||
volumes:
|
||||
- name: config-volume
|
||||
configMap:
|
||||
name: rbcs-server
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: rbcs-service
|
||||
spec:
|
||||
type: LoadBalancer
|
||||
ports:
|
||||
- port: 8080
|
||||
targetPort: 8080
|
||||
protocol: TCP
|
||||
selector:
|
||||
app: rbcs
|
||||
---
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: memcached-deployment
|
||||
spec:
|
||||
replicas: 1
|
||||
selector:
|
||||
matchLabels:
|
||||
app: memcached
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: memcached
|
||||
spec:
|
||||
containers:
|
||||
- name: memcached
|
||||
image: memcached
|
||||
args: ["-I", "128m", "-m", "4096"]
|
||||
resources:
|
||||
requests:
|
||||
memory: "1Gi"
|
||||
cpu: "500m" # 0.5 CPU
|
||||
limits:
|
||||
memory: "5Gi"
|
||||
cpu: "500m" # 0.5 CP
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: memcached-service
|
||||
spec:
|
||||
type: ClusterIP # ClusterIP makes it accessible only within the cluster
|
||||
ports:
|
||||
- port: 11211 # Default memcached port
|
||||
targetPort: 11211
|
||||
protocol: TCP
|
||||
selector:
|
||||
app: memcached
|
@@ -4,7 +4,7 @@ org.gradle.caching=true
|
||||
|
||||
rbcs.version = 0.2.0
|
||||
|
||||
lys.version = 2025.03.03
|
||||
lys.version = 2025.03.08
|
||||
|
||||
gitea.maven.url = https://gitea.woggioni.net/api/packages/woggioni/maven
|
||||
docker.registry.url=gitea.woggioni.net
|
||||
|
@@ -5,9 +5,12 @@ plugins {
|
||||
}
|
||||
|
||||
dependencies {
|
||||
implementation catalog.slf4j.api
|
||||
implementation project(':rbcs-common')
|
||||
api catalog.netty.common
|
||||
api catalog.netty.buffer
|
||||
api catalog.netty.handler
|
||||
api catalog.netty.codec.http
|
||||
}
|
||||
|
||||
publishing {
|
||||
|
@@ -1,10 +1,15 @@
|
||||
module net.woggioni.rbcs.api {
|
||||
requires static lombok;
|
||||
requires java.xml;
|
||||
requires io.netty.buffer;
|
||||
requires io.netty.handler;
|
||||
requires io.netty.transport;
|
||||
requires io.netty.common;
|
||||
requires net.woggioni.rbcs.common;
|
||||
requires io.netty.transport;
|
||||
requires io.netty.codec.http;
|
||||
requires io.netty.buffer;
|
||||
requires org.slf4j;
|
||||
requires java.xml;
|
||||
|
||||
|
||||
exports net.woggioni.rbcs.api;
|
||||
exports net.woggioni.rbcs.api.exception;
|
||||
exports net.woggioni.rbcs.api.message;
|
||||
|
@@ -0,0 +1,57 @@
|
||||
package net.woggioni.rbcs.api;
|
||||
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.handler.codec.http.LastHttpContent;
|
||||
import io.netty.util.ReferenceCounted;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import net.woggioni.rbcs.api.message.CacheMessage;
|
||||
|
||||
@Slf4j
|
||||
public abstract class CacheHandler extends ChannelInboundHandlerAdapter {
|
||||
private boolean requestFinished = false;
|
||||
|
||||
abstract protected void channelRead0(ChannelHandlerContext ctx, CacheMessage msg);
|
||||
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
||||
if(!requestFinished && msg instanceof CacheMessage) {
|
||||
if(msg instanceof CacheMessage.LastCacheContent) requestFinished = true;
|
||||
try {
|
||||
channelRead0(ctx, (CacheMessage) msg);
|
||||
} finally {
|
||||
if(msg instanceof ReferenceCounted rc) rc.release();
|
||||
}
|
||||
} else {
|
||||
ctx.fireChannelRead(msg);
|
||||
}
|
||||
}
|
||||
|
||||
protected void sendMessageAndFlush(ChannelHandlerContext ctx, Object msg) {
|
||||
sendMessage(ctx, msg, true);
|
||||
}
|
||||
|
||||
protected void sendMessage(ChannelHandlerContext ctx, Object msg) {
|
||||
sendMessage(ctx, msg, false);
|
||||
}
|
||||
|
||||
private void sendMessage(ChannelHandlerContext ctx, Object msg, boolean flush) {
|
||||
ctx.write(msg);
|
||||
if(
|
||||
msg instanceof CacheMessage.LastCacheContent ||
|
||||
msg instanceof CacheMessage.CachePutResponse ||
|
||||
msg instanceof CacheMessage.CacheValueNotFoundResponse ||
|
||||
msg instanceof LastHttpContent
|
||||
) {
|
||||
ctx.flush();
|
||||
ctx.pipeline().remove(this);
|
||||
} else if(flush) {
|
||||
ctx.flush();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
super.exceptionCaught(ctx, cause);
|
||||
}
|
||||
}
|
@@ -1,13 +1,12 @@
|
||||
package net.woggioni.rbcs.api;
|
||||
|
||||
import io.netty.channel.ChannelFactory;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.socket.DatagramChannel;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
|
||||
public interface CacheHandlerFactory extends AsyncCloseable {
|
||||
ChannelHandler newHandler(
|
||||
CacheHandler newHandler(
|
||||
Configuration configuration,
|
||||
EventLoopGroup eventLoopGroup,
|
||||
ChannelFactory<SocketChannel> socketChannelFactory,
|
||||
|
@@ -1,10 +1,9 @@
|
||||
package net.woggioni.rbcs.api;
|
||||
|
||||
import java.io.Serializable;
|
||||
import lombok.Getter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
@Getter
|
||||
@RequiredArgsConstructor
|
||||
public class CacheValueMetadata implements Serializable {
|
||||
|
@@ -1,16 +1,15 @@
|
||||
package net.woggioni.rbcs.api;
|
||||
|
||||
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.NonNull;
|
||||
import lombok.Value;
|
||||
|
||||
import java.nio.file.Path;
|
||||
import java.security.cert.X509Certificate;
|
||||
import java.time.Duration;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.NonNull;
|
||||
import lombok.Value;
|
||||
|
||||
@Value
|
||||
public class Configuration {
|
||||
|
@@ -105,6 +105,7 @@ tasks.named(NativeImagePlugin.CONFIGURE_NATIVE_IMAGE_TASK_NAME, NativeImageConfi
|
||||
systemProperty('io.netty.leakDetectionLevel', 'DISABLED')
|
||||
modularity.inferModulePath = false
|
||||
enabled = true
|
||||
systemProperty('gradle.tmp.dir', temporaryDir.toString())
|
||||
}
|
||||
|
||||
nativeImage {
|
||||
|
@@ -1,2 +1,2 @@
|
||||
Args=-O3 -march=skylake --gc=serial --install-exit-handlers --initialize-at-run-time=io.netty --enable-url-protocols=jpms --initialize-at-build-time=net.woggioni.rbcs.common.RbcsUrlStreamHandlerFactory,net.woggioni.rbcs.common.RbcsUrlStreamHandlerFactory$JpmsHandler
|
||||
Args=-O3 -march=x86-64-v2 --gc=serial --install-exit-handlers --initialize-at-run-time=io.netty --enable-url-protocols=jpms --initialize-at-build-time=net.woggioni.rbcs.common.RbcsUrlStreamHandlerFactory,net.woggioni.rbcs.common.RbcsUrlStreamHandlerFactory$JpmsHandler
|
||||
#-H:TraceClassInitialization=io.netty.handler.ssl.BouncyCastleAlpnSslUtils
|
@@ -487,6 +487,10 @@
|
||||
"name":"jdk.internal.misc.Unsafe",
|
||||
"methods":[{"name":"getUnsafe","parameterTypes":[] }]
|
||||
},
|
||||
{
|
||||
"name":"net.woggioni.rbcs.api.CacheHandler",
|
||||
"methods":[{"name":"channelRead","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] }]
|
||||
},
|
||||
{
|
||||
"name":"net.woggioni.rbcs.cli.RemoteBuildCacheServerCli",
|
||||
"allDeclaredFields":true,
|
||||
@@ -552,11 +556,7 @@
|
||||
},
|
||||
{
|
||||
"name":"net.woggioni.rbcs.client.RemoteBuildCacheClient$sendRequest$1$operationComplete$responseHandler$1",
|
||||
"methods":[{"name":"channelInactive","parameterTypes":["io.netty.channel.ChannelHandlerContext"] }, {"name":"exceptionCaught","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Throwable"] }]
|
||||
},
|
||||
{
|
||||
"name":"net.woggioni.rbcs.client.RemoteBuildCacheClient$sendRequest$1$operationComplete$timeoutHandler$1",
|
||||
"methods":[{"name":"userEventTriggered","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] }]
|
||||
"methods":[{"name":"channelInactive","parameterTypes":["io.netty.channel.ChannelHandlerContext"] }, {"name":"exceptionCaught","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Throwable"] }, {"name":"userEventTriggered","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] }]
|
||||
},
|
||||
{
|
||||
"name":"net.woggioni.rbcs.server.RemoteBuildCacheServer$HttpChunkContentCompressor",
|
||||
@@ -588,17 +588,13 @@
|
||||
"name":"net.woggioni.rbcs.server.exception.ExceptionHandler",
|
||||
"methods":[{"name":"exceptionCaught","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Throwable"] }]
|
||||
},
|
||||
{
|
||||
"name":"net.woggioni.rbcs.server.handler.CacheContentHandler",
|
||||
"methods":[{"name":"exceptionCaught","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Throwable"] }]
|
||||
},
|
||||
{
|
||||
"name":"net.woggioni.rbcs.server.handler.MaxRequestSizeHandler",
|
||||
"methods":[{"name":"channelRead","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] }]
|
||||
},
|
||||
{
|
||||
"name":"net.woggioni.rbcs.server.handler.ServerHandler",
|
||||
"methods":[{"name":"channelRead","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] }, {"name":"exceptionCaught","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Throwable"] }, {"name":"write","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object","io.netty.channel.ChannelPromise"] }]
|
||||
"methods":[{"name":"channelRead","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] }, {"name":"channelReadComplete","parameterTypes":["io.netty.channel.ChannelHandlerContext"] }, {"name":"exceptionCaught","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Throwable"] }, {"name":"write","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object","io.netty.channel.ChannelPromise"] }]
|
||||
},
|
||||
{
|
||||
"name":"net.woggioni.rbcs.server.handler.TraceHandler",
|
||||
|
@@ -32,7 +32,7 @@ object GraalNativeImageConfiguration {
|
||||
@JvmStatic
|
||||
fun main(vararg args : String) {
|
||||
|
||||
val serverURL = URI.create("file:conf/rbcs-client.xml").toURL()
|
||||
val serverURL = URI.create("file:conf/rbcs-server.xml").toURL()
|
||||
val serverDoc = serverURL.openStream().use {
|
||||
Xml.parseXml(serverURL, it)
|
||||
}
|
||||
@@ -71,7 +71,6 @@ object GraalNativeImageConfiguration {
|
||||
compressionLevel = Deflater.DEFAULT_COMPRESSION,
|
||||
compressionEnabled = false,
|
||||
maxSize = 0x1000000,
|
||||
chunkSize = 0x1000
|
||||
),
|
||||
FileSystemCacheConfiguration(
|
||||
Path.of(System.getProperty("java.io.tmpdir")).resolve("rbcs"),
|
||||
@@ -79,7 +78,6 @@ object GraalNativeImageConfiguration {
|
||||
digestAlgorithm = "MD5",
|
||||
compressionLevel = Deflater.DEFAULT_COMPRESSION,
|
||||
compressionEnabled = false,
|
||||
chunkSize = 0x1000
|
||||
),
|
||||
MemcacheCacheConfiguration(
|
||||
listOf(MemcacheCacheConfiguration.Server(
|
||||
@@ -91,7 +89,6 @@ object GraalNativeImageConfiguration {
|
||||
"MD5",
|
||||
null,
|
||||
1,
|
||||
0x1000
|
||||
)
|
||||
)
|
||||
|
||||
@@ -107,6 +104,7 @@ object GraalNativeImageConfiguration {
|
||||
Duration.ofSeconds(15),
|
||||
Duration.ofSeconds(15),
|
||||
0x10000,
|
||||
0x1000
|
||||
),
|
||||
users.asSequence().map { it.name to it }.toMap(),
|
||||
sequenceOf(writersGroup, readersGroup).map { it.name to it }.toMap(),
|
||||
@@ -127,7 +125,6 @@ object GraalNativeImageConfiguration {
|
||||
"MD5",
|
||||
null,
|
||||
1,
|
||||
0x1000
|
||||
)
|
||||
|
||||
val serverHandle = RemoteBuildCacheServer(serverConfiguration).run()
|
||||
@@ -135,7 +132,12 @@ object GraalNativeImageConfiguration {
|
||||
|
||||
val clientProfile = ClientConfiguration.Profile(
|
||||
URI.create("http://127.0.0.1:$serverPort/"),
|
||||
null,
|
||||
ClientConfiguration.Connection(
|
||||
Duration.ofSeconds(5),
|
||||
Duration.ofSeconds(5),
|
||||
Duration.ofSeconds(7),
|
||||
true,
|
||||
),
|
||||
ClientConfiguration.Authentication.BasicAuthenticationCredentials("user3", PASSWORD),
|
||||
Duration.ofSeconds(3),
|
||||
10,
|
||||
@@ -177,6 +179,8 @@ object GraalNativeImageConfiguration {
|
||||
} catch (ee : ExecutionException) {
|
||||
}
|
||||
}
|
||||
RemoteBuildCacheServerCli.main("--help")
|
||||
System.setProperty("net.woggioni.rbcs.conf.dir", System.getProperty("gradle.tmp.dir"))
|
||||
RemoteBuildCacheServerCli.createCommandLine().execute("--version")
|
||||
RemoteBuildCacheServerCli.createCommandLine().execute("server", "-t", "PT10S")
|
||||
}
|
||||
}
|
@@ -26,8 +26,8 @@ class RemoteBuildCacheServerCli : RbcsCommand() {
|
||||
private fun setPropertyIfNotPresent(key: String, value: String) {
|
||||
System.getProperty(key) ?: System.setProperty(key, value)
|
||||
}
|
||||
@JvmStatic
|
||||
fun main(vararg args: String) {
|
||||
|
||||
fun createCommandLine() : CommandLine {
|
||||
setPropertyIfNotPresent("logback.configurationFile", "net/woggioni/rbcs/cli/logback.xml")
|
||||
setPropertyIfNotPresent("io.netty.leakDetectionLevel", "DISABLED")
|
||||
val currentClassLoader = RemoteBuildCacheServerCli::class.java.classLoader
|
||||
@@ -56,7 +56,12 @@ class RemoteBuildCacheServerCli : RbcsCommand() {
|
||||
addSubcommand(GetCommand())
|
||||
addSubcommand(HealthCheckCommand())
|
||||
})
|
||||
System.exit(commandLine.execute(*args))
|
||||
return commandLine
|
||||
}
|
||||
|
||||
@JvmStatic
|
||||
fun main(vararg args: String) {
|
||||
System.exit(createCommandLine().execute(*args))
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -6,7 +6,6 @@ import net.woggioni.rbcs.client.Configuration
|
||||
import net.woggioni.rbcs.common.createLogger
|
||||
import net.woggioni.rbcs.common.debug
|
||||
import picocli.CommandLine
|
||||
import java.lang.IllegalArgumentException
|
||||
import java.nio.file.Path
|
||||
|
||||
@CommandLine.Command(
|
||||
|
@@ -38,11 +38,12 @@ data class Configuration(
|
||||
val readIdleTimeout: Duration,
|
||||
val writeIdleTimeout: Duration,
|
||||
val idleTimeout: Duration,
|
||||
val requestPipelining : Boolean,
|
||||
)
|
||||
|
||||
data class Profile(
|
||||
val serverURI: URI,
|
||||
val connection: Connection?,
|
||||
val connection: Connection,
|
||||
val authentication: Authentication?,
|
||||
val connectionTimeout: Duration?,
|
||||
val maxConnections: Int,
|
||||
|
@@ -318,6 +318,9 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
|
||||
) {
|
||||
pipeline.remove(this)
|
||||
responseFuture.complete(response)
|
||||
if(!profile.connection.requestPipelining) {
|
||||
pool.release(channel)
|
||||
}
|
||||
}
|
||||
|
||||
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
|
||||
@@ -332,6 +335,9 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
|
||||
|
||||
override fun channelInactive(ctx: ChannelHandlerContext) {
|
||||
responseFuture.completeExceptionally(IOException("The remote server closed the connection"))
|
||||
if(!profile.connection.requestPipelining) {
|
||||
pool.release(channel)
|
||||
}
|
||||
super.channelInactive(ctx)
|
||||
}
|
||||
|
||||
@@ -352,6 +358,9 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
|
||||
if (this === pipeline.last()) {
|
||||
ctx.close()
|
||||
}
|
||||
if(!profile.connection.requestPipelining) {
|
||||
pool.release(channel)
|
||||
}
|
||||
} else {
|
||||
super.userEventTriggered(ctx, evt)
|
||||
}
|
||||
@@ -401,8 +410,9 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
|
||||
val ex = it.cause()
|
||||
log.warn(ex.message, ex)
|
||||
}
|
||||
|
||||
pool.release(channel)
|
||||
if(profile.connection.requestPipelining) {
|
||||
pool.release(channel)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
responseFuture.completeExceptionally(channelFuture.cause())
|
||||
|
@@ -30,7 +30,12 @@ object Parser {
|
||||
?: throw ConfigurationException("base-url attribute is required")
|
||||
var authentication: Configuration.Authentication? = null
|
||||
var retryPolicy: Configuration.RetryPolicy? = null
|
||||
var connection : Configuration.Connection? = null
|
||||
var connection : Configuration.Connection = Configuration.Connection(
|
||||
Duration.ofSeconds(60),
|
||||
Duration.ofSeconds(60),
|
||||
Duration.ofSeconds(30),
|
||||
false
|
||||
)
|
||||
var trustStore : Configuration.TrustStore? = null
|
||||
for (gchild in child.asIterable()) {
|
||||
when (gchild.localName) {
|
||||
@@ -97,10 +102,13 @@ object Parser {
|
||||
?.let(Duration::parse) ?: Duration.of(60, ChronoUnit.SECONDS)
|
||||
val writeIdleTimeout = gchild.renderAttribute("write-idle-timeout")
|
||||
?.let(Duration::parse) ?: Duration.of(60, ChronoUnit.SECONDS)
|
||||
val requestPipelining = gchild.renderAttribute("request-pipelining")
|
||||
?.let(String::toBoolean) ?: false
|
||||
connection = Configuration.Connection(
|
||||
readIdleTimeout,
|
||||
writeIdleTimeout,
|
||||
idleTimeout,
|
||||
requestPipelining
|
||||
)
|
||||
}
|
||||
|
||||
|
@@ -123,6 +123,13 @@
|
||||
</xs:documentation>
|
||||
</xs:annotation>
|
||||
</xs:attribute>
|
||||
<xs:attribute name="request-pipelining" type="xs:boolean" use="optional" default="false">
|
||||
<xs:annotation>
|
||||
<xs:documentation>
|
||||
Enables HTTP/1.1 request pipelining
|
||||
</xs:documentation>
|
||||
</xs:annotation>
|
||||
</xs:attribute>
|
||||
</xs:complexType>
|
||||
|
||||
<xs:complexType name="noAuthType">
|
||||
|
@@ -6,7 +6,7 @@ plugins {
|
||||
}
|
||||
|
||||
dependencies {
|
||||
implementation project(':rbcs-api')
|
||||
implementation catalog.netty.transport
|
||||
implementation catalog.slf4j.api
|
||||
implementation catalog.jwo
|
||||
implementation catalog.netty.buffer
|
||||
|
@@ -1,11 +1,11 @@
|
||||
package net.woggioni.rbcs.server.memcache
|
||||
|
||||
import io.netty.channel.ChannelFactory
|
||||
import io.netty.channel.ChannelHandler
|
||||
import io.netty.channel.EventLoopGroup
|
||||
import io.netty.channel.pool.FixedChannelPool
|
||||
import io.netty.channel.socket.DatagramChannel
|
||||
import io.netty.channel.socket.SocketChannel
|
||||
import net.woggioni.rbcs.api.CacheHandler
|
||||
import net.woggioni.rbcs.api.CacheHandlerFactory
|
||||
import net.woggioni.rbcs.api.Configuration
|
||||
import net.woggioni.rbcs.common.HostAndPort
|
||||
@@ -51,7 +51,7 @@ data class MemcacheCacheConfiguration(
|
||||
eventLoop: EventLoopGroup,
|
||||
socketChannelFactory: ChannelFactory<SocketChannel>,
|
||||
datagramChannelFactory: ChannelFactory<DatagramChannel>,
|
||||
): ChannelHandler {
|
||||
): CacheHandler {
|
||||
return MemcacheCacheHandler(
|
||||
MemcacheClient(
|
||||
this@MemcacheCacheConfiguration.servers,
|
||||
|
@@ -4,7 +4,6 @@ import io.netty.buffer.ByteBuf
|
||||
import io.netty.buffer.ByteBufAllocator
|
||||
import io.netty.buffer.CompositeByteBuf
|
||||
import io.netty.channel.ChannelHandlerContext
|
||||
import io.netty.channel.SimpleChannelInboundHandler
|
||||
import io.netty.handler.codec.memcache.DefaultLastMemcacheContent
|
||||
import io.netty.handler.codec.memcache.DefaultMemcacheContent
|
||||
import io.netty.handler.codec.memcache.LastMemcacheContent
|
||||
@@ -13,6 +12,7 @@ import io.netty.handler.codec.memcache.binary.BinaryMemcacheOpcodes
|
||||
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
|
||||
import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponseStatus
|
||||
import io.netty.handler.codec.memcache.binary.DefaultBinaryMemcacheRequest
|
||||
import net.woggioni.rbcs.api.CacheHandler
|
||||
import net.woggioni.rbcs.api.CacheValueMetadata
|
||||
import net.woggioni.rbcs.api.exception.ContentTooLargeException
|
||||
import net.woggioni.rbcs.api.message.CacheMessage
|
||||
@@ -58,7 +58,7 @@ class MemcacheCacheHandler(
|
||||
private val compressionLevel: Int,
|
||||
private val chunkSize: Int,
|
||||
private val maxAge: Duration
|
||||
) : SimpleChannelInboundHandler<CacheMessage>() {
|
||||
) : CacheHandler() {
|
||||
companion object {
|
||||
private val log = createLogger<MemcacheCacheHandler>()
|
||||
|
||||
@@ -69,10 +69,14 @@ class MemcacheCacheHandler(
|
||||
}
|
||||
}
|
||||
|
||||
private interface InProgressRequest {
|
||||
|
||||
}
|
||||
|
||||
private inner class InProgressGetRequest(
|
||||
private val key: String,
|
||||
val key: String,
|
||||
private val ctx: ChannelHandlerContext
|
||||
) {
|
||||
) : InProgressRequest {
|
||||
private val acc = ctx.alloc().compositeBuffer()
|
||||
private val chunk = ctx.alloc().compositeBuffer()
|
||||
private val outputStream = ByteBufOutputStream(chunk).let {
|
||||
@@ -98,32 +102,35 @@ class MemcacheCacheHandler(
|
||||
acc.retain()
|
||||
it.readObject() as CacheValueMetadata
|
||||
}
|
||||
ctx.writeAndFlush(CacheValueFoundResponse(key, metadata))
|
||||
log.trace(ctx) {
|
||||
"Sending response from cache"
|
||||
}
|
||||
sendMessageAndFlush(ctx, CacheValueFoundResponse(key, metadata))
|
||||
responseSent = true
|
||||
acc.readerIndex(Int.SIZE_BYTES + mSize)
|
||||
}
|
||||
if (responseSent) {
|
||||
acc.readBytes(outputStream, acc.readableBytes())
|
||||
if(acc.readableBytes() >= chunkSize) {
|
||||
if (acc.readableBytes() >= chunkSize) {
|
||||
flush(false)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun flush(last : Boolean) {
|
||||
private fun flush(last: Boolean) {
|
||||
val toSend = extractChunk(chunk, ctx.alloc())
|
||||
val msg = if(last) {
|
||||
val msg = if (last) {
|
||||
log.trace(ctx) {
|
||||
"Sending last chunk to client on channel ${ctx.channel().id().asShortText()}"
|
||||
"Sending last chunk to client"
|
||||
}
|
||||
LastCacheContent(toSend)
|
||||
} else {
|
||||
log.trace(ctx) {
|
||||
"Sending chunk to client on channel ${ctx.channel().id().asShortText()}"
|
||||
"Sending chunk to client"
|
||||
}
|
||||
CacheContent(toSend)
|
||||
}
|
||||
ctx.writeAndFlush(msg)
|
||||
sendMessageAndFlush(ctx, msg)
|
||||
}
|
||||
|
||||
fun commit() {
|
||||
@@ -141,14 +148,14 @@ class MemcacheCacheHandler(
|
||||
}
|
||||
|
||||
private inner class InProgressPutRequest(
|
||||
private val ch : NettyChannel,
|
||||
metadata : CacheValueMetadata,
|
||||
val digest : ByteBuf,
|
||||
private val ch: NettyChannel,
|
||||
metadata: CacheValueMetadata,
|
||||
val digest: ByteBuf,
|
||||
val requestController: CompletableFuture<MemcacheRequestController>,
|
||||
private val alloc: ByteBufAllocator
|
||||
) {
|
||||
) : InProgressRequest {
|
||||
private var totalSize = 0
|
||||
private var tmpFile : FileChannel? = null
|
||||
private var tmpFile: FileChannel? = null
|
||||
private val accumulator = alloc.compositeBuffer()
|
||||
private val stream = ByteBufOutputStream(accumulator).let {
|
||||
if (compressionEnabled) {
|
||||
@@ -175,7 +182,7 @@ class MemcacheCacheHandler(
|
||||
tmpFile?.let {
|
||||
flushToDisk(it, accumulator)
|
||||
}
|
||||
if(accumulator.readableBytes() > 0x100000) {
|
||||
if (accumulator.readableBytes() > 0x100000) {
|
||||
log.debug(ch) {
|
||||
"Entry is too big, buffering it into a file"
|
||||
}
|
||||
@@ -192,18 +199,18 @@ class MemcacheCacheHandler(
|
||||
}
|
||||
}
|
||||
|
||||
private fun flushToDisk(fc : FileChannel, buf : CompositeByteBuf) {
|
||||
private fun flushToDisk(fc: FileChannel, buf: CompositeByteBuf) {
|
||||
val chunk = extractChunk(buf, alloc)
|
||||
fc.write(chunk.nioBuffer())
|
||||
chunk.release()
|
||||
}
|
||||
|
||||
fun commit() : Pair<Int, ReadableByteChannel> {
|
||||
fun commit(): Pair<Int, ReadableByteChannel> {
|
||||
digest.release()
|
||||
accumulator.retain()
|
||||
stream.close()
|
||||
val fileChannel = tmpFile
|
||||
return if(fileChannel != null) {
|
||||
return if (fileChannel != null) {
|
||||
flushToDisk(fileChannel, accumulator)
|
||||
accumulator.release()
|
||||
fileChannel.position(0)
|
||||
@@ -224,8 +231,7 @@ class MemcacheCacheHandler(
|
||||
}
|
||||
}
|
||||
|
||||
private var inProgressPutRequest: InProgressPutRequest? = null
|
||||
private var inProgressGetRequest: InProgressGetRequest? = null
|
||||
private var inProgressRequest: InProgressRequest? = null
|
||||
|
||||
override fun channelRead0(ctx: ChannelHandlerContext, msg: CacheMessage) {
|
||||
when (msg) {
|
||||
@@ -252,32 +258,39 @@ class MemcacheCacheHandler(
|
||||
log.debug(ctx) {
|
||||
"Cache hit for key ${msg.key} on memcache"
|
||||
}
|
||||
inProgressGetRequest = InProgressGetRequest(msg.key, ctx)
|
||||
inProgressRequest = InProgressGetRequest(msg.key, ctx)
|
||||
}
|
||||
|
||||
BinaryMemcacheResponseStatus.KEY_ENOENT -> {
|
||||
log.debug(ctx) {
|
||||
"Cache miss for key ${msg.key} on memcache"
|
||||
}
|
||||
ctx.writeAndFlush(CacheValueNotFoundResponse())
|
||||
sendMessageAndFlush(ctx, CacheValueNotFoundResponse())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun contentReceived(content: MemcacheContent) {
|
||||
log.trace(ctx) {
|
||||
"${if(content is LastMemcacheContent) "Last chunk" else "Chunk"} of ${content.content().readableBytes()} bytes received from memcache for key ${msg.key}"
|
||||
"${if (content is LastMemcacheContent) "Last chunk" else "Chunk"} of ${
|
||||
content.content().readableBytes()
|
||||
} bytes received from memcache for key ${msg.key}"
|
||||
}
|
||||
inProgressGetRequest?.write(content.content())
|
||||
if (content is LastMemcacheContent) {
|
||||
inProgressGetRequest?.commit()
|
||||
(inProgressRequest as? InProgressGetRequest)?.let { inProgressGetRequest ->
|
||||
inProgressGetRequest.write(content.content())
|
||||
if (content is LastMemcacheContent) {
|
||||
inProgressRequest = null
|
||||
inProgressGetRequest.commit()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun exceptionCaught(ex: Throwable) {
|
||||
inProgressGetRequest?.let {
|
||||
inProgressGetRequest = null
|
||||
it.rollback()
|
||||
(inProgressRequest as? InProgressGetRequest).let { inProgressGetRequest ->
|
||||
inProgressGetRequest?.let {
|
||||
inProgressRequest = null
|
||||
it.rollback()
|
||||
}
|
||||
}
|
||||
this@MemcacheCacheHandler.exceptionCaught(ctx, ex)
|
||||
}
|
||||
@@ -290,6 +303,7 @@ class MemcacheCacheHandler(
|
||||
setOpcode(BinaryMemcacheOpcodes.GET)
|
||||
}
|
||||
requestHandle.sendRequest(request)
|
||||
requestHandle.sendContent(LastMemcacheContent.EMPTY_LAST_CONTENT)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -305,8 +319,9 @@ class MemcacheCacheHandler(
|
||||
log.debug(ctx) {
|
||||
"Inserted key ${msg.key} into memcache"
|
||||
}
|
||||
ctx.writeAndFlush(CachePutResponse(msg.key))
|
||||
sendMessageAndFlush(ctx, CachePutResponse(msg.key))
|
||||
}
|
||||
|
||||
else -> this@MemcacheCacheHandler.exceptionCaught(ctx, MemcacheException(status))
|
||||
}
|
||||
}
|
||||
@@ -323,86 +338,103 @@ class MemcacheCacheHandler(
|
||||
this@MemcacheCacheHandler.exceptionCaught(ctx, ex)
|
||||
}
|
||||
}
|
||||
inProgressPutRequest = InProgressPutRequest(ctx.channel(), msg.metadata, key, requestController, ctx.alloc())
|
||||
inProgressRequest = InProgressPutRequest(ctx.channel(), msg.metadata, key, requestController, ctx.alloc())
|
||||
}
|
||||
|
||||
private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) {
|
||||
inProgressPutRequest?.let { request ->
|
||||
log.trace(ctx) {
|
||||
"Received chunk of ${msg.content().readableBytes()} bytes for memcache"
|
||||
val request = inProgressRequest
|
||||
when (request) {
|
||||
is InProgressPutRequest -> {
|
||||
log.trace(ctx) {
|
||||
"Received chunk of ${msg.content().readableBytes()} bytes for memcache"
|
||||
}
|
||||
request.write(msg.content())
|
||||
}
|
||||
|
||||
is InProgressGetRequest -> {
|
||||
msg.release()
|
||||
}
|
||||
request.write(msg.content())
|
||||
}
|
||||
}
|
||||
|
||||
private fun handleLastCacheContent(ctx: ChannelHandlerContext, msg: LastCacheContent) {
|
||||
inProgressPutRequest?.let { request ->
|
||||
inProgressPutRequest = null
|
||||
log.trace(ctx) {
|
||||
"Received last chunk of ${msg.content().readableBytes()} bytes for memcache"
|
||||
}
|
||||
request.write(msg.content())
|
||||
val key = request.digest.retainedDuplicate()
|
||||
val (payloadSize, payloadSource) = request.commit()
|
||||
val extras = ctx.alloc().buffer(8, 8)
|
||||
extras.writeInt(0)
|
||||
extras.writeInt(encodeExpiry(maxAge))
|
||||
val totalBodyLength = request.digest.readableBytes() + extras.readableBytes() + payloadSize
|
||||
request.requestController.whenComplete { requestController, ex ->
|
||||
if(ex == null) {
|
||||
log.trace(ctx) {
|
||||
"Sending SET request to memcache"
|
||||
}
|
||||
requestController.sendRequest(DefaultBinaryMemcacheRequest().apply {
|
||||
setOpcode(BinaryMemcacheOpcodes.SET)
|
||||
setKey(key)
|
||||
setExtras(extras)
|
||||
setTotalBodyLength(totalBodyLength)
|
||||
})
|
||||
log.trace(ctx) {
|
||||
"Sending request payload to memcache"
|
||||
}
|
||||
payloadSource.use { source ->
|
||||
val bb = ByteBuffer.allocate(chunkSize)
|
||||
while (true) {
|
||||
val read = source.read(bb)
|
||||
bb.limit()
|
||||
if(read >= 0 && bb.position() < chunkSize && bb.hasRemaining()) {
|
||||
continue
|
||||
}
|
||||
val chunk = ctx.alloc().buffer(chunkSize)
|
||||
bb.flip()
|
||||
chunk.writeBytes(bb)
|
||||
bb.clear()
|
||||
log.trace(ctx) {
|
||||
"Sending ${chunk.readableBytes()} bytes chunk to memcache"
|
||||
}
|
||||
if(read < 0) {
|
||||
requestController.sendContent(DefaultLastMemcacheContent(chunk))
|
||||
break
|
||||
} else {
|
||||
requestController.sendContent(DefaultMemcacheContent(chunk))
|
||||
val request = inProgressRequest
|
||||
when (request) {
|
||||
is InProgressPutRequest -> {
|
||||
inProgressRequest = null
|
||||
log.trace(ctx) {
|
||||
"Received last chunk of ${msg.content().readableBytes()} bytes for memcache"
|
||||
}
|
||||
request.write(msg.content())
|
||||
val key = request.digest.retainedDuplicate()
|
||||
val (payloadSize, payloadSource) = request.commit()
|
||||
val extras = ctx.alloc().buffer(8, 8)
|
||||
extras.writeInt(0)
|
||||
extras.writeInt(encodeExpiry(maxAge))
|
||||
val totalBodyLength = request.digest.readableBytes() + extras.readableBytes() + payloadSize
|
||||
log.trace(ctx) {
|
||||
"Trying to send SET request to memcache"
|
||||
}
|
||||
request.requestController.whenComplete { requestController, ex ->
|
||||
if (ex == null) {
|
||||
log.trace(ctx) {
|
||||
"Sending SET request to memcache"
|
||||
}
|
||||
requestController.sendRequest(DefaultBinaryMemcacheRequest().apply {
|
||||
setOpcode(BinaryMemcacheOpcodes.SET)
|
||||
setKey(key)
|
||||
setExtras(extras)
|
||||
setTotalBodyLength(totalBodyLength)
|
||||
})
|
||||
log.trace(ctx) {
|
||||
"Sending request payload to memcache"
|
||||
}
|
||||
payloadSource.use { source ->
|
||||
val bb = ByteBuffer.allocate(chunkSize)
|
||||
while (true) {
|
||||
val read = source.read(bb)
|
||||
bb.limit()
|
||||
if (read >= 0 && bb.position() < chunkSize && bb.hasRemaining()) {
|
||||
continue
|
||||
}
|
||||
val chunk = ctx.alloc().buffer(chunkSize)
|
||||
bb.flip()
|
||||
chunk.writeBytes(bb)
|
||||
bb.clear()
|
||||
log.trace(ctx) {
|
||||
"Sending ${chunk.readableBytes()} bytes chunk to memcache"
|
||||
}
|
||||
if (read < 0) {
|
||||
requestController.sendContent(DefaultLastMemcacheContent(chunk))
|
||||
break
|
||||
} else {
|
||||
requestController.sendContent(DefaultMemcacheContent(chunk))
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
payloadSource.close()
|
||||
}
|
||||
} else {
|
||||
payloadSource.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
|
||||
inProgressGetRequest?.let {
|
||||
inProgressGetRequest = null
|
||||
it.rollback()
|
||||
}
|
||||
inProgressPutRequest?.let {
|
||||
inProgressPutRequest = null
|
||||
it.requestController.thenAccept { controller ->
|
||||
controller.exceptionCaught(cause)
|
||||
val request = inProgressRequest
|
||||
when (request) {
|
||||
is InProgressPutRequest -> {
|
||||
inProgressRequest = null
|
||||
request.requestController.thenAccept { controller ->
|
||||
controller.exceptionCaught(cause)
|
||||
}
|
||||
request.rollback()
|
||||
}
|
||||
|
||||
is InProgressGetRequest -> {
|
||||
inProgressRequest = null
|
||||
request.rollback()
|
||||
}
|
||||
it.rollback()
|
||||
}
|
||||
super.exceptionCaught(ctx, cause)
|
||||
}
|
||||
|
@@ -12,7 +12,6 @@ import io.netty.channel.ChannelPipeline
|
||||
import io.netty.channel.EventLoopGroup
|
||||
import io.netty.channel.SimpleChannelInboundHandler
|
||||
import io.netty.channel.pool.AbstractChannelPoolHandler
|
||||
import io.netty.channel.pool.ChannelPool
|
||||
import io.netty.channel.pool.FixedChannelPool
|
||||
import io.netty.channel.socket.SocketChannel
|
||||
import io.netty.handler.codec.memcache.LastMemcacheContent
|
||||
@@ -24,7 +23,7 @@ import io.netty.handler.codec.memcache.binary.BinaryMemcacheResponse
|
||||
import io.netty.util.concurrent.GenericFutureListener
|
||||
import net.woggioni.rbcs.common.HostAndPort
|
||||
import net.woggioni.rbcs.common.createLogger
|
||||
import net.woggioni.rbcs.common.warn
|
||||
import net.woggioni.rbcs.common.trace
|
||||
import net.woggioni.rbcs.server.memcache.MemcacheCacheConfiguration
|
||||
import net.woggioni.rbcs.server.memcache.MemcacheCacheHandler
|
||||
import java.io.IOException
|
||||
@@ -94,18 +93,6 @@ class MemcacheClient(
|
||||
pool.acquire().addListener(object : GenericFutureListener<NettyFuture<Channel>> {
|
||||
override fun operationComplete(channelFuture: NettyFuture<Channel>) {
|
||||
if (channelFuture.isSuccess) {
|
||||
|
||||
var requestSent = false
|
||||
var requestBodySent = false
|
||||
var requestFinished = false
|
||||
var responseReceived = false
|
||||
var responseBodyReceived = false
|
||||
var responseFinished = false
|
||||
var requestBodySize = 0
|
||||
var requestBodyBytesSent = 0
|
||||
|
||||
|
||||
|
||||
val channel = channelFuture.now
|
||||
var connectionClosedByTheRemoteServer = true
|
||||
val closeCallback = {
|
||||
@@ -113,14 +100,7 @@ class MemcacheClient(
|
||||
val ex = IOException("The memcache server closed the connection")
|
||||
val completed = response.completeExceptionally(ex)
|
||||
if(!completed) responseHandler.exceptionCaught(ex)
|
||||
log.warn {
|
||||
"RequestSent: $requestSent, RequestBodySent: $requestBodySent, " +
|
||||
"RequestFinished: $requestFinished, ResponseReceived: $responseReceived, " +
|
||||
"ResponseBodyReceived: $responseBodyReceived, ResponseFinished: $responseFinished, " +
|
||||
"RequestBodySize: $requestBodySize, RequestBodyBytesSent: $requestBodyBytesSent"
|
||||
}
|
||||
}
|
||||
pool.release(channel)
|
||||
}
|
||||
val closeListener = ChannelFutureListener {
|
||||
closeCallback()
|
||||
@@ -140,18 +120,14 @@ class MemcacheClient(
|
||||
when (msg) {
|
||||
is BinaryMemcacheResponse -> {
|
||||
responseHandler.responseReceived(msg)
|
||||
responseReceived = true
|
||||
}
|
||||
|
||||
is LastMemcacheContent -> {
|
||||
responseFinished = true
|
||||
responseHandler.contentReceived(msg)
|
||||
pipeline.remove(this)
|
||||
pool.release(channel)
|
||||
}
|
||||
|
||||
is MemcacheContent -> {
|
||||
responseBodyReceived = true
|
||||
responseHandler.contentReceived(msg)
|
||||
}
|
||||
}
|
||||
@@ -165,35 +141,43 @@ class MemcacheClient(
|
||||
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
|
||||
connectionClosedByTheRemoteServer = false
|
||||
ctx.close()
|
||||
pool.release(channel)
|
||||
responseHandler.exceptionCaught(cause)
|
||||
}
|
||||
}
|
||||
|
||||
channel.pipeline()
|
||||
.addLast("client-handler", handler)
|
||||
channel.pipeline().addLast(handler)
|
||||
response.complete(object : MemcacheRequestController {
|
||||
private var channelReleased = false
|
||||
|
||||
override fun sendRequest(request: BinaryMemcacheRequest) {
|
||||
requestBodySize = request.totalBodyLength() - request.keyLength() - request.extrasLength()
|
||||
channel.writeAndFlush(request)
|
||||
requestSent = true
|
||||
}
|
||||
|
||||
override fun sendContent(content: MemcacheContent) {
|
||||
val size = content.content().readableBytes()
|
||||
channel.writeAndFlush(content).addListener {
|
||||
requestBodyBytesSent += size
|
||||
requestBodySent = true
|
||||
if(content is LastMemcacheContent) {
|
||||
requestFinished = true
|
||||
if(!channelReleased) {
|
||||
pool.release(channel)
|
||||
channelReleased = true
|
||||
log.trace(channel) {
|
||||
"Channel released"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun exceptionCaught(ex: Throwable) {
|
||||
log.warn(ex.message, ex)
|
||||
connectionClosedByTheRemoteServer = false
|
||||
channel.close()
|
||||
if(!channelReleased) {
|
||||
pool.release(channel)
|
||||
channelReleased = true
|
||||
log.trace(channel) {
|
||||
"Channel released"
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
} else {
|
||||
|
@@ -54,6 +54,7 @@ import net.woggioni.rbcs.server.auth.RoleAuthorizer
|
||||
import net.woggioni.rbcs.server.configuration.Parser
|
||||
import net.woggioni.rbcs.server.configuration.Serializer
|
||||
import net.woggioni.rbcs.server.exception.ExceptionHandler
|
||||
import net.woggioni.rbcs.server.handler.BlackHoleRequestHandler
|
||||
import net.woggioni.rbcs.server.handler.MaxRequestSizeHandler
|
||||
import net.woggioni.rbcs.server.handler.ServerHandler
|
||||
import net.woggioni.rbcs.server.throttling.BucketManager
|
||||
@@ -298,6 +299,7 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
|
||||
"Closed connection ${ch.id().asShortText()} with ${ch.remoteAddress()}"
|
||||
}
|
||||
}
|
||||
ch.config().setAutoRead(false)
|
||||
val pipeline = ch.pipeline()
|
||||
cfg.connection.also { conn ->
|
||||
val readIdleTimeout = conn.readIdleTimeout.toMillis()
|
||||
@@ -360,6 +362,7 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
|
||||
}
|
||||
pipeline.addLast(eventExecutorGroup, ServerHandler.NAME, serverHandler)
|
||||
pipeline.addLast(ExceptionHandler.NAME, ExceptionHandler)
|
||||
pipeline.addLast(BlackHoleRequestHandler.NAME, BlackHoleRequestHandler())
|
||||
}
|
||||
|
||||
override fun asyncClose() = cacheHandlerFactory.asyncClose()
|
||||
|
@@ -2,9 +2,9 @@ package net.woggioni.rbcs.server.cache
|
||||
|
||||
import io.netty.buffer.ByteBuf
|
||||
import io.netty.channel.ChannelHandlerContext
|
||||
import io.netty.channel.SimpleChannelInboundHandler
|
||||
import io.netty.handler.codec.http.LastHttpContent
|
||||
import io.netty.handler.stream.ChunkedNioFile
|
||||
import net.woggioni.rbcs.api.CacheHandler
|
||||
import net.woggioni.rbcs.api.message.CacheMessage
|
||||
import net.woggioni.rbcs.api.message.CacheMessage.CacheContent
|
||||
import net.woggioni.rbcs.api.message.CacheMessage.CacheGetRequest
|
||||
@@ -26,12 +26,18 @@ class FileSystemCacheHandler(
|
||||
private val compressionEnabled: Boolean,
|
||||
private val compressionLevel: Int,
|
||||
private val chunkSize: Int
|
||||
) : SimpleChannelInboundHandler<CacheMessage>() {
|
||||
) : CacheHandler() {
|
||||
|
||||
private interface InProgressRequest{
|
||||
|
||||
}
|
||||
|
||||
private class InProgressGetRequest(val request : CacheGetRequest) : InProgressRequest
|
||||
|
||||
private inner class InProgressPutRequest(
|
||||
val key : String,
|
||||
private val fileSink : FileSystemCache.FileSink
|
||||
) {
|
||||
) : InProgressRequest {
|
||||
|
||||
private val stream = Channels.newOutputStream(fileSink.channel).let {
|
||||
if (compressionEnabled) {
|
||||
@@ -55,7 +61,7 @@ class FileSystemCacheHandler(
|
||||
}
|
||||
}
|
||||
|
||||
private var inProgressPutRequest: InProgressPutRequest? = null
|
||||
private var inProgressRequest: InProgressRequest? = null
|
||||
|
||||
override fun channelRead0(ctx: ChannelHandlerContext, msg: CacheMessage) {
|
||||
when (msg) {
|
||||
@@ -68,55 +74,64 @@ class FileSystemCacheHandler(
|
||||
}
|
||||
|
||||
private fun handleGetRequest(ctx: ChannelHandlerContext, msg: CacheGetRequest) {
|
||||
val key = String(Base64.getUrlEncoder().encode(processCacheKey(msg.key, digestAlgorithm)))
|
||||
cache.get(key)?.also { entryValue ->
|
||||
ctx.writeAndFlush(CacheValueFoundResponse(msg.key, entryValue.metadata))
|
||||
entryValue.channel.let { channel ->
|
||||
if(compressionEnabled) {
|
||||
InflaterInputStream(Channels.newInputStream(channel)).use { stream ->
|
||||
inProgressRequest = InProgressGetRequest(msg)
|
||||
|
||||
outerLoop@
|
||||
while (true) {
|
||||
val buf = ctx.alloc().heapBuffer(chunkSize)
|
||||
while(buf.readableBytes() < chunkSize) {
|
||||
val read = buf.writeBytes(stream, chunkSize)
|
||||
if(read < 0) {
|
||||
ctx.writeAndFlush(LastCacheContent(buf))
|
||||
break@outerLoop
|
||||
}
|
||||
}
|
||||
ctx.writeAndFlush(CacheContent(buf))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ctx.writeAndFlush(ChunkedNioFile(channel, entryValue.offset, entryValue.size - entryValue.offset, chunkSize))
|
||||
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
|
||||
}
|
||||
}
|
||||
} ?: ctx.writeAndFlush(CacheValueNotFoundResponse())
|
||||
}
|
||||
|
||||
private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) {
|
||||
val key = String(Base64.getUrlEncoder().encode(processCacheKey(msg.key, digestAlgorithm)))
|
||||
val sink = cache.put(key, msg.metadata)
|
||||
inProgressPutRequest = InProgressPutRequest(msg.key, sink)
|
||||
inProgressRequest = InProgressPutRequest(msg.key, sink)
|
||||
}
|
||||
|
||||
private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) {
|
||||
inProgressPutRequest!!.write(msg.content())
|
||||
val request = inProgressRequest
|
||||
if(request is InProgressPutRequest) {
|
||||
request.write(msg.content())
|
||||
}
|
||||
}
|
||||
|
||||
private fun handleLastCacheContent(ctx: ChannelHandlerContext, msg: LastCacheContent) {
|
||||
inProgressPutRequest?.let { request ->
|
||||
inProgressPutRequest = null
|
||||
request.write(msg.content())
|
||||
request.commit()
|
||||
ctx.writeAndFlush(CachePutResponse(request.key))
|
||||
when(val request = inProgressRequest) {
|
||||
is InProgressPutRequest -> {
|
||||
inProgressRequest = null
|
||||
request.write(msg.content())
|
||||
request.commit()
|
||||
sendMessageAndFlush(ctx, CachePutResponse(request.key))
|
||||
}
|
||||
is InProgressGetRequest -> {
|
||||
val key = String(Base64.getUrlEncoder().encode(processCacheKey(request.request.key, digestAlgorithm)))
|
||||
cache.get(key)?.also { entryValue ->
|
||||
sendMessageAndFlush(ctx, CacheValueFoundResponse(request.request.key, entryValue.metadata))
|
||||
entryValue.channel.let { channel ->
|
||||
if(compressionEnabled) {
|
||||
InflaterInputStream(Channels.newInputStream(channel)).use { stream ->
|
||||
|
||||
outerLoop@
|
||||
while (true) {
|
||||
val buf = ctx.alloc().heapBuffer(chunkSize)
|
||||
while(buf.readableBytes() < chunkSize) {
|
||||
val read = buf.writeBytes(stream, chunkSize)
|
||||
if(read < 0) {
|
||||
sendMessageAndFlush(ctx, LastCacheContent(buf))
|
||||
break@outerLoop
|
||||
}
|
||||
}
|
||||
sendMessageAndFlush(ctx, CacheContent(buf))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
sendMessage(ctx, ChunkedNioFile(channel, entryValue.offset, entryValue.size - entryValue.offset, chunkSize))
|
||||
sendMessageAndFlush(ctx, LastHttpContent.EMPTY_LAST_CONTENT)
|
||||
}
|
||||
}
|
||||
} ?: sendMessageAndFlush(ctx, CacheValueNotFoundResponse())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
|
||||
inProgressPutRequest?.rollback()
|
||||
(inProgressRequest as? InProgressPutRequest)?.rollback()
|
||||
super.exceptionCaught(ctx, cause)
|
||||
}
|
||||
}
|
@@ -75,6 +75,10 @@ class InMemoryCache(
|
||||
cond.await(interval.toMillis(), TimeUnit.MILLISECONDS)
|
||||
}
|
||||
}
|
||||
map.forEach {
|
||||
it.value.content.release()
|
||||
}
|
||||
map.clear()
|
||||
}
|
||||
complete(null)
|
||||
} catch (ex: Throwable) {
|
||||
|
@@ -4,7 +4,6 @@ import io.netty.channel.ChannelFactory
|
||||
import io.netty.channel.EventLoopGroup
|
||||
import io.netty.channel.socket.DatagramChannel
|
||||
import io.netty.channel.socket.SocketChannel
|
||||
import io.netty.util.concurrent.Future
|
||||
import net.woggioni.rbcs.api.CacheHandlerFactory
|
||||
import net.woggioni.rbcs.api.Configuration
|
||||
import net.woggioni.rbcs.common.RBCS
|
||||
|
@@ -2,15 +2,9 @@ package net.woggioni.rbcs.server.cache
|
||||
|
||||
import io.netty.buffer.ByteBuf
|
||||
import io.netty.channel.ChannelHandlerContext
|
||||
import io.netty.channel.SimpleChannelInboundHandler
|
||||
import net.woggioni.rbcs.api.CacheHandler
|
||||
import net.woggioni.rbcs.api.message.CacheMessage
|
||||
import net.woggioni.rbcs.api.message.CacheMessage.CacheContent
|
||||
import net.woggioni.rbcs.api.message.CacheMessage.CacheGetRequest
|
||||
import net.woggioni.rbcs.api.message.CacheMessage.CachePutRequest
|
||||
import net.woggioni.rbcs.api.message.CacheMessage.CachePutResponse
|
||||
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueFoundResponse
|
||||
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse
|
||||
import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent
|
||||
import net.woggioni.rbcs.api.message.CacheMessage.*
|
||||
import net.woggioni.rbcs.common.ByteBufOutputStream
|
||||
import net.woggioni.rbcs.common.RBCS.processCacheKey
|
||||
import java.util.zip.Deflater
|
||||
@@ -22,9 +16,17 @@ class InMemoryCacheHandler(
|
||||
private val digestAlgorithm: String?,
|
||||
private val compressionEnabled: Boolean,
|
||||
private val compressionLevel: Int
|
||||
) : SimpleChannelInboundHandler<CacheMessage>() {
|
||||
) : CacheHandler() {
|
||||
|
||||
private interface InProgressPutRequest : AutoCloseable {
|
||||
private interface InProgressRequest : AutoCloseable {
|
||||
}
|
||||
|
||||
private class InProgressGetRequest(val request : CacheGetRequest) : InProgressRequest {
|
||||
override fun close() {
|
||||
}
|
||||
}
|
||||
|
||||
private interface InProgressPutRequest : InProgressRequest {
|
||||
val request: CachePutRequest
|
||||
val buf: ByteBuf
|
||||
|
||||
@@ -33,18 +35,14 @@ class InMemoryCacheHandler(
|
||||
|
||||
private inner class InProgressPlainPutRequest(ctx: ChannelHandlerContext, override val request: CachePutRequest) :
|
||||
InProgressPutRequest {
|
||||
override val buf = ctx.alloc().compositeBuffer()
|
||||
|
||||
private val stream = ByteBufOutputStream(buf).let {
|
||||
if (compressionEnabled) {
|
||||
DeflaterOutputStream(it, Deflater(compressionLevel))
|
||||
} else {
|
||||
it
|
||||
}
|
||||
}
|
||||
override val buf = ctx.alloc().compositeHeapBuffer()
|
||||
|
||||
override fun append(buf: ByteBuf) {
|
||||
this.buf.addComponent(true, buf.retain())
|
||||
if(buf.isDirect) {
|
||||
this.buf.writeBytes(buf)
|
||||
} else {
|
||||
this.buf.addComponent(true, buf.retain())
|
||||
}
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
@@ -72,7 +70,7 @@ class InMemoryCacheHandler(
|
||||
}
|
||||
}
|
||||
|
||||
private var inProgressPutRequest: InProgressPutRequest? = null
|
||||
private var inProgressRequest: InProgressRequest? = null
|
||||
|
||||
override fun channelRead0(ctx: ChannelHandlerContext, msg: CacheMessage) {
|
||||
when (msg) {
|
||||
@@ -85,24 +83,11 @@ class InMemoryCacheHandler(
|
||||
}
|
||||
|
||||
private fun handleGetRequest(ctx: ChannelHandlerContext, msg: CacheGetRequest) {
|
||||
cache.get(processCacheKey(msg.key, digestAlgorithm))?.let { value ->
|
||||
ctx.writeAndFlush(CacheValueFoundResponse(msg.key, value.metadata))
|
||||
if (compressionEnabled) {
|
||||
val buf = ctx.alloc().heapBuffer()
|
||||
InflaterOutputStream(ByteBufOutputStream(buf)).use {
|
||||
value.content.readBytes(it, value.content.readableBytes())
|
||||
value.content.release()
|
||||
buf.retain()
|
||||
}
|
||||
ctx.writeAndFlush(LastCacheContent(buf))
|
||||
} else {
|
||||
ctx.writeAndFlush(LastCacheContent(value.content))
|
||||
}
|
||||
} ?: ctx.writeAndFlush(CacheValueNotFoundResponse())
|
||||
inProgressRequest = InProgressGetRequest(msg)
|
||||
}
|
||||
|
||||
private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) {
|
||||
inProgressPutRequest = if(compressionEnabled) {
|
||||
inProgressRequest = if(compressionEnabled) {
|
||||
InProgressCompressedPutRequest(ctx, msg)
|
||||
} else {
|
||||
InProgressPlainPutRequest(ctx, msg)
|
||||
@@ -110,27 +95,46 @@ class InMemoryCacheHandler(
|
||||
}
|
||||
|
||||
private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) {
|
||||
inProgressPutRequest?.append(msg.content())
|
||||
val req = inProgressRequest
|
||||
if(req is InProgressPutRequest) {
|
||||
req.append(msg.content())
|
||||
}
|
||||
}
|
||||
|
||||
private fun handleLastCacheContent(ctx: ChannelHandlerContext, msg: LastCacheContent) {
|
||||
handleCacheContent(ctx, msg)
|
||||
inProgressPutRequest?.let { inProgressRequest ->
|
||||
inProgressPutRequest = null
|
||||
val buf = inProgressRequest.buf
|
||||
buf.retain()
|
||||
inProgressRequest.close()
|
||||
val cacheKey = processCacheKey(inProgressRequest.request.key, digestAlgorithm)
|
||||
cache.put(cacheKey, CacheEntry(inProgressRequest.request.metadata, buf))
|
||||
ctx.writeAndFlush(CachePutResponse(inProgressRequest.request.key))
|
||||
when(val req = inProgressRequest) {
|
||||
is InProgressGetRequest -> {
|
||||
cache.get(processCacheKey(req.request.key, digestAlgorithm))?.let { value ->
|
||||
sendMessageAndFlush(ctx, CacheValueFoundResponse(req.request.key, value.metadata))
|
||||
if (compressionEnabled) {
|
||||
val buf = ctx.alloc().heapBuffer()
|
||||
InflaterOutputStream(ByteBufOutputStream(buf)).use {
|
||||
value.content.readBytes(it, value.content.readableBytes())
|
||||
value.content.release()
|
||||
buf.retain()
|
||||
}
|
||||
sendMessage(ctx, LastCacheContent(buf))
|
||||
} else {
|
||||
sendMessage(ctx, LastCacheContent(value.content))
|
||||
}
|
||||
} ?: sendMessage(ctx, CacheValueNotFoundResponse())
|
||||
}
|
||||
is InProgressPutRequest -> {
|
||||
this.inProgressRequest = null
|
||||
val buf = req.buf
|
||||
buf.retain()
|
||||
req.close()
|
||||
val cacheKey = processCacheKey(req.request.key, digestAlgorithm)
|
||||
cache.put(cacheKey, CacheEntry(req.request.metadata, buf))
|
||||
sendMessageAndFlush(ctx, CachePutResponse(req.request.key))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
|
||||
inProgressPutRequest?.let { req ->
|
||||
req.buf.release()
|
||||
inProgressPutRequest = null
|
||||
}
|
||||
inProgressRequest?.close()
|
||||
inProgressRequest = null
|
||||
super.exceptionCaught(ctx, cause)
|
||||
}
|
||||
}
|
@@ -1,4 +0,0 @@
|
||||
package net.woggioni.rbcs.server.event
|
||||
|
||||
class RequestCompletedEvent {
|
||||
}
|
@@ -0,0 +1,13 @@
|
||||
package net.woggioni.rbcs.server.handler
|
||||
|
||||
import io.netty.channel.ChannelHandlerContext
|
||||
import io.netty.channel.SimpleChannelInboundHandler
|
||||
import io.netty.handler.codec.http.HttpContent
|
||||
|
||||
class BlackHoleRequestHandler : SimpleChannelInboundHandler<HttpContent>() {
|
||||
companion object {
|
||||
val NAME = BlackHoleRequestHandler::class.java.name
|
||||
}
|
||||
override fun channelRead0(ctx: ChannelHandlerContext, msg: HttpContent) {
|
||||
}
|
||||
}
|
@@ -1,79 +0,0 @@
|
||||
package net.woggioni.rbcs.server.handler
|
||||
|
||||
import io.netty.channel.ChannelHandler
|
||||
import io.netty.channel.ChannelHandler.Sharable
|
||||
import io.netty.channel.ChannelHandlerContext
|
||||
import io.netty.channel.ChannelOutboundHandler
|
||||
import io.netty.channel.ChannelPromise
|
||||
import io.netty.channel.SimpleChannelInboundHandler
|
||||
import io.netty.handler.codec.http.HttpContent
|
||||
import io.netty.handler.codec.http.LastHttpContent
|
||||
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse
|
||||
import net.woggioni.rbcs.api.message.CacheMessage.CacheContent
|
||||
import net.woggioni.rbcs.api.message.CacheMessage.CachePutResponse
|
||||
import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent
|
||||
import java.net.SocketAddress
|
||||
|
||||
class CacheContentHandler(private val pairedHandler : ChannelHandler) : SimpleChannelInboundHandler<HttpContent>(), ChannelOutboundHandler {
|
||||
private var requestFinished = false
|
||||
|
||||
override fun channelRead0(ctx: ChannelHandlerContext, msg: HttpContent) {
|
||||
if(requestFinished) {
|
||||
ctx.fireChannelRead(msg.retain())
|
||||
} else {
|
||||
when (msg) {
|
||||
is LastHttpContent -> {
|
||||
ctx.fireChannelRead(LastCacheContent(msg.content().retain()))
|
||||
requestFinished = true
|
||||
}
|
||||
else -> ctx.fireChannelRead(CacheContent(msg.content().retain()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
|
||||
super.exceptionCaught(ctx, cause)
|
||||
}
|
||||
|
||||
|
||||
override fun bind(ctx: ChannelHandlerContext, localAddress: SocketAddress, promise: ChannelPromise) {
|
||||
ctx.bind(localAddress, promise)
|
||||
}
|
||||
|
||||
override fun connect(
|
||||
ctx: ChannelHandlerContext,
|
||||
remoteAddress: SocketAddress,
|
||||
localAddress: SocketAddress,
|
||||
promise: ChannelPromise
|
||||
) {
|
||||
ctx.connect(remoteAddress, localAddress, promise)
|
||||
}
|
||||
|
||||
override fun disconnect(ctx: ChannelHandlerContext, promise: ChannelPromise) {
|
||||
ctx.disconnect(promise)
|
||||
}
|
||||
|
||||
override fun close(ctx: ChannelHandlerContext, promise: ChannelPromise) {
|
||||
ctx.close(promise)
|
||||
}
|
||||
|
||||
override fun deregister(ctx: ChannelHandlerContext, promise: ChannelPromise) {
|
||||
ctx.deregister(promise)
|
||||
}
|
||||
|
||||
override fun read(ctx: ChannelHandlerContext) {
|
||||
ctx.read()
|
||||
}
|
||||
|
||||
override fun flush(ctx: ChannelHandlerContext) {
|
||||
ctx.flush()
|
||||
}
|
||||
|
||||
override fun write(ctx: ChannelHandlerContext, msg: Any, promise: ChannelPromise) {
|
||||
ctx.write(msg, promise)
|
||||
if(msg is LastCacheContent || msg is CachePutResponse || msg is CacheValueNotFoundResponse || msg is LastHttpContent) {
|
||||
ctx.pipeline().remove(pairedHandler)
|
||||
ctx.pipeline().remove(this)
|
||||
}
|
||||
}
|
||||
}
|
@@ -1,56 +0,0 @@
|
||||
package net.woggioni.rbcs.server.handler
|
||||
|
||||
import io.netty.channel.ChannelHandlerContext
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter
|
||||
import io.netty.channel.ChannelOutboundHandler
|
||||
import io.netty.channel.ChannelPromise
|
||||
import net.woggioni.rbcs.server.event.RequestCompletedEvent
|
||||
import java.net.SocketAddress
|
||||
|
||||
class ResponseCapHandler : ChannelInboundHandlerAdapter(), ChannelOutboundHandler {
|
||||
val bufferedMessages = mutableListOf<Any>()
|
||||
|
||||
override fun bind(ctx: ChannelHandlerContext, localAddress: SocketAddress, promise: ChannelPromise) {
|
||||
ctx.bind(localAddress, promise)
|
||||
}
|
||||
|
||||
override fun connect(
|
||||
ctx: ChannelHandlerContext,
|
||||
remoteAddress: SocketAddress,
|
||||
localAddress: SocketAddress,
|
||||
promise: ChannelPromise
|
||||
) {
|
||||
ctx.connect(remoteAddress, localAddress, promise)
|
||||
}
|
||||
|
||||
override fun disconnect(ctx: ChannelHandlerContext, promise: ChannelPromise) {
|
||||
ctx.disconnect(promise)
|
||||
}
|
||||
|
||||
override fun close(ctx: ChannelHandlerContext, promise: ChannelPromise) {
|
||||
ctx.close(promise)
|
||||
}
|
||||
|
||||
override fun deregister(ctx: ChannelHandlerContext, promise: ChannelPromise) {
|
||||
ctx.deregister(promise)
|
||||
}
|
||||
|
||||
override fun read(ctx: ChannelHandlerContext) {
|
||||
ctx.read()
|
||||
}
|
||||
|
||||
override fun write(ctx: ChannelHandlerContext, msg: Any, promise: ChannelPromise) {
|
||||
bufferedMessages.add(msg)
|
||||
}
|
||||
|
||||
override fun flush(ctx: ChannelHandlerContext) {
|
||||
}
|
||||
|
||||
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
|
||||
if(evt is RequestCompletedEvent) {
|
||||
for(msg in bufferedMessages) ctx.write(msg)
|
||||
ctx.flush()
|
||||
ctx.pipeline().remove(this)
|
||||
}
|
||||
}
|
||||
}
|
@@ -8,6 +8,7 @@ import io.netty.handler.codec.http.DefaultFullHttpResponse
|
||||
import io.netty.handler.codec.http.DefaultHttpContent
|
||||
import io.netty.handler.codec.http.DefaultHttpResponse
|
||||
import io.netty.handler.codec.http.DefaultLastHttpContent
|
||||
import io.netty.handler.codec.http.HttpContent
|
||||
import io.netty.handler.codec.http.HttpHeaderNames
|
||||
import io.netty.handler.codec.http.HttpHeaderValues
|
||||
import io.netty.handler.codec.http.HttpHeaders
|
||||
@@ -17,7 +18,6 @@ import io.netty.handler.codec.http.HttpResponseStatus
|
||||
import io.netty.handler.codec.http.HttpUtil
|
||||
import io.netty.handler.codec.http.HttpVersion
|
||||
import io.netty.handler.codec.http.LastHttpContent
|
||||
import net.woggioni.rbcs.api.CacheHandlerFactory
|
||||
import net.woggioni.rbcs.api.CacheValueMetadata
|
||||
import net.woggioni.rbcs.api.message.CacheMessage
|
||||
import net.woggioni.rbcs.api.message.CacheMessage.CacheContent
|
||||
@@ -30,10 +30,8 @@ import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent
|
||||
import net.woggioni.rbcs.common.createLogger
|
||||
import net.woggioni.rbcs.common.debug
|
||||
import net.woggioni.rbcs.common.warn
|
||||
import net.woggioni.rbcs.server.event.RequestCompletedEvent
|
||||
import net.woggioni.rbcs.server.exception.ExceptionHandler
|
||||
import java.nio.file.Path
|
||||
import java.util.Locale
|
||||
|
||||
class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupplier : () -> ChannelHandler) :
|
||||
ChannelDuplexHandler() {
|
||||
@@ -47,6 +45,15 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp
|
||||
private var keepAlive = true
|
||||
private var pipelinedRequests = 0
|
||||
|
||||
private fun newRequest() {
|
||||
pipelinedRequests += 1
|
||||
}
|
||||
|
||||
private fun requestCompleted(ctx : ChannelHandlerContext) {
|
||||
pipelinedRequests -= 1
|
||||
if(pipelinedRequests == 0) ctx.read()
|
||||
}
|
||||
|
||||
private fun resetRequestMetadata() {
|
||||
httpVersion = HttpVersion.HTTP_1_1
|
||||
keepAlive = true
|
||||
@@ -65,22 +72,44 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp
|
||||
}
|
||||
}
|
||||
|
||||
private var cacheRequestInProgress : Boolean = false
|
||||
|
||||
override fun handlerAdded(ctx: ChannelHandlerContext) {
|
||||
ctx.read()
|
||||
}
|
||||
|
||||
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
|
||||
when (msg) {
|
||||
is HttpRequest -> handleRequest(ctx, msg)
|
||||
is HttpContent -> {
|
||||
if(cacheRequestInProgress) {
|
||||
if(msg is LastHttpContent) {
|
||||
super.channelRead(ctx, LastCacheContent(msg.content().retain()))
|
||||
cacheRequestInProgress = false
|
||||
} else {
|
||||
super.channelRead(ctx, CacheContent(msg.content().retain()))
|
||||
}
|
||||
msg.release()
|
||||
} else {
|
||||
super.channelRead(ctx, msg)
|
||||
}
|
||||
}
|
||||
else -> super.channelRead(ctx, msg)
|
||||
}
|
||||
}
|
||||
|
||||
override fun channelReadComplete(ctx: ChannelHandlerContext) {
|
||||
super.channelReadComplete(ctx)
|
||||
if(cacheRequestInProgress) {
|
||||
ctx.read()
|
||||
}
|
||||
}
|
||||
|
||||
override fun write(ctx: ChannelHandlerContext, msg: Any, promise: ChannelPromise?) {
|
||||
if (msg is CacheMessage) {
|
||||
try {
|
||||
when (msg) {
|
||||
is CachePutResponse -> {
|
||||
pipelinedRequests -= 1
|
||||
ctx.fireUserEventTriggered(RequestCompletedEvent())
|
||||
val response = DefaultFullHttpResponse(httpVersion, HttpResponseStatus.CREATED)
|
||||
val keyBytes = msg.key.toByteArray(Charsets.UTF_8)
|
||||
response.headers().apply {
|
||||
@@ -92,16 +121,18 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp
|
||||
val buf = ctx.alloc().buffer(keyBytes.size).apply {
|
||||
writeBytes(keyBytes)
|
||||
}
|
||||
ctx.writeAndFlush(DefaultLastHttpContent(buf))
|
||||
ctx.writeAndFlush(DefaultLastHttpContent(buf)).also {
|
||||
requestCompleted(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
is CacheValueNotFoundResponse -> {
|
||||
pipelinedRequests -= 1
|
||||
ctx.fireUserEventTriggered(RequestCompletedEvent())
|
||||
val response = DefaultFullHttpResponse(httpVersion, HttpResponseStatus.NOT_FOUND)
|
||||
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
|
||||
setKeepAliveHeader(response.headers())
|
||||
ctx.writeAndFlush(response)
|
||||
ctx.writeAndFlush(response).also {
|
||||
requestCompleted(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
is CacheValueFoundResponse -> {
|
||||
@@ -118,9 +149,9 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp
|
||||
}
|
||||
|
||||
is LastCacheContent -> {
|
||||
pipelinedRequests -= 1
|
||||
ctx.fireUserEventTriggered(RequestCompletedEvent())
|
||||
ctx.writeAndFlush(DefaultLastHttpContent(msg.content()))
|
||||
ctx.writeAndFlush(DefaultLastHttpContent(msg.content())).also {
|
||||
requestCompleted(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
is CacheContent -> {
|
||||
@@ -140,9 +171,8 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp
|
||||
resetRequestMetadata()
|
||||
}
|
||||
} else if(msg is LastHttpContent) {
|
||||
pipelinedRequests -= 1
|
||||
ctx.fireUserEventTriggered(RequestCompletedEvent())
|
||||
ctx.write(msg, promise)
|
||||
requestCompleted(ctx)
|
||||
} else super.write(ctx, msg, promise)
|
||||
}
|
||||
|
||||
@@ -153,15 +183,12 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp
|
||||
if (method === HttpMethod.GET) {
|
||||
val path = Path.of(msg.uri()).normalize()
|
||||
if (path.startsWith(serverPrefix)) {
|
||||
cacheRequestInProgress = true
|
||||
val relativePath = serverPrefix.relativize(path)
|
||||
val key = relativePath.toString()
|
||||
if(pipelinedRequests > 0) {
|
||||
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, ResponseCapHandler())
|
||||
}
|
||||
val key : String = relativePath.toString()
|
||||
newRequest()
|
||||
val cacheHandler = cacheHandlerSupplier()
|
||||
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, CacheContentHandler(cacheHandler))
|
||||
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, cacheHandler)
|
||||
pipelinedRequests += 1
|
||||
key.let(::CacheGetRequest)
|
||||
.let(ctx::fireChannelRead)
|
||||
?: ctx.channel().write(CacheValueNotFoundResponse())
|
||||
@@ -176,18 +203,15 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp
|
||||
} else if (method === HttpMethod.PUT) {
|
||||
val path = Path.of(msg.uri()).normalize()
|
||||
if (path.startsWith(serverPrefix)) {
|
||||
cacheRequestInProgress = true
|
||||
val relativePath = serverPrefix.relativize(path)
|
||||
val key = relativePath.toString()
|
||||
log.debug(ctx) {
|
||||
"Added value for key '$key' to build cache"
|
||||
}
|
||||
if(pipelinedRequests > 0) {
|
||||
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, ResponseCapHandler())
|
||||
}
|
||||
newRequest()
|
||||
val cacheHandler = cacheHandlerSupplier()
|
||||
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, CacheContentHandler(cacheHandler))
|
||||
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, cacheHandler)
|
||||
pipelinedRequests += 1
|
||||
|
||||
path.fileName?.toString()
|
||||
?.let {
|
||||
@@ -205,11 +229,8 @@ class ServerHandler(private val serverPrefix: Path, private val cacheHandlerSupp
|
||||
ctx.writeAndFlush(response)
|
||||
}
|
||||
} else if (method == HttpMethod.TRACE) {
|
||||
if(pipelinedRequests > 0) {
|
||||
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, ResponseCapHandler())
|
||||
}
|
||||
newRequest()
|
||||
ctx.pipeline().addBefore(ExceptionHandler.NAME, null, TraceHandler)
|
||||
pipelinedRequests += 1
|
||||
super.channelRead(ctx, msg)
|
||||
} else {
|
||||
log.warn(ctx) {
|
||||
|
@@ -94,6 +94,9 @@ class ThrottlingHandler(private val bucketManager : BucketManager,
|
||||
handleBuckets(buckets, ctx, msg, false)
|
||||
}, waitDuration.toMillis(), TimeUnit.MILLISECONDS)
|
||||
} else {
|
||||
queuedContent?.let { qc ->
|
||||
qc.forEach { it.release() }
|
||||
}
|
||||
this.queuedContent = null
|
||||
sendThrottledResponse(ctx, waitDuration)
|
||||
}
|
||||
|
@@ -1,5 +1,14 @@
|
||||
package net.woggioni.rbcs.server.test.utils;
|
||||
|
||||
import java.math.BigInteger;
|
||||
import java.security.KeyPair;
|
||||
import java.security.KeyPairGenerator;
|
||||
import java.security.PrivateKey;
|
||||
import java.security.SecureRandom;
|
||||
import java.security.cert.X509Certificate;
|
||||
import java.time.Instant;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.Date;
|
||||
import org.bouncycastle.asn1.DERSequence;
|
||||
import org.bouncycastle.asn1.x500.X500Name;
|
||||
import org.bouncycastle.asn1.x509.BasicConstraints;
|
||||
@@ -15,16 +24,6 @@ import org.bouncycastle.cert.jcajce.JcaX509v3CertificateBuilder;
|
||||
import org.bouncycastle.operator.ContentSigner;
|
||||
import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder;
|
||||
|
||||
import java.math.BigInteger;
|
||||
import java.security.KeyPair;
|
||||
import java.security.KeyPairGenerator;
|
||||
import java.security.PrivateKey;
|
||||
import java.security.SecureRandom;
|
||||
import java.security.cert.X509Certificate;
|
||||
import java.time.Instant;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.Date;
|
||||
|
||||
public class CertificateUtils {
|
||||
|
||||
public record X509Credentials(
|
||||
|
@@ -154,7 +154,7 @@ class BasicAuthServerTest : AbstractBasicAuthServerTest() {
|
||||
}
|
||||
|
||||
@Test
|
||||
@Order(6)
|
||||
@Order(8)
|
||||
fun getAsAThrottledUser() {
|
||||
val client: HttpClient = HttpClient.newHttpClient()
|
||||
|
||||
@@ -172,7 +172,7 @@ class BasicAuthServerTest : AbstractBasicAuthServerTest() {
|
||||
}
|
||||
|
||||
@Test
|
||||
@Order(7)
|
||||
@Order(9)
|
||||
fun getAsAThrottledUser2() {
|
||||
val client: HttpClient = HttpClient.newHttpClient()
|
||||
|
||||
|
Reference in New Issue
Block a user