Compare commits
4 Commits
Author | SHA1 | Date | |
---|---|---|---|
222b475223
|
|||
ede515e2ca
|
|||
974fdb7a91
|
|||
a294229ff0
|
93
benchmark/rbcs-filesystem.yml
Normal file
93
benchmark/rbcs-filesystem.yml
Normal file
@@ -0,0 +1,93 @@
|
|||||||
|
apiVersion: v1
|
||||||
|
kind: ConfigMap
|
||||||
|
metadata:
|
||||||
|
name: rbcs-server
|
||||||
|
data:
|
||||||
|
rbcs-server.xml: |
|
||||||
|
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
|
||||||
|
<rbcs:server xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xmlns:rbcs="urn:net.woggioni.rbcs.server"
|
||||||
|
xmlns:rbcs-memcache="urn:net.woggioni.rbcs.server.memcache"
|
||||||
|
xs:schemaLocation="urn:net.woggioni.rbcs.server.memcache jpms://net.woggioni.rbcs.server.memcache/net/woggioni/rbcs/server/memcache/schema/rbcs-memcache.xsd urn:net.woggioni.rbcs.server jpms://net.woggioni.rbcs.server/net/woggioni/rbcs/server/schema/rbcs-server.xsd"
|
||||||
|
>
|
||||||
|
<bind host="0.0.0.0" port="8080" incoming-connections-backlog-size="128"/>
|
||||||
|
<connection
|
||||||
|
max-request-size="0xd000000"
|
||||||
|
idle-timeout="PT15S"
|
||||||
|
read-idle-timeout="PT30S"
|
||||||
|
write-idle-timeout="PT30S"/>
|
||||||
|
<event-executor use-virtual-threads="true"/>
|
||||||
|
<cache xs:type="rbcs:fileSystemCacheType" max-age="P7D" enable-compression="false" path="/rbcs/cache"/>
|
||||||
|
</rbcs:server>
|
||||||
|
|
||||||
|
---
|
||||||
|
apiVersion: v1
|
||||||
|
kind: PersistentVolumeClaim
|
||||||
|
metadata:
|
||||||
|
name: rbcs-pvc
|
||||||
|
namespace: default
|
||||||
|
spec:
|
||||||
|
accessModes:
|
||||||
|
- ReadWriteOnce
|
||||||
|
storageClassName: local-path
|
||||||
|
resources:
|
||||||
|
requests:
|
||||||
|
storage: 16Gi
|
||||||
|
---
|
||||||
|
apiVersion: apps/v1
|
||||||
|
kind: Deployment
|
||||||
|
metadata:
|
||||||
|
name: rbcs-deployment
|
||||||
|
labels:
|
||||||
|
app: rbcs
|
||||||
|
spec:
|
||||||
|
replicas: 1
|
||||||
|
selector:
|
||||||
|
matchLabels:
|
||||||
|
app: rbcs
|
||||||
|
template:
|
||||||
|
metadata:
|
||||||
|
labels:
|
||||||
|
app: rbcs
|
||||||
|
spec:
|
||||||
|
containers:
|
||||||
|
- name: rbcs
|
||||||
|
image: gitea.woggioni.net/woggioni/rbcs:native
|
||||||
|
imagePullPolicy: Always
|
||||||
|
args: ['server', '-c', 'rbcs-server.xml']
|
||||||
|
ports:
|
||||||
|
- containerPort: 8080
|
||||||
|
volumeMounts:
|
||||||
|
- name: config-volume
|
||||||
|
mountPath: /rbcs/rbcs-server.xml
|
||||||
|
subPath: rbcs-server.xml
|
||||||
|
- name: cache-volume
|
||||||
|
mountPath: /rbcs/cache
|
||||||
|
resources:
|
||||||
|
requests:
|
||||||
|
memory: "0.25Gi"
|
||||||
|
cpu: "1"
|
||||||
|
limits:
|
||||||
|
memory: "0.25Gi"
|
||||||
|
cpu: "3.5"
|
||||||
|
volumes:
|
||||||
|
- name: config-volume
|
||||||
|
configMap:
|
||||||
|
name: rbcs-server
|
||||||
|
- name: cache-volume
|
||||||
|
persistentVolumeClaim:
|
||||||
|
claimName: rbcs-pvc
|
||||||
|
---
|
||||||
|
apiVersion: v1
|
||||||
|
kind: Service
|
||||||
|
metadata:
|
||||||
|
name: rbcs-service
|
||||||
|
spec:
|
||||||
|
type: LoadBalancer
|
||||||
|
ports:
|
||||||
|
- port: 8080
|
||||||
|
targetPort: 8080
|
||||||
|
protocol: TCP
|
||||||
|
selector:
|
||||||
|
app: rbcs
|
||||||
|
|
76
benchmark/rbcs-in-memory.yml
Normal file
76
benchmark/rbcs-in-memory.yml
Normal file
@@ -0,0 +1,76 @@
|
|||||||
|
apiVersion: v1
|
||||||
|
kind: ConfigMap
|
||||||
|
metadata:
|
||||||
|
name: rbcs-server
|
||||||
|
data:
|
||||||
|
rbcs-server.xml: |
|
||||||
|
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
|
||||||
|
<rbcs:server xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xmlns:rbcs="urn:net.woggioni.rbcs.server"
|
||||||
|
xmlns:rbcs-memcache="urn:net.woggioni.rbcs.server.memcache"
|
||||||
|
xs:schemaLocation="urn:net.woggioni.rbcs.server.memcache jpms://net.woggioni.rbcs.server.memcache/net/woggioni/rbcs/server/memcache/schema/rbcs-memcache.xsd urn:net.woggioni.rbcs.server jpms://net.woggioni.rbcs.server/net/woggioni/rbcs/server/schema/rbcs-server.xsd"
|
||||||
|
>
|
||||||
|
<bind host="0.0.0.0" port="8080" incoming-connections-backlog-size="128"/>
|
||||||
|
<connection
|
||||||
|
max-request-size="0xd000000"
|
||||||
|
idle-timeout="PT15S"
|
||||||
|
read-idle-timeout="PT30S"
|
||||||
|
write-idle-timeout="PT30S"/>
|
||||||
|
<event-executor use-virtual-threads="true"/>
|
||||||
|
<cache xs:type="rbcs:inMemoryCacheType" max-age="P7D" enable-compression="false" max-size="0xb0000000" />
|
||||||
|
</rbcs:server>
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
apiVersion: apps/v1
|
||||||
|
kind: Deployment
|
||||||
|
metadata:
|
||||||
|
name: rbcs-deployment
|
||||||
|
labels:
|
||||||
|
app: rbcs
|
||||||
|
spec:
|
||||||
|
replicas: 1
|
||||||
|
selector:
|
||||||
|
matchLabels:
|
||||||
|
app: rbcs
|
||||||
|
template:
|
||||||
|
metadata:
|
||||||
|
labels:
|
||||||
|
app: rbcs
|
||||||
|
spec:
|
||||||
|
containers:
|
||||||
|
- name: rbcs
|
||||||
|
image: gitea.woggioni.net/woggioni/rbcs:native
|
||||||
|
imagePullPolicy: Always
|
||||||
|
args: ['server', '-c', 'rbcs-server.xml']
|
||||||
|
ports:
|
||||||
|
- containerPort: 8080
|
||||||
|
volumeMounts:
|
||||||
|
- name: config-volume
|
||||||
|
mountPath: /rbcs/rbcs-server.xml
|
||||||
|
subPath: rbcs-server.xml
|
||||||
|
resources:
|
||||||
|
requests:
|
||||||
|
memory: "0.5Gi"
|
||||||
|
cpu: "1"
|
||||||
|
limits:
|
||||||
|
memory: "4Gi"
|
||||||
|
cpu: "3.5"
|
||||||
|
volumes:
|
||||||
|
- name: config-volume
|
||||||
|
configMap:
|
||||||
|
name: rbcs-server
|
||||||
|
---
|
||||||
|
apiVersion: v1
|
||||||
|
kind: Service
|
||||||
|
metadata:
|
||||||
|
name: rbcs-service
|
||||||
|
spec:
|
||||||
|
type: LoadBalancer
|
||||||
|
ports:
|
||||||
|
- port: 8080
|
||||||
|
targetPort: 8080
|
||||||
|
protocol: TCP
|
||||||
|
selector:
|
||||||
|
app: rbcs
|
||||||
|
|
117
benchmark/rbcs-memcache.yml
Normal file
117
benchmark/rbcs-memcache.yml
Normal file
@@ -0,0 +1,117 @@
|
|||||||
|
apiVersion: v1
|
||||||
|
kind: ConfigMap
|
||||||
|
metadata:
|
||||||
|
name: rbcs-server
|
||||||
|
data:
|
||||||
|
rbcs-server.xml: |
|
||||||
|
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
|
||||||
|
<rbcs:server xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xmlns:rbcs="urn:net.woggioni.rbcs.server"
|
||||||
|
xmlns:rbcs-memcache="urn:net.woggioni.rbcs.server.memcache"
|
||||||
|
xs:schemaLocation="urn:net.woggioni.rbcs.server.memcache jpms://net.woggioni.rbcs.server.memcache/net/woggioni/rbcs/server/memcache/schema/rbcs-memcache.xsd urn:net.woggioni.rbcs.server jpms://net.woggioni.rbcs.server/net/woggioni/rbcs/server/schema/rbcs-server.xsd"
|
||||||
|
>
|
||||||
|
<bind host="0.0.0.0" port="8080" incoming-connections-backlog-size="128"/>
|
||||||
|
<connection
|
||||||
|
max-request-size="0xd000000"
|
||||||
|
idle-timeout="PT15S"
|
||||||
|
read-idle-timeout="PT30S"
|
||||||
|
write-idle-timeout="PT30S"/>
|
||||||
|
<event-executor use-virtual-threads="true"/>
|
||||||
|
<!--cache xs:type="rbcs:inMemoryCacheType" max-age="P7D" enable-compression="false" max-size="0x10000000" /-->
|
||||||
|
<cache xs:type="rbcs-memcache:memcacheCacheType" max-age="P7D" chunk-size="0x1000" digest="MD5">
|
||||||
|
<server host="memcached-service" port="11211" max-connections="256"/>
|
||||||
|
</cache>
|
||||||
|
</rbcs:server>
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
apiVersion: apps/v1
|
||||||
|
kind: Deployment
|
||||||
|
metadata:
|
||||||
|
name: rbcs-deployment
|
||||||
|
labels:
|
||||||
|
app: rbcs
|
||||||
|
spec:
|
||||||
|
replicas: 1
|
||||||
|
selector:
|
||||||
|
matchLabels:
|
||||||
|
app: rbcs
|
||||||
|
template:
|
||||||
|
metadata:
|
||||||
|
labels:
|
||||||
|
app: rbcs
|
||||||
|
spec:
|
||||||
|
containers:
|
||||||
|
- name: rbcs
|
||||||
|
image: gitea.woggioni.net/woggioni/rbcs:native
|
||||||
|
imagePullPolicy: Always
|
||||||
|
args: ['server', '-c', 'rbcs-server.xml']
|
||||||
|
ports:
|
||||||
|
- containerPort: 8080
|
||||||
|
volumeMounts:
|
||||||
|
- name: config-volume
|
||||||
|
mountPath: /rbcs/rbcs-server.xml
|
||||||
|
subPath: rbcs-server.xml
|
||||||
|
resources:
|
||||||
|
requests:
|
||||||
|
memory: "0.25Gi"
|
||||||
|
cpu: "1"
|
||||||
|
limits:
|
||||||
|
memory: "0.25Gi"
|
||||||
|
cpu: "1"
|
||||||
|
volumes:
|
||||||
|
- name: config-volume
|
||||||
|
configMap:
|
||||||
|
name: rbcs-server
|
||||||
|
---
|
||||||
|
apiVersion: v1
|
||||||
|
kind: Service
|
||||||
|
metadata:
|
||||||
|
name: rbcs-service
|
||||||
|
spec:
|
||||||
|
type: LoadBalancer
|
||||||
|
ports:
|
||||||
|
- port: 8080
|
||||||
|
targetPort: 8080
|
||||||
|
protocol: TCP
|
||||||
|
selector:
|
||||||
|
app: rbcs
|
||||||
|
---
|
||||||
|
apiVersion: apps/v1
|
||||||
|
kind: Deployment
|
||||||
|
metadata:
|
||||||
|
name: memcached-deployment
|
||||||
|
spec:
|
||||||
|
replicas: 1
|
||||||
|
selector:
|
||||||
|
matchLabels:
|
||||||
|
app: memcached
|
||||||
|
template:
|
||||||
|
metadata:
|
||||||
|
labels:
|
||||||
|
app: memcached
|
||||||
|
spec:
|
||||||
|
containers:
|
||||||
|
- name: memcached
|
||||||
|
image: memcached
|
||||||
|
args: ["-I", "128m", "-m", "4096"]
|
||||||
|
resources:
|
||||||
|
requests:
|
||||||
|
memory: "1Gi"
|
||||||
|
cpu: "500m" # 0.5 CPU
|
||||||
|
limits:
|
||||||
|
memory: "5Gi"
|
||||||
|
cpu: "500m" # 0.5 CP
|
||||||
|
---
|
||||||
|
apiVersion: v1
|
||||||
|
kind: Service
|
||||||
|
metadata:
|
||||||
|
name: memcached-service
|
||||||
|
spec:
|
||||||
|
type: ClusterIP # ClusterIP makes it accessible only within the cluster
|
||||||
|
ports:
|
||||||
|
- port: 11211 # Default memcached port
|
||||||
|
targetPort: 11211
|
||||||
|
protocol: TCP
|
||||||
|
selector:
|
||||||
|
app: memcached
|
@@ -1,10 +1,9 @@
|
|||||||
package net.woggioni.rbcs.api;
|
package net.woggioni.rbcs.api;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
|
|
||||||
@Getter
|
@Getter
|
||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
public class CacheValueMetadata implements Serializable {
|
public class CacheValueMetadata implements Serializable {
|
||||||
|
@@ -1,16 +1,15 @@
|
|||||||
package net.woggioni.rbcs.api;
|
package net.woggioni.rbcs.api;
|
||||||
|
|
||||||
|
|
||||||
import lombok.EqualsAndHashCode;
|
|
||||||
import lombok.NonNull;
|
|
||||||
import lombok.Value;
|
|
||||||
|
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.security.cert.X509Certificate;
|
import java.security.cert.X509Certificate;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
import lombok.EqualsAndHashCode;
|
||||||
|
import lombok.NonNull;
|
||||||
|
import lombok.Value;
|
||||||
|
|
||||||
@Value
|
@Value
|
||||||
public class Configuration {
|
public class Configuration {
|
||||||
|
@@ -1,2 +1,2 @@
|
|||||||
Args=-O3 -march=skylake --gc=serial --install-exit-handlers --initialize-at-run-time=io.netty --enable-url-protocols=jpms --initialize-at-build-time=net.woggioni.rbcs.common.RbcsUrlStreamHandlerFactory,net.woggioni.rbcs.common.RbcsUrlStreamHandlerFactory$JpmsHandler
|
Args=-O3 -march=x86-64-v2 --gc=serial --install-exit-handlers --initialize-at-run-time=io.netty --enable-url-protocols=jpms --initialize-at-build-time=net.woggioni.rbcs.common.RbcsUrlStreamHandlerFactory,net.woggioni.rbcs.common.RbcsUrlStreamHandlerFactory$JpmsHandler
|
||||||
#-H:TraceClassInitialization=io.netty.handler.ssl.BouncyCastleAlpnSslUtils
|
#-H:TraceClassInitialization=io.netty.handler.ssl.BouncyCastleAlpnSslUtils
|
@@ -6,7 +6,6 @@ import net.woggioni.rbcs.client.Configuration
|
|||||||
import net.woggioni.rbcs.common.createLogger
|
import net.woggioni.rbcs.common.createLogger
|
||||||
import net.woggioni.rbcs.common.debug
|
import net.woggioni.rbcs.common.debug
|
||||||
import picocli.CommandLine
|
import picocli.CommandLine
|
||||||
import java.lang.IllegalArgumentException
|
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
|
|
||||||
@CommandLine.Command(
|
@CommandLine.Command(
|
||||||
|
@@ -1,7 +1,6 @@
|
|||||||
package net.woggioni.rbcs.server.memcache
|
package net.woggioni.rbcs.server.memcache
|
||||||
|
|
||||||
import io.netty.channel.ChannelFactory
|
import io.netty.channel.ChannelFactory
|
||||||
import io.netty.channel.ChannelHandler
|
|
||||||
import io.netty.channel.EventLoopGroup
|
import io.netty.channel.EventLoopGroup
|
||||||
import io.netty.channel.pool.FixedChannelPool
|
import io.netty.channel.pool.FixedChannelPool
|
||||||
import io.netty.channel.socket.DatagramChannel
|
import io.netty.channel.socket.DatagramChannel
|
||||||
|
@@ -111,17 +111,17 @@ class MemcacheCacheHandler(
|
|||||||
}
|
}
|
||||||
if (responseSent) {
|
if (responseSent) {
|
||||||
acc.readBytes(outputStream, acc.readableBytes())
|
acc.readBytes(outputStream, acc.readableBytes())
|
||||||
if(acc.readableBytes() >= chunkSize) {
|
if (acc.readableBytes() >= chunkSize) {
|
||||||
flush(false)
|
flush(false)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun flush(last : Boolean) {
|
private fun flush(last: Boolean) {
|
||||||
val toSend = extractChunk(chunk, ctx.alloc())
|
val toSend = extractChunk(chunk, ctx.alloc())
|
||||||
val msg = if(last) {
|
val msg = if (last) {
|
||||||
log.trace(ctx) {
|
log.trace(ctx) {
|
||||||
"Sending last chunk to client on channel"
|
"Sending last chunk to client"
|
||||||
}
|
}
|
||||||
LastCacheContent(toSend)
|
LastCacheContent(toSend)
|
||||||
} else {
|
} else {
|
||||||
@@ -148,14 +148,14 @@ class MemcacheCacheHandler(
|
|||||||
}
|
}
|
||||||
|
|
||||||
private inner class InProgressPutRequest(
|
private inner class InProgressPutRequest(
|
||||||
private val ch : NettyChannel,
|
private val ch: NettyChannel,
|
||||||
metadata : CacheValueMetadata,
|
metadata: CacheValueMetadata,
|
||||||
val digest : ByteBuf,
|
val digest: ByteBuf,
|
||||||
val requestController: CompletableFuture<MemcacheRequestController>,
|
val requestController: CompletableFuture<MemcacheRequestController>,
|
||||||
private val alloc: ByteBufAllocator
|
private val alloc: ByteBufAllocator
|
||||||
) : InProgressRequest {
|
) : InProgressRequest {
|
||||||
private var totalSize = 0
|
private var totalSize = 0
|
||||||
private var tmpFile : FileChannel? = null
|
private var tmpFile: FileChannel? = null
|
||||||
private val accumulator = alloc.compositeBuffer()
|
private val accumulator = alloc.compositeBuffer()
|
||||||
private val stream = ByteBufOutputStream(accumulator).let {
|
private val stream = ByteBufOutputStream(accumulator).let {
|
||||||
if (compressionEnabled) {
|
if (compressionEnabled) {
|
||||||
@@ -182,7 +182,7 @@ class MemcacheCacheHandler(
|
|||||||
tmpFile?.let {
|
tmpFile?.let {
|
||||||
flushToDisk(it, accumulator)
|
flushToDisk(it, accumulator)
|
||||||
}
|
}
|
||||||
if(accumulator.readableBytes() > 0x100000) {
|
if (accumulator.readableBytes() > 0x100000) {
|
||||||
log.debug(ch) {
|
log.debug(ch) {
|
||||||
"Entry is too big, buffering it into a file"
|
"Entry is too big, buffering it into a file"
|
||||||
}
|
}
|
||||||
@@ -199,18 +199,18 @@ class MemcacheCacheHandler(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun flushToDisk(fc : FileChannel, buf : CompositeByteBuf) {
|
private fun flushToDisk(fc: FileChannel, buf: CompositeByteBuf) {
|
||||||
val chunk = extractChunk(buf, alloc)
|
val chunk = extractChunk(buf, alloc)
|
||||||
fc.write(chunk.nioBuffer())
|
fc.write(chunk.nioBuffer())
|
||||||
chunk.release()
|
chunk.release()
|
||||||
}
|
}
|
||||||
|
|
||||||
fun commit() : Pair<Int, ReadableByteChannel> {
|
fun commit(): Pair<Int, ReadableByteChannel> {
|
||||||
digest.release()
|
digest.release()
|
||||||
accumulator.retain()
|
accumulator.retain()
|
||||||
stream.close()
|
stream.close()
|
||||||
val fileChannel = tmpFile
|
val fileChannel = tmpFile
|
||||||
return if(fileChannel != null) {
|
return if (fileChannel != null) {
|
||||||
flushToDisk(fileChannel, accumulator)
|
flushToDisk(fileChannel, accumulator)
|
||||||
accumulator.release()
|
accumulator.release()
|
||||||
fileChannel.position(0)
|
fileChannel.position(0)
|
||||||
@@ -244,9 +244,69 @@ class MemcacheCacheHandler(
|
|||||||
}
|
}
|
||||||
|
|
||||||
private fun handleGetRequest(ctx: ChannelHandlerContext, msg: CacheGetRequest) {
|
private fun handleGetRequest(ctx: ChannelHandlerContext, msg: CacheGetRequest) {
|
||||||
|
log.debug(ctx) {
|
||||||
|
"Fetching ${msg.key} from memcache"
|
||||||
|
}
|
||||||
|
val key = ctx.alloc().buffer().also {
|
||||||
|
it.writeBytes(processCacheKey(msg.key, digestAlgorithm))
|
||||||
|
}
|
||||||
|
val responseHandler = object : MemcacheResponseHandler {
|
||||||
|
override fun responseReceived(response: BinaryMemcacheResponse) {
|
||||||
|
val status = response.status()
|
||||||
|
when (status) {
|
||||||
|
BinaryMemcacheResponseStatus.SUCCESS -> {
|
||||||
|
log.debug(ctx) {
|
||||||
|
"Cache hit for key ${msg.key} on memcache"
|
||||||
|
}
|
||||||
inProgressRequest = InProgressGetRequest(msg.key, ctx)
|
inProgressRequest = InProgressGetRequest(msg.key, ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
BinaryMemcacheResponseStatus.KEY_ENOENT -> {
|
||||||
|
log.debug(ctx) {
|
||||||
|
"Cache miss for key ${msg.key} on memcache"
|
||||||
|
}
|
||||||
|
sendMessageAndFlush(ctx, CacheValueNotFoundResponse())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun contentReceived(content: MemcacheContent) {
|
||||||
|
log.trace(ctx) {
|
||||||
|
"${if (content is LastMemcacheContent) "Last chunk" else "Chunk"} of ${
|
||||||
|
content.content().readableBytes()
|
||||||
|
} bytes received from memcache for key ${msg.key}"
|
||||||
|
}
|
||||||
|
(inProgressRequest as? InProgressGetRequest)?.let { inProgressGetRequest ->
|
||||||
|
inProgressGetRequest.write(content.content())
|
||||||
|
if (content is LastMemcacheContent) {
|
||||||
|
inProgressRequest = null
|
||||||
|
inProgressGetRequest.commit()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun exceptionCaught(ex: Throwable) {
|
||||||
|
(inProgressRequest as? InProgressGetRequest).let { inProgressGetRequest ->
|
||||||
|
inProgressGetRequest?.let {
|
||||||
|
inProgressRequest = null
|
||||||
|
it.rollback()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this@MemcacheCacheHandler.exceptionCaught(ctx, ex)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
client.sendRequest(key.retainedDuplicate(), responseHandler).thenAccept { requestHandle ->
|
||||||
|
log.trace(ctx) {
|
||||||
|
"Sending GET request for key ${msg.key} to memcache"
|
||||||
|
}
|
||||||
|
val request = DefaultBinaryMemcacheRequest(key).apply {
|
||||||
|
setOpcode(BinaryMemcacheOpcodes.GET)
|
||||||
|
}
|
||||||
|
requestHandle.sendRequest(request)
|
||||||
|
requestHandle.sendContent(LastMemcacheContent.EMPTY_LAST_CONTENT)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) {
|
private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) {
|
||||||
val key = ctx.alloc().buffer().also {
|
val key = ctx.alloc().buffer().also {
|
||||||
it.writeBytes(processCacheKey(msg.key, digestAlgorithm))
|
it.writeBytes(processCacheKey(msg.key, digestAlgorithm))
|
||||||
@@ -261,6 +321,7 @@ class MemcacheCacheHandler(
|
|||||||
}
|
}
|
||||||
sendMessageAndFlush(ctx, CachePutResponse(msg.key))
|
sendMessageAndFlush(ctx, CachePutResponse(msg.key))
|
||||||
}
|
}
|
||||||
|
|
||||||
else -> this@MemcacheCacheHandler.exceptionCaught(ctx, MemcacheException(status))
|
else -> this@MemcacheCacheHandler.exceptionCaught(ctx, MemcacheException(status))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -282,13 +343,14 @@ class MemcacheCacheHandler(
|
|||||||
|
|
||||||
private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) {
|
private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) {
|
||||||
val request = inProgressRequest
|
val request = inProgressRequest
|
||||||
when(request) {
|
when (request) {
|
||||||
is InProgressPutRequest -> {
|
is InProgressPutRequest -> {
|
||||||
log.trace(ctx) {
|
log.trace(ctx) {
|
||||||
"Received chunk of ${msg.content().readableBytes()} bytes for memcache"
|
"Received chunk of ${msg.content().readableBytes()} bytes for memcache"
|
||||||
}
|
}
|
||||||
request.write(msg.content())
|
request.write(msg.content())
|
||||||
}
|
}
|
||||||
|
|
||||||
is InProgressGetRequest -> {
|
is InProgressGetRequest -> {
|
||||||
msg.release()
|
msg.release()
|
||||||
}
|
}
|
||||||
@@ -297,7 +359,7 @@ class MemcacheCacheHandler(
|
|||||||
|
|
||||||
private fun handleLastCacheContent(ctx: ChannelHandlerContext, msg: LastCacheContent) {
|
private fun handleLastCacheContent(ctx: ChannelHandlerContext, msg: LastCacheContent) {
|
||||||
val request = inProgressRequest
|
val request = inProgressRequest
|
||||||
when(request) {
|
when (request) {
|
||||||
is InProgressPutRequest -> {
|
is InProgressPutRequest -> {
|
||||||
inProgressRequest = null
|
inProgressRequest = null
|
||||||
log.trace(ctx) {
|
log.trace(ctx) {
|
||||||
@@ -314,7 +376,7 @@ class MemcacheCacheHandler(
|
|||||||
"Trying to send SET request to memcache"
|
"Trying to send SET request to memcache"
|
||||||
}
|
}
|
||||||
request.requestController.whenComplete { requestController, ex ->
|
request.requestController.whenComplete { requestController, ex ->
|
||||||
if(ex == null) {
|
if (ex == null) {
|
||||||
log.trace(ctx) {
|
log.trace(ctx) {
|
||||||
"Sending SET request to memcache"
|
"Sending SET request to memcache"
|
||||||
}
|
}
|
||||||
@@ -332,7 +394,7 @@ class MemcacheCacheHandler(
|
|||||||
while (true) {
|
while (true) {
|
||||||
val read = source.read(bb)
|
val read = source.read(bb)
|
||||||
bb.limit()
|
bb.limit()
|
||||||
if(read >= 0 && bb.position() < chunkSize && bb.hasRemaining()) {
|
if (read >= 0 && bb.position() < chunkSize && bb.hasRemaining()) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
val chunk = ctx.alloc().buffer(chunkSize)
|
val chunk = ctx.alloc().buffer(chunkSize)
|
||||||
@@ -342,7 +404,7 @@ class MemcacheCacheHandler(
|
|||||||
log.trace(ctx) {
|
log.trace(ctx) {
|
||||||
"Sending ${chunk.readableBytes()} bytes chunk to memcache"
|
"Sending ${chunk.readableBytes()} bytes chunk to memcache"
|
||||||
}
|
}
|
||||||
if(read < 0) {
|
if (read < 0) {
|
||||||
requestController.sendContent(DefaultLastMemcacheContent(chunk))
|
requestController.sendContent(DefaultLastMemcacheContent(chunk))
|
||||||
break
|
break
|
||||||
} else {
|
} else {
|
||||||
@@ -355,73 +417,12 @@ class MemcacheCacheHandler(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
is InProgressGetRequest -> {
|
|
||||||
log.debug(ctx) {
|
|
||||||
"Fetching ${request.key} from memcache"
|
|
||||||
}
|
|
||||||
val key = ctx.alloc().buffer().also {
|
|
||||||
it.writeBytes(processCacheKey(request.key, digestAlgorithm))
|
|
||||||
}
|
|
||||||
val responseHandler = object : MemcacheResponseHandler {
|
|
||||||
override fun responseReceived(response: BinaryMemcacheResponse) {
|
|
||||||
val status = response.status()
|
|
||||||
when (status) {
|
|
||||||
BinaryMemcacheResponseStatus.SUCCESS -> {
|
|
||||||
log.debug(ctx) {
|
|
||||||
"Cache hit for key ${request.key} on memcache"
|
|
||||||
}
|
|
||||||
inProgressRequest = InProgressGetRequest(request.key, ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
BinaryMemcacheResponseStatus.KEY_ENOENT -> {
|
|
||||||
log.debug(ctx) {
|
|
||||||
"Cache miss for key ${request.key} on memcache"
|
|
||||||
}
|
|
||||||
sendMessageAndFlush(ctx, CacheValueNotFoundResponse())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun contentReceived(content: MemcacheContent) {
|
|
||||||
log.trace(ctx) {
|
|
||||||
"${if(content is LastMemcacheContent) "Last chunk" else "Chunk"} of ${content.content().readableBytes()} bytes received from memcache for key ${request.key}"
|
|
||||||
}
|
|
||||||
(inProgressRequest as? InProgressGetRequest).let { inProgressGetRequest ->
|
|
||||||
inProgressGetRequest?.write(content.content())
|
|
||||||
if (content is LastMemcacheContent) {
|
|
||||||
inProgressRequest = null
|
|
||||||
inProgressGetRequest?.commit()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun exceptionCaught(ex: Throwable) {
|
|
||||||
(inProgressRequest as? InProgressGetRequest).let { inProgressGetRequest ->
|
|
||||||
inProgressGetRequest?.let {
|
|
||||||
inProgressRequest = null
|
|
||||||
it.rollback()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
this@MemcacheCacheHandler.exceptionCaught(ctx, ex)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
client.sendRequest(key.retainedDuplicate(), responseHandler).thenAccept { requestHandle ->
|
|
||||||
log.trace(ctx) {
|
|
||||||
"Sending GET request for key ${request.key} to memcache"
|
|
||||||
}
|
|
||||||
val request = DefaultBinaryMemcacheRequest(key).apply {
|
|
||||||
setOpcode(BinaryMemcacheOpcodes.GET)
|
|
||||||
}
|
|
||||||
requestHandle.sendRequest(request)
|
|
||||||
requestHandle.sendContent(LastMemcacheContent.EMPTY_LAST_CONTENT)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
|
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
|
||||||
val request = inProgressRequest
|
val request = inProgressRequest
|
||||||
when(request) {
|
when (request) {
|
||||||
is InProgressPutRequest -> {
|
is InProgressPutRequest -> {
|
||||||
inProgressRequest = null
|
inProgressRequest = null
|
||||||
request.requestController.thenAccept { controller ->
|
request.requestController.thenAccept { controller ->
|
||||||
@@ -429,6 +430,7 @@ class MemcacheCacheHandler(
|
|||||||
}
|
}
|
||||||
request.rollback()
|
request.rollback()
|
||||||
}
|
}
|
||||||
|
|
||||||
is InProgressGetRequest -> {
|
is InProgressGetRequest -> {
|
||||||
inProgressRequest = null
|
inProgressRequest = null
|
||||||
request.rollback()
|
request.rollback()
|
||||||
|
@@ -4,7 +4,6 @@ import io.netty.channel.ChannelFactory
|
|||||||
import io.netty.channel.EventLoopGroup
|
import io.netty.channel.EventLoopGroup
|
||||||
import io.netty.channel.socket.DatagramChannel
|
import io.netty.channel.socket.DatagramChannel
|
||||||
import io.netty.channel.socket.SocketChannel
|
import io.netty.channel.socket.SocketChannel
|
||||||
import io.netty.util.concurrent.Future
|
|
||||||
import net.woggioni.rbcs.api.CacheHandlerFactory
|
import net.woggioni.rbcs.api.CacheHandlerFactory
|
||||||
import net.woggioni.rbcs.api.Configuration
|
import net.woggioni.rbcs.api.Configuration
|
||||||
import net.woggioni.rbcs.common.RBCS
|
import net.woggioni.rbcs.common.RBCS
|
||||||
|
@@ -2,19 +2,11 @@ package net.woggioni.rbcs.server.cache
|
|||||||
|
|
||||||
import io.netty.buffer.ByteBuf
|
import io.netty.buffer.ByteBuf
|
||||||
import io.netty.channel.ChannelHandlerContext
|
import io.netty.channel.ChannelHandlerContext
|
||||||
import io.netty.channel.SimpleChannelInboundHandler
|
|
||||||
import net.woggioni.rbcs.api.CacheHandler
|
import net.woggioni.rbcs.api.CacheHandler
|
||||||
import net.woggioni.rbcs.api.message.CacheMessage
|
import net.woggioni.rbcs.api.message.CacheMessage
|
||||||
import net.woggioni.rbcs.api.message.CacheMessage.CacheContent
|
import net.woggioni.rbcs.api.message.CacheMessage.*
|
||||||
import net.woggioni.rbcs.api.message.CacheMessage.CacheGetRequest
|
|
||||||
import net.woggioni.rbcs.api.message.CacheMessage.CachePutRequest
|
|
||||||
import net.woggioni.rbcs.api.message.CacheMessage.CachePutResponse
|
|
||||||
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueFoundResponse
|
|
||||||
import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse
|
|
||||||
import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent
|
|
||||||
import net.woggioni.rbcs.common.ByteBufOutputStream
|
import net.woggioni.rbcs.common.ByteBufOutputStream
|
||||||
import net.woggioni.rbcs.common.RBCS.processCacheKey
|
import net.woggioni.rbcs.common.RBCS.processCacheKey
|
||||||
import net.woggioni.rbcs.common.trace
|
|
||||||
import java.util.zip.Deflater
|
import java.util.zip.Deflater
|
||||||
import java.util.zip.DeflaterOutputStream
|
import java.util.zip.DeflaterOutputStream
|
||||||
import java.util.zip.InflaterOutputStream
|
import java.util.zip.InflaterOutputStream
|
||||||
@@ -43,19 +35,15 @@ class InMemoryCacheHandler(
|
|||||||
|
|
||||||
private inner class InProgressPlainPutRequest(ctx: ChannelHandlerContext, override val request: CachePutRequest) :
|
private inner class InProgressPlainPutRequest(ctx: ChannelHandlerContext, override val request: CachePutRequest) :
|
||||||
InProgressPutRequest {
|
InProgressPutRequest {
|
||||||
override val buf = ctx.alloc().compositeBuffer()
|
override val buf = ctx.alloc().compositeHeapBuffer()
|
||||||
|
|
||||||
private val stream = ByteBufOutputStream(buf).let {
|
|
||||||
if (compressionEnabled) {
|
|
||||||
DeflaterOutputStream(it, Deflater(compressionLevel))
|
|
||||||
} else {
|
|
||||||
it
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun append(buf: ByteBuf) {
|
override fun append(buf: ByteBuf) {
|
||||||
|
if(buf.isDirect) {
|
||||||
|
this.buf.writeBytes(buf)
|
||||||
|
} else {
|
||||||
this.buf.addComponent(true, buf.retain())
|
this.buf.addComponent(true, buf.retain())
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
override fun close() {
|
override fun close() {
|
||||||
buf.release()
|
buf.release()
|
||||||
|
@@ -1,5 +1,14 @@
|
|||||||
package net.woggioni.rbcs.server.test.utils;
|
package net.woggioni.rbcs.server.test.utils;
|
||||||
|
|
||||||
|
import java.math.BigInteger;
|
||||||
|
import java.security.KeyPair;
|
||||||
|
import java.security.KeyPairGenerator;
|
||||||
|
import java.security.PrivateKey;
|
||||||
|
import java.security.SecureRandom;
|
||||||
|
import java.security.cert.X509Certificate;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.time.temporal.ChronoUnit;
|
||||||
|
import java.util.Date;
|
||||||
import org.bouncycastle.asn1.DERSequence;
|
import org.bouncycastle.asn1.DERSequence;
|
||||||
import org.bouncycastle.asn1.x500.X500Name;
|
import org.bouncycastle.asn1.x500.X500Name;
|
||||||
import org.bouncycastle.asn1.x509.BasicConstraints;
|
import org.bouncycastle.asn1.x509.BasicConstraints;
|
||||||
@@ -15,16 +24,6 @@ import org.bouncycastle.cert.jcajce.JcaX509v3CertificateBuilder;
|
|||||||
import org.bouncycastle.operator.ContentSigner;
|
import org.bouncycastle.operator.ContentSigner;
|
||||||
import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder;
|
import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder;
|
||||||
|
|
||||||
import java.math.BigInteger;
|
|
||||||
import java.security.KeyPair;
|
|
||||||
import java.security.KeyPairGenerator;
|
|
||||||
import java.security.PrivateKey;
|
|
||||||
import java.security.SecureRandom;
|
|
||||||
import java.security.cert.X509Certificate;
|
|
||||||
import java.time.Instant;
|
|
||||||
import java.time.temporal.ChronoUnit;
|
|
||||||
import java.util.Date;
|
|
||||||
|
|
||||||
public class CertificateUtils {
|
public class CertificateUtils {
|
||||||
|
|
||||||
public record X509Credentials(
|
public record X509Credentials(
|
||||||
|
Reference in New Issue
Block a user