Compare commits

...

8 Commits

Author SHA1 Message Date
19965dd3b0 replaced inMemory cache with caffeine for performance benchmarks 2025-04-15 01:51:03 +08:00
ae8817ad2a updated benchmarks 2025-03-24 15:01:56 +08:00
69f215e68f tuned GC parameters in Docker images
All checks were successful
CI / build (push) Successful in 16m35s
2025-03-24 14:42:04 +08:00
222b475223 ensured in-memory-cache is allocated to heap memory
All checks were successful
CI / build (push) Successful in 12m29s
2025-03-11 12:29:43 +08:00
ede515e2ca rebuild native image with wider ISA compatibility
All checks were successful
CI / build (push) Successful in 14m13s
2025-03-10 22:28:55 +08:00
974fdb7a91 added k8s benchmark files
All checks were successful
CI / build (push) Successful in 15m39s
2025-03-10 10:09:30 +08:00
a294229ff0 fixed memory leak in MemcachedCacheHandler 2025-03-09 22:16:03 +08:00
9600dd7e4f solved issue with ignored HttpContent and HttpCacheContent messages in the Netty pipeline
All checks were successful
CI / build (push) Successful in 12m48s
2025-03-09 13:57:52 +08:00
28 changed files with 664 additions and 362 deletions

View File

@@ -0,0 +1,94 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: rbcs-server
data:
rbcs-server.xml: |
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<rbcs:server xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rbcs="urn:net.woggioni.rbcs.server"
xmlns:rbcs-memcache="urn:net.woggioni.rbcs.server.memcache"
xs:schemaLocation="urn:net.woggioni.rbcs.server.memcache jpms://net.woggioni.rbcs.server.memcache/net/woggioni/rbcs/server/memcache/schema/rbcs-memcache.xsd urn:net.woggioni.rbcs.server jpms://net.woggioni.rbcs.server/net/woggioni/rbcs/server/schema/rbcs-server.xsd"
>
<bind host="0.0.0.0" port="8080" incoming-connections-backlog-size="128"/>
<connection
max-request-size="0xd000000"
idle-timeout="PT15S"
read-idle-timeout="PT30S"
write-idle-timeout="PT30S"/>
<event-executor use-virtual-threads="true"/>
<cache xs:type="rbcs:fileSystemCacheType" max-age="P7D" enable-compression="false" path="/home/luser/cache" digest="SHA-224"/>
</rbcs:server>
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: rbcs-pvc
namespace: default
spec:
accessModes:
- ReadWriteOnce
storageClassName: local-path
resources:
requests:
storage: 16Gi
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: rbcs-deployment
labels:
app: rbcs
spec:
replicas: 1
selector:
matchLabels:
app: rbcs
template:
metadata:
labels:
app: rbcs
spec:
containers:
- name: rbcs
image: gitea.woggioni.net/woggioni/rbcs:memcache
imagePullPolicy: Always
command: ["java", "-Dlogback.configurationFile=logback.xml", "-XX:MaxRAMPercentage=75","-jar", "/home/luser/rbcs.jar"]
args: ['server', '-c', 'rbcs-server.xml']
ports:
- containerPort: 8080
volumeMounts:
- name: config-volume
mountPath: /home/luser/rbcs-server.xml
subPath: rbcs-server.xml
- name: cache-volume
mountPath: /home/luser/cache
resources:
requests:
memory: "0.25Gi"
cpu: "1"
limits:
memory: "0.5Gi"
cpu: "1"
volumes:
- name: config-volume
configMap:
name: rbcs-server
- name: cache-volume
persistentVolumeClaim:
claimName: rbcs-pvc
---
apiVersion: v1
kind: Service
metadata:
name: rbcs-service
spec:
type: LoadBalancer
ports:
- port: 8080
targetPort: 8080
protocol: TCP
selector:
app: rbcs

View File

@@ -0,0 +1,77 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: rbcs-server
data:
rbcs-server.xml: |
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<rbcs:server xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rbcs="urn:net.woggioni.rbcs.server"
xmlns:rbcs-memcache="urn:net.woggioni.rbcs.server.memcache"
xs:schemaLocation="urn:net.woggioni.rbcs.server.memcache jpms://net.woggioni.rbcs.server.memcache/net/woggioni/rbcs/server/memcache/schema/rbcs-memcache.xsd urn:net.woggioni.rbcs.server jpms://net.woggioni.rbcs.server/net/woggioni/rbcs/server/schema/rbcs-server.xsd"
>
<bind host="0.0.0.0" port="8080" incoming-connections-backlog-size="128"/>
<connection
max-request-size="0xd000000"
idle-timeout="PT15S"
read-idle-timeout="PT30S"
write-idle-timeout="PT30S"/>
<event-executor use-virtual-threads="true"/>
<cache xs:type="rbcs:inMemoryCacheType" max-age="P7D" enable-compression="false" max-size="0x40000000" digest="SHA-224"/>
</rbcs:server>
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: rbcs-deployment
labels:
app: rbcs
spec:
replicas: 1
selector:
matchLabels:
app: rbcs
template:
metadata:
labels:
app: rbcs
spec:
containers:
- name: rbcs
image: gitea.woggioni.net/woggioni/rbcs:memcache
imagePullPolicy: Always
command: ["java", "-Dlogback.configurationFile=logback.xml", "-XX:MaxRAMPercentage=75","-jar", "/home/luser/rbcs.jar"]
args: ['server', '-c', 'rbcs-server.xml']
ports:
- containerPort: 8080
volumeMounts:
- name: config-volume
mountPath: /home/luser/rbcs-server.xml
subPath: rbcs-server.xml
resources:
requests:
memory: "0.5Gi"
cpu: "1"
limits:
memory: "4Gi"
cpu: "1"
volumes:
- name: config-volume
configMap:
name: rbcs-server
---
apiVersion: v1
kind: Service
metadata:
name: rbcs-service
spec:
type: LoadBalancer
ports:
- port: 8080
targetPort: 8080
protocol: TCP
selector:
app: rbcs

118
benchmark/rbcs-memcache.yml Normal file
View File

@@ -0,0 +1,118 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: rbcs-server
data:
rbcs-server.xml: |
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<rbcs:server xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rbcs="urn:net.woggioni.rbcs.server"
xmlns:rbcs-memcache="urn:net.woggioni.rbcs.server.memcache"
xs:schemaLocation="urn:net.woggioni.rbcs.server.memcache jpms://net.woggioni.rbcs.server.memcache/net/woggioni/rbcs/server/memcache/schema/rbcs-memcache.xsd urn:net.woggioni.rbcs.server jpms://net.woggioni.rbcs.server/net/woggioni/rbcs/server/schema/rbcs-server.xsd"
>
<bind host="0.0.0.0" port="8080" incoming-connections-backlog-size="128"/>
<connection
max-request-size="0xd000000"
idle-timeout="PT15S"
read-idle-timeout="PT30S"
write-idle-timeout="PT30S"/>
<event-executor use-virtual-threads="true"/>
<!--cache xs:type="rbcs:inMemoryCacheType" max-age="P7D" enable-compression="false" max-size="0x10000000" /-->
<cache xs:type="rbcs-memcache:memcacheCacheType" max-age="P7D" chunk-size="0x1000" digest="SHA-224">
<server host="memcached-service" port="11211" max-connections="256"/>
</cache>
</rbcs:server>
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: rbcs-deployment
labels:
app: rbcs
spec:
replicas: 1
selector:
matchLabels:
app: rbcs
template:
metadata:
labels:
app: rbcs
spec:
containers:
- name: rbcs
image: gitea.woggioni.net/woggioni/rbcs:memcache
imagePullPolicy: Always
command: ["java", "-Dlogback.configurationFile=logback.xml", "-XX:MaxRAMPercentage=75","-jar", "/home/luser/rbcs.jar"]
args: ['server', '-c', 'rbcs-server.xml']
ports:
- containerPort: 8080
volumeMounts:
- name: config-volume
mountPath: /home/luser/rbcs-server.xml
subPath: rbcs-server.xml
resources:
requests:
memory: "0.5Gi"
cpu: "1"
limits:
memory: "0.5Gi"
cpu: "3.5"
volumes:
- name: config-volume
configMap:
name: rbcs-server
---
apiVersion: v1
kind: Service
metadata:
name: rbcs-service
spec:
type: LoadBalancer
ports:
- port: 8080
targetPort: 8080
protocol: TCP
selector:
app: rbcs
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: memcached-deployment
spec:
replicas: 1
selector:
matchLabels:
app: memcached
template:
metadata:
labels:
app: memcached
spec:
containers:
- name: memcached
image: memcached
args: ["-I", "128m", "-m", "4096", "-t", "1"]
resources:
requests:
memory: "1Gi"
cpu: "500m" # 0.5 CPU
limits:
memory: "5Gi"
cpu: "500m" # 0.5 CP
---
apiVersion: v1
kind: Service
metadata:
name: memcached-service
spec:
type: ClusterIP # ClusterIP makes it accessible only within the cluster
ports:
- port: 11211 # Default memcached port
targetPort: 11211
protocol: TCP
selector:
app: memcached

