Compare commits

...

5 Commits

26 changed files with 413 additions and 144 deletions

View File

@@ -27,6 +27,7 @@ public class Configuration {
Cache cache;
Authentication authentication;
Tls tls;
boolean useNativeTransport;
@Value
public static class EventExecutor {
@@ -151,7 +152,8 @@ public class Configuration {
Map<String, Group> groups,
Cache cache,
Authentication authentication,
Tls tls
Tls tls,
boolean useNativeTransport
) {
return new Configuration(
host,
@@ -164,7 +166,8 @@ public class Configuration {
groups,
cache,
authentication,
tls
tls,
useNativeTransport
);
}
}

View File

@@ -32,6 +32,13 @@ configurations {
canBeResolved = true
visible = true
}
nativeLibraries {
transitive = false
canBeConsumed = false
canBeResolved = true
visible = false
}
}
envelopeJar {
@@ -53,6 +60,8 @@ dependencies {
// runtimeOnly catalog.slf4j.jdk14
runtimeOnly catalog.logback.classic
// runtimeOnly catalog.slf4j.simple
nativeLibraries group: 'io.netty', name: 'netty-transport-native-epoll', version: catalog.versions.netty.get(), classifier: 'linux-x86_64'
}
Provider<EnvelopeJarTask> envelopeJarTaskProvider = tasks.named('envelopeJar', EnvelopeJarTask.class) {
@@ -67,6 +76,9 @@ Provider<EnvelopeJarTask> envelopeJarTaskProvider = tasks.named('envelopeJar', E
// systemProperties['org.slf4j.simpleLogger.log.com.google.code.yanf4j'] = 'warn'
// systemProperties['org.slf4j.simpleLogger.log.net.rubyeye.xmemcached'] = 'warn'
// systemProperties['org.slf4j.simpleLogger.dateTimeFormat'] = 'yyyy-MM-dd\'T\'HH:mm:ss.SSSZ'
from {
configurations.nativeLibraries.collect { it.isDirectory() ? it : zipTree(it) }
}
}
tasks.named(NativeImagePlugin.CONFIGURE_NATIVE_IMAGE_TASK_NAME, NativeImageConfigurationTask) {

View File

@@ -5,6 +5,7 @@ import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.cli.impl.commands.BenchmarkCommand
import net.woggioni.gbcs.cli.impl.commands.ClientCommand
import net.woggioni.gbcs.cli.impl.commands.GetCommand
import net.woggioni.gbcs.cli.impl.commands.HealthCheckCommand
import net.woggioni.gbcs.cli.impl.commands.PasswordHashCommand
import net.woggioni.gbcs.cli.impl.commands.PutCommand
import net.woggioni.gbcs.cli.impl.commands.ServerCommand
@@ -24,8 +25,12 @@ class GradleBuildCacheServerCli : GbcsCommand() {
companion object {
@JvmStatic
fun main(vararg args: String) {
Thread.currentThread().contextClassLoader = GradleBuildCacheServerCli::class.java.classLoader
GbcsUrlStreamHandlerFactory.install()
val currentClassLoader = GradleBuildCacheServerCli::class.java.classLoader
Thread.currentThread().contextClassLoader = currentClassLoader
if(currentClassLoader.javaClass.name == "net.woggioni.envelope.loader.ModuleClassLoader") {
//We're running in an envelope jar and custom URL protocols won't work
GbcsUrlStreamHandlerFactory.install()
}
val log = contextLogger()
val app = Application.builder("gbcs")
.configurationDirectoryEnvVar("GBCS_CONFIGURATION_DIR")
@@ -44,6 +49,7 @@ class GradleBuildCacheServerCli : GbcsCommand() {
addSubcommand(BenchmarkCommand())
addSubcommand(PutCommand())
addSubcommand(GetCommand())
addSubcommand(HealthCheckCommand())
})
System.exit(commandLine.execute(*args))
}

View File

@@ -46,90 +46,97 @@ class BenchmarkCommand : GbcsCommand() {
clientCommand.configuration.profiles[profileName]
?: throw IllegalArgumentException("Profile $profileName does not exist in configuration")
}
val client = GradleBuildCacheClient(profile)
GradleBuildCacheClient(profile).use { client ->
val entryGenerator = sequence {
val random = Random(SecureRandom.getInstance("NativePRNGNonBlocking").nextLong())
while (true) {
val key = JWO.bytesToHex(random.nextBytes(16))
val content = random.nextInt().toByte()
val value = ByteArray(size, { _ -> content })
yield(key to value)
val entryGenerator = sequence {
val random = Random(SecureRandom.getInstance("NativePRNGNonBlocking").nextLong())
while (true) {
val key = JWO.bytesToHex(random.nextBytes(16))
val content = random.nextInt().toByte()
val value = ByteArray(size, { _ -> content })
yield(key to value)
}
}
}
log.info {
"Starting insertion"
}
val entries = let {
val completionCounter = AtomicLong(0)
val completionQueue = LinkedBlockingQueue<Pair<String, ByteArray>>(numberOfEntries)
val start = Instant.now()
val semaphore = Semaphore(profile.maxConnections * 3)
val iterator = entryGenerator.take(numberOfEntries).iterator()
while(completionCounter.get() < numberOfEntries) {
if(iterator.hasNext()) {
val entry = iterator.next()
semaphore.acquire()
val future = client.put(entry.first, entry.second).thenApply { entry }
future.whenComplete { result, ex ->
if (ex != null) {
log.error(ex.message, ex)
} else {
completionQueue.put(result)
log.info {
"Starting insertion"
}
val entries = let {
val completionCounter = AtomicLong(0)
val completionQueue = LinkedBlockingQueue<Pair<String, ByteArray>>(numberOfEntries)
val start = Instant.now()
val semaphore = Semaphore(profile.maxConnections * 3)
val iterator = entryGenerator.take(numberOfEntries).iterator()
while (completionCounter.get() < numberOfEntries) {
if (iterator.hasNext()) {
val entry = iterator.next()
semaphore.acquire()
val future = client.put(entry.first, entry.second).thenApply { entry }
future.whenComplete { result, ex ->
if (ex != null) {
log.error(ex.message, ex)
} else {
completionQueue.put(result)
}
semaphore.release()
completionCounter.incrementAndGet()
}
semaphore.release()
completionCounter.incrementAndGet()
} else {
Thread.sleep(0)
}
}
}
val inserted = completionQueue.toList()
val end = Instant.now()
val inserted = completionQueue.toList()
val end = Instant.now()
log.info {
val elapsed = Duration.between(start, end).toMillis()
val opsPerSecond = String.format("%.2f", numberOfEntries.toDouble() / elapsed * 1000)
"Insertion rate: $opsPerSecond ops/s"
}
inserted
}
log.info {
val elapsed = Duration.between(start, end).toMillis()
val opsPerSecond = String.format("%.2f", numberOfEntries.toDouble() / elapsed * 1000)
"Insertion rate: $opsPerSecond ops/s"
"Inserted ${entries.size} entries"
}
inserted
}
log.info {
"Inserted ${entries.size} entries"
}
log.info {
"Starting retrieval"
}
if (entries.isNotEmpty()) {
val completionCounter = AtomicLong(0)
val semaphore = Semaphore(profile.maxConnections * 3)
val start = Instant.now()
entries.forEach { entry ->
semaphore.acquire()
val future = client.get(entry.first).thenApply {
if (it == null) {
log.error {
"Missing entry for key '${entry.first}'"
log.info {
"Starting retrieval"
}
if (entries.isNotEmpty()) {
val completionCounter = AtomicLong(0)
val semaphore = Semaphore(profile.maxConnections * 3)
val start = Instant.now()
val it = entries.iterator()
while (completionCounter.get() < entries.size) {
if (it.hasNext()) {
val entry = it.next()
val future = client.get(entry.first).thenApply {
if (it == null) {
log.error {
"Missing entry for key '${entry.first}'"
}
} else if (!entry.second.contentEquals(it)) {
log.error {
"Retrieved a value different from what was inserted for key '${entry.first}'"
}
}
}
} else if (!entry.second.contentEquals(it)) {
log.error {
"Retrieved a value different from what was inserted for key '${entry.first}'"
future.whenComplete { _, _ ->
completionCounter.incrementAndGet()
semaphore.release()
}
} else {
Thread.sleep(0)
}
}
future.whenComplete { _, _ ->
completionCounter.incrementAndGet()
semaphore.release()
val end = Instant.now()
log.info {
val elapsed = Duration.between(start, end).toMillis()
val opsPerSecond = String.format("%.2f", entries.size.toDouble() / elapsed * 1000)
"Retrieval rate: $opsPerSecond ops/s"
}
} else {
log.error("Skipping retrieval benchmark as it was not possible to insert any entry in the cache")
}
val end = Instant.now()
log.info {
val elapsed = Duration.between(start, end).toMillis()
val opsPerSecond = String.format("%.2f", entries.size.toDouble() / elapsed * 1000)
"Retrieval rate: $opsPerSecond ops/s"
}
} else {
log.error("Skipping retrieval benchmark as it was not possible to insert any entry in the cache")
}
}
}

View File

@@ -0,0 +1,45 @@
package net.woggioni.gbcs.cli.impl.commands
import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.client.GradleBuildCacheClient
import net.woggioni.gbcs.common.contextLogger
import picocli.CommandLine
import java.security.SecureRandom
import kotlin.random.Random
@CommandLine.Command(
name = "health",
description = ["Check server health"],
showDefaultValues = true
)
class HealthCheckCommand : GbcsCommand() {
private val log = contextLogger()
@CommandLine.Spec
private lateinit var spec: CommandLine.Model.CommandSpec
override fun run() {
val clientCommand = spec.parent().userObject() as ClientCommand
val profile = clientCommand.profileName.let { profileName ->
clientCommand.configuration.profiles[profileName]
?: throw IllegalArgumentException("Profile $profileName does not exist in configuration")
}
GradleBuildCacheClient(profile).use { client ->
val random = Random(SecureRandom.getInstance("NativePRNGNonBlocking").nextLong())
val nonce = ByteArray(0xa0)
random.nextBytes(nonce)
client.healthCheck(nonce).thenApply { value ->
if(value == null) {
throw IllegalStateException("Empty response from server")
}
for(i in 0 until nonce.size) {
for(j in value.size - nonce.size until nonce.size) {
if(nonce[i] != value[j]) {
throw IllegalStateException("Server nonce does not match")
}
}
}
}.get()
}
}
}

View File

@@ -2,6 +2,7 @@ package net.woggioni.gbcs.cli.impl.commands
import net.woggioni.gbcs.api.Configuration
import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.cli.impl.converters.DurationConverter
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.common.debug
import net.woggioni.gbcs.common.info
@@ -13,6 +14,7 @@ import picocli.CommandLine
import java.io.ByteArrayOutputStream
import java.nio.file.Files
import java.nio.file.Path
import java.time.Duration
@CommandLine.Command(
name = "server",
@@ -35,6 +37,14 @@ class ServerCommand(app : Application) : GbcsCommand() {
}
}
@CommandLine.Option(
names = ["-t", "--timeout"],
description = ["Exit after the specified time"],
paramLabel = "TIMEOUT",
converter = [DurationConverter::class]
)
private var timeout: Duration? = null
@CommandLine.Option(
names = ["-c", "--config-file"],
description = ["Read the application configuration from this file"],
@@ -42,10 +52,6 @@ class ServerCommand(app : Application) : GbcsCommand() {
)
private var configurationFile: Path = findConfigurationFile(app, "gbcs-server.xml")
val configuration : Configuration by lazy {
GradleBuildCacheServer.loadConfiguration(configurationFile)
}
override fun run() {
if (!Files.exists(configurationFile)) {
Files.createDirectories(configurationFile.parent)
@@ -61,7 +67,11 @@ class ServerCommand(app : Application) : GbcsCommand() {
}
}
val server = GradleBuildCacheServer(configuration)
server.run().use {
server.run().use { server ->
timeout?.let {
Thread.sleep(it)
server.shutdown()
}
}
}
}

View File

@@ -0,0 +1,11 @@
package net.woggioni.gbcs.cli.impl.converters
import picocli.CommandLine
import java.time.Duration
class DurationConverter : CommandLine.ITypeConverter<Duration> {
override fun convert(value: String): Duration {
return Duration.parse(value)
}
}

View File

@@ -15,6 +15,4 @@
<root level="info">
<appender-ref ref="console"/>
</root>
<logger name="com.google.code.yanf4j" level="warn"/>
<logger name="net.rubyeye.xmemcached" level="warn"/>
</configuration>

View File

@@ -213,6 +213,25 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
}
}
fun healthCheck(nonce: ByteArray): CompletableFuture<ByteArray?> {
return executeWithRetry {
sendRequest(profile.serverURI, HttpMethod.TRACE, nonce)
}.thenApply {
val status = it.status()
if (it.status() != HttpResponseStatus.OK) {
throw HttpException(status)
} else {
it.content()
}
}.thenApply { maybeByteBuf ->
maybeByteBuf?.let {
val result = ByteArray(it.readableBytes())
it.getBytes(0, result)
result
}
}
}
fun get(key: String): CompletableFuture<ByteArray?> {
return executeWithRetry {
sendRequest(profile.serverURI.resolve(key), HttpMethod.GET, null)

View File

@@ -0,0 +1,19 @@
package net.woggioni.gbcs.common
import io.netty.buffer.ByteBuf
import java.io.InputStream
import java.io.OutputStream
class ByteBufOutputStream(private val buf : ByteBuf) : OutputStream() {
override fun write(b: Int) {
buf.writeByte(b)
}
override fun write(b: ByteArray, off: Int, len: Int) {
buf.writeBytes(b, off, len)
}
override fun close() {
buf.release()
}
}

View File

@@ -0,0 +1,7 @@
package net.woggioni.gbcs.common
class ResourceNotFoundException(msg : String? = null, cause: Throwable? = null) : RuntimeException(msg, cause) {
}
class ModuleNotFoundException(msg : String? = null, cause: Throwable? = null) : RuntimeException(msg, cause) {
}

View File

@@ -36,13 +36,17 @@ class GbcsUrlStreamHandlerFactory : URLStreamHandlerProvider() {
private class JpmsHandler : URLStreamHandler() {
override fun openConnection(u: URL): URLConnection {
val moduleName = u.host
val thisModule = javaClass.module
val sourceModule = Optional.ofNullable(thisModule)
.map { obj: Module -> obj.layer }
.flatMap { layer: ModuleLayer ->
val moduleName = u.host
layer.findModule(moduleName)
}.orElse(thisModule)
val sourceModule =
thisModule
?.let(Module::getLayer)
?.let { layer: ModuleLayer ->
layer.findModule(moduleName).orElse(null)
} ?: if(thisModule.layer == null) {
thisModule
} else throw ModuleNotFoundException("Module '$moduleName' not found")
return JpmsResourceURLConnection(u, sourceModule)
}
}
@@ -53,7 +57,9 @@ class GbcsUrlStreamHandlerFactory : URLStreamHandlerProvider() {
@Throws(IOException::class)
override fun getInputStream(): InputStream {
return module.getResourceAsStream(getURL().path)
val resource = getURL().path
return module.getResourceAsStream(resource)
?: throw ResourceNotFoundException("Resource '$resource' not found in module '${module.name}'")
}
}

View File

@@ -24,6 +24,7 @@ import io.netty.handler.codec.memcache.binary.FullBinaryMemcacheRequest
import io.netty.handler.codec.memcache.binary.FullBinaryMemcacheResponse
import io.netty.util.concurrent.GenericFutureListener
import net.woggioni.gbcs.common.ByteBufInputStream
import net.woggioni.gbcs.common.ByteBufOutputStream
import net.woggioni.gbcs.common.GBCS.digest
import net.woggioni.gbcs.common.HostAndPort
import net.woggioni.gbcs.common.contextLogger
@@ -114,13 +115,14 @@ class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseabl
val channel = channelFuture.now
val pipeline = channel.pipeline()
channel.pipeline()
.addLast("handler", object : SimpleChannelInboundHandler<FullBinaryMemcacheResponse>() {
.addLast("client-handler", object : SimpleChannelInboundHandler<FullBinaryMemcacheResponse>() {
override fun channelRead0(
ctx: ChannelHandlerContext,
msg: FullBinaryMemcacheResponse
) {
pipeline.removeLast()
pool.release(channel)
msg.touch("The method's caller must remember to release this")
response.complete(msg.retain())
}
@@ -135,6 +137,7 @@ class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseabl
response.completeExceptionally(ex)
}
})
request.touch()
channel.writeAndFlush(request)
} else {
response.completeExceptionally(channelFuture.cause())
@@ -161,28 +164,35 @@ class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseabl
}
}
return sendRequest(request).thenApply { response ->
when (val status = response.status()) {
BinaryMemcacheResponseStatus.SUCCESS -> {
val compressionMode = cfg.compressionMode
val content = response.content()
if (compressionMode != null) {
when (compressionMode) {
MemcacheCacheConfiguration.CompressionMode.GZIP -> {
GZIPInputStream(ByteBufInputStream(content))
}
try {
when (val status = response.status()) {
BinaryMemcacheResponseStatus.SUCCESS -> {
val compressionMode = cfg.compressionMode
val content = response.content().retain()
content.touch()
if (compressionMode != null) {
when (compressionMode) {
MemcacheCacheConfiguration.CompressionMode.GZIP -> {
GZIPInputStream(ByteBufInputStream(content))
}
MemcacheCacheConfiguration.CompressionMode.DEFLATE -> {
InflaterInputStream(ByteBufInputStream(content))
MemcacheCacheConfiguration.CompressionMode.DEFLATE -> {
InflaterInputStream(ByteBufInputStream(content))
}
}
}
} else {
ByteBufInputStream(content)
}.let(Channels::newChannel)
} else {
ByteBufInputStream(content)
}.let(Channels::newChannel)
}
BinaryMemcacheResponseStatus.KEY_ENOENT -> {
null
}
else -> throw MemcacheException(status)
}
BinaryMemcacheResponseStatus.KEY_ENOENT -> {
null
}
else -> throw MemcacheException(status)
} finally {
response.release()
}
}
}
@@ -197,16 +207,18 @@ class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseabl
extras.writeInt(0)
extras.writeInt(encodeExpiry(expiry))
val compressionMode = cfg.compressionMode
content.retain()
val payload = if (compressionMode != null) {
val inputStream = ByteBufInputStream(Unpooled.wrappedBuffer(content))
val baos = ByteArrayOutputStream()
val inputStream = ByteBufInputStream(content)
val buf = content.alloc().buffer()
buf.retain()
val outputStream = when (compressionMode) {
MemcacheCacheConfiguration.CompressionMode.GZIP -> {
GZIPOutputStream(baos)
GZIPOutputStream(ByteBufOutputStream(buf))
}
MemcacheCacheConfiguration.CompressionMode.DEFLATE -> {
DeflaterOutputStream(baos, Deflater(Deflater.DEFAULT_COMPRESSION, false))
DeflaterOutputStream(ByteBufOutputStream(buf), Deflater(Deflater.DEFAULT_COMPRESSION, false))
}
}
inputStream.use { i ->
@@ -214,7 +226,7 @@ class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseabl
JWO.copy(i, o)
}
}
Unpooled.wrappedBuffer(baos.toByteArray())
buf
} else {
content
}
@@ -224,9 +236,13 @@ class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseabl
}
}
return sendRequest(request).thenApply { response ->
when(val status = response.status()) {
BinaryMemcacheResponseStatus.SUCCESS -> null
else -> throw MemcacheException(status)
try {
when (val status = response.status()) {
BinaryMemcacheResponseStatus.SUCCESS -> null
else -> throw MemcacheException(status)
}
} finally {
response.release()
}
}
}

View File

@@ -18,6 +18,7 @@ module net.woggioni.gbcs.server {
requires net.woggioni.jwo;
requires net.woggioni.gbcs.common;
requires net.woggioni.gbcs.api;
requires io.netty.transport.classes.epoll;
exports net.woggioni.gbcs.server;

View File

@@ -10,6 +10,9 @@ import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.channel.ChannelInitializer
import io.netty.channel.ChannelOption
import io.netty.channel.ChannelPromise
import io.netty.channel.MultithreadEventLoopGroup
import io.netty.channel.epoll.EpollEventLoopGroup
import io.netty.channel.epoll.EpollServerSocketChannel
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.codec.compression.CompressionOptions
@@ -49,6 +52,7 @@ import net.woggioni.gbcs.server.exception.ExceptionHandler
import net.woggioni.gbcs.server.handler.ServerHandler
import net.woggioni.gbcs.server.throttling.ThrottlingHandler
import net.woggioni.jwo.JWO
import net.woggioni.jwo.OS
import net.woggioni.jwo.Tuple2
import java.io.OutputStream
import java.net.InetSocketAddress
@@ -399,10 +403,23 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
}
fun run(): ServerHandle {
// Create the multithreaded event loops for the server
val bossGroup = NioEventLoopGroup(1)
val serverSocketChannel = NioServerSocketChannel::class.java
val workerGroup = NioEventLoopGroup(0)
val bossGroup : MultithreadEventLoopGroup
val workerGroup : MultithreadEventLoopGroup
val serverSocketChannel : Class<*>
if(cfg.isUseNativeTransport) {
if(OS.isLinux) {
bossGroup = EpollEventLoopGroup(1)
serverSocketChannel = EpollServerSocketChannel::class.java
workerGroup = EpollEventLoopGroup(0)
} else {
throw java.lang.IllegalArgumentException("Native transport is not supported on ${OS.current.name}")
}
} else {
bossGroup = NioEventLoopGroup(1)
serverSocketChannel = NioServerSocketChannel::class.java
workerGroup = NioEventLoopGroup(0)
}
val eventExecutorGroup = run {
val threadFactory = if (cfg.eventExecutor.isUseVirtualThreads) {
Thread.ofVirtual().factory()

View File

@@ -1,12 +1,12 @@
package net.woggioni.gbcs.server.cache
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.common.ByteBufInputStream
import net.woggioni.gbcs.common.ByteBufOutputStream
import net.woggioni.gbcs.common.GBCS.digestString
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.jwo.JWO
import java.io.ByteArrayOutputStream
import java.nio.channels.Channels
import java.security.MessageDigest
import java.time.Duration
@@ -14,6 +14,7 @@ import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.PriorityBlockingQueue
import java.util.concurrent.atomic.AtomicLong
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.Inflater
@@ -21,11 +22,18 @@ import java.util.zip.InflaterInputStream
class InMemoryCache(
val maxAge: Duration,
val maxSize: Long,
val digestAlgorithm: String?,
val compressionEnabled: Boolean,
val compressionLevel: Int
) : Cache {
companion object {
@JvmStatic
private val log = contextLogger()
}
private val size = AtomicLong()
private val map = ConcurrentHashMap<String, ByteBuf>()
private class RemovalQueueElement(val key: String, val value : ByteBuf, val expiry : Instant) : Comparable<RemovalQueueElement> {
@@ -38,9 +46,17 @@ class InMemoryCache(
private val garbageCollector = Thread {
while(true) {
val el = removalQueue.take()
val buf = el.value
val now = Instant.now()
if(now > el.expiry) {
map.remove(el.key, el.value)
val removed = map.remove(el.key, buf)
if(removed) {
updateSizeAfterRemoval(buf)
//Decrease the reference count for map
buf.release()
}
//Decrease the reference count for removalQueue
buf.release()
} else {
removalQueue.put(el)
Thread.sleep(minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1)))
@@ -50,6 +66,28 @@ class InMemoryCache(
start()
}
private fun removeEldest() : Long {
while(true) {
val el = removalQueue.take()
val buf = el.value
val removed = map.remove(el.key, buf)
//Decrease the reference count for removalQueue
buf.release()
if(removed) {
val newSize = updateSizeAfterRemoval(buf)
//Decrease the reference count for map
buf.release()
return newSize
}
}
}
private fun updateSizeAfterRemoval(removed: ByteBuf) : Long {
return size.updateAndGet { currentSize : Long ->
currentSize - removed.readableBytes()
}
}
override fun close() {
running = false
garbageCollector.join()
@@ -64,11 +102,13 @@ class InMemoryCache(
).let { digest ->
map[digest]
?.let { value ->
val copy = value.retainedDuplicate()
copy.touch("This has to be released by the caller of the cache")
if (compressionEnabled) {
val inflater = Inflater()
Channels.newChannel(InflaterInputStream(ByteBufInputStream(value), inflater))
Channels.newChannel(InflaterInputStream(ByteBufInputStream(copy), inflater))
} else {
Channels.newChannel(ByteBufInputStream(value))
Channels.newChannel(ByteBufInputStream(copy))
}
}
}.let {
@@ -81,18 +121,29 @@ class InMemoryCache(
?.let { md ->
digestString(key.toByteArray(), md)
} ?: key).let { digest ->
content.retain()
val value = if (compressionEnabled) {
val deflater = Deflater(compressionLevel)
val baos = ByteArrayOutputStream()
DeflaterOutputStream(baos, deflater).use { stream ->
JWO.copy(ByteBufInputStream(content), stream)
val buf = content.alloc().buffer()
buf.retain()
DeflaterOutputStream(ByteBufOutputStream(buf), deflater).use { outputStream ->
ByteBufInputStream(content).use { inputStream ->
JWO.copy(inputStream, outputStream)
}
}
Unpooled.wrappedBuffer(baos.toByteArray())
buf
} else {
content
}
map[digest] = value
removalQueue.put(RemovalQueueElement(digest, value, Instant.now().plus(maxAge)))
val old = map.put(digest, value)
val delta = value.readableBytes() - (old?.readableBytes() ?: 0)
var newSize = size.updateAndGet { currentSize : Long ->
currentSize + delta
}
removalQueue.put(RemovalQueueElement(digest, value.retain(), Instant.now().plus(maxAge)))
while(newSize > maxSize) {
newSize = removeEldest()
}
}.let {
CompletableFuture.completedFuture<Void>(null)
}

View File

@@ -6,12 +6,14 @@ import java.time.Duration
data class InMemoryCacheConfiguration(
val maxAge: Duration,
val maxSize: Long,
val digestAlgorithm : String?,
val compressionEnabled: Boolean,
val compressionLevel: Int,
) : Configuration.Cache {
override fun materialize() = InMemoryCache(
maxAge,
maxSize,
digestAlgorithm,
compressionEnabled,
compressionLevel

View File

@@ -21,6 +21,9 @@ class InMemoryCacheProvider : CacheProvider<InMemoryCacheConfiguration> {
val maxAge = el.renderAttribute("max-age")
?.let(Duration::parse)
?: Duration.ofDays(1)
val maxSize = el.renderAttribute("max-size")
?.let(java.lang.Long::decode)
?: 0x1000000
val enableCompression = el.renderAttribute("enable-compression")
?.let(String::toBoolean)
?: true
@@ -31,6 +34,7 @@ class InMemoryCacheProvider : CacheProvider<InMemoryCacheConfiguration> {
return InMemoryCacheConfiguration(
maxAge,
maxSize,
digestAlgorithm,
enableCompression,
compressionLevel
@@ -43,6 +47,7 @@ class InMemoryCacheProvider : CacheProvider<InMemoryCacheConfiguration> {
val prefix = doc.lookupPrefix(GBCS.GBCS_NAMESPACE_URI)
attr("xs:type", "${prefix}:inMemoryCacheType", GBCS.XML_SCHEMA_NAMESPACE_URI)
attr("max-age", maxAge.toString())
attr("max-size", maxSize.toString())
digestAlgorithm?.let { digestAlgorithm ->
attr("digest", digestAlgorithm)
}

View File

@@ -44,6 +44,7 @@ object Parser {
val serverPath = root.renderAttribute("path")
var incomingConnectionsBacklogSize = 1024
var authentication: Authentication? = null
var useNativeTransport = false
for (child in root.asIterable()) {
val tagName = child.localName
when (tagName) {
@@ -136,7 +137,7 @@ object Parser {
}
"event-executor" -> {
val useVirtualThread = root.renderAttribute("use-virtual-threads")
val useVirtualThread = child.renderAttribute("use-virtual-threads")
?.let(String::toBoolean) ?: true
eventExecutor = Configuration.EventExecutor(useVirtualThread)
}
@@ -180,6 +181,10 @@ object Parser {
}
tls = Tls(keyStore, trustStore)
}
"transport" -> {
useNativeTransport = child.renderAttribute("use-native-transport")
?.let(String::toBoolean) ?: false
}
}
}
return Configuration.of(
@@ -194,6 +199,7 @@ object Parser {
cache!!,
authentication,
tls,
useNativeTransport
)
}

View File

@@ -44,8 +44,12 @@ object Serializer {
attr("max-request-size", connection.maxRequestSize.toString())
}
}
node("transport") {
attr("use-native-transport", conf.isUseNativeTransport.toString())
}
node("event-executor") {
attr("use-virtual-threads", conf.eventExecutor.isUseVirtualThreads.toString())
attr("use-virtual-threads", conf.eventExecutor.isUseVirtualThreads.toString())
}
val cache = conf.cache
val serializer : CacheProvider<Configuration.Cache> =

View File

@@ -57,16 +57,29 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
}
ctx.write(response)
val content : Any = when (channel) {
is FileChannel -> DefaultFileRegion(channel, 0, channel.size())
else -> ChunkedNioStream(channel)
}
if (keepAlive) {
ctx.write(content)
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate())
} else {
ctx.writeAndFlush(content)
.addListener(ChannelFutureListener.CLOSE)
when (channel) {
is FileChannel -> {
val content = DefaultFileRegion(channel, 0, channel.size())
if (keepAlive) {
ctx.write(content)
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate())
} else {
ctx.writeAndFlush(content)
.addListener(ChannelFutureListener.CLOSE)
}
}
else -> {
val content = ChunkedNioStream(channel)
if (keepAlive) {
ctx.write(content).addListener {
content.close()
}
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate())
} else {
ctx.writeAndFlush(content)
.addListener(ChannelFutureListener.CLOSE)
}
}
}
} else {
log.debug(ctx) {
@@ -94,7 +107,7 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
log.debug(ctx) {
"Added value for key '$key' to build cache"
}
cache.put(key, msg.content().retain()).thenRun {
cache.put(key, msg.content()).thenRun {
val response = DefaultFullHttpResponse(
msg.protocolVersion(), HttpResponseStatus.CREATED,
Unpooled.copiedBuffer(key.toByteArray())

View File

@@ -9,6 +9,7 @@
<xs:sequence minOccurs="0">
<xs:element name="bind" type="gbcs:bindType" maxOccurs="1"/>
<xs:element name="connection" type="gbcs:connectionType" minOccurs="0" maxOccurs="1"/>
<xs:element name="transport" type="gbcs:transportType" minOccurs="0" maxOccurs="1"/>
<xs:element name="event-executor" type="gbcs:eventExecutorType" minOccurs="0" maxOccurs="1"/>
<xs:element name="cache" type="gbcs:cacheType" maxOccurs="1"/>
<xs:element name="authorization" type="gbcs:authorizationType" minOccurs="0">
@@ -42,6 +43,10 @@
<xs:attribute name="max-request-size" type="xs:unsignedInt" use="optional" default="67108864"/>
</xs:complexType>
<xs:complexType name="transportType">
<xs:attribute name="use-native-transport" type="xs:boolean" use="optional" default="false"/>
</xs:complexType>
<xs:complexType name="eventExecutorType">
<xs:attribute name="use-virtual-threads" type="xs:boolean" use="optional" default="true"/>
</xs:complexType>
@@ -52,6 +57,7 @@
<xs:complexContent>
<xs:extension base="gbcs:cacheType">
<xs:attribute name="max-age" type="xs:duration" default="P1D"/>
<xs:attribute name="max-size" type="xs:token" default="0x1000000"/>
<xs:attribute name="digest" type="xs:token" default="MD5"/>
<xs:attribute name="enable-compression" type="xs:boolean" default="true"/>
<xs:attribute name="compression-level" type="xs:byte" default="-1"/>

View File

@@ -55,6 +55,7 @@ abstract class AbstractBasicAuthServerTest : AbstractServerTest() {
),
Configuration.BasicAuthentication(),
null,
false,
)
Xml.write(Serializer.serialize(cfg), System.out)
}

View File

@@ -171,7 +171,8 @@ abstract class AbstractTlsServerTest : AbstractServerTest() {
Configuration.Tls(
Configuration.KeyStore(this.serverKeyStoreFile, null, SERVER_CERTIFICATE_ENTRY, PASSWORD),
Configuration.TrustStore(this.trustStoreFile, null, false, false),
)
),
false
)
Xml.write(Serializer.serialize(cfg), System.out)
}

View File

@@ -51,10 +51,12 @@ class NoAuthServerTest : AbstractServerTest() {
maxAge = Duration.ofSeconds(3600 * 24),
compressionEnabled = true,
digestAlgorithm = "MD5",
compressionLevel = Deflater.DEFAULT_COMPRESSION
compressionLevel = Deflater.DEFAULT_COMPRESSION,
maxSize = 0x1000000
),
null,
null,
false,
)
Xml.write(Serializer.serialize(cfg), System.out)
}

View File

@@ -10,6 +10,7 @@
write-idle-timeout="PT11M"
idle-timeout="PT30M"
max-request-size="101325"/>
<transport use-native-transport="true"/>
<event-executor use-virtual-threads="false"/>
<cache xs:type="gbcs:fileSystemCacheType" path="/tmp/gbcs" max-age="P7D"/>
<authentication>