Compare commits

...

6 Commits

Author SHA1 Message Date
66ab357d91 added epoll native transport for server 2025-02-05 21:01:49 +08:00
53b24e3d54 improved benchmark accuracy 2025-02-05 19:10:25 +08:00
7d0f24fa58 fixed memory leak in InMemoryCache 2025-02-05 19:09:51 +08:00
1b6cf1bd96 fixed memory leak in memcached plugin 2025-02-05 14:41:11 +08:00
4180df2352 added healthcheck command to client 2025-02-05 00:02:17 +08:00
c2e388b931 switched to ZGC in docker image
All checks were successful
CI / build (push) Successful in 3m28s
2025-02-04 22:46:34 +08:00
27 changed files with 415 additions and 146 deletions

View File

@@ -5,7 +5,7 @@ WORKDIR /home/luser
FROM base-release AS release FROM base-release AS release
ADD gbcs-cli-envelope-*.jar gbcs.jar ADD gbcs-cli-envelope-*.jar gbcs.jar
ENTRYPOINT ["java", "-jar", "/home/luser/gbcs.jar", "server"] ENTRYPOINT ["java", "-XX:+UseZGC", "-XX:+ZGenerational", "-jar", "/home/luser/gbcs.jar", "server"]
FROM base-release AS release-memcache FROM base-release AS release-memcache
ADD --chown=luser:luser gbcs-cli-envelope-*.jar gbcs.jar ADD --chown=luser:luser gbcs-cli-envelope-*.jar gbcs.jar
@@ -13,4 +13,4 @@ RUN mkdir plugins
WORKDIR /home/luser/plugins WORKDIR /home/luser/plugins
RUN --mount=type=bind,source=.,target=/build/distributions tar -xf /build/distributions/gbcs-server-memcache*.tar RUN --mount=type=bind,source=.,target=/build/distributions tar -xf /build/distributions/gbcs-server-memcache*.tar
WORKDIR /home/luser WORKDIR /home/luser
ENTRYPOINT ["java", "-jar", "/home/luser/gbcs.jar", "server"] ENTRYPOINT ["java", "-XX:+UseZGC", "-XX:+ZGenerational", "-jar", "/home/luser/gbcs.jar", "server"]

View File