View File

@@ -16,22 +16,22 @@ All test were executed under the following conditions:
| Cache backend | CPU | CPU quota | Memory quota (GB) | Request size (b) | Client connections | PUT (req/s) | GET (req/s) | | Cache backend | CPU | CPU quota | Memory quota (GB) | Request size (b) | Client connections | PUT (req/s) | GET (req/s) |
|----------------|---------------------|-----------|-------------------|------------------|--------------------|-------------|-------------| |----------------|---------------------|-----------|-------------------|------------------|--------------------|-------------|-------------|
| in-memory | Intel Celeron J3455 | 1.00 | 4 | 128 | 10 | 3691 | 4037 | | in-memory | Intel Celeron J3455 | 1.00 | 4 | 128 | 10 | 7867 | 13762 |
| in-memory | Intel Celeron J3455 | 1.00 | 4 | 128 | 100 | 6881 | 7483 | | in-memory | Intel Celeron J3455 | 1.00 | 4 | 128 | 100 | 7728 | 14180 |
| in-memory | Intel Celeron J3455 | 1.00 | 4 | 512 | 10 | 3790 | 4069 | | in-memory | Intel Celeron J3455 | 1.00 | 4 | 512 | 10 | 7964 | 10992 |
| in-memory | Intel Celeron J3455 | 1.00 | 4 | 512 | 100 | 6716 | 7408 | | in-memory | Intel Celeron J3455 | 1.00 | 4 | 512 | 100 | 8415 | 12478 |
| in-memory | Intel Celeron J3455 | 1.00 | 4 | 4096 | 10 | 3399 | 1974 | | in-memory | Intel Celeron J3455 | 1.00 | 4 | 4096 | 10 | 4268 | 5395 |
| in-memory | Intel Celeron J3455 | 1.00 | 4 | 4096 | 100 | 5341 | 6402 | | in-memory | Intel Celeron J3455 | 1.00 | 4 | 4096 | 100 | 5585 | 8259 |
| in-memory | Intel Celeron J3455 | 1.00 | 4 | 65536 | 10 | 1099 | 1116 | | in-memory | Intel Celeron J3455 | 1.00 | 4 | 65536 | 10 | 1063 | 1185 |
| in-memory | Intel Celeron J3455 | 1.00 | 4 | 65536 | 100 | 1379 | 1703 | | in-memory | Intel Celeron J3455 | 1.00 | 4 | 65536 | 100 | 1522 | 1366 |
| in-memory | Intel Celeron J3455 | 3.50 | 4 | 128 | 10 | 4443 | 5170 | | in-memory | Intel Celeron J3455 | 3.50 | 4 | 128 | 10 | 11271 | 14092 |
| in-memory | Intel Celeron J3455 | 3.50 | 4 | 128 | 100 | 12813 | 13568 | | in-memory | Intel Celeron J3455 | 3.50 | 4 | 128 | 100 | 16064 | 24201 |
| in-memory | Intel Celeron J3455 | 3.50 | 4 | 512 | 10 | 4450 | 4383 | | in-memory | Intel Celeron J3455 | 3.50 | 4 | 512 | 10 | 11504 | 13077 |
| in-memory | Intel Celeron J3455 | 3.50 | 4 | 512 | 100 | 12212 | 13586 | | in-memory | Intel Celeron J3455 | 3.50 | 4 | 512 | 100 | 17379 | 22094 |
| in-memory | Intel Celeron J3455 | 3.50 | 4 | 4096 | 10 | 3441 | 3012 | | in-memory | Intel Celeron J3455 | 3.50 | 4 | 4096 | 10 | 9151 | 9489 |
| in-memory | Intel Celeron J3455 | 3.50 | 4 | 4096 | 100 | 8982 | 10452 | | in-memory | Intel Celeron J3455 | 3.50 | 4 | 4096 | 100 | 13194 | 18268 |
| in-memory | Intel Celeron J3455 | 3.50 | 4 | 65536 | 10 | 1391 | 1167 | | in-memory | Intel Celeron J3455 | 3.50 | 4 | 65536 | 10 | 1590 | 1174 |
| in-memory | Intel Celeron J3455 | 3.50 | 4 | 65536 | 100 | 1303 | 1151 | | in-memory | Intel Celeron J3455 | 3.50 | 4 | 65536 | 100 | 1539 | 1561 |
### Filesystem cache backend ### Filesystem cache backend
@@ -42,23 +42,22 @@ TLS: disabled
| Cache backend | CPU | CPU quota | Memory quota (GB) | Request size (b) | Client connections | PUT (req/s) | GET (req/s) | | Cache backend | CPU | CPU quota | Memory quota (GB) | Request size (b) | Client connections | PUT (req/s) | GET (req/s) |
|---------------|---------------------|-----------|-------------------|------------------|--------------------|-------------|-------------| |---------------|---------------------|-----------|-------------------|------------------|--------------------|-------------|-------------|
| filesystem | Intel Celeron J3455 | 1.00 | 0.25 | 128 | 10 | 1208 | 2048 | | filesystem | Intel Celeron J3455 | 1.00 | 0.5 | 128 | 10 | 1478 | 5771 |
| filesystem | Intel Celeron J3455 | 1.00 | 0.25 | 128 | 100 | 1304 | 2394 | | filesystem | Intel Celeron J3455 | 1.00 | 0.5 | 128 | 100 | 3166 | 8070 |
| filesystem | Intel Celeron J3455 | 1.00 | 0.25 | 512 | 10 | 1408 | 2157 | | filesystem | Intel Celeron J3455 | 1.00 | 0.5 | 512 | 10 | 1717 | 5895 |
| filesystem | Intel Celeron J3455 | 1.00 | 0.25 | 512 | 100 | 1282 | 1888 | | filesystem | Intel Celeron J3455 | 1.00 | 0.5 | 512 | 100 | 1125 | 6564 |
| filesystem | Intel Celeron J3455 | 1.00 | 0.25 | 4096 | 10 | 1291 | 1256 | | filesystem | Intel Celeron J3455 | 1.00 | 0.5 | 4096 | 10 | 819 | 2509 |
| filesystem | Intel Celeron J3455 | 1.00 | 0.25 | 4096 | 100 | 1170 | 1423 | | filesystem | Intel Celeron J3455 | 1.00 | 0.5 | 4096 | 100 | 1136 | 2365 |
| filesystem | Intel Celeron J3455 | 1.00 | 0.25 | 65536 | 10 | 313 | 606 | | filesystem | Intel Celeron J3455 | 1.00 | 0.5 | 65536 | 10 | 584 | 632 |
| filesystem | Intel Celeron J3455 | 1.00 | 0.25 | 65536 | 100 | 298 | 609 | | filesystem | Intel Celeron J3455 | 1.00 | 0.5 | 65536 | 100 | 529 | 635 |
| filesystem | Intel Celeron J3455 | 3.50 | 0.25 | 128 | 10 | 2195 | 3477 | | filesystem | Intel Celeron J3455 | 3.50 | 0.5 | 128 | 10 | 1227 | 3342 |
| filesystem | Intel Celeron J3455 | 3.50 | 0.25 | 128 | 100 | 2480 | 6207 | | filesystem | Intel Celeron J3455 | 3.50 | 0.5 | 128 | 100 | 1156 | 4035 |
| filesystem | Intel Celeron J3455 | 3.50 | 0.25 | 512 | 10 | 2164 | 3413 | | filesystem | Intel Celeron J3455 | 3.50 | 0.5 | 512 | 10 | 979 | 3294 |
| filesystem | Intel Celeron J3455 | 3.50 | 0.25 | 512 | 100 | 2842 | 6218 | | filesystem | Intel Celeron J3455 | 3.50 | 0.5 | 512 | 100 | 1217 | 3888 |
| filesystem | Intel Celeron J3455 | 3.50 | 0.25 | 4096 | 10 | 1302 | 2591 | | filesystem | Intel Celeron J3455 | 3.50 | 0.5 | 4096 | 10 | 535 | 1805 |
| filesystem | Intel Celeron J3455 | 3.50 | 0.25 | 4096 | 100 | 2270 | 3045 | | filesystem | Intel Celeron J3455 | 3.50 | 0.5 | 4096 | 100 | 555 | 1910 |
| filesystem | Intel Celeron J3455 | 3.50 | 0.25 | 65536 | 10 | 375 | 394 | | filesystem | Intel Celeron J3455 | 3.50 | 0.5 | 65536 | 10 | 301 | 494 |
| filesystem | Intel Celeron J3455 | 3.50 | 0.25 | 65536 | 100 | 364 | 462 | | filesystem | Intel Celeron J3455 | 3.50 | 0.5 | 65536 | 100 | 353 | 595 |
### Memcache cache backend ### Memcache cache backend
@@ -69,19 +68,19 @@ TLS: disabled
| Cache backend | CPU | CPU quota | Memory quota (GB) | Request size (b) | Client connections | PUT (req/s) | GET (req/s) | | Cache backend | CPU | CPU quota | Memory quota (GB) | Request size (b) | Client connections | PUT (req/s) | GET (req/s) |
|---------------|---------------------|-----------|-------------------|------------------|--------------------|-------------|-------------| |---------------|---------------------|-----------|-------------------|------------------|--------------------|-------------|-------------|
| memcache | Intel Celeron J3455 | 1.00 | 0.25 | 128 | 10 | 2505 | 2578 | | memcache | Intel Celeron J3455 | 1.00 | 0.25 | 128 | 10 | 3380 | 6083 |
| memcache | Intel Celeron J3455 | 1.00 | 0.25 | 128 | 100 | 3582 | 3935 | | memcache | Intel Celeron J3455 | 1.00 | 0.25 | 128 | 100 | 3323 | 4998 |
| memcache | Intel Celeron J3455 | 1.00 | 0.25 | 512 | 10 | 2495 | 2784 | | memcache | Intel Celeron J3455 | 1.00 | 0.25 | 512 | 10 | 3924 | 6086 |
| memcache | Intel Celeron J3455 | 1.00 | 0.25 | 512 | 100 | 3565 | 3883 | | memcache | Intel Celeron J3455 | 1.00 | 0.25 | 512 | 100 | 3440 | 5049 |
| memcache | Intel Celeron J3455 | 1.00 | 0.25 | 4096 | 10 | 2174 | 2505 | | memcache | Intel Celeron J3455 | 1.00 | 0.25 | 4096 | 10 | 3347 | 5255 |
| memcache | Intel Celeron J3455 | 1.00 | 0.25 | 4096 | 100 | 2937 | 3563 | | memcache | Intel Celeron J3455 | 1.00 | 0.25 | 4096 | 100 | 3685 | 4693 |
| memcache | Intel Celeron J3455 | 1.00 | 0.25 | 65536 | 10 | 648 | 1074 | | memcache | Intel Celeron J3455 | 1.00 | 0.25 | 65536 | 10 | 1304 | 1343 |
| memcache | Intel Celeron J3455 | 1.00 | 0.25 | 65536 | 100 | 724 | 1548 | | memcache | Intel Celeron J3455 | 1.00 | 0.25 | 65536 | 100 | 1481 | 1541 |
| memcache | Intel Celeron J3455 | 3.50 | 0.25 | 128 | 10 | 2362 | 2927 | | memcache | Intel Celeron J3455 | 3.50 | 0.25 | 128 | 10 | 4667 | 7984 |
| memcache | Intel Celeron J3455 | 3.50 | 0.25 | 128 | 100 | 5491 | 6531 | | memcache | Intel Celeron J3455 | 3.50 | 0.25 | 128 | 100 | 4044 | 8358 |
| memcache | Intel Celeron J3455 | 3.50 | 0.25 | 512 | 10 | 2125 | 2807 | | memcache | Intel Celeron J3455 | 3.50 | 0.25 | 512 | 10 | 4177 | 7828 |
| memcache | Intel Celeron J3455 | 3.50 | 0.25 | 512 | 100 | 5173 | 6242 | | memcache | Intel Celeron J3455 | 3.50 | 0.25 | 512 | 100 | 4079 | 8794 |
| memcache | Intel Celeron J3455 | 3.50 | 0.25 | 4096 | 10 | 1720 | 2397 | | memcache | Intel Celeron J3455 | 3.50 | 0.25 | 4096 | 10 | 4588 | 6869 |
| memcache | Intel Celeron J3455 | 3.50 | 0.25 | 4096 | 100 | 3871 | 5859 | | memcache | Intel Celeron J3455 | 3.50 | 0.25 | 4096 | 100 | 5343 | 7797 |
| memcache | Intel Celeron J3455 | 3.50 | 0.25 | 65536 | 10 | 616 | 1016 | | memcache | Intel Celeron J3455 | 3.50 | 0.25 | 65536 | 10 | 1624 | 1317 |
| memcache | Intel Celeron J3455 | 3.50 | 0.25 | 65536 | 100 | 820 | 1677 | | memcache | Intel Celeron J3455 | 3.50 | 0.25 | 65536 | 100 | 1633 | 1317 |

