diff --git a/rbcs-api/src/main/java/net/woggioni/rbcs/api/CacheValueMetadata.java b/rbcs-api/src/main/java/net/woggioni/rbcs/api/CacheValueMetadata.java index dce5407..129b3e0 100644 --- a/rbcs-api/src/main/java/net/woggioni/rbcs/api/CacheValueMetadata.java +++ b/rbcs-api/src/main/java/net/woggioni/rbcs/api/CacheValueMetadata.java @@ -1,10 +1,9 @@ package net.woggioni.rbcs.api; +import java.io.Serializable; import lombok.Getter; import lombok.RequiredArgsConstructor; -import java.io.Serializable; - @Getter @RequiredArgsConstructor public class CacheValueMetadata implements Serializable { diff --git a/rbcs-api/src/main/java/net/woggioni/rbcs/api/Configuration.java b/rbcs-api/src/main/java/net/woggioni/rbcs/api/Configuration.java index 7ef2dba..25ff212 100644 --- a/rbcs-api/src/main/java/net/woggioni/rbcs/api/Configuration.java +++ b/rbcs-api/src/main/java/net/woggioni/rbcs/api/Configuration.java @@ -1,16 +1,15 @@ package net.woggioni.rbcs.api; -import lombok.EqualsAndHashCode; -import lombok.NonNull; -import lombok.Value; - import java.nio.file.Path; import java.security.cert.X509Certificate; import java.time.Duration; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import lombok.EqualsAndHashCode; +import lombok.NonNull; +import lombok.Value; @Value public class Configuration { diff --git a/rbcs-cli/src/main/kotlin/net/woggioni/rbcs/cli/impl/commands/ClientCommand.kt b/rbcs-cli/src/main/kotlin/net/woggioni/rbcs/cli/impl/commands/ClientCommand.kt index c5589c1..a43ee55 100644 --- a/rbcs-cli/src/main/kotlin/net/woggioni/rbcs/cli/impl/commands/ClientCommand.kt +++ b/rbcs-cli/src/main/kotlin/net/woggioni/rbcs/cli/impl/commands/ClientCommand.kt @@ -6,7 +6,6 @@ import net.woggioni.rbcs.client.Configuration import net.woggioni.rbcs.common.createLogger import net.woggioni.rbcs.common.debug import picocli.CommandLine -import java.lang.IllegalArgumentException import java.nio.file.Path @CommandLine.Command( diff --git a/rbcs-client/src/test/kotlin/net/woggioni/rbcs/client/RetryTest.kt b/rbcs-client/src/test/kotlin/net/woggioni/rbcs/client/RetryTest.kt index f7075cc..36b4413 100644 --- a/rbcs-client/src/test/kotlin/net/woggioni/rbcs/client/RetryTest.kt +++ b/rbcs-client/src/test/kotlin/net/woggioni/rbcs/client/RetryTest.kt @@ -129,7 +129,7 @@ class RetryTest { previousAttempt.first + testArgs.initialDelay * Math.pow(testArgs.exp, index.toDouble()) * 1e6 val actualTimestamp = timestamp val err = Math.abs(expectedTimestamp - actualTimestamp) / expectedTimestamp - Assertions.assertTrue(err < 0.1) + Assertions.assertTrue(err < 0.5) } if (index == attempts.size - 1 && index < testArgs.maxAttempt - 1) { /* diff --git a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheConfiguration.kt b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheConfiguration.kt index f5842da..de1b604 100644 --- a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheConfiguration.kt +++ b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheConfiguration.kt @@ -1,7 +1,6 @@ package net.woggioni.rbcs.server.memcache import io.netty.channel.ChannelFactory -import io.netty.channel.ChannelHandler import io.netty.channel.EventLoopGroup import io.netty.channel.pool.FixedChannelPool import io.netty.channel.socket.DatagramChannel diff --git a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheHandler.kt b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheHandler.kt index faf5c03..05df4e5 100644 --- a/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheHandler.kt +++ b/rbcs-server-memcache/src/main/kotlin/net/woggioni/rbcs/server/memcache/MemcacheCacheHandler.kt @@ -111,17 +111,17 @@ class MemcacheCacheHandler( } if (responseSent) { acc.readBytes(outputStream, acc.readableBytes()) - if(acc.readableBytes() >= chunkSize) { + if (acc.readableBytes() >= chunkSize) { flush(false) } } } - private fun flush(last : Boolean) { + private fun flush(last: Boolean) { val toSend = extractChunk(chunk, ctx.alloc()) - val msg = if(last) { + val msg = if (last) { log.trace(ctx) { - "Sending last chunk to client on channel" + "Sending last chunk to client" } LastCacheContent(toSend) } else { @@ -148,14 +148,14 @@ class MemcacheCacheHandler( } private inner class InProgressPutRequest( - private val ch : NettyChannel, - metadata : CacheValueMetadata, - val digest : ByteBuf, + private val ch: NettyChannel, + metadata: CacheValueMetadata, + val digest: ByteBuf, val requestController: CompletableFuture, private val alloc: ByteBufAllocator ) : InProgressRequest { private var totalSize = 0 - private var tmpFile : FileChannel? = null + private var tmpFile: FileChannel? = null private val accumulator = alloc.compositeBuffer() private val stream = ByteBufOutputStream(accumulator).let { if (compressionEnabled) { @@ -182,7 +182,7 @@ class MemcacheCacheHandler( tmpFile?.let { flushToDisk(it, accumulator) } - if(accumulator.readableBytes() > 0x100000) { + if (accumulator.readableBytes() > 0x100000) { log.debug(ch) { "Entry is too big, buffering it into a file" } @@ -199,18 +199,18 @@ class MemcacheCacheHandler( } } - private fun flushToDisk(fc : FileChannel, buf : CompositeByteBuf) { + private fun flushToDisk(fc: FileChannel, buf: CompositeByteBuf) { val chunk = extractChunk(buf, alloc) fc.write(chunk.nioBuffer()) chunk.release() } - fun commit() : Pair { + fun commit(): Pair { digest.release() accumulator.retain() stream.close() val fileChannel = tmpFile - return if(fileChannel != null) { + return if (fileChannel != null) { flushToDisk(fileChannel, accumulator) accumulator.release() fileChannel.position(0) @@ -244,7 +244,67 @@ class MemcacheCacheHandler( } private fun handleGetRequest(ctx: ChannelHandlerContext, msg: CacheGetRequest) { - inProgressRequest = InProgressGetRequest(msg.key, ctx) + log.debug(ctx) { + "Fetching ${msg.key} from memcache" + } + val key = ctx.alloc().buffer().also { + it.writeBytes(processCacheKey(msg.key, digestAlgorithm)) + } + val responseHandler = object : MemcacheResponseHandler { + override fun responseReceived(response: BinaryMemcacheResponse) { + val status = response.status() + when (status) { + BinaryMemcacheResponseStatus.SUCCESS -> { + log.debug(ctx) { + "Cache hit for key ${msg.key} on memcache" + } + inProgressRequest = InProgressGetRequest(msg.key, ctx) + } + + BinaryMemcacheResponseStatus.KEY_ENOENT -> { + log.debug(ctx) { + "Cache miss for key ${msg.key} on memcache" + } + sendMessageAndFlush(ctx, CacheValueNotFoundResponse()) + } + } + } + + override fun contentReceived(content: MemcacheContent) { + log.trace(ctx) { + "${if (content is LastMemcacheContent) "Last chunk" else "Chunk"} of ${ + content.content().readableBytes() + } bytes received from memcache for key ${msg.key}" + } + (inProgressRequest as? InProgressGetRequest)?.let { inProgressGetRequest -> + inProgressGetRequest.write(content.content()) + if (content is LastMemcacheContent) { + inProgressRequest = null + inProgressGetRequest.commit() + } + } + } + + override fun exceptionCaught(ex: Throwable) { + (inProgressRequest as? InProgressGetRequest).let { inProgressGetRequest -> + inProgressGetRequest?.let { + inProgressRequest = null + it.rollback() + } + } + this@MemcacheCacheHandler.exceptionCaught(ctx, ex) + } + } + client.sendRequest(key.retainedDuplicate(), responseHandler).thenAccept { requestHandle -> + log.trace(ctx) { + "Sending GET request for key ${msg.key} to memcache" + } + val request = DefaultBinaryMemcacheRequest(key).apply { + setOpcode(BinaryMemcacheOpcodes.GET) + } + requestHandle.sendRequest(request) + requestHandle.sendContent(LastMemcacheContent.EMPTY_LAST_CONTENT) + } } private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) { @@ -261,6 +321,7 @@ class MemcacheCacheHandler( } sendMessageAndFlush(ctx, CachePutResponse(msg.key)) } + else -> this@MemcacheCacheHandler.exceptionCaught(ctx, MemcacheException(status)) } } @@ -282,13 +343,14 @@ class MemcacheCacheHandler( private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) { val request = inProgressRequest - when(request) { + when (request) { is InProgressPutRequest -> { log.trace(ctx) { "Received chunk of ${msg.content().readableBytes()} bytes for memcache" } request.write(msg.content()) } + is InProgressGetRequest -> { msg.release() } @@ -297,7 +359,7 @@ class MemcacheCacheHandler( private fun handleLastCacheContent(ctx: ChannelHandlerContext, msg: LastCacheContent) { val request = inProgressRequest - when(request) { + when (request) { is InProgressPutRequest -> { inProgressRequest = null log.trace(ctx) { @@ -314,7 +376,7 @@ class MemcacheCacheHandler( "Trying to send SET request to memcache" } request.requestController.whenComplete { requestController, ex -> - if(ex == null) { + if (ex == null) { log.trace(ctx) { "Sending SET request to memcache" } @@ -332,7 +394,7 @@ class MemcacheCacheHandler( while (true) { val read = source.read(bb) bb.limit() - if(read >= 0 && bb.position() < chunkSize && bb.hasRemaining()) { + if (read >= 0 && bb.position() < chunkSize && bb.hasRemaining()) { continue } val chunk = ctx.alloc().buffer(chunkSize) @@ -342,7 +404,7 @@ class MemcacheCacheHandler( log.trace(ctx) { "Sending ${chunk.readableBytes()} bytes chunk to memcache" } - if(read < 0) { + if (read < 0) { requestController.sendContent(DefaultLastMemcacheContent(chunk)) break } else { @@ -355,73 +417,12 @@ class MemcacheCacheHandler( } } } - is InProgressGetRequest -> { - log.debug(ctx) { - "Fetching ${request.key} from memcache" - } - val key = ctx.alloc().buffer().also { - it.writeBytes(processCacheKey(request.key, digestAlgorithm)) - } - val responseHandler = object : MemcacheResponseHandler { - override fun responseReceived(response: BinaryMemcacheResponse) { - val status = response.status() - when (status) { - BinaryMemcacheResponseStatus.SUCCESS -> { - log.debug(ctx) { - "Cache hit for key ${request.key} on memcache" - } - inProgressRequest = InProgressGetRequest(request.key, ctx) - } - - BinaryMemcacheResponseStatus.KEY_ENOENT -> { - log.debug(ctx) { - "Cache miss for key ${request.key} on memcache" - } - sendMessageAndFlush(ctx, CacheValueNotFoundResponse()) - } - } - } - - override fun contentReceived(content: MemcacheContent) { - log.trace(ctx) { - "${if(content is LastMemcacheContent) "Last chunk" else "Chunk"} of ${content.content().readableBytes()} bytes received from memcache for key ${request.key}" - } - (inProgressRequest as? InProgressGetRequest).let { inProgressGetRequest -> - inProgressGetRequest?.write(content.content()) - if (content is LastMemcacheContent) { - inProgressRequest = null - inProgressGetRequest?.commit() - } - } - } - - override fun exceptionCaught(ex: Throwable) { - (inProgressRequest as? InProgressGetRequest).let { inProgressGetRequest -> - inProgressGetRequest?.let { - inProgressRequest = null - it.rollback() - } - } - this@MemcacheCacheHandler.exceptionCaught(ctx, ex) - } - } - client.sendRequest(key.retainedDuplicate(), responseHandler).thenAccept { requestHandle -> - log.trace(ctx) { - "Sending GET request for key ${request.key} to memcache" - } - val request = DefaultBinaryMemcacheRequest(key).apply { - setOpcode(BinaryMemcacheOpcodes.GET) - } - requestHandle.sendRequest(request) - requestHandle.sendContent(LastMemcacheContent.EMPTY_LAST_CONTENT) - } - } } } override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { val request = inProgressRequest - when(request) { + when (request) { is InProgressPutRequest -> { inProgressRequest = null request.requestController.thenAccept { controller -> @@ -429,6 +430,7 @@ class MemcacheCacheHandler( } request.rollback() } + is InProgressGetRequest -> { inProgressRequest = null request.rollback() diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheConfiguration.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheConfiguration.kt index 52ddafa..11d3d8d 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheConfiguration.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheConfiguration.kt @@ -4,7 +4,6 @@ import io.netty.channel.ChannelFactory import io.netty.channel.EventLoopGroup import io.netty.channel.socket.DatagramChannel import io.netty.channel.socket.SocketChannel -import io.netty.util.concurrent.Future import net.woggioni.rbcs.api.CacheHandlerFactory import net.woggioni.rbcs.api.Configuration import net.woggioni.rbcs.common.RBCS diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheHandler.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheHandler.kt index f8dcb94..821df02 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheHandler.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCacheHandler.kt @@ -2,7 +2,6 @@ package net.woggioni.rbcs.server.cache import io.netty.buffer.ByteBuf import io.netty.channel.ChannelHandlerContext -import io.netty.channel.SimpleChannelInboundHandler import net.woggioni.rbcs.api.CacheHandler import net.woggioni.rbcs.api.message.CacheMessage import net.woggioni.rbcs.api.message.CacheMessage.CacheContent @@ -14,7 +13,6 @@ import net.woggioni.rbcs.api.message.CacheMessage.CacheValueNotFoundResponse import net.woggioni.rbcs.api.message.CacheMessage.LastCacheContent import net.woggioni.rbcs.common.ByteBufOutputStream import net.woggioni.rbcs.common.RBCS.processCacheKey -import net.woggioni.rbcs.common.trace import java.util.zip.Deflater import java.util.zip.DeflaterOutputStream import java.util.zip.InflaterOutputStream diff --git a/rbcs-server/src/test/java/net/woggioni/rbcs/server/test/utils/CertificateUtils.java b/rbcs-server/src/test/java/net/woggioni/rbcs/server/test/utils/CertificateUtils.java index 09a7a93..16452c0 100644 --- a/rbcs-server/src/test/java/net/woggioni/rbcs/server/test/utils/CertificateUtils.java +++ b/rbcs-server/src/test/java/net/woggioni/rbcs/server/test/utils/CertificateUtils.java @@ -1,5 +1,14 @@ package net.woggioni.rbcs.server.test.utils; +import java.math.BigInteger; +import java.security.KeyPair; +import java.security.KeyPairGenerator; +import java.security.PrivateKey; +import java.security.SecureRandom; +import java.security.cert.X509Certificate; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Date; import org.bouncycastle.asn1.DERSequence; import org.bouncycastle.asn1.x500.X500Name; import org.bouncycastle.asn1.x509.BasicConstraints; @@ -15,16 +24,6 @@ import org.bouncycastle.cert.jcajce.JcaX509v3CertificateBuilder; import org.bouncycastle.operator.ContentSigner; import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder; -import java.math.BigInteger; -import java.security.KeyPair; -import java.security.KeyPairGenerator; -import java.security.PrivateKey; -import java.security.SecureRandom; -import java.security.cert.X509Certificate; -import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.util.Date; - public class CertificateUtils { public record X509Credentials(