@@ -27,6 +27,7 @@ public class Configuration {
Cache cache; Cache cache;
Authentication authentication; Authentication authentication;
Tls tls; Tls tls;
boolean useNativeTransport;
@Value @Value
public static class EventExecutor { public static class EventExecutor {
@@ -151,7 +152,8 @@ public class Configuration {
Map<String, Group> groups, Map<String, Group> groups,
Cache cache, Cache cache,
Authentication authentication, Authentication authentication,
Tls tls Tls tls,
boolean useNativeTransport
) { ) {
return new Configuration( return new Configuration(
host, host,
@@ -164,7 +166,8 @@ public class Configuration {
groups, groups,
cache, cache,
authentication, authentication,
tls tls,
useNativeTransport
); );
} }
} }

View File

@@ -32,6 +32,13 @@ configurations {
canBeResolved = true canBeResolved = true
visible = true visible = true
} }
nativeLibraries {
transitive = false
canBeConsumed = false
canBeResolved = true
visible = false
}
} }
envelopeJar { envelopeJar {
@@ -53,6 +60,8 @@ dependencies {
// runtimeOnly catalog.slf4j.jdk14 // runtimeOnly catalog.slf4j.jdk14
runtimeOnly catalog.logback.classic runtimeOnly catalog.logback.classic
// runtimeOnly catalog.slf4j.simple // runtimeOnly catalog.slf4j.simple
nativeLibraries group: 'io.netty', name: 'netty-transport-native-epoll', version: catalog.versions.netty.get(), classifier: 'linux-x86_64'
} }
Provider<EnvelopeJarTask> envelopeJarTaskProvider = tasks.named('envelopeJar', EnvelopeJarTask.class) { Provider<EnvelopeJarTask> envelopeJarTaskProvider = tasks.named('envelopeJar', EnvelopeJarTask.class) {
@@ -67,6 +76,9 @@ Provider<EnvelopeJarTask> envelopeJarTaskProvider = tasks.named('envelopeJar', E
// systemProperties['org.slf4j.simpleLogger.log.com.google.code.yanf4j'] = 'warn' // systemProperties['org.slf4j.simpleLogger.log.com.google.code.yanf4j'] = 'warn'
// systemProperties['org.slf4j.simpleLogger.log.net.rubyeye.xmemcached'] = 'warn' // systemProperties['org.slf4j.simpleLogger.log.net.rubyeye.xmemcached'] = 'warn'
// systemProperties['org.slf4j.simpleLogger.dateTimeFormat'] = 'yyyy-MM-dd\'T\'HH:mm:ss.SSSZ' // systemProperties['org.slf4j.simpleLogger.dateTimeFormat'] = 'yyyy-MM-dd\'T\'HH:mm:ss.SSSZ'
from {
configurations.nativeLibraries.collect { it.isDirectory() ? it : zipTree(it) }
}
} }
tasks.named(NativeImagePlugin.CONFIGURE_NATIVE_IMAGE_TASK_NAME, NativeImageConfigurationTask) { tasks.named(NativeImagePlugin.CONFIGURE_NATIVE_IMAGE_TASK_NAME, NativeImageConfigurationTask) {

View File

@@ -5,6 +5,7 @@ import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.cli.impl.commands.BenchmarkCommand import net.woggioni.gbcs.cli.impl.commands.BenchmarkCommand
import net.woggioni.gbcs.cli.impl.commands.ClientCommand import net.woggioni.gbcs.cli.impl.commands.ClientCommand
import net.woggioni.gbcs.cli.impl.commands.GetCommand import net.woggioni.gbcs.cli.impl.commands.GetCommand
import net.woggioni.gbcs.cli.impl.commands.HealthCheckCommand
import net.woggioni.gbcs.cli.impl.commands.PasswordHashCommand import net.woggioni.gbcs.cli.impl.commands.PasswordHashCommand
import net.woggioni.gbcs.cli.impl.commands.PutCommand import net.woggioni.gbcs.cli.impl.commands.PutCommand
import net.woggioni.gbcs.cli.impl.commands.ServerCommand import net.woggioni.gbcs.cli.impl.commands.ServerCommand
@@ -24,8 +25,12 @@ class GradleBuildCacheServerCli : GbcsCommand() {
companion object { companion object {
@JvmStatic @JvmStatic
fun main(vararg args: String) { fun main(vararg args: String) {
Thread.currentThread().contextClassLoader = GradleBuildCacheServerCli::class.java.classLoader val currentClassLoader = GradleBuildCacheServerCli::class.java.classLoader
Thread.currentThread().contextClassLoader = currentClassLoader
if(currentClassLoader.javaClass.name == "net.woggioni.envelope.loader.ModuleClassLoader") {
//We're running in an envelope jar and custom URL protocols won't work
GbcsUrlStreamHandlerFactory.install() GbcsUrlStreamHandlerFactory.install()
}
val log = contextLogger() val log = contextLogger()
val app = Application.builder("gbcs") val app = Application.builder("gbcs")
.configurationDirectoryEnvVar("GBCS_CONFIGURATION_DIR") .configurationDirectoryEnvVar("GBCS_CONFIGURATION_DIR")
@@ -44,6 +49,7 @@ class GradleBuildCacheServerCli : GbcsCommand() {
addSubcommand(BenchmarkCommand()) addSubcommand(BenchmarkCommand())
addSubcommand(PutCommand()) addSubcommand(PutCommand())
addSubcommand(GetCommand()) addSubcommand(GetCommand())
addSubcommand(HealthCheckCommand())
}) })
System.exit(commandLine.execute(*args)) System.exit(commandLine.execute(*args))
} }

View File

@@ -46,7 +46,7 @@ class BenchmarkCommand : GbcsCommand() {
clientCommand.configuration.profiles[profileName] clientCommand.configuration.profiles[profileName]
?: throw IllegalArgumentException("Profile $profileName does not exist in configuration") ?: throw IllegalArgumentException("Profile $profileName does not exist in configuration")
} }
val client = GradleBuildCacheClient(profile) GradleBuildCacheClient(profile).use { client ->
val entryGenerator = sequence { val entryGenerator = sequence {
val random = Random(SecureRandom.getInstance("NativePRNGNonBlocking").nextLong()) val random = Random(SecureRandom.getInstance("NativePRNGNonBlocking").nextLong())
@@ -81,6 +81,8 @@ class BenchmarkCommand : GbcsCommand() {
semaphore.release() semaphore.release()
completionCounter.incrementAndGet() completionCounter.incrementAndGet()
} }
} else {
Thread.sleep(0)
} }
} }
@@ -103,9 +105,10 @@ class BenchmarkCommand : GbcsCommand() {
val completionCounter = AtomicLong(0) val completionCounter = AtomicLong(0)
val semaphore = Semaphore(profile.maxConnections * 3) val semaphore = Semaphore(profile.maxConnections * 3)
val start = Instant.now() val start = Instant.now()
entries.forEach { entry -> val it = entries.iterator()
semaphore.acquire() while (completionCounter.get() < entries.size) {
if (it.hasNext()) {
val entry = it.next()
val future = client.get(entry.first).thenApply { val future = client.get(entry.first).thenApply {
if (it == null) { if (it == null) {
log.error { log.error {
@@ -121,6 +124,9 @@ class BenchmarkCommand : GbcsCommand() {
completionCounter.incrementAndGet() completionCounter.incrementAndGet()
semaphore.release() semaphore.release()
} }
} else {
Thread.sleep(0)
}
} }
val end = Instant.now() val end = Instant.now()
log.info { log.info {
@@ -133,3 +139,4 @@ class BenchmarkCommand : GbcsCommand() {
} }
} }
} }
}

View File

@@ -0,0 +1,45 @@
package net.woggioni.gbcs.cli.impl.commands
import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.client.GradleBuildCacheClient
import net.woggioni.gbcs.common.contextLogger
import picocli.CommandLine
import java.security.SecureRandom
import kotlin.random.Random
@CommandLine.Command(
name = "health",
description = ["Check server health"],
showDefaultValues = true
)
class HealthCheckCommand : GbcsCommand() {
private val log = contextLogger()
@CommandLine.Spec
private lateinit var spec: CommandLine.Model.CommandSpec
override fun run() {
val clientCommand = spec.parent().userObject() as ClientCommand
val profile = clientCommand.profileName.let { profileName ->
clientCommand.configuration.profiles[profileName]
?: throw IllegalArgumentException("Profile $profileName does not exist in configuration")
}
GradleBuildCacheClient(profile).use { client ->
val random = Random(SecureRandom.getInstance("NativePRNGNonBlocking").nextLong())
val nonce = ByteArray(0xa0)
random.nextBytes(nonce)
client.healthCheck(nonce).thenApply { value ->
if(value == null) {
throw IllegalStateException("Empty response from server")
}
for(i in 0 until nonce.size) {
for(j in value.size - nonce.size until nonce.size) {
if(nonce[i] != value[j]) {
throw IllegalStateException("Server nonce does not match")
}
}
}
}.get()
}
}
}

View File

@@ -2,6 +2,7 @@ package net.woggioni.gbcs.cli.impl.commands
import net.woggioni.gbcs.api.Configuration import net.woggioni.gbcs.api.Configuration
import net.woggioni.gbcs.cli.impl.GbcsCommand import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.cli.impl.converters.DurationConverter
import net.woggioni.gbcs.common.contextLogger import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.common.debug import net.woggioni.gbcs.common.debug
import net.woggioni.gbcs.common.info import net.woggioni.gbcs.common.info
@@ -13,6 +14,7 @@ import picocli.CommandLine
import java.io.ByteArrayOutputStream import java.io.ByteArrayOutputStream
import java.nio.file.Files import java.nio.file.Files
import java.nio.file.Path import java.nio.file.Path
import java.time.Duration
@CommandLine.Command( @CommandLine.Command(
name = "server", name = "server",
@@ -35,6 +37,14 @@ class ServerCommand(app : Application) : GbcsCommand() {
} }
} }
@CommandLine.Option(
names = ["-t", "--timeout"],
description = ["Exit after the specified time"],
paramLabel = "TIMEOUT",
converter = [DurationConverter::class]
)
private var timeout: Duration? = null
@CommandLine.Option( @CommandLine.Option(
names = ["-c", "--config-file"], names = ["-c", "--config-file"],
description = ["Read the application configuration from this file"], description = ["Read the application configuration from this file"],
@@ -42,10 +52,6 @@ class ServerCommand(app : Application) : GbcsCommand() {
) )
private var configurationFile: Path = findConfigurationFile(app, "gbcs-server.xml") private var configurationFile: Path = findConfigurationFile(app, "gbcs-server.xml")
val configuration : Configuration by lazy {
GradleBuildCacheServer.loadConfiguration(configurationFile)
}
override fun run() { override fun run() {
if (!Files.exists(configurationFile)) { if (!Files.exists(configurationFile)) {
Files.createDirectories(configurationFile.parent) Files.createDirectories(configurationFile.parent)
@@ -61,7 +67,11 @@ class ServerCommand(app : Application) : GbcsCommand() {
} }
} }
val server = GradleBuildCacheServer(configuration) val server = GradleBuildCacheServer(configuration)
server.run().use { server.run().use { server ->
timeout?.let {
Thread.sleep(it)
server.shutdown()
}
} }
} }
} }