View File

@@ -5,7 +5,7 @@ WORKDIR /home/luser
FROM base-release AS release-vanilla FROM base-release AS release-vanilla
ADD rbcs-cli-envelope-*.jar rbcs.jar ADD rbcs-cli-envelope-*.jar rbcs.jar
ENTRYPOINT ["java", "-XX:+UseSerialGC", "-XX:GCTimeRatio=24", "-jar", "/home/luser/rbcs.jar", "server"] ENTRYPOINT ["java", "-Dlogback.configurationFile=logback.xml", "-XX:MaxRAMPercentage=70", "-XX:GCTimeRatio=24", "-XX:+UseZGC", "-XX:+ZGenerational", "-jar", "/home/luser/rbcs.jar"]
FROM base-release AS release-memcache FROM base-release AS release-memcache
ADD --chown=luser:luser rbcs-cli-envelope-*.jar rbcs.jar ADD --chown=luser:luser rbcs-cli-envelope-*.jar rbcs.jar
@@ -14,10 +14,10 @@ WORKDIR /home/luser/plugins
RUN --mount=type=bind,source=.,target=/build/distributions tar -xf /build/distributions/rbcs-server-memcache*.tar RUN --mount=type=bind,source=.,target=/build/distributions tar -xf /build/distributions/rbcs-server-memcache*.tar
WORKDIR /home/luser WORKDIR /home/luser
ADD logback.xml . ADD logback.xml .
ENTRYPOINT ["java", "-Dlogback.configurationFile=logback.xml", "-XX:+UseSerialGC", "-XX:GCTimeRatio=24", "-jar", "/home/luser/rbcs.jar", "server"] ENTRYPOINT ["java", "-Dlogback.configurationFile=logback.xml", "-XX:MaxRAMPercentage=70", "-XX:GCTimeRatio=24", "-XX:+UseZGC", "-XX:+ZGenerational", "-jar", "/home/luser/rbcs.jar"]
FROM scratch AS release-native FROM scratch AS release-native
ADD rbcs-cli.upx /rbcs/rbcs-cli ADD rbcs-cli.upx /rbcs/rbcs-cli
ENV RBCS_CONFIGURATION_DIR="/rbcs" ENV RBCS_CONFIGURATION_DIR="/rbcs"
WORKDIR /rbcs WORKDIR /rbcs
ENTRYPOINT ["/rbcs/rbcs-cli"] ENTRYPOINT ["/rbcs/rbcs-cli", "-XX:MaximumHeapSizePercent=70"]

View File

@@ -2,7 +2,7 @@ org.gradle.configuration-cache=false
org.gradle.parallel=true org.gradle.parallel=true
org.gradle.caching=true org.gradle.caching=true
rbcs.version = 0.2.0 rbcs.version = 0.2.1
lys.version = 2025.03.08 lys.version = 2025.03.08

View File

