added optional key prefix to memcache backend

This commit is contained in:
2025-06-13 17:45:15 +08:00
parent 0e92998f16
commit fd0bd1ee5f
10 changed files with 51 additions and 41 deletions

View File

@@ -86,6 +86,7 @@ object GraalNativeImageConfiguration {
4) 4)
), ),
Duration.ofSeconds(60), Duration.ofSeconds(60),
"someCustomPrefix",
"MD5", "MD5",
null, null,
1, 1,
@@ -116,23 +117,8 @@ object GraalNativeImageConfiguration {
null, null,
) )
MemcacheCacheConfiguration(
listOf(
MemcacheCacheConfiguration.Server(
HostAndPort("127.0.0.1", 11211),
1000,
4
)
),
Duration.ofSeconds(60),
"MD5",
null,
1,
)
val serverHandle = RemoteBuildCacheServer(serverConfiguration).run() val serverHandle = RemoteBuildCacheServer(serverConfiguration).run()
val clientProfile = ClientConfiguration.Profile( val clientProfile = ClientConfiguration.Profile(
URI.create("http://127.0.0.1:$serverPort/"), URI.create("http://127.0.0.1:$serverPort/"),
ClientConfiguration.Connection( ClientConfiguration.Connection(

View File

@@ -23,24 +23,24 @@ import net.woggioni.jwo.JWO
import net.woggioni.jwo.Tuple2 import net.woggioni.jwo.Tuple2
object RBCS { object RBCS {
fun String.toUrl() : URL = URL.of(URI(this), null) fun String.toUrl(): URL = URL.of(URI(this), null)
const val RBCS_NAMESPACE_URI: String = "urn:net.woggioni.rbcs.server" const val RBCS_NAMESPACE_URI: String = "urn:net.woggioni.rbcs.server"
const val RBCS_PREFIX: String = "rbcs" const val RBCS_PREFIX: String = "rbcs"
const val XML_SCHEMA_NAMESPACE_URI = "http://www.w3.org/2001/XMLSchema-instance" const val XML_SCHEMA_NAMESPACE_URI = "http://www.w3.org/2001/XMLSchema-instance"
fun ByteArray.toInt(index : Int = 0) : Long { fun ByteArray.toInt(index: Int = 0): Long {
if(index + 4 > size) throw IllegalArgumentException("Not enough bytes to decode a 32 bits integer") if (index + 4 > size) throw IllegalArgumentException("Not enough bytes to decode a 32 bits integer")
var value : Long = 0 var value: Long = 0
for (b in index until index + 4) { for (b in index until index + 4) {
value = (value shl 8) + (get(b).toInt() and 0xFF) value = (value shl 8) + (get(b).toInt() and 0xFF)
} }
return value return value
} }
fun ByteArray.toLong(index : Int = 0) : Long { fun ByteArray.toLong(index: Int = 0): Long {
if(index + 8 > size) throw IllegalArgumentException("Not enough bytes to decode a 64 bits long integer") if (index + 8 > size) throw IllegalArgumentException("Not enough bytes to decode a 64 bits long integer")
var value : Long = 0 var value: Long = 0
for (b in index until index + 8) { for (b in index until index + 8) {
value = (value shl 8) + (get(b).toInt() and 0xFF) value = (value shl 8) + (get(b).toInt() and 0xFF)
} }
@@ -62,11 +62,18 @@ object RBCS {
return JWO.bytesToHex(digest(data, md)) return JWO.bytesToHex(digest(data, md))
} }
fun processCacheKey(key: String, digestAlgorithm: String?) = digestAlgorithm fun processCacheKey(key: String, keyPrefix: String?, digestAlgorithm: String?) : ByteArray {
val prefixedKey = if (keyPrefix == null) {
key
} else {
key + keyPrefix
}.toByteArray(Charsets.UTF_8)
return digestAlgorithm
?.let(MessageDigest::getInstance) ?.let(MessageDigest::getInstance)
?.let { md -> ?.let { md ->
digest(key.toByteArray(), md) digest(prefixedKey, md)
} ?: key.toByteArray(Charsets.UTF_8) } ?: prefixedKey
}
fun Long.toIntOrNull(): Int? { fun Long.toIntOrNull(): Int? {
return if (this >= Int.MIN_VALUE && this <= Int.MAX_VALUE) { return if (this >= Int.MIN_VALUE && this <= Int.MAX_VALUE) {

View File

@@ -20,6 +20,7 @@ import net.woggioni.rbcs.server.memcache.client.MemcacheClient
data class MemcacheCacheConfiguration( data class MemcacheCacheConfiguration(
val servers: List<Server>, val servers: List<Server>,
val maxAge: Duration = Duration.ofDays(1), val maxAge: Duration = Duration.ofDays(1),
val keyPrefix : String? = null,
val digestAlgorithm: String? = null, val digestAlgorithm: String? = null,
val compressionMode: CompressionMode? = null, val compressionMode: CompressionMode? = null,
val compressionLevel: Int, val compressionLevel: Int,
@@ -60,6 +61,7 @@ data class MemcacheCacheConfiguration(
socketChannelFactory, socketChannelFactory,
connectionPoolMap connectionPoolMap
), ),
keyPrefix,
digestAlgorithm, digestAlgorithm,
compressionMode != null, compressionMode != null,
compressionLevel, compressionLevel,

View File

@@ -53,6 +53,7 @@ import net.woggioni.rbcs.server.memcache.client.MemcacheResponseHandler
class MemcacheCacheHandler( class MemcacheCacheHandler(
private val client: MemcacheClient, private val client: MemcacheClient,
private val keyPrefix: String?,
private val digestAlgorithm: String?, private val digestAlgorithm: String?,
private val compressionEnabled: Boolean, private val compressionEnabled: Boolean,
private val compressionLevel: Int, private val compressionLevel: Int,
@@ -248,7 +249,7 @@ class MemcacheCacheHandler(
"Fetching ${msg.key} from memcache" "Fetching ${msg.key} from memcache"
} }
val key = ctx.alloc().buffer().also { val key = ctx.alloc().buffer().also {
it.writeBytes(processCacheKey(msg.key, digestAlgorithm)) it.writeBytes(processCacheKey(msg.key, keyPrefix, digestAlgorithm))
} }
val responseHandler = object : MemcacheResponseHandler { val responseHandler = object : MemcacheResponseHandler {
override fun responseReceived(response: BinaryMemcacheResponse) { override fun responseReceived(response: BinaryMemcacheResponse) {
@@ -309,7 +310,7 @@ class MemcacheCacheHandler(
private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) { private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) {
val key = ctx.alloc().buffer().also { val key = ctx.alloc().buffer().also {
it.writeBytes(processCacheKey(msg.key, digestAlgorithm)) it.writeBytes(processCacheKey(msg.key, keyPrefix, digestAlgorithm))
} }
val responseHandler = object : MemcacheResponseHandler { val responseHandler = object : MemcacheResponseHandler {
override fun responseReceived(response: BinaryMemcacheResponse) { override fun responseReceived(response: BinaryMemcacheResponse) {

View File

@@ -38,6 +38,7 @@ class MemcacheCacheProvider : CacheProvider<MemcacheCacheConfiguration> {
else -> MemcacheCacheConfiguration.CompressionMode.DEFLATE else -> MemcacheCacheConfiguration.CompressionMode.DEFLATE
} }
} }
val keyPrefix = el.renderAttribute("key-prefix")
val digestAlgorithm = el.renderAttribute("digest") val digestAlgorithm = el.renderAttribute("digest")
for (child in el.asIterable()) { for (child in el.asIterable()) {
when (child.nodeName) { when (child.nodeName) {
@@ -54,10 +55,10 @@ class MemcacheCacheProvider : CacheProvider<MemcacheCacheConfiguration> {
} }
} }
} }
return MemcacheCacheConfiguration( return MemcacheCacheConfiguration(
servers, servers,
maxAge, maxAge,
keyPrefix,
digestAlgorithm, digestAlgorithm,
compressionMode, compressionMode,
compressionLevel compressionLevel
@@ -78,8 +79,12 @@ class MemcacheCacheProvider : CacheProvider<MemcacheCacheConfiguration> {
} }
attr("max-connections", server.maxConnections.toString()) attr("max-connections", server.maxConnections.toString())
} }
} }
attr("max-age", maxAge.toString()) attr("max-age", maxAge.toString())
keyPrefix?.let {
attr("key-prefix", it)
}
digestAlgorithm?.let { digestAlgorithm -> digestAlgorithm?.let { digestAlgorithm ->
attr("digest", digestAlgorithm) attr("digest", digestAlgorithm)
} }

View File

@@ -21,6 +21,14 @@
</xs:sequence> </xs:sequence>
<xs:attribute name="max-age" type="xs:duration" default="P1D"/> <xs:attribute name="max-age" type="xs:duration" default="P1D"/>
<xs:attribute name="chunk-size" type="rbcs:byteSizeType" default="0x10000"/> <xs:attribute name="chunk-size" type="rbcs:byteSizeType" default="0x10000"/>
<xs:attribute name="key-prefix" type="xs:string" use="optional">
<xs:annotation>
<xs:documentation>
Prepend this string to all the keys inserted in memcache,
useful in case the caching backend is shared with other applications
</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="digest" type="xs:token"/> <xs:attribute name="digest" type="xs:token"/>
<xs:attribute name="compression-mode" type="rbcs-memcache:compressionType"/> <xs:attribute name="compression-mode" type="rbcs-memcache:compressionType"/>
<xs:attribute name="compression-level" type="rbcs:compressionLevelType" default="-1"/> <xs:attribute name="compression-level" type="rbcs:compressionLevelType" default="-1"/>

View File

@@ -79,7 +79,7 @@ class FileSystemCacheHandler(
} }
private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) { private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) {
val key = String(Base64.getUrlEncoder().encode(processCacheKey(msg.key, digestAlgorithm))) val key = String(Base64.getUrlEncoder().encode(processCacheKey(msg.key, null, digestAlgorithm)))
val sink = cache.put(key, msg.metadata) val sink = cache.put(key, msg.metadata)
inProgressRequest = InProgressPutRequest(msg.key, sink) inProgressRequest = InProgressPutRequest(msg.key, sink)
} }
@@ -100,7 +100,7 @@ class FileSystemCacheHandler(
sendMessageAndFlush(ctx, CachePutResponse(request.key)) sendMessageAndFlush(ctx, CachePutResponse(request.key))
} }
is InProgressGetRequest -> { is InProgressGetRequest -> {
val key = String(Base64.getUrlEncoder().encode(processCacheKey(request.request.key, digestAlgorithm))) val key = String(Base64.getUrlEncoder().encode(processCacheKey(request.request.key, null, digestAlgorithm)))
cache.get(key)?.also { entryValue -> cache.get(key)?.also { entryValue ->
sendMessageAndFlush(ctx, CacheValueFoundResponse(request.request.key, entryValue.metadata)) sendMessageAndFlush(ctx, CacheValueFoundResponse(request.request.key, entryValue.metadata))
entryValue.channel.let { channel -> entryValue.channel.let { channel ->

View File

@@ -27,7 +27,7 @@ class InMemoryCacheHandler(
private interface InProgressRequest : AutoCloseable { private interface InProgressRequest : AutoCloseable {
} }
private class InProgressGetRequest(val request : CacheGetRequest) : InProgressRequest { private class InProgressGetRequest(val request: CacheGetRequest) : InProgressRequest {
override fun close() { override fun close() {
} }
} }
@@ -44,7 +44,7 @@ class InMemoryCacheHandler(
override val buf = ctx.alloc().compositeHeapBuffer() override val buf = ctx.alloc().compositeHeapBuffer()
override fun append(buf: ByteBuf) { override fun append(buf: ByteBuf) {
if(buf.isDirect) { if (buf.isDirect) {
this.buf.writeBytes(buf) this.buf.writeBytes(buf)
} else { } else {
this.buf.addComponent(true, buf.retain()) this.buf.addComponent(true, buf.retain())
@@ -93,7 +93,7 @@ class InMemoryCacheHandler(
} }
private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) { private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) {
inProgressRequest = if(compressionEnabled) { inProgressRequest = if (compressionEnabled) {
InProgressCompressedPutRequest(ctx, msg) InProgressCompressedPutRequest(ctx, msg)
} else { } else {
InProgressPlainPutRequest(ctx, msg) InProgressPlainPutRequest(ctx, msg)
@@ -102,16 +102,16 @@ class InMemoryCacheHandler(
private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) { private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) {
val req = inProgressRequest val req = inProgressRequest
if(req is InProgressPutRequest) { if (req is InProgressPutRequest) {
req.append(msg.content()) req.append(msg.content())
} }
} }
private fun handleLastCacheContent(ctx: ChannelHandlerContext, msg: LastCacheContent) { private fun handleLastCacheContent(ctx: ChannelHandlerContext, msg: LastCacheContent) {
handleCacheContent(ctx, msg) handleCacheContent(ctx, msg)
when(val req = inProgressRequest) { when (val req = inProgressRequest) {
is InProgressGetRequest -> { is InProgressGetRequest -> {
cache.get(processCacheKey(req.request.key, digestAlgorithm))?.let { value -> cache.get(processCacheKey(req.request.key, null, digestAlgorithm))?.let { value ->
sendMessageAndFlush(ctx, CacheValueFoundResponse(req.request.key, value.metadata)) sendMessageAndFlush(ctx, CacheValueFoundResponse(req.request.key, value.metadata))
if (compressionEnabled) { if (compressionEnabled) {
val buf = ctx.alloc().heapBuffer() val buf = ctx.alloc().heapBuffer()
@@ -126,12 +126,13 @@ class InMemoryCacheHandler(
} }
} ?: sendMessage(ctx, CacheValueNotFoundResponse(req.request.key)) } ?: sendMessage(ctx, CacheValueNotFoundResponse(req.request.key))
} }
is InProgressPutRequest -> { is InProgressPutRequest -> {
this.inProgressRequest = null this.inProgressRequest = null
val buf = req.buf val buf = req.buf
buf.retain() buf.retain()
req.close() req.close()
val cacheKey = processCacheKey(req.request.key, digestAlgorithm) val cacheKey = processCacheKey(req.request.key, null, digestAlgorithm)
cache.put(cacheKey, CacheEntry(req.request.metadata, buf)) cache.put(cacheKey, CacheEntry(req.request.metadata, buf))
sendMessageAndFlush(ctx, CachePutResponse(req.request.key)) sendMessageAndFlush(ctx, CachePutResponse(req.request.key))
} }

View File

@@ -13,7 +13,7 @@
chunk-size="123"/> chunk-size="123"/>
<event-executor use-virtual-threads="true"/> <event-executor use-virtual-threads="true"/>
<rate-limiter delay-response="false" message-buffer-size="12000" max-queued-messages="53"/> <rate-limiter delay-response="false" message-buffer-size="12000" max-queued-messages="53"/>
<cache xs:type="rbcs-memcache:memcacheCacheType" max-age="P7D"> <cache xs:type="rbcs-memcache:memcacheCacheType" max-age="P7D" key-prefix="some-prefix-string">
<server host="memcached" port="11211"/> <server host="memcached" port="11211"/>
</cache> </cache>
<authorization> <authorization>

View File

@@ -12,7 +12,7 @@
chunk-size="456"/> chunk-size="456"/>
<event-executor use-virtual-threads="false"/> <event-executor use-virtual-threads="false"/>
<rate-limiter delay-response="true" message-buffer-size="65432" max-queued-messages="21"/> <rate-limiter delay-response="true" message-buffer-size="65432" max-queued-messages="21"/>
<cache xs:type="rbcs-memcache:memcacheCacheType" max-age="P7D" digest="SHA-256" compression-mode="deflate" compression-level="7"> <cache xs:type="rbcs-memcache:memcacheCacheType" max-age="P7D" key-prefix="some-prefix-string" digest="SHA-256" compression-mode="deflate" compression-level="7">
<server host="127.0.0.1" port="11211" max-connections="10" connection-timeout="PT20S"/> <server host="127.0.0.1" port="11211" max-connections="10" connection-timeout="PT20S"/>
</cache> </cache>
<authentication> <authentication>