View File

@@ -0,0 +1,11 @@
package net.woggioni.gbcs.cli.impl.converters
import picocli.CommandLine
import java.time.Duration
class DurationConverter : CommandLine.ITypeConverter<Duration> {
override fun convert(value: String): Duration {
return Duration.parse(value)
}
}

View File

@@ -15,6 +15,4 @@
<root level="info"> <root level="info">
<appender-ref ref="console"/> <appender-ref ref="console"/>
</root> </root>
<logger name="com.google.code.yanf4j" level="warn"/>
<logger name="net.rubyeye.xmemcached" level="warn"/>
</configuration> </configuration>

View File

@@ -213,6 +213,25 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
} }
} }
fun healthCheck(nonce: ByteArray): CompletableFuture<ByteArray?> {
return executeWithRetry {
sendRequest(profile.serverURI, HttpMethod.TRACE, nonce)
}.thenApply {
val status = it.status()
if (it.status() != HttpResponseStatus.OK) {
throw HttpException(status)
} else {
it.content()
}
}.thenApply { maybeByteBuf ->
maybeByteBuf?.let {
val result = ByteArray(it.readableBytes())
it.getBytes(0, result)
result
}
}
}
fun get(key: String): CompletableFuture<ByteArray?> { fun get(key: String): CompletableFuture<ByteArray?> {
return executeWithRetry { return executeWithRetry {
sendRequest(profile.serverURI.resolve(key), HttpMethod.GET, null) sendRequest(profile.serverURI.resolve(key), HttpMethod.GET, null)

View File

@@ -0,0 +1,19 @@
package net.woggioni.gbcs.common
import io.netty.buffer.ByteBuf
import java.io.InputStream
import java.io.OutputStream
class ByteBufOutputStream(private val buf : ByteBuf) : OutputStream() {
override fun write(b: Int) {
buf.writeByte(b)
}
override fun write(b: ByteArray, off: Int, len: Int) {
buf.writeBytes(b, off, len)
}
override fun close() {
buf.release()
}
}

View File

@@ -0,0 +1,7 @@
package net.woggioni.gbcs.common
class ResourceNotFoundException(msg : String? = null, cause: Throwable? = null) : RuntimeException(msg, cause) {
}
class ModuleNotFoundException(msg : String? = null, cause: Throwable? = null) : RuntimeException(msg, cause) {
}

View File

@@ -36,13 +36,17 @@ class GbcsUrlStreamHandlerFactory : URLStreamHandlerProvider() {
private class JpmsHandler : URLStreamHandler() { private class JpmsHandler : URLStreamHandler() {
override fun openConnection(u: URL): URLConnection { override fun openConnection(u: URL): URLConnection {
val thisModule = javaClass.module
val sourceModule = Optional.ofNullable(thisModule)
.map { obj: Module -> obj.layer }
.flatMap { layer: ModuleLayer ->
val moduleName = u.host val moduleName = u.host
layer.findModule(moduleName) val thisModule = javaClass.module
}.orElse(thisModule) val sourceModule =
thisModule
?.let(Module::getLayer)
?.let { layer: ModuleLayer ->
layer.findModule(moduleName).orElse(null)
} ?: if(thisModule.layer == null) {
thisModule
} else throw ModuleNotFoundException("Module '$moduleName' not found")
return JpmsResourceURLConnection(u, sourceModule) return JpmsResourceURLConnection(u, sourceModule)
} }
} }
@@ -53,7 +57,9 @@ class GbcsUrlStreamHandlerFactory : URLStreamHandlerProvider() {
@Throws(IOException::class) @Throws(IOException::class)
override fun getInputStream(): InputStream { override fun getInputStream(): InputStream {
return module.getResourceAsStream(getURL().path) val resource = getURL().path
return module.getResourceAsStream(resource)
?: throw ResourceNotFoundException("Resource '$resource' not found in module '${module.name}'")
} }
} }

View File

@@ -24,6 +24,7 @@ import io.netty.handler.codec.memcache.binary.FullBinaryMemcacheRequest
import io.netty.handler.codec.memcache.binary.FullBinaryMemcacheResponse import io.netty.handler.codec.memcache.binary.FullBinaryMemcacheResponse
import io.netty.util.concurrent.GenericFutureListener import io.netty.util.concurrent.GenericFutureListener
import net.woggioni.gbcs.common.ByteBufInputStream import net.woggioni.gbcs.common.ByteBufInputStream
import net.woggioni.gbcs.common.ByteBufOutputStream
import net.woggioni.gbcs.common.GBCS.digest import net.woggioni.gbcs.common.GBCS.digest
import net.woggioni.gbcs.common.HostAndPort import net.woggioni.gbcs.common.HostAndPort
import net.woggioni.gbcs.common.contextLogger import net.woggioni.gbcs.common.contextLogger
@@ -114,13 +115,14 @@ class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseabl
val channel = channelFuture.now val channel = channelFuture.now
val pipeline = channel.pipeline() val pipeline = channel.pipeline()
channel.pipeline() channel.pipeline()
.addLast("handler", object : SimpleChannelInboundHandler<FullBinaryMemcacheResponse>() { .addLast("client-handler", object : SimpleChannelInboundHandler<FullBinaryMemcacheResponse>() {
override fun channelRead0( override fun channelRead0(
ctx: ChannelHandlerContext, ctx: ChannelHandlerContext,
msg: FullBinaryMemcacheResponse msg: FullBinaryMemcacheResponse
) { ) {
pipeline.removeLast() pipeline.removeLast()
pool.release(channel) pool.release(channel)
msg.touch("The method's caller must remember to release this")
response.complete(msg.retain()) response.complete(msg.retain())
} }
@@ -135,6 +137,7 @@ class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseabl
response.completeExceptionally(ex) response.completeExceptionally(ex)
} }
}) })
request.touch()
channel.writeAndFlush(request) channel.writeAndFlush(request)
} else { } else {
response.completeExceptionally(channelFuture.cause()) response.completeExceptionally(channelFuture.cause())
@@ -161,10 +164,12 @@ class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseabl
} }
} }
return sendRequest(request).thenApply { response -> return sendRequest(request).thenApply { response ->
try {
when (val status = response.status()) { when (val status = response.status()) {
BinaryMemcacheResponseStatus.SUCCESS -> { BinaryMemcacheResponseStatus.SUCCESS -> {
val compressionMode = cfg.compressionMode val compressionMode = cfg.compressionMode
val content = response.content() val content = response.content().retain()
content.touch()
if (compressionMode != null) { if (compressionMode != null) {
when (compressionMode) { when (compressionMode) {
MemcacheCacheConfiguration.CompressionMode.GZIP -> { MemcacheCacheConfiguration.CompressionMode.GZIP -> {
@@ -179,11 +184,16 @@ class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseabl
ByteBufInputStream(content) ByteBufInputStream(content)
}.let(Channels::newChannel) }.let(Channels::newChannel)
} }
BinaryMemcacheResponseStatus.KEY_ENOENT -> { BinaryMemcacheResponseStatus.KEY_ENOENT -> {
null null
} }
else -> throw MemcacheException(status) else -> throw MemcacheException(status)
} }
} finally {
response.release()
}
} }
} }
@@ -197,16 +207,18 @@ class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseabl
extras.writeInt(0) extras.writeInt(0)
extras.writeInt(encodeExpiry(expiry)) extras.writeInt(encodeExpiry(expiry))
val compressionMode = cfg.compressionMode val compressionMode = cfg.compressionMode
content.retain()
val payload = if (compressionMode != null) { val payload = if (compressionMode != null) {
val inputStream = ByteBufInputStream(Unpooled.wrappedBuffer(content)) val inputStream = ByteBufInputStream(content)
val baos = ByteArrayOutputStream() val buf = content.alloc().buffer()
buf.retain()
val outputStream = when (compressionMode) { val outputStream = when (compressionMode) {
MemcacheCacheConfiguration.CompressionMode.GZIP -> { MemcacheCacheConfiguration.CompressionMode.GZIP -> {
GZIPOutputStream(baos) GZIPOutputStream(ByteBufOutputStream(buf))
} }
MemcacheCacheConfiguration.CompressionMode.DEFLATE -> { MemcacheCacheConfiguration.CompressionMode.DEFLATE -> {
DeflaterOutputStream(baos, Deflater(Deflater.DEFAULT_COMPRESSION, false)) DeflaterOutputStream(ByteBufOutputStream(buf), Deflater(Deflater.DEFAULT_COMPRESSION, false))
} }
} }
inputStream.use { i -> inputStream.use { i ->
@@ -214,7 +226,7 @@ class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseabl
JWO.copy(i, o) JWO.copy(i, o)
} }
} }
Unpooled.wrappedBuffer(baos.toByteArray()) buf
} else { } else {
content content
} }
@@ -224,10 +236,14 @@ class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseabl
} }
} }
return sendRequest(request).thenApply { response -> return sendRequest(request).thenApply { response ->
try {
when (val status = response.status()) { when (val status = response.status()) {
BinaryMemcacheResponseStatus.SUCCESS -> null BinaryMemcacheResponseStatus.SUCCESS -> null
else -> throw MemcacheException(status) else -> throw MemcacheException(status)
} }
} finally {
response.release()
}
} }
} }