@@ -16,7 +16,7 @@ public abstract class CacheHandler extends ChannelInboundHandlerAdapter {
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { public void channelRead(ChannelHandlerContext ctx, Object msg) {
if(!requestFinished && msg instanceof CacheMessage) { if(!requestFinished && msg instanceof CacheMessage) {
if(msg instanceof CacheMessage.LastCacheContent || msg instanceof CacheMessage.CacheGetRequest) requestFinished = true; if(msg instanceof CacheMessage.LastCacheContent) requestFinished = true;
try { try {
channelRead0(ctx, (CacheMessage) msg); channelRead0(ctx, (CacheMessage) msg);
} finally { } finally {

View File

@@ -1,10 +1,9 @@
package net.woggioni.rbcs.api; package net.woggioni.rbcs.api;
import java.io.Serializable;
import lombok.Getter; import lombok.Getter;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import java.io.Serializable;
@Getter @Getter
@RequiredArgsConstructor @RequiredArgsConstructor
public class CacheValueMetadata implements Serializable { public class CacheValueMetadata implements Serializable {

View File

@@ -1,16 +1,15 @@
package net.woggioni.rbcs.api; package net.woggioni.rbcs.api;
import lombok.EqualsAndHashCode;
import lombok.NonNull;
import lombok.Value;
import java.nio.file.Path; import java.nio.file.Path;
import java.security.cert.X509Certificate; import java.security.cert.X509Certificate;
import java.time.Duration; import java.time.Duration;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import lombok.EqualsAndHashCode;
import lombok.NonNull;
import lombok.Value;
@Value @Value
public class Configuration { public class Configuration {

View File

@@ -105,6 +105,7 @@ tasks.named(NativeImagePlugin.CONFIGURE_NATIVE_IMAGE_TASK_NAME, NativeImageConfi
systemProperty('io.netty.leakDetectionLevel', 'DISABLED') systemProperty('io.netty.leakDetectionLevel', 'DISABLED')
modularity.inferModulePath = false modularity.inferModulePath = false
enabled = true enabled = true
systemProperty('gradle.tmp.dir', temporaryDir.toString())
} }
nativeImage { nativeImage {

View File

@@ -1,2 +1,2 @@
Args=-O3 -march=skylake --gc=serial --install-exit-handlers --initialize-at-run-time=io.netty --enable-url-protocols=jpms --initialize-at-build-time=net.woggioni.rbcs.common.RbcsUrlStreamHandlerFactory,net.woggioni.rbcs.common.RbcsUrlStreamHandlerFactory$JpmsHandler Args=-O3 -march=x86-64-v2 --gc=serial --install-exit-handlers --initialize-at-run-time=io.netty --enable-url-protocols=jpms --initialize-at-build-time=net.woggioni.rbcs.common.RbcsUrlStreamHandlerFactory,net.woggioni.rbcs.common.RbcsUrlStreamHandlerFactory$JpmsHandler
#-H:TraceClassInitialization=io.netty.handler.ssl.BouncyCastleAlpnSslUtils #-H:TraceClassInitialization=io.netty.handler.ssl.BouncyCastleAlpnSslUtils

View File

@@ -43,6 +43,46 @@
{ {
"name":"com.aayushatharva.brotli4j.Brotli4jLoader" "name":"com.aayushatharva.brotli4j.Brotli4jLoader"
}, },
{
"name":"com.github.benmanes.caffeine.cache.BLCHeader$DrainStatusRef",
"fields":[{"name":"drainStatus"}]
},
{
"name":"com.github.benmanes.caffeine.cache.BaseMpscLinkedArrayQueueColdProducerFields",
"fields":[{"name":"producerLimit"}]
},
{
"name":"com.github.benmanes.caffeine.cache.BaseMpscLinkedArrayQueueConsumerFields",
"fields":[{"name":"consumerIndex"}]
},
{
"name":"com.github.benmanes.caffeine.cache.BaseMpscLinkedArrayQueueProducerFields",
"fields":[{"name":"producerIndex"}]
},
{
"name":"com.github.benmanes.caffeine.cache.BoundedLocalCache",
"fields":[{"name":"refreshes"}]
},
{
"name":"com.github.benmanes.caffeine.cache.PS",
"fields":[{"name":"key"}, {"name":"value"}]
},
{
"name":"com.github.benmanes.caffeine.cache.PSW",
"fields":[{"name":"writeTime"}]
},
{
"name":"com.github.benmanes.caffeine.cache.PSWMS",
"methods":[{"name":"<init>","parameterTypes":[] }]
},
{
"name":"com.github.benmanes.caffeine.cache.SSMSW",
"fields":[{"name":"FACTORY"}]
},
{
"name":"com.github.benmanes.caffeine.cache.StripedBuffer",
"fields":[{"name":"tableBusy"}]
},
{ {
"name":"com.github.luben.zstd.Zstd" "name":"com.github.luben.zstd.Zstd"
}, },
@@ -588,6 +628,9 @@
"name":"net.woggioni.rbcs.server.exception.ExceptionHandler", "name":"net.woggioni.rbcs.server.exception.ExceptionHandler",
"methods":[{"name":"exceptionCaught","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Throwable"] }] "methods":[{"name":"exceptionCaught","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Throwable"] }]
}, },
{
"name":"net.woggioni.rbcs.server.handler.BlackHoleRequestHandler"
},
{ {
"name":"net.woggioni.rbcs.server.handler.MaxRequestSizeHandler", "name":"net.woggioni.rbcs.server.handler.MaxRequestSizeHandler",
"methods":[{"name":"channelRead","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] }] "methods":[{"name":"channelRead","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] }]
@@ -637,6 +680,10 @@
"name":"sun.security.provider.DSA$SHA256withDSA", "name":"sun.security.provider.DSA$SHA256withDSA",
"methods":[{"name":"<init>","parameterTypes":[] }] "methods":[{"name":"<init>","parameterTypes":[] }]
}, },
{
"name":"sun.security.provider.JavaKeyStore$JKS",
"methods":[{"name":"<init>","parameterTypes":[] }]
},
{ {
"name":"sun.security.provider.MD5", "name":"sun.security.provider.MD5",
"methods":[{"name":"<init>","parameterTypes":[] }] "methods":[{"name":"<init>","parameterTypes":[] }]
@@ -725,14 +772,6 @@
"name":"sun.security.x509.CertificatePoliciesExtension", "name":"sun.security.x509.CertificatePoliciesExtension",
"methods":[{"name":"<init>","parameterTypes":["java.lang.Boolean","java.lang.Object"] }] "methods":[{"name":"<init>","parameterTypes":["java.lang.Boolean","java.lang.Object"] }]
}, },
{
"name":"sun.security.x509.ExtendedKeyUsageExtension",
"methods":[{"name":"<init>","parameterTypes":["java.lang.Boolean","java.lang.Object"] }]
},
{
"name":"sun.security.x509.IssuerAlternativeNameExtension",
"methods":[{"name":"<init>","parameterTypes":["java.lang.Boolean","java.lang.Object"] }]
},
{ {
"name":"sun.security.x509.KeyUsageExtension", "name":"sun.security.x509.KeyUsageExtension",
"methods":[{"name":"<init>","parameterTypes":["java.lang.Boolean","java.lang.Object"] }] "methods":[{"name":"<init>","parameterTypes":["java.lang.Boolean","java.lang.Object"] }]

View File

@@ -179,6 +179,8 @@ object GraalNativeImageConfiguration {
} catch (ee : ExecutionException) { } catch (ee : ExecutionException) {
} }
} }
RemoteBuildCacheServerCli.main("--help") System.setProperty("net.woggioni.rbcs.conf.dir", System.getProperty("gradle.tmp.dir"))
RemoteBuildCacheServerCli.createCommandLine().execute("--version")
RemoteBuildCacheServerCli.createCommandLine().execute("server", "-t", "PT10S")
} }
} }

View File

@@ -26,8 +26,8 @@ class RemoteBuildCacheServerCli : RbcsCommand() {
private fun setPropertyIfNotPresent(key: String, value: String) { private fun setPropertyIfNotPresent(key: String, value: String) {
System.getProperty(key) ?: System.setProperty(key, value) System.getProperty(key) ?: System.setProperty(key, value)
} }
@JvmStatic
fun main(vararg args: String) { fun createCommandLine() : CommandLine {
setPropertyIfNotPresent("logback.configurationFile", "net/woggioni/rbcs/cli/logback.xml") setPropertyIfNotPresent("logback.configurationFile", "net/woggioni/rbcs/cli/logback.xml")
setPropertyIfNotPresent("io.netty.leakDetectionLevel", "DISABLED") setPropertyIfNotPresent("io.netty.leakDetectionLevel", "DISABLED")
val currentClassLoader = RemoteBuildCacheServerCli::class.java.classLoader val currentClassLoader = RemoteBuildCacheServerCli::class.java.classLoader
@@ -56,7 +56,12 @@ class RemoteBuildCacheServerCli : RbcsCommand() {
addSubcommand(GetCommand()) addSubcommand(GetCommand())
addSubcommand(HealthCheckCommand()) addSubcommand(HealthCheckCommand())
}) })
System.exit(commandLine.execute(*args)) return commandLine
}
@JvmStatic
fun main(vararg args: String) {
System.exit(createCommandLine().execute(*args))
} }
} }

View File

