implemented memcached client with Netty
All checks were successful
CI / build (push) Successful in 1m46s

This commit is contained in:
2025-02-04 21:56:41 +08:00
parent 89153b60f8
commit 6c62ac85c0
53 changed files with 590 additions and 293 deletions

View File

@@ -19,7 +19,7 @@ dependencies {
testImplementation catalog.bcprov.jdk18on
testImplementation catalog.bcpkix.jdk18on
testRuntimeOnly project(":gbcs-server-memcached")
testRuntimeOnly project(":gbcs-server-memcache")
}
test {

View File

@@ -400,9 +400,9 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
fun run(): ServerHandle {
// Create the multithreaded event loops for the server
val bossGroup = NioEventLoopGroup(0)
val bossGroup = NioEventLoopGroup(1)
val serverSocketChannel = NioServerSocketChannel::class.java
val workerGroup = bossGroup
val workerGroup = NioEventLoopGroup(0)
val eventExecutorGroup = run {
val threadFactory = if (cfg.eventExecutor.isUseVirtualThreads) {
Thread.ofVirtual().factory()

View File

@@ -1,8 +1,11 @@
package net.woggioni.gbcs.server.cache
import io.netty.buffer.ByteBuf
import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.common.ByteBufInputStream
import net.woggioni.gbcs.common.GBCS.digestString
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.jwo.JWO
import net.woggioni.jwo.LockFile
import java.nio.channels.Channels
import java.nio.channels.FileChannel
@@ -14,6 +17,7 @@ import java.nio.file.attribute.BasicFileAttributes
import java.security.MessageDigest
import java.time.Duration
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicReference
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
@@ -28,7 +32,10 @@ class FileSystemCache(
val compressionLevel: Int
) : Cache {
private val log = contextLogger()
private companion object {
@JvmStatic
private val log = contextLogger()
}
init {
Files.createDirectories(root)
@@ -62,10 +69,12 @@ class FileSystemCache(
}
}.also {
gc()
}.let {
CompletableFuture.completedFuture(it)
}
}
override fun put(key: String, content: ByteArray) {
override fun put(key: String, content: ByteBuf): CompletableFuture<Void> {
(digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
@@ -82,7 +91,7 @@ class FileSystemCache(
it
}
}.use {
it.write(content)
JWO.copy(ByteBufInputStream(content), it)
}
Files.move(tmpFile, file, StandardCopyOption.ATOMIC_MOVE)
} catch (t: Throwable) {
@@ -92,6 +101,7 @@ class FileSystemCache(
}.also {
gc()
}
return CompletableFuture.completedFuture(null)
}
private fun gc() {

View File

@@ -1,13 +1,17 @@
package net.woggioni.gbcs.server.cache
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.common.ByteBufInputStream
import net.woggioni.gbcs.common.GBCS.digestString
import java.io.ByteArrayInputStream
import net.woggioni.jwo.JWO
import java.io.ByteArrayOutputStream
import java.nio.channels.Channels
import java.security.MessageDigest
import java.time.Duration
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.PriorityBlockingQueue
import java.util.zip.Deflater
@@ -22,9 +26,9 @@ class InMemoryCache(
val compressionLevel: Int
) : Cache {
private val map = ConcurrentHashMap<String, ByteArray>()
private val map = ConcurrentHashMap<String, ByteBuf>()
private class RemovalQueueElement(val key: String, val value : ByteArray, val expiry : Instant) : Comparable<RemovalQueueElement> {
private class RemovalQueueElement(val key: String, val value : ByteBuf, val expiry : Instant) : Comparable<RemovalQueueElement> {
override fun compareTo(other: RemovalQueueElement) = expiry.compareTo(other.expiry)
}
@@ -62,14 +66,16 @@ class InMemoryCache(
?.let { value ->
if (compressionEnabled) {
val inflater = Inflater()
Channels.newChannel(InflaterInputStream(ByteArrayInputStream(value), inflater))
Channels.newChannel(InflaterInputStream(ByteBufInputStream(value), inflater))
} else {
Channels.newChannel(ByteArrayInputStream(value))
Channels.newChannel(ByteBufInputStream(value))
}
}
}.let {
CompletableFuture.completedFuture(it)
}
override fun put(key: String, content: ByteArray) {
override fun put(key: String, content: ByteBuf) =
(digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
@@ -79,14 +85,15 @@ class InMemoryCache(
val deflater = Deflater(compressionLevel)
val baos = ByteArrayOutputStream()
DeflaterOutputStream(baos, deflater).use { stream ->
stream.write(content)
JWO.copy(ByteBufInputStream(content), stream)
}
baos.toByteArray()
Unpooled.wrappedBuffer(baos.toByteArray())
} else {
content
}
map[digest] = value
removalQueue.put(RemovalQueueElement(digest, value, Instant.now().plus(maxAge)))
}.let {
CompletableFuture.completedFuture<Void>(null)
}
}
}

View File

@@ -6,7 +6,6 @@ import net.woggioni.gbcs.common.Xml
import net.woggioni.gbcs.common.Xml.Companion.renderAttribute
import org.w3c.dom.Document
import org.w3c.dom.Element
import java.nio.file.Path
import java.time.Duration
import java.util.zip.Deflater

View File

@@ -265,7 +265,8 @@ object Parser {
}.map { el ->
val groupName = el.renderAttribute("name") ?: throw ConfigurationException("Group name is required")
var roles = emptySet<Role>()
var quota: Configuration.Quota? = null
var userQuota: Configuration.Quota? = null
var groupQuota: Configuration.Quota? = null
for (child in el.asIterable()) {
when (child.localName) {
"users" -> {
@@ -279,12 +280,15 @@ object Parser {
"roles" -> {
roles = parseRoles(child)
}
"quota" -> {
quota = parseQuota(child)
"group-quota" -> {
userQuota = parseQuota(child)
}
"user-quota" -> {
groupQuota = parseQuota(child)
}
}
}
groupName to Group(groupName, roles, quota)
groupName to Group(groupName, roles, userQuota, groupQuota)
}.toMap()
val users = knownUsersMap.map { (name, user) ->
name to User(name, user.password, userGroups[name]?.mapNotNull { groups[it] }?.toSet() ?: emptySet(), user.quota)

View File

@@ -8,8 +8,14 @@ import org.w3c.dom.Document
object Serializer {
fun serialize(conf : Configuration) : Document {
private fun Xml.serializeQuota(quota : Configuration.Quota) {
attr("calls", quota.calls.toString())
attr("period", quota.period.toString())
attr("max-available-calls", quota.maxAvailableCalls.toString())
attr("initial-available-calls", quota.initialAvailableCalls.toString())
}
fun serialize(conf : Configuration) : Document {
val schemaLocations = CacheSerializers.index.values.asSequence().map {
it.xmlNamespace to it.xmlSchemaLocation
}.toMap()
@@ -56,10 +62,7 @@ object Serializer {
}
user.quota?.let { quota ->
node("quota") {
attr("calls", quota.calls.toString())
attr("period", quota.period.toString())
attr("max-available-calls", quota.maxAvailableCalls.toString())
attr("initial-available-calls", quota.initialAvailableCalls.toString())
serializeQuota(quota)
}
}
}
@@ -70,10 +73,7 @@ object Serializer {
anonymousUser.quota?.let { quota ->
node("anonymous") {
node("quota") {
attr("calls", quota.calls.toString())
attr("period", quota.period.toString())
attr("max-available-calls", quota.maxAvailableCalls.toString())
attr("initial-available-calls", quota.initialAvailableCalls.toString())
serializeQuota(quota)
}
}
}
@@ -113,12 +113,14 @@ object Serializer {
}
}
}
group.quota?.let { quota ->
node("quota") {
attr("calls", quota.calls.toString())
attr("period", quota.period.toString())
attr("max-available-calls", quota.maxAvailableCalls.toString())
attr("initial-available-calls", quota.initialAvailableCalls.toString())
group.userQuota?.let { quota ->
node("user-quota") {
serializeQuota(quota)
}
}
group.groupQuota?.let { quota ->
node("group-quota") {
serializeQuota(quota)
}
}
}

View File

@@ -17,7 +17,6 @@ import io.netty.handler.codec.http.HttpUtil
import io.netty.handler.codec.http.LastHttpContent
import io.netty.handler.stream.ChunkedNioStream
import net.woggioni.gbcs.api.Cache
import net.woggioni.gbcs.api.exception.CacheException
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.server.debug
import net.woggioni.gbcs.server.warn
@@ -43,49 +42,41 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
return
}
if (serverPrefix == prefix) {
try {
cache.get(key)
} catch(ex : Throwable) {
throw CacheException("Error accessing the cache backend", ex)
}?.let { channel ->
log.debug(ctx) {
"Cache hit for key '$key'"
}
val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK)
response.headers()[HttpHeaderNames.CONTENT_TYPE] = HttpHeaderValues.APPLICATION_OCTET_STREAM
if (!keepAlive) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.IDENTITY)
} else {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
}
ctx.write(response)
when (channel) {
is FileChannel -> {
if (keepAlive) {
ctx.write(DefaultFileRegion(channel, 0, channel.size()))
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate())
} else {
ctx.writeAndFlush(DefaultFileRegion(channel, 0, channel.size()))
.addListener(ChannelFutureListener.CLOSE)
}
cache.get(key).thenApply { channel ->
if(channel != null) {
log.debug(ctx) {
"Cache hit for key '$key'"
}
else -> {
ctx.write(ChunkedNioStream(channel)).addListener { evt ->
channel.close()
}
val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK)
response.headers()[HttpHeaderNames.CONTENT_TYPE] = HttpHeaderValues.APPLICATION_OCTET_STREAM
if (!keepAlive) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.IDENTITY)
} else {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
}
ctx.write(response)
val content : Any = when (channel) {
is FileChannel -> DefaultFileRegion(channel, 0, channel.size())
else -> ChunkedNioStream(channel)
}
if (keepAlive) {
ctx.write(content)
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate())
} else {
ctx.writeAndFlush(content)
.addListener(ChannelFutureListener.CLOSE)
}
} else {
log.debug(ctx) {
"Cache miss for key '$key'"
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response)
}
} ?: let {
log.debug(ctx) {
"Cache miss for key '$key'"
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response)
}
}.whenComplete { _, ex -> ex?.let(ctx::fireExceptionCaught) }
} else {
log.warn(ctx) {
"Got request for unhandled path '${msg.uri()}'"
@@ -103,26 +94,16 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
log.debug(ctx) {
"Added value for key '$key' to build cache"
}
val bodyBytes = msg.content().run {
if (isDirect) {
ByteArray(readableBytes()).also {
readBytes(it)
}
} else {
array()
}
cache.put(key, msg.content().retain()).thenRun {
val response = DefaultFullHttpResponse(
msg.protocolVersion(), HttpResponseStatus.CREATED,
Unpooled.copiedBuffer(key.toByteArray())
)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes()
ctx.writeAndFlush(response)
}.whenComplete { _, ex ->
ctx.fireExceptionCaught(ex)
}
try {
cache.put(key, bodyBytes)
} catch(ex : Throwable) {
throw CacheException("Error accessing the cache backend", ex)
}
val response = DefaultFullHttpResponse(
msg.protocolVersion(), HttpResponseStatus.CREATED,
Unpooled.copiedBuffer(key.toByteArray())
)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes()
ctx.writeAndFlush(response)
} else {
log.warn(ctx) {
"Got request for unhandled path '${msg.uri()}'"

View File

@@ -8,7 +8,7 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.function.Function
class BucketManager private constructor(
private val bucketsByUser: Map<Configuration.User, Bucket> = HashMap(),
private val bucketsByUser: Map<Configuration.User, List<Bucket>> = HashMap(),
private val bucketsByGroup: Map<Configuration.Group, Bucket> = HashMap(),
loader: Function<InetSocketAddress, Bucket>?
) {
@@ -43,22 +43,27 @@ class BucketManager private constructor(
companion object {
fun from(cfg : Configuration) : BucketManager {
val bucketsByUser = cfg.users.values.asSequence().filter {
it.quota != null
}.map { user ->
val quota = user.quota
val bucket = Bucket.local(
quota.maxAvailableCalls,
quota.calls,
quota.period,
quota.initialAvailableCalls
)
user to bucket
val bucketsByUser = cfg.users.values.asSequence().map { user ->
val buckets = (
user.quota
?.let { quota ->
sequenceOf(quota)
} ?: user.groups.asSequence()
.mapNotNull(Configuration.Group::getUserQuota)
).map { quota ->
Bucket.local(
quota.maxAvailableCalls,
quota.calls,
quota.period,
quota.initialAvailableCalls
)
}.toList()
user to buckets
}.toMap()
val bucketsByGroup = cfg.groups.values.asSequence().filter {
it.quota != null
it.groupQuota != null
}.map { group ->
val quota = group.quota
val quota = group.groupQuota
val bucket = Bucket.local(
quota.maxAvailableCalls,
quota.calls,

View File

@@ -42,7 +42,7 @@ class ThrottlingHandler(cfg: Configuration) :
val buckets = mutableListOf<Bucket>()
val user = ctx.channel().attr(GradleBuildCacheServer.userAttribute).get()
if (user != null) {
bucketManager.getBucketByUser(user)?.let(buckets::add)
bucketManager.getBucketByUser(user)?.let(buckets::addAll)
}
val groups = ctx.channel().attr(GradleBuildCacheServer.groupAttribute).get() ?: emptySet()
if (groups.isNotEmpty()) {

View File

@@ -146,7 +146,8 @@
</xs:unique>
</xs:element>
<xs:element name="roles" type="gbcs:rolesType" maxOccurs="1" minOccurs="0"/>
<xs:element name="quota" type="gbcs:quotaType" minOccurs="0" maxOccurs="1"/>
<xs:element name="user-quota" type="gbcs:quotaType" minOccurs="0" maxOccurs="1"/>
<xs:element name="group-quota" type="gbcs:quotaType" minOccurs="0" maxOccurs="1"/>
</xs:sequence>
<xs:attribute name="name" type="xs:token"/>
</xs:complexType>

View File

@@ -24,8 +24,8 @@ abstract class AbstractBasicAuthServerTest : AbstractServerTest() {
protected val random = Random(101325)
protected val keyValuePair = newEntry(random)
protected val serverPath = "gbcs"
protected val readersGroup = Configuration.Group("readers", setOf(Role.Reader), null)
protected val writersGroup = Configuration.Group("writers", setOf(Role.Writer), null)
protected val readersGroup = Configuration.Group("readers", setOf(Role.Reader), null, null)
protected val writersGroup = Configuration.Group("writers", setOf(Role.Writer), null, null)
abstract protected val users : List<Configuration.User>

View File

@@ -1,7 +1,7 @@
package net.woggioni.gbcs.server.test
import net.woggioni.gbcs.server.GradleBuildCacheServer
import net.woggioni.gbcs.api.Configuration
import net.woggioni.gbcs.server.GradleBuildCacheServer
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.MethodOrderer

View File

@@ -4,7 +4,6 @@ import net.woggioni.gbcs.api.Configuration
import net.woggioni.gbcs.api.Role
import net.woggioni.gbcs.common.Xml
import net.woggioni.gbcs.server.cache.FileSystemCacheConfiguration
import net.woggioni.gbcs.server.cache.InMemoryCacheConfiguration
import net.woggioni.gbcs.server.configuration.Serializer
import net.woggioni.gbcs.server.test.utils.CertificateUtils
import net.woggioni.gbcs.server.test.utils.CertificateUtils.X509Credentials
@@ -46,8 +45,8 @@ abstract class AbstractTlsServerTest : AbstractServerTest() {
private lateinit var trustStore: KeyStore
protected lateinit var ca: X509Credentials
protected val readersGroup = Configuration.Group("readers", setOf(Role.Reader), null)
protected val writersGroup = Configuration.Group("writers", setOf(Role.Writer), null)
protected val readersGroup = Configuration.Group("readers", setOf(Role.Reader), null, null)
protected val writersGroup = Configuration.Group("writers", setOf(Role.Writer), null, null)
protected val random = Random(101325)
protected val keyValuePair = newEntry(random)
private val serverPath : String? = null

View File

@@ -3,7 +3,6 @@ package net.woggioni.gbcs.server.test
import io.netty.handler.codec.http.HttpResponseStatus
import net.woggioni.gbcs.api.Configuration
import net.woggioni.gbcs.common.Xml
import net.woggioni.gbcs.server.cache.FileSystemCacheConfiguration
import net.woggioni.gbcs.server.cache.InMemoryCacheConfiguration
import net.woggioni.gbcs.server.configuration.Serializer
import net.woggioni.gbcs.server.test.utils.NetworkUtils

View File

@@ -7,7 +7,6 @@ import org.bouncycastle.asn1.x500.X500Name
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Order
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.provider.ArgumentsSource
import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse

View File

@@ -1,8 +1,8 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<gbcs:server xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
xmlns:gbcs="urn:net.woggioni.gbcs.server"
xmlns:gbcs-memcached="urn:net.woggioni.gbcs.server.memcached"
xs:schemaLocation="urn:net.woggioni.gbcs.server.memcached jpms://net.woggioni.gbcs.server.memcached/net/woggioni/gbcs/server/memcached/schema/gbcs-memcached.xsd urn:net.woggioni.gbcs.server jpms://net.woggioni.gbcs.server/net/woggioni/gbcs/server/schema/gbcs.xsd"
xmlns:gbcs-memcache="urn:net.woggioni.gbcs.server.memcache"
xs:schemaLocation="urn:net.woggioni.gbcs.server.memcache jpms://net.woggioni.gbcs.server.memcache/net/woggioni/gbcs/server/memcache/schema/gbcs-memcache.xsd urn:net.woggioni.gbcs.server jpms://net.woggioni.gbcs.server/net/woggioni/gbcs/server/schema/gbcs.xsd"
>
<bind host="0.0.0.0" port="8443" incoming-connections-backlog-size="4096"/>
<connection
@@ -13,7 +13,7 @@
read-timeout="PT5M"
write-timeout="PT5M"/>
<event-executor use-virtual-threads="true"/>
<cache xs:type="gbcs-memcached:memcachedCacheType" max-age="P7D" max-size="16777216" compression-mode="zip">
<cache xs:type="gbcs-memcache:memcacheCacheType" max-age="P7D" max-size="16777216" compression-mode="deflate">
<server host="memcached" port="11211"/>
</cache>
<authorization>

View File

@@ -1,8 +1,8 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<gbcs:server xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
xmlns:gbcs="urn:net.woggioni.gbcs.server"
xmlns:gbcs-memcached="urn:net.woggioni.gbcs.server.memcached"
xs:schemaLocation="urn:net.woggioni.gbcs.server.memcached jpms://net.woggioni.gbcs.server.memcached/net/woggioni/gbcs/server/memcached/schema/gbcs-memcached.xsd urn:net.woggioni.gbcs.server jpms://net.woggioni.gbcs.server/net/woggioni/gbcs/server/schema/gbcs.xsd">
xmlns:gbcs-memcache="urn:net.woggioni.gbcs.server.memcache"
xs:schemaLocation="urn:net.woggioni.gbcs.server.memcache jpms://net.woggioni.gbcs.server.memcache/net/woggioni/gbcs/server/memcache/schema/gbcs-memcache.xsd urn:net.woggioni.gbcs.server jpms://net.woggioni.gbcs.server/net/woggioni/gbcs/server/schema/gbcs.xsd">
<bind host="127.0.0.1" port="11443" incoming-connections-backlog-size="50"/>
<connection
write-timeout="PT25M"
@@ -12,8 +12,8 @@
idle-timeout="PT30M"
max-request-size="101325"/>
<event-executor use-virtual-threads="false"/>
<cache xs:type="gbcs-memcached:memcachedCacheType" max-age="P7D" max-size="101325" digest="SHA-256">
<server host="127.0.0.1" port="11211"/>
<cache xs:type="gbcs-memcache:memcacheCacheType" max-age="P7D" max-size="101325" digest="SHA-256">
<server host="127.0.0.1" port="11211" max-connections="10" connection-timeout="PT20S"/>
</cache>
<authentication>
<none/>

View File

@@ -32,7 +32,8 @@
<roles>
<reader/>
</roles>
<quota calls="10" period="PT1S"/>
<user-quota calls="30" period="PT1M"/>
<group-quota calls="10" period="PT1S"/>
</group>
<group name="writers">
<users>
@@ -50,7 +51,7 @@
<reader/>
<writer/>
</roles>
<quota calls="1000" period="P1D"/>
<group-quota calls="1000" period="P1D"/>
</group>
</groups>
</authorization>