View File

@@ -18,6 +18,7 @@ module net.woggioni.gbcs.server {
requires net.woggioni.jwo; requires net.woggioni.jwo;
requires net.woggioni.gbcs.common; requires net.woggioni.gbcs.common;
requires net.woggioni.gbcs.api; requires net.woggioni.gbcs.api;
requires io.netty.transport.classes.epoll;
exports net.woggioni.gbcs.server; exports net.woggioni.gbcs.server;

View File

@@ -10,6 +10,9 @@ import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.channel.ChannelInitializer import io.netty.channel.ChannelInitializer
import io.netty.channel.ChannelOption import io.netty.channel.ChannelOption
import io.netty.channel.ChannelPromise import io.netty.channel.ChannelPromise
import io.netty.channel.MultithreadEventLoopGroup
import io.netty.channel.epoll.EpollEventLoopGroup
import io.netty.channel.epoll.EpollServerSocketChannel
import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.nio.NioServerSocketChannel import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.codec.compression.CompressionOptions import io.netty.handler.codec.compression.CompressionOptions
@@ -49,6 +52,7 @@ import net.woggioni.gbcs.server.exception.ExceptionHandler
import net.woggioni.gbcs.server.handler.ServerHandler import net.woggioni.gbcs.server.handler.ServerHandler
import net.woggioni.gbcs.server.throttling.ThrottlingHandler import net.woggioni.gbcs.server.throttling.ThrottlingHandler
import net.woggioni.jwo.JWO import net.woggioni.jwo.JWO
import net.woggioni.jwo.OS
import net.woggioni.jwo.Tuple2 import net.woggioni.jwo.Tuple2
import java.io.OutputStream import java.io.OutputStream
import java.net.InetSocketAddress import java.net.InetSocketAddress
@@ -399,10 +403,23 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
} }
fun run(): ServerHandle { fun run(): ServerHandle {
// Create the multithreaded event loops for the server val bossGroup : MultithreadEventLoopGroup
val bossGroup = NioEventLoopGroup(1) val workerGroup : MultithreadEventLoopGroup
val serverSocketChannel = NioServerSocketChannel::class.java val serverSocketChannel : Class<*>
val workerGroup = NioEventLoopGroup(0) if(cfg.isUseNativeTransport) {
if(OS.isLinux) {
bossGroup = EpollEventLoopGroup(1)
serverSocketChannel = EpollServerSocketChannel::class.java
workerGroup = EpollEventLoopGroup(0)
} else {
throw java.lang.IllegalArgumentException("Native transport is not supported on ${OS.current.name}")
}
} else {
bossGroup = NioEventLoopGroup(1)
serverSocketChannel = NioServerSocketChannel::class.java
workerGroup = NioEventLoopGroup(0)
}
val eventExecutorGroup = run { val eventExecutorGroup = run {
val threadFactory = if (cfg.eventExecutor.isUseVirtualThreads) { val threadFactory = if (cfg.eventExecutor.isUseVirtualThreads) {
Thread.ofVirtual().factory() Thread.ofVirtual().factory()

View File

@@ -1,12 +1,12 @@
package net.woggioni.gbcs.server.cache package net.woggioni.gbcs.server.cache
import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import net.woggioni.gbcs.api.Cache import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.common.ByteBufInputStream import net.woggioni.gbcs.common.ByteBufInputStream
import net.woggioni.gbcs.common.ByteBufOutputStream
import net.woggioni.gbcs.common.GBCS.digestString import net.woggioni.gbcs.common.GBCS.digestString
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.jwo.JWO import net.woggioni.jwo.JWO
import java.io.ByteArrayOutputStream
import java.nio.channels.Channels import java.nio.channels.Channels
import java.security.MessageDigest import java.security.MessageDigest
import java.time.Duration import java.time.Duration
@@ -14,6 +14,7 @@ import java.time.Instant
import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.PriorityBlockingQueue import java.util.concurrent.PriorityBlockingQueue
import java.util.concurrent.atomic.AtomicLong
import java.util.zip.Deflater import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream import java.util.zip.DeflaterOutputStream
import java.util.zip.Inflater import java.util.zip.Inflater
@@ -21,11 +22,18 @@ import java.util.zip.InflaterInputStream
class InMemoryCache( class InMemoryCache(
val maxAge: Duration, val maxAge: Duration,
val maxSize: Long,
val digestAlgorithm: String?, val digestAlgorithm: String?,
val compressionEnabled: Boolean, val compressionEnabled: Boolean,
val compressionLevel: Int val compressionLevel: Int
) : Cache { ) : Cache {
companion object {
@JvmStatic
private val log = contextLogger()
}
private val size = AtomicLong()
private val map = ConcurrentHashMap<String, ByteBuf>() private val map = ConcurrentHashMap<String, ByteBuf>()
private class RemovalQueueElement(val key: String, val value : ByteBuf, val expiry : Instant) : Comparable<RemovalQueueElement> { private class RemovalQueueElement(val key: String, val value : ByteBuf, val expiry : Instant) : Comparable<RemovalQueueElement> {
@@ -38,9 +46,17 @@ class InMemoryCache(
private val garbageCollector = Thread { private val garbageCollector = Thread {
while(true) { while(true) {
val el = removalQueue.take() val el = removalQueue.take()
val buf = el.value
val now = Instant.now() val now = Instant.now()
if(now > el.expiry) { if(now > el.expiry) {
map.remove(el.key, el.value) val removed = map.remove(el.key, buf)
if(removed) {
updateSizeAfterRemoval(buf)
//Decrease the reference count for map
buf.release()
}
//Decrease the reference count for removalQueue
buf.release()
} else { } else {
removalQueue.put(el) removalQueue.put(el)
Thread.sleep(minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1))) Thread.sleep(minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1)))
@@ -50,6 +66,28 @@ class InMemoryCache(
start() start()
} }
private fun removeEldest() : Long {
while(true) {
val el = removalQueue.take()
val buf = el.value
val removed = map.remove(el.key, buf)
//Decrease the reference count for removalQueue
buf.release()
if(removed) {
val newSize = updateSizeAfterRemoval(buf)
//Decrease the reference count for map
buf.release()
return newSize
}
}
}
private fun updateSizeAfterRemoval(removed: ByteBuf) : Long {
return size.updateAndGet { currentSize : Long ->
currentSize - removed.readableBytes()
}
}
override fun close() { override fun close() {
running = false running = false
garbageCollector.join() garbageCollector.join()
@@ -64,11 +102,13 @@ class InMemoryCache(
).let { digest -> ).let { digest ->
map[digest] map[digest]
?.let { value -> ?.let { value ->
val copy = value.retainedDuplicate()
copy.touch("This has to be released by the caller of the cache")
if (compressionEnabled) { if (compressionEnabled) {
val inflater = Inflater() val inflater = Inflater()
Channels.newChannel(InflaterInputStream(ByteBufInputStream(value), inflater)) Channels.newChannel(InflaterInputStream(ByteBufInputStream(copy), inflater))
} else { } else {
Channels.newChannel(ByteBufInputStream(value)) Channels.newChannel(ByteBufInputStream(copy))
} }
} }
}.let { }.let {
@@ -81,18 +121,29 @@ class InMemoryCache(
?.let { md -> ?.let { md ->
digestString(key.toByteArray(), md) digestString(key.toByteArray(), md)
} ?: key).let { digest -> } ?: key).let { digest ->
content.retain()
val value = if (compressionEnabled) { val value = if (compressionEnabled) {
val deflater = Deflater(compressionLevel) val deflater = Deflater(compressionLevel)
val baos = ByteArrayOutputStream() val buf = content.alloc().buffer()
DeflaterOutputStream(baos, deflater).use { stream -> buf.retain()
JWO.copy(ByteBufInputStream(content), stream) DeflaterOutputStream(ByteBufOutputStream(buf), deflater).use { outputStream ->
ByteBufInputStream(content).use { inputStream ->
JWO.copy(inputStream, outputStream)
} }
Unpooled.wrappedBuffer(baos.toByteArray()) }
buf
} else { } else {
content content
} }
map[digest] = value val old = map.put(digest, value)
removalQueue.put(RemovalQueueElement(digest, value, Instant.now().plus(maxAge))) val delta = value.readableBytes() - (old?.readableBytes() ?: 0)
var newSize = size.updateAndGet { currentSize : Long ->
currentSize + delta
}
removalQueue.put(RemovalQueueElement(digest, value.retain(), Instant.now().plus(maxAge)))
while(newSize > maxSize) {
newSize = removeEldest()
}
}.let { }.let {
CompletableFuture.completedFuture<Void>(null) CompletableFuture.completedFuture<Void>(null)
} }

View File

@@ -6,12 +6,14 @@ import java.time.Duration
data class InMemoryCacheConfiguration( data class InMemoryCacheConfiguration(
val maxAge: Duration, val maxAge: Duration,
val maxSize: Long,
val digestAlgorithm : String?, val digestAlgorithm : String?,
val compressionEnabled: Boolean, val compressionEnabled: Boolean,
val compressionLevel: Int, val compressionLevel: Int,
) : Configuration.Cache { ) : Configuration.Cache {
override fun materialize() = InMemoryCache( override fun materialize() = InMemoryCache(
maxAge, maxAge,
maxSize,
digestAlgorithm, digestAlgorithm,
compressionEnabled, compressionEnabled,
compressionLevel compressionLevel

View File

@@ -21,6 +21,9 @@ class InMemoryCacheProvider : CacheProvider<InMemoryCacheConfiguration> {
val maxAge = el.renderAttribute("max-age") val maxAge = el.renderAttribute("max-age")
?.let(Duration::parse) ?.let(Duration::parse)
?: Duration.ofDays(1) ?: Duration.ofDays(1)
val maxSize = el.renderAttribute("max-size")
?.let(java.lang.Long::decode)
?: 0x1000000
val enableCompression = el.renderAttribute("enable-compression") val enableCompression = el.renderAttribute("enable-compression")
?.let(String::toBoolean) ?.let(String::toBoolean)
?: true ?: true
@@ -31,6 +34,7 @@ class InMemoryCacheProvider : CacheProvider<InMemoryCacheConfiguration> {
return InMemoryCacheConfiguration( return InMemoryCacheConfiguration(
maxAge, maxAge,
maxSize,
digestAlgorithm, digestAlgorithm,
enableCompression, enableCompression,
compressionLevel compressionLevel
@@ -43,6 +47,7 @@ class InMemoryCacheProvider : CacheProvider<InMemoryCacheConfiguration> {
val prefix = doc.lookupPrefix(GBCS.GBCS_NAMESPACE_URI) val prefix = doc.lookupPrefix(GBCS.GBCS_NAMESPACE_URI)
attr("xs:type", "${prefix}:inMemoryCacheType", GBCS.XML_SCHEMA_NAMESPACE_URI) attr("xs:type", "${prefix}:inMemoryCacheType", GBCS.XML_SCHEMA_NAMESPACE_URI)
attr("max-age", maxAge.toString()) attr("max-age", maxAge.toString())
attr("max-size", maxSize.toString())
digestAlgorithm?.let { digestAlgorithm -> digestAlgorithm?.let { digestAlgorithm ->
attr("digest", digestAlgorithm) attr("digest", digestAlgorithm)
} }

View File

@@ -44,6 +44,7 @@ object Parser {
val serverPath = root.renderAttribute("path") val serverPath = root.renderAttribute("path")
var incomingConnectionsBacklogSize = 1024 var incomingConnectionsBacklogSize = 1024
var authentication: Authentication? = null var authentication: Authentication? = null
var useNativeTransport = false
for (child in root.asIterable()) { for (child in root.asIterable()) {
val tagName = child.localName val tagName = child.localName
when (tagName) { when (tagName) {
@@ -136,7 +137,7 @@ object Parser {
} }
"event-executor" -> { "event-executor" -> {
val useVirtualThread = root.renderAttribute("use-virtual-threads") val useVirtualThread = child.renderAttribute("use-virtual-threads")
?.let(String::toBoolean) ?: true ?.let(String::toBoolean) ?: true
eventExecutor = Configuration.EventExecutor(useVirtualThread) eventExecutor = Configuration.EventExecutor(useVirtualThread)
} }
@@ -180,6 +181,10 @@ object Parser {
} }
tls = Tls(keyStore, trustStore) tls = Tls(keyStore, trustStore)
} }
"transport" -> {
useNativeTransport = child.renderAttribute("use-native-transport")
?.let(String::toBoolean) ?: false
}
} }
} }
return Configuration.of( return Configuration.of(
@@ -194,6 +199,7 @@ object Parser {
cache!!, cache!!,
authentication, authentication,
tls, tls,
useNativeTransport
) )
} }

View File

@@ -44,8 +44,12 @@ object Serializer {
attr("max-request-size", connection.maxRequestSize.toString()) attr("max-request-size", connection.maxRequestSize.toString())
} }
} }
node("transport") {
attr("use-native-transport", conf.isUseNativeTransport.toString())
}
node("event-executor") { node("event-executor") {
attr("use-virtual-threads", conf.eventExecutor.isUseVirtualThreads.toString()) attr("use-virtual-threads", conf.eventExecutor.isUseVirtualThreads.toString())
attr("use-virtual-threads", conf.eventExecutor.isUseVirtualThreads.toString())
} }
val cache = conf.cache val cache = conf.cache
val serializer : CacheProvider<Configuration.Cache> = val serializer : CacheProvider<Configuration.Cache> =

View File

@@ -57,10 +57,9 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED) response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
} }
ctx.write(response) ctx.write(response)
val content : Any = when (channel) { when (channel) {
is FileChannel -> DefaultFileRegion(channel, 0, channel.size()) is FileChannel -> {
else -> ChunkedNioStream(channel) val content = DefaultFileRegion(channel, 0, channel.size())
}
if (keepAlive) { if (keepAlive) {
ctx.write(content) ctx.write(content)
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate()) ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate())
@@ -68,6 +67,20 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
ctx.writeAndFlush(content) ctx.writeAndFlush(content)
.addListener(ChannelFutureListener.CLOSE) .addListener(ChannelFutureListener.CLOSE)
} }
}
else -> {
val content = ChunkedNioStream(channel)
if (keepAlive) {
ctx.write(content).addListener {
content.close()
}
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate())
} else {
ctx.writeAndFlush(content)
.addListener(ChannelFutureListener.CLOSE)
}
}
}
} else { } else {
log.debug(ctx) { log.debug(ctx) {
"Cache miss for key '$key'" "Cache miss for key '$key'"
@@ -94,7 +107,7 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
log.debug(ctx) { log.debug(ctx) {
"Added value for key '$key' to build cache" "Added value for key '$key' to build cache"
} }
cache.put(key, msg.content().retain()).thenRun { cache.put(key, msg.content()).thenRun {
val response = DefaultFullHttpResponse( val response = DefaultFullHttpResponse(
msg.protocolVersion(), HttpResponseStatus.CREATED, msg.protocolVersion(), HttpResponseStatus.CREATED,
Unpooled.copiedBuffer(key.toByteArray()) Unpooled.copiedBuffer(key.toByteArray())

View File

@@ -9,6 +9,7 @@
<xs:sequence minOccurs="0"> <xs:sequence minOccurs="0">
<xs:element name="bind" type="gbcs:bindType" maxOccurs="1"/> <xs:element name="bind" type="gbcs:bindType" maxOccurs="1"/>
<xs:element name="connection" type="gbcs:connectionType" minOccurs="0" maxOccurs="1"/> <xs:element name="connection" type="gbcs:connectionType" minOccurs="0" maxOccurs="1"/>
<xs:element name="transport" type="gbcs:transportType" minOccurs="0" maxOccurs="1"/>
<xs:element name="event-executor" type="gbcs:eventExecutorType" minOccurs="0" maxOccurs="1"/> <xs:element name="event-executor" type="gbcs:eventExecutorType" minOccurs="0" maxOccurs="1"/>
<xs:element name="cache" type="gbcs:cacheType" maxOccurs="1"/> <xs:element name="cache" type="gbcs:cacheType" maxOccurs="1"/>
<xs:element name="authorization" type="gbcs:authorizationType" minOccurs="0"> <xs:element name="authorization" type="gbcs:authorizationType" minOccurs="0">
@@ -42,6 +43,10 @@
<xs:attribute name="max-request-size" type="xs:unsignedInt" use="optional" default="67108864"/> <xs:attribute name="max-request-size" type="xs:unsignedInt" use="optional" default="67108864"/>
</xs:complexType> </xs:complexType>
<xs:complexType name="transportType">
<xs:attribute name="use-native-transport" type="xs:boolean" use="optional" default="false"/>
</xs:complexType>
<xs:complexType name="eventExecutorType"> <xs:complexType name="eventExecutorType">
<xs:attribute name="use-virtual-threads" type="xs:boolean" use="optional" default="true"/> <xs:attribute name="use-virtual-threads" type="xs:boolean" use="optional" default="true"/>
</xs:complexType> </xs:complexType>
@@ -52,6 +57,7 @@
<xs:complexContent> <xs:complexContent>
<xs:extension base="gbcs:cacheType"> <xs:extension base="gbcs:cacheType">
<xs:attribute name="max-age" type="xs:duration" default="P1D"/> <xs:attribute name="max-age" type="xs:duration" default="P1D"/>
<xs:attribute name="max-size" type="xs:token" default="0x1000000"/>
<xs:attribute name="digest" type="xs:token" default="MD5"/> <xs:attribute name="digest" type="xs:token" default="MD5"/>
<xs:attribute name="enable-compression" type="xs:boolean" default="true"/> <xs:attribute name="enable-compression" type="xs:boolean" default="true"/>
<xs:attribute name="compression-level" type="xs:byte" default="-1"/> <xs:attribute name="compression-level" type="xs:byte" default="-1"/>

View File

@@ -55,6 +55,7 @@ abstract class AbstractBasicAuthServerTest : AbstractServerTest() {
), ),
Configuration.BasicAuthentication(), Configuration.BasicAuthentication(),
null, null,
false,
) )
Xml.write(Serializer.serialize(cfg), System.out) Xml.write(Serializer.serialize(cfg), System.out)
} }

View File

@@ -171,7 +171,8 @@ abstract class AbstractTlsServerTest : AbstractServerTest() {
Configuration.Tls( Configuration.Tls(
Configuration.KeyStore(this.serverKeyStoreFile, null, SERVER_CERTIFICATE_ENTRY, PASSWORD), Configuration.KeyStore(this.serverKeyStoreFile, null, SERVER_CERTIFICATE_ENTRY, PASSWORD),
Configuration.TrustStore(this.trustStoreFile, null, false, false), Configuration.TrustStore(this.trustStoreFile, null, false, false),
) ),
false
) )
Xml.write(Serializer.serialize(cfg), System.out) Xml.write(Serializer.serialize(cfg), System.out)
} }

View File

@@ -51,10 +51,12 @@ class NoAuthServerTest : AbstractServerTest() {
maxAge = Duration.ofSeconds(3600 * 24), maxAge = Duration.ofSeconds(3600 * 24),
compressionEnabled = true, compressionEnabled = true,
digestAlgorithm = "MD5", digestAlgorithm = "MD5",
compressionLevel = Deflater.DEFAULT_COMPRESSION compressionLevel = Deflater.DEFAULT_COMPRESSION,
maxSize = 0x1000000
), ),
null, null,
null, null,
false,
) )
Xml.write(Serializer.serialize(cfg), System.out) Xml.write(Serializer.serialize(cfg), System.out)
} }

View File

@@ -10,6 +10,7 @@
write-idle-timeout="PT11M" write-idle-timeout="PT11M"
idle-timeout="PT30M" idle-timeout="PT30M"
max-request-size="101325"/> max-request-size="101325"/>
<transport use-native-transport="true"/>
<event-executor use-virtual-threads="false"/> <event-executor use-virtual-threads="false"/>
<cache xs:type="gbcs:fileSystemCacheType" path="/tmp/gbcs" max-age="P7D"/> <cache xs:type="gbcs:fileSystemCacheType" path="/tmp/gbcs" max-age="P7D"/>
<authentication> <authentication>