@@ -6,7 +6,6 @@ import net.woggioni.rbcs.client.Configuration
import net.woggioni.rbcs.common.createLogger import net.woggioni.rbcs.common.createLogger
import net.woggioni.rbcs.common.debug import net.woggioni.rbcs.common.debug
import picocli.CommandLine import picocli.CommandLine
import java.lang.IllegalArgumentException
import java.nio.file.Path import java.nio.file.Path
@CommandLine.Command( @CommandLine.Command(

View File

@@ -1,7 +1,6 @@
package net.woggioni.rbcs.server.memcache package net.woggioni.rbcs.server.memcache
import io.netty.channel.ChannelFactory import io.netty.channel.ChannelFactory
import io.netty.channel.ChannelHandler
import io.netty.channel.EventLoopGroup import io.netty.channel.EventLoopGroup
import io.netty.channel.pool.FixedChannelPool import io.netty.channel.pool.FixedChannelPool
import io.netty.channel.socket.DatagramChannel import io.netty.channel.socket.DatagramChannel

View File

@@ -69,10 +69,14 @@ class MemcacheCacheHandler(
} }
} }
private interface InProgressRequest {
}
private inner class InProgressGetRequest( private inner class InProgressGetRequest(
private val key: String, val key: String,
private val ctx: ChannelHandlerContext private val ctx: ChannelHandlerContext
) { ) : InProgressRequest {
private val acc = ctx.alloc().compositeBuffer() private val acc = ctx.alloc().compositeBuffer()
private val chunk = ctx.alloc().compositeBuffer() private val chunk = ctx.alloc().compositeBuffer()
private val outputStream = ByteBufOutputStream(chunk).let { private val outputStream = ByteBufOutputStream(chunk).let {
@@ -107,17 +111,17 @@ class MemcacheCacheHandler(
} }
if (responseSent) { if (responseSent) {
acc.readBytes(outputStream, acc.readableBytes()) acc.readBytes(outputStream, acc.readableBytes())
if(acc.readableBytes() >= chunkSize) { if (acc.readableBytes() >= chunkSize) {
flush(false) flush(false)
} }
} }
} }
private fun flush(last : Boolean) { private fun flush(last: Boolean) {
val toSend = extractChunk(chunk, ctx.alloc()) val toSend = extractChunk(chunk, ctx.alloc())
val msg = if(last) { val msg = if (last) {
log.trace(ctx) { log.trace(ctx) {
"Sending last chunk to client on channel" "Sending last chunk to client"
} }
LastCacheContent(toSend) LastCacheContent(toSend)
} else { } else {
@@ -144,14 +148,14 @@ class MemcacheCacheHandler(
} }
private inner class InProgressPutRequest( private inner class InProgressPutRequest(
private val ch : NettyChannel, private val ch: NettyChannel,
metadata : CacheValueMetadata, metadata: CacheValueMetadata,
val digest : ByteBuf, val digest: ByteBuf,
val requestController: CompletableFuture<MemcacheRequestController>, val requestController: CompletableFuture<MemcacheRequestController>,
private val alloc: ByteBufAllocator private val alloc: ByteBufAllocator
) { ) : InProgressRequest {
private var totalSize = 0 private var totalSize = 0
private var tmpFile : FileChannel? = null private var tmpFile: FileChannel? = null
private val accumulator = alloc.compositeBuffer() private val accumulator = alloc.compositeBuffer()
private val stream = ByteBufOutputStream(accumulator).let { private val stream = ByteBufOutputStream(accumulator).let {
if (compressionEnabled) { if (compressionEnabled) {
@@ -178,7 +182,7 @@ class MemcacheCacheHandler(
tmpFile?.let { tmpFile?.let {
flushToDisk(it, accumulator) flushToDisk(it, accumulator)
} }
if(accumulator.readableBytes() > 0x100000) { if (accumulator.readableBytes() > 0x100000) {
log.debug(ch) { log.debug(ch) {
"Entry is too big, buffering it into a file" "Entry is too big, buffering it into a file"
} }
@@ -195,18 +199,18 @@ class MemcacheCacheHandler(
} }
} }
private fun flushToDisk(fc : FileChannel, buf : CompositeByteBuf) { private fun flushToDisk(fc: FileChannel, buf: CompositeByteBuf) {
val chunk = extractChunk(buf, alloc) val chunk = extractChunk(buf, alloc)
fc.write(chunk.nioBuffer()) fc.write(chunk.nioBuffer())
chunk.release() chunk.release()
} }
fun commit() : Pair<Int, ReadableByteChannel> { fun commit(): Pair<Int, ReadableByteChannel> {
digest.release() digest.release()
accumulator.retain() accumulator.retain()
stream.close() stream.close()
val fileChannel = tmpFile val fileChannel = tmpFile
return if(fileChannel != null) { return if (fileChannel != null) {
flushToDisk(fileChannel, accumulator) flushToDisk(fileChannel, accumulator)
accumulator.release() accumulator.release()
fileChannel.position(0) fileChannel.position(0)
@@ -227,8 +231,7 @@ class MemcacheCacheHandler(
} }
} }
private var inProgressPutRequest: InProgressPutRequest? = null private var inProgressRequest: InProgressRequest? = null
private var inProgressGetRequest: InProgressGetRequest? = null
override fun channelRead0(ctx: ChannelHandlerContext, msg: CacheMessage) { override fun channelRead0(ctx: ChannelHandlerContext, msg: CacheMessage) {
when (msg) { when (msg) {
@@ -255,7 +258,7 @@ class MemcacheCacheHandler(
log.debug(ctx) { log.debug(ctx) {
"Cache hit for key ${msg.key} on memcache" "Cache hit for key ${msg.key} on memcache"
} }
inProgressGetRequest = InProgressGetRequest(msg.key, ctx) inProgressRequest = InProgressGetRequest(msg.key, ctx)
} }
BinaryMemcacheResponseStatus.KEY_ENOENT -> { BinaryMemcacheResponseStatus.KEY_ENOENT -> {
@@ -269,18 +272,25 @@ class MemcacheCacheHandler(
override fun contentReceived(content: MemcacheContent) { override fun contentReceived(content: MemcacheContent) {
log.trace(ctx) { log.trace(ctx) {
"${if(content is LastMemcacheContent) "Last chunk" else "Chunk"} of ${content.content().readableBytes()} bytes received from memcache for key ${msg.key}" "${if (content is LastMemcacheContent) "Last chunk" else "Chunk"} of ${
content.content().readableBytes()
} bytes received from memcache for key ${msg.key}"
} }
inProgressGetRequest?.write(content.content()) (inProgressRequest as? InProgressGetRequest)?.let { inProgressGetRequest ->
if (content is LastMemcacheContent) { inProgressGetRequest.write(content.content())
inProgressGetRequest?.commit() if (content is LastMemcacheContent) {
inProgressRequest = null
inProgressGetRequest.commit()
}
} }
} }
override fun exceptionCaught(ex: Throwable) { override fun exceptionCaught(ex: Throwable) {
inProgressGetRequest?.let { (inProgressRequest as? InProgressGetRequest).let { inProgressGetRequest ->
inProgressGetRequest = null inProgressGetRequest?.let {
it.rollback() inProgressRequest = null
it.rollback()
}
} }
this@MemcacheCacheHandler.exceptionCaught(ctx, ex) this@MemcacheCacheHandler.exceptionCaught(ctx, ex)
} }
@@ -311,6 +321,7 @@ class MemcacheCacheHandler(
} }
sendMessageAndFlush(ctx, CachePutResponse(msg.key)) sendMessageAndFlush(ctx, CachePutResponse(msg.key))
} }
else -> this@MemcacheCacheHandler.exceptionCaught(ctx, MemcacheException(status)) else -> this@MemcacheCacheHandler.exceptionCaught(ctx, MemcacheException(status))
} }
} }
@@ -327,89 +338,103 @@ class MemcacheCacheHandler(
this@MemcacheCacheHandler.exceptionCaught(ctx, ex) this@MemcacheCacheHandler.exceptionCaught(ctx, ex)
} }
} }
inProgressPutRequest = InProgressPutRequest(ctx.channel(), msg.metadata, key, requestController, ctx.alloc()) inProgressRequest = InProgressPutRequest(ctx.channel(), msg.metadata, key, requestController, ctx.alloc())
} }
private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) { private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) {
inProgressPutRequest?.let { request -> val request = inProgressRequest
log.trace(ctx) { when (request) {
"Received chunk of ${msg.content().readableBytes()} bytes for memcache" is InProgressPutRequest -> {
log.trace(ctx) {
"Received chunk of ${msg.content().readableBytes()} bytes for memcache"
}
request.write(msg.content())
}
is InProgressGetRequest -> {
msg.release()
} }
request.write(msg.content())
} }
} }
private fun handleLastCacheContent(ctx: ChannelHandlerContext, msg: LastCacheContent) { private fun handleLastCacheContent(ctx: ChannelHandlerContext, msg: LastCacheContent) {
inProgressPutRequest?.let { request -> val request = inProgressRequest
inProgressPutRequest = null when (request) {
log.trace(ctx) { is InProgressPutRequest -> {
"Received last chunk of ${msg.content().readableBytes()} bytes for memcache" inProgressRequest = null
} log.trace(ctx) {
request.write(msg.content()) "Received last chunk of ${msg.content().readableBytes()} bytes for memcache"
val key = request.digest.retainedDuplicate() }
val (payloadSize, payloadSource) = request.commit() request.write(msg.content())
val extras = ctx.alloc().buffer(8, 8) val key = request.digest.retainedDuplicate()
extras.writeInt(0) val (payloadSize, payloadSource) = request.commit()
extras.writeInt(encodeExpiry(maxAge)) val extras = ctx.alloc().buffer(8, 8)
val totalBodyLength = request.digest.readableBytes() + extras.readableBytes() + payloadSize extras.writeInt(0)
log.trace(ctx) { extras.writeInt(encodeExpiry(maxAge))
"Trying to send SET request to memcache" val totalBodyLength = request.digest.readableBytes() + extras.readableBytes() + payloadSize
} log.trace(ctx) {
request.requestController.whenComplete { requestController, ex -> "Trying to send SET request to memcache"
if(ex == null) { }
log.trace(ctx) { request.requestController.whenComplete { requestController, ex ->
"Sending SET request to memcache" if (ex == null) {
} log.trace(ctx) {
requestController.sendRequest(DefaultBinaryMemcacheRequest().apply { "Sending SET request to memcache"
setOpcode(BinaryMemcacheOpcodes.SET) }
setKey(key) requestController.sendRequest(DefaultBinaryMemcacheRequest().apply {
setExtras(extras) setOpcode(BinaryMemcacheOpcodes.SET)
setTotalBodyLength(totalBodyLength) setKey(key)
}) setExtras(extras)
log.trace(ctx) { setTotalBodyLength(totalBodyLength)
"Sending request payload to memcache" })
} log.trace(ctx) {
payloadSource.use { source -> "Sending request payload to memcache"
val bb = ByteBuffer.allocate(chunkSize) }
while (true) { payloadSource.use { source ->
val read = source.read(bb) val bb = ByteBuffer.allocate(chunkSize)
bb.limit() while (true) {
if(read >= 0 && bb.position() < chunkSize && bb.hasRemaining()) { val read = source.read(bb)
continue bb.limit()
} if (read >= 0 && bb.position() < chunkSize && bb.hasRemaining()) {
val chunk = ctx.alloc().buffer(chunkSize) continue
bb.flip() }
chunk.writeBytes(bb) val chunk = ctx.alloc().buffer(chunkSize)
bb.clear() bb.flip()
log.trace(ctx) { chunk.writeBytes(bb)
"Sending ${chunk.readableBytes()} bytes chunk to memcache" bb.clear()
} log.trace(ctx) {
if(read < 0) { "Sending ${chunk.readableBytes()} bytes chunk to memcache"
requestController.sendContent(DefaultLastMemcacheContent(chunk)) }
break if (read < 0) {
} else { requestController.sendContent(DefaultLastMemcacheContent(chunk))
requestController.sendContent(DefaultMemcacheContent(chunk)) break
} else {
requestController.sendContent(DefaultMemcacheContent(chunk))
}
} }
} }
} else {
payloadSource.close()
} }
} else {
payloadSource.close()
} }
} }
} }
} }
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
inProgressGetRequest?.let { val request = inProgressRequest
inProgressGetRequest = null when (request) {
it.rollback() is InProgressPutRequest -> {
} inProgressRequest = null
inProgressPutRequest?.let { request.requestController.thenAccept { controller ->
inProgressPutRequest = null controller.exceptionCaught(cause)
it.requestController.thenAccept { controller -> }
controller.exceptionCaught(cause) request.rollback()
}
is InProgressGetRequest -> {
inProgressRequest = null
request.rollback()
} }
it.rollback()
} }
super.exceptionCaught(ctx, cause) super.exceptionCaught(ctx, cause)
} }

View File

@@ -12,6 +12,7 @@ dependencies {
implementation catalog.netty.handler implementation catalog.netty.handler
implementation catalog.netty.buffer implementation catalog.netty.buffer
implementation catalog.netty.transport implementation catalog.netty.transport
implementation("com.github.ben-manes.caffeine:caffeine:3.2.0")
api project(':rbcs-common') api project(':rbcs-common')
api project(':rbcs-api') api project(':rbcs-api')

View File

@@ -18,6 +18,7 @@ module net.woggioni.rbcs.server {
requires net.woggioni.jwo; requires net.woggioni.jwo;
requires net.woggioni.rbcs.common; requires net.woggioni.rbcs.common;
requires net.woggioni.rbcs.api; requires net.woggioni.rbcs.api;
requires com.github.benmanes.caffeine;
exports net.woggioni.rbcs.server; exports net.woggioni.rbcs.server;

View File

@@ -54,6 +54,7 @@ import net.woggioni.rbcs.server.auth.RoleAuthorizer
import net.woggioni.rbcs.server.configuration.Parser import net.woggioni.rbcs.server.configuration.Parser
import net.woggioni.rbcs.server.configuration.Serializer import net.woggioni.rbcs.server.configuration.Serializer
import net.woggioni.rbcs.server.exception.ExceptionHandler import net.woggioni.rbcs.server.exception.ExceptionHandler
import net.woggioni.rbcs.server.handler.BlackHoleRequestHandler
import net.woggioni.rbcs.server.handler.MaxRequestSizeHandler import net.woggioni.rbcs.server.handler.MaxRequestSizeHandler
import net.woggioni.rbcs.server.handler.ServerHandler import net.woggioni.rbcs.server.handler.ServerHandler
import net.woggioni.rbcs.server.throttling.BucketManager import net.woggioni.rbcs.server.throttling.BucketManager
@@ -361,6 +362,7 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
} }
pipeline.addLast(eventExecutorGroup, ServerHandler.NAME, serverHandler) pipeline.addLast(eventExecutorGroup, ServerHandler.NAME, serverHandler)
pipeline.addLast(ExceptionHandler.NAME, ExceptionHandler) pipeline.addLast(ExceptionHandler.NAME, ExceptionHandler)
pipeline.addLast(BlackHoleRequestHandler.NAME, BlackHoleRequestHandler())
} }
override fun asyncClose() = cacheHandlerFactory.asyncClose() override fun asyncClose() = cacheHandlerFactory.asyncClose()

View File

@@ -2,7 +2,6 @@ package net.woggioni.rbcs.server.cache
import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelHandlerContext
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.http.LastHttpContent import io.netty.handler.codec.http.LastHttpContent
import io.netty.handler.stream.ChunkedNioFile import io.netty.handler.stream.ChunkedNioFile
import net.woggioni.rbcs.api.CacheHandler import net.woggioni.rbcs.api.CacheHandler
@@ -29,10 +28,16 @@ class FileSystemCacheHandler(
private val chunkSize: Int private val chunkSize: Int
) : CacheHandler() { ) : CacheHandler() {
private interface InProgressRequest{
}
private class InProgressGetRequest(val request : CacheGetRequest) : InProgressRequest
private inner class InProgressPutRequest( private inner class InProgressPutRequest(
val key : String, val key : String,
private val fileSink : FileSystemCache.FileSink private val fileSink : FileSystemCache.FileSink
) { ) : InProgressRequest {
private val stream = Channels.newOutputStream(fileSink.channel).let { private val stream = Channels.newOutputStream(fileSink.channel).let {
if (compressionEnabled) { if (compressionEnabled) {
@@ -56,7 +61,7 @@ class FileSystemCacheHandler(
} }
} }
private var inProgressPutRequest: InProgressPutRequest? = null private var inProgressRequest: InProgressRequest? = null
override fun channelRead0(ctx: ChannelHandlerContext, msg: CacheMessage) { override fun channelRead0(ctx: ChannelHandlerContext, msg: CacheMessage) {
when (msg) { when (msg) {
@@ -69,55 +74,64 @@ class FileSystemCacheHandler(
} }
private fun handleGetRequest(ctx: ChannelHandlerContext, msg: CacheGetRequest) { private fun handleGetRequest(ctx: ChannelHandlerContext, msg: CacheGetRequest) {
val key = String(Base64.getUrlEncoder().encode(processCacheKey(msg.key, digestAlgorithm))) inProgressRequest = InProgressGetRequest(msg)
cache.get(key)?.also { entryValue ->
sendMessageAndFlush(ctx, CacheValueFoundResponse(msg.key, entryValue.metadata))
entryValue.channel.let { channel ->
if(compressionEnabled) {
InflaterInputStream(Channels.newInputStream(channel)).use { stream ->
outerLoop@
while (true) {
val buf = ctx.alloc().heapBuffer(chunkSize)
while(buf.readableBytes() < chunkSize) {
val read = buf.writeBytes(stream, chunkSize)
if(read < 0) {
sendMessageAndFlush(ctx, LastCacheContent(buf))
break@outerLoop
}
}
sendMessageAndFlush(ctx, CacheContent(buf))
}
}
} else {
sendMessage(ctx, ChunkedNioFile(channel, entryValue.offset, entryValue.size - entryValue.offset, chunkSize))
sendMessageAndFlush(ctx, LastHttpContent.EMPTY_LAST_CONTENT)
}
}
} ?: sendMessageAndFlush(ctx, CacheValueNotFoundResponse())
} }
private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) { private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) {
val key = String(Base64.getUrlEncoder().encode(processCacheKey(msg.key, digestAlgorithm))) val key = String(Base64.getUrlEncoder().encode(processCacheKey(msg.key, digestAlgorithm)))
val sink = cache.put(key, msg.metadata) val sink = cache.put(key, msg.metadata)
inProgressPutRequest = InProgressPutRequest(msg.key, sink) inProgressRequest = InProgressPutRequest(msg.key, sink)
} }
private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) { private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) {
inProgressPutRequest!!.write(msg.content()) val request = inProgressRequest
if(request is InProgressPutRequest) {
request.write(msg.content())
}
} }
private fun handleLastCacheContent(ctx: ChannelHandlerContext, msg: LastCacheContent) { private fun handleLastCacheContent(ctx: ChannelHandlerContext, msg: LastCacheContent) {
inProgressPutRequest?.let { request -> when(val request = inProgressRequest) {
inProgressPutRequest = null is InProgressPutRequest -> {
request.write(msg.content()) inProgressRequest = null
request.commit() request.write(msg.content())
sendMessageAndFlush(ctx, CachePutResponse(request.key)) request.commit()
sendMessageAndFlush(ctx, CachePutResponse(request.key))
}
is InProgressGetRequest -> {
val key = String(Base64.getUrlEncoder().encode(processCacheKey(request.request.key, digestAlgorithm)))
cache.get(key)?.also { entryValue ->
sendMessageAndFlush(ctx, CacheValueFoundResponse(request.request.key, entryValue.metadata))
entryValue.channel.let { channel ->
if(compressionEnabled) {
InflaterInputStream(Channels.newInputStream(channel)).use { stream ->
outerLoop@
while (true) {
val buf = ctx.alloc().heapBuffer(chunkSize)
while(buf.readableBytes() < chunkSize) {
val read = buf.writeBytes(stream, chunkSize)
if(read < 0) {
sendMessageAndFlush(ctx, LastCacheContent(buf))
break@outerLoop
}
}
sendMessageAndFlush(ctx, CacheContent(buf))
}
}
} else {
sendMessage(ctx, ChunkedNioFile(channel, entryValue.offset, entryValue.size - entryValue.offset, chunkSize))
sendMessageAndFlush(ctx, LastHttpContent.EMPTY_LAST_CONTENT)
}
}
} ?: sendMessageAndFlush(ctx, CacheValueNotFoundResponse())
}
} }
} }
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
inProgressPutRequest?.rollback() (inProgressRequest as? InProgressPutRequest)?.rollback()
super.exceptionCaught(ctx, cause) super.exceptionCaught(ctx, cause)
} }
} }

View File

@@ -1,16 +1,13 @@
package net.woggioni.rbcs.server.cache package net.woggioni.rbcs.server.cache
import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.Caffeine
import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBuf
import net.woggioni.rbcs.api.AsyncCloseable import net.woggioni.rbcs.api.AsyncCloseable
import net.woggioni.rbcs.api.CacheValueMetadata import net.woggioni.rbcs.api.CacheValueMetadata
import net.woggioni.rbcs.common.createLogger import net.woggioni.rbcs.common.createLogger
import java.time.Duration import java.time.Duration
import java.time.Instant
import java.util.PriorityQueue
import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.concurrent.withLock
private class CacheKey(private val value: ByteArray) { private class CacheKey(private val value: ByteArray) {
override fun equals(other: Any?) = if (other is CacheKey) { override fun equals(other: Any?) = if (other is CacheKey) {
@@ -26,114 +23,29 @@ class CacheEntry(
) )
class InMemoryCache( class InMemoryCache(
private val maxAge: Duration, maxAge: Duration,
private val maxSize: Long maxSize: Long
) : AsyncCloseable { ) : AsyncCloseable {
companion object { companion object {
private val log = createLogger<InMemoryCache>() private val log = createLogger<InMemoryCache>()
} }
private var mapSize : Long = 0 private val cache: Cache<CacheKey, CacheEntry> = Caffeine.newBuilder()
private val map = HashMap<CacheKey, CacheEntry>() .expireAfterWrite(maxAge)
private val lock = ReentrantReadWriteLock() .maximumSize(maxSize)
private val cond = lock.writeLock().newCondition() .build()
override fun asyncClose(): CompletableFuture<Void> = CompletableFuture.completedFuture(null)
private class RemovalQueueElement(val key: CacheKey, val value: CacheEntry, val expiry: Instant) : fun get(key: ByteArray) = cache.getIfPresent(CacheKey(key))?.run {
Comparable<RemovalQueueElement> {
override fun compareTo(other: RemovalQueueElement) = expiry.compareTo(other.expiry)
}
private val removalQueue = PriorityQueue<RemovalQueueElement>()
@Volatile
private var running = true
private val closeFuture = object : CompletableFuture<Void>() {
init {
Thread.ofVirtual().name("in-memory-cache-gc").start {
try {
lock.writeLock().withLock {
while (running) {
val el = removalQueue.poll()
if(el == null) {
cond.await(1000, TimeUnit.MILLISECONDS)
continue
}
val value = el.value
val now = Instant.now()
if (now > el.expiry) {
val removed = map.remove(el.key, value)
if (removed) {
updateSizeAfterRemoval(value.content)
//Decrease the reference count for map
value.content.release()
}
} else {
removalQueue.offer(el)
val interval = minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1))
cond.await(interval.toMillis(), TimeUnit.MILLISECONDS)
}
}
}
complete(null)
} catch (ex: Throwable) {
completeExceptionally(ex)
}
}
}
}
fun removeEldest(): Long {
while (true) {
val el = removalQueue.poll() ?: return mapSize
val value = el.value
val removed = map.remove(el.key, value)
if (removed) {
val newSize = updateSizeAfterRemoval(value.content)
//Decrease the reference count for map
value.content.release()
return newSize
}
}
}
private fun updateSizeAfterRemoval(removed: ByteBuf): Long {
mapSize -= removed.readableBytes()
return mapSize
}
override fun asyncClose() : CompletableFuture<Void> {
running = false
lock.writeLock().withLock {
cond.signal()
}
return closeFuture
}
fun get(key: ByteArray) = lock.readLock().withLock {
map[CacheKey(key)]?.run {
CacheEntry(metadata, content.retainedDuplicate()) CacheEntry(metadata, content.retainedDuplicate())
} }
}
fun put( fun put(
key: ByteArray, key: ByteArray,
value: CacheEntry, value: CacheEntry,
) { ) {
val cacheKey = CacheKey(key) val cacheKey = CacheKey(key)
lock.writeLock().withLock { cache.put(cacheKey, value)
val oldSize = map.put(cacheKey, value)?.let { old ->
val result = old.content.readableBytes()
old.content.release()
result
} ?: 0
val delta = value.content.readableBytes() - oldSize
mapSize += delta
removalQueue.offer(RemovalQueueElement(cacheKey, value, Instant.now().plus(maxAge)))
while (mapSize > maxSize) {
removeEldest()
}
}
} }
} }

View File

@@ -4,7 +4,6 @@ import io.netty.channel.ChannelFactory
import io.netty.channel.EventLoopGroup import io.netty.channel.EventLoopGroup
import io.netty.channel.socket.DatagramChannel import io.netty.channel.socket.DatagramChannel
import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.SocketChannel
import io.netty.util.concurrent.Future
import net.woggioni.rbcs.api.CacheHandlerFactory import net.woggioni.rbcs.api.CacheHandlerFactory
import net.woggioni.rbcs.api.Configuration import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.common.RBCS import net.woggioni.rbcs.common.RBCS

View File

@@ -2,19 +2,11 @@ package net.woggioni.rbcs.server.cache
import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelHandlerContext
import io.netty.channel.SimpleChannelInboundHandler
import net.woggioni.rbcs.api.CacheHandler import net.woggioni.rbcs.api.CacheHandler
import net.woggioni.rbcs.api.message.CacheMessage import net.woggioni.rbcs.api.message.CacheMessage
import net.woggioni.rbcs.api.message.CacheMessage.CacheContent import net.woggioni.rbcs.api.message.CacheMessage.*
import net.woggioni.rbcs.api.message.CacheMessage.CacheGetRequest
import net.woggioni.rbcs.api.message.CacheMessage.CachePutRequest
import net.woggioni.rbcs.api.message.CacheMessage.CachePutResponse
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueFoundResponse
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse
import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent
import net.woggioni.rbcs.common.ByteBufOutputStream import net.woggioni.rbcs.common.ByteBufOutputStream
import net.woggioni.rbcs.common.RBCS.processCacheKey import net.woggioni.rbcs.common.RBCS.processCacheKey
import net.woggioni.rbcs.common.trace
import java.util.zip.Deflater import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream import java.util.zip.DeflaterOutputStream
import java.util.zip.InflaterOutputStream import java.util.zip.InflaterOutputStream
@@ -26,7 +18,15 @@ class InMemoryCacheHandler(
private val compressionLevel: Int private val compressionLevel: Int
) : CacheHandler() { ) : CacheHandler() {
private interface InProgressPutRequest : AutoCloseable { private interface InProgressRequest : AutoCloseable {
}
private class InProgressGetRequest(val request : CacheGetRequest) : InProgressRequest {
override fun close() {
}
}
private interface InProgressPutRequest : InProgressRequest {
val request: CachePutRequest val request: CachePutRequest
val buf: ByteBuf val buf: ByteBuf
@@ -35,18 +35,14 @@ class InMemoryCacheHandler(
private inner class InProgressPlainPutRequest(ctx: ChannelHandlerContext, override val request: CachePutRequest) : private inner class InProgressPlainPutRequest(ctx: ChannelHandlerContext, override val request: CachePutRequest) :
InProgressPutRequest { InProgressPutRequest {
override val buf = ctx.alloc().compositeBuffer() override val buf = ctx.alloc().compositeHeapBuffer()
private val stream = ByteBufOutputStream(buf).let {
if (compressionEnabled) {
DeflaterOutputStream(it, Deflater(compressionLevel))
} else {
it
}
}
override fun append(buf: ByteBuf) { override fun append(buf: ByteBuf) {
this.buf.addComponent(true, buf.retain()) if(buf.isDirect) {
this.buf.writeBytes(buf)
} else {
this.buf.addComponent(true, buf.retain())
}
} }
override fun close() { override fun close() {
@@ -74,7 +70,7 @@ class InMemoryCacheHandler(
} }
} }
private var inProgressPutRequest: InProgressPutRequest? = null private var inProgressRequest: InProgressRequest? = null
override fun channelRead0(ctx: ChannelHandlerContext, msg: CacheMessage) { override fun channelRead0(ctx: ChannelHandlerContext, msg: CacheMessage) {
when (msg) { when (msg) {
@@ -87,24 +83,11 @@ class InMemoryCacheHandler(
} }
private fun handleGetRequest(ctx: ChannelHandlerContext, msg: CacheGetRequest) { private fun handleGetRequest(ctx: ChannelHandlerContext, msg: CacheGetRequest) {
cache.get(processCacheKey(msg.key, digestAlgorithm))?.let { value -> inProgressRequest = InProgressGetRequest(msg)
sendMessageAndFlush(ctx, CacheValueFoundResponse(msg.key, value.metadata))
if (compressionEnabled) {
val buf = ctx.alloc().heapBuffer()
InflaterOutputStream(ByteBufOutputStream(buf)).use {
value.content.readBytes(it, value.content.readableBytes())
value.content.release()
buf.retain()
}
sendMessage(ctx, LastCacheContent(buf))
} else {
sendMessage(ctx, LastCacheContent(value.content))
}
} ?: sendMessage(ctx, CacheValueNotFoundResponse())
} }
private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) { private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) {
inProgressPutRequest = if(compressionEnabled) { inProgressRequest = if(compressionEnabled) {
InProgressCompressedPutRequest(ctx, msg) InProgressCompressedPutRequest(ctx, msg)
} else { } else {
InProgressPlainPutRequest(ctx, msg) InProgressPlainPutRequest(ctx, msg)
@@ -112,27 +95,46 @@ class InMemoryCacheHandler(
} }
private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) { private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) {
inProgressPutRequest?.append(msg.content()) val req = inProgressRequest
if(req is InProgressPutRequest) {
req.append(msg.content())
}
} }
private fun handleLastCacheContent(ctx: ChannelHandlerContext, msg: LastCacheContent) { private fun handleLastCacheContent(ctx: ChannelHandlerContext, msg: LastCacheContent) {
handleCacheContent(ctx, msg) handleCacheContent(ctx, msg)
inProgressPutRequest?.let { inProgressRequest -> when(val req = inProgressRequest) {
inProgressPutRequest = null is InProgressGetRequest -> {
val buf = inProgressRequest.buf cache.get(processCacheKey(req.request.key, digestAlgorithm))?.let { value ->
buf.retain() sendMessageAndFlush(ctx, CacheValueFoundResponse(req.request.key, value.metadata))
inProgressRequest.close() if (compressionEnabled) {
val cacheKey = processCacheKey(inProgressRequest.request.key, digestAlgorithm) val buf = ctx.alloc().heapBuffer()
cache.put(cacheKey, CacheEntry(inProgressRequest.request.metadata, buf)) InflaterOutputStream(ByteBufOutputStream(buf)).use {
sendMessageAndFlush(ctx, CachePutResponse(inProgressRequest.request.key)) value.content.readBytes(it, value.content.readableBytes())
value.content.release()
buf.retain()
}
sendMessage(ctx, LastCacheContent(buf))
} else {
sendMessage(ctx, LastCacheContent(value.content))
}
} ?: sendMessage(ctx, CacheValueNotFoundResponse())
}
is InProgressPutRequest -> {
this.inProgressRequest = null
val buf = req.buf
buf.retain()
req.close()
val cacheKey = processCacheKey(req.request.key, digestAlgorithm)
cache.put(cacheKey, CacheEntry(req.request.metadata, buf))
sendMessageAndFlush(ctx, CachePutResponse(req.request.key))
}
} }
} }
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
inProgressPutRequest?.let { req -> inProgressRequest?.close()
req.buf.release() inProgressRequest = null
inProgressPutRequest = null
}
super.exceptionCaught(ctx, cause) super.exceptionCaught(ctx, cause)
} }
} }

View File

@@ -0,0 +1,13 @@
package net.woggioni.rbcs.server.handler
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.http.HttpContent
class BlackHoleRequestHandler : SimpleChannelInboundHandler<HttpContent>() {
companion object {
val NAME = BlackHoleRequestHandler::class.java.name
}
override fun channelRead0(ctx: ChannelHandlerContext, msg: HttpContent) {
}
}

View File

@@ -94,6 +94,9 @@ class ThrottlingHandler(private val bucketManager : BucketManager,
handleBuckets(buckets, ctx, msg, false) handleBuckets(buckets, ctx, msg, false)
}, waitDuration.toMillis(), TimeUnit.MILLISECONDS) }, waitDuration.toMillis(), TimeUnit.MILLISECONDS)
} else { } else {
queuedContent?.let { qc ->
qc.forEach { it.release() }
}
this.queuedContent = null this.queuedContent = null
sendThrottledResponse(ctx, waitDuration) sendThrottledResponse(ctx, waitDuration)
} }

View File

@@ -1,5 +1,14 @@
package net.woggioni.rbcs.server.test.utils; package net.woggioni.rbcs.server.test.utils;
import java.math.BigInteger;
import java.security.KeyPair;
import java.security.KeyPairGenerator;
import java.security.PrivateKey;
import java.security.SecureRandom;
import java.security.cert.X509Certificate;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Date;
import org.bouncycastle.asn1.DERSequence; import org.bouncycastle.asn1.DERSequence;
import org.bouncycastle.asn1.x500.X500Name; import org.bouncycastle.asn1.x500.X500Name;
import org.bouncycastle.asn1.x509.BasicConstraints; import org.bouncycastle.asn1.x509.BasicConstraints;
@@ -15,16 +24,6 @@ import org.bouncycastle.cert.jcajce.JcaX509v3CertificateBuilder;
import org.bouncycastle.operator.ContentSigner; import org.bouncycastle.operator.ContentSigner;
import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder; import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder;
import java.math.BigInteger;
import java.security.KeyPair;
import java.security.KeyPairGenerator;
import java.security.PrivateKey;
import java.security.SecureRandom;
import java.security.cert.X509Certificate;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Date;
public class CertificateUtils { public class CertificateUtils {
public record X509Credentials( public record X509Credentials(

View File

@@ -154,7 +154,7 @@ class BasicAuthServerTest : AbstractBasicAuthServerTest() {
} }
@Test @Test
@Order(6) @Order(8)
fun getAsAThrottledUser() { fun getAsAThrottledUser() {
val client: HttpClient = HttpClient.newHttpClient() val client: HttpClient = HttpClient.newHttpClient()
@@ -172,7 +172,7 @@ class BasicAuthServerTest : AbstractBasicAuthServerTest() {
} }
@Test @Test
@Order(7) @Order(9)
fun getAsAThrottledUser2() { fun getAsAThrottledUser2() {
val client: HttpClient = HttpClient.newHttpClient() val client: HttpClient = HttpClient.newHttpClient()