Compare commits

...

1 Commits

4 changed files with 56 additions and 103 deletions

View File

@@ -43,6 +43,46 @@
{ {
"name":"com.aayushatharva.brotli4j.Brotli4jLoader" "name":"com.aayushatharva.brotli4j.Brotli4jLoader"
}, },
{
"name":"com.github.benmanes.caffeine.cache.BLCHeader$DrainStatusRef",
"fields":[{"name":"drainStatus"}]
},
{
"name":"com.github.benmanes.caffeine.cache.BaseMpscLinkedArrayQueueColdProducerFields",
"fields":[{"name":"producerLimit"}]
},
{
"name":"com.github.benmanes.caffeine.cache.BaseMpscLinkedArrayQueueConsumerFields",
"fields":[{"name":"consumerIndex"}]
},
{
"name":"com.github.benmanes.caffeine.cache.BaseMpscLinkedArrayQueueProducerFields",
"fields":[{"name":"producerIndex"}]
},
{
"name":"com.github.benmanes.caffeine.cache.BoundedLocalCache",
"fields":[{"name":"refreshes"}]
},
{
"name":"com.github.benmanes.caffeine.cache.PS",
"fields":[{"name":"key"}, {"name":"value"}]
},
{
"name":"com.github.benmanes.caffeine.cache.PSW",
"fields":[{"name":"writeTime"}]
},
{
"name":"com.github.benmanes.caffeine.cache.PSWMS",
"methods":[{"name":"<init>","parameterTypes":[] }]
},
{
"name":"com.github.benmanes.caffeine.cache.SSMSW",
"fields":[{"name":"FACTORY"}]
},
{
"name":"com.github.benmanes.caffeine.cache.StripedBuffer",
"fields":[{"name":"tableBusy"}]
},
{ {
"name":"com.github.luben.zstd.Zstd" "name":"com.github.luben.zstd.Zstd"
}, },
@@ -588,6 +628,9 @@
"name":"net.woggioni.rbcs.server.exception.ExceptionHandler", "name":"net.woggioni.rbcs.server.exception.ExceptionHandler",
"methods":[{"name":"exceptionCaught","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Throwable"] }] "methods":[{"name":"exceptionCaught","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Throwable"] }]
}, },
{
"name":"net.woggioni.rbcs.server.handler.BlackHoleRequestHandler"
},
{ {
"name":"net.woggioni.rbcs.server.handler.MaxRequestSizeHandler", "name":"net.woggioni.rbcs.server.handler.MaxRequestSizeHandler",
"methods":[{"name":"channelRead","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] }] "methods":[{"name":"channelRead","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] }]

View File

@@ -12,6 +12,7 @@ dependencies {
implementation catalog.netty.handler implementation catalog.netty.handler
implementation catalog.netty.buffer implementation catalog.netty.buffer
implementation catalog.netty.transport implementation catalog.netty.transport
implementation("com.github.ben-manes.caffeine:caffeine:3.2.0")
api project(':rbcs-common') api project(':rbcs-common')
api project(':rbcs-api') api project(':rbcs-api')

View File

@@ -18,6 +18,7 @@ module net.woggioni.rbcs.server {
requires net.woggioni.jwo; requires net.woggioni.jwo;
requires net.woggioni.rbcs.common; requires net.woggioni.rbcs.common;
requires net.woggioni.rbcs.api; requires net.woggioni.rbcs.api;
requires com.github.benmanes.caffeine;
exports net.woggioni.rbcs.server; exports net.woggioni.rbcs.server;

View File

@@ -1,16 +1,13 @@
package net.woggioni.rbcs.server.cache package net.woggioni.rbcs.server.cache
import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.Caffeine
import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBuf
import net.woggioni.rbcs.api.AsyncCloseable import net.woggioni.rbcs.api.AsyncCloseable
import net.woggioni.rbcs.api.CacheValueMetadata import net.woggioni.rbcs.api.CacheValueMetadata
import net.woggioni.rbcs.common.createLogger import net.woggioni.rbcs.common.createLogger
import java.time.Duration import java.time.Duration
import java.time.Instant
import java.util.PriorityQueue
import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.concurrent.withLock
private class CacheKey(private val value: ByteArray) { private class CacheKey(private val value: ByteArray) {
override fun equals(other: Any?) = if (other is CacheKey) { override fun equals(other: Any?) = if (other is CacheKey) {
@@ -26,118 +23,29 @@ class CacheEntry(
) )
class InMemoryCache( class InMemoryCache(
private val maxAge: Duration, maxAge: Duration,
private val maxSize: Long maxSize: Long
) : AsyncCloseable { ) : AsyncCloseable {
companion object { companion object {
private val log = createLogger<InMemoryCache>() private val log = createLogger<InMemoryCache>()
} }
private var mapSize : Long = 0 private val cache: Cache<CacheKey, CacheEntry> = Caffeine.newBuilder()
private val map = HashMap<CacheKey, CacheEntry>() .expireAfterWrite(maxAge)
private val lock = ReentrantReadWriteLock() .maximumSize(maxSize)
private val cond = lock.writeLock().newCondition() .build()
override fun asyncClose(): CompletableFuture<Void> = CompletableFuture.completedFuture(null)
private class RemovalQueueElement(val key: CacheKey, val value: CacheEntry, val expiry: Instant) : fun get(key: ByteArray) = cache.getIfPresent(CacheKey(key))?.run {
Comparable<RemovalQueueElement> {
override fun compareTo(other: RemovalQueueElement) = expiry.compareTo(other.expiry)
}
private val removalQueue = PriorityQueue<RemovalQueueElement>()
@Volatile
private var running = true
private val closeFuture = object : CompletableFuture<Void>() {
init {
Thread.ofVirtual().name("in-memory-cache-gc").start {
try {
lock.writeLock().withLock {
while (running) {
val el = removalQueue.poll()
if(el == null) {
cond.await(1000, TimeUnit.MILLISECONDS)
continue
}
val value = el.value
val now = Instant.now()
if (now > el.expiry) {
val removed = map.remove(el.key, value)
if (removed) {
updateSizeAfterRemoval(value.content)
//Decrease the reference count for map
value.content.release()
}
} else {
removalQueue.offer(el)
val interval = minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1))
cond.await(interval.toMillis(), TimeUnit.MILLISECONDS)
}
}
map.forEach {
it.value.content.release()
}
map.clear()
}
complete(null)
} catch (ex: Throwable) {
completeExceptionally(ex)
}
}
}
}
fun removeEldest(): Long {
while (true) {
val el = removalQueue.poll() ?: return mapSize
val value = el.value
val removed = map.remove(el.key, value)
if (removed) {
val newSize = updateSizeAfterRemoval(value.content)
//Decrease the reference count for map
value.content.release()
return newSize
}
}
}
private fun updateSizeAfterRemoval(removed: ByteBuf): Long {
mapSize -= removed.readableBytes()
return mapSize
}
override fun asyncClose() : CompletableFuture<Void> {
running = false
lock.writeLock().withLock {
cond.signal()
}
return closeFuture
}
fun get(key: ByteArray) = lock.readLock().withLock {
map[CacheKey(key)]?.run {
CacheEntry(metadata, content.retainedDuplicate()) CacheEntry(metadata, content.retainedDuplicate())
} }
}
fun put( fun put(
key: ByteArray, key: ByteArray,
value: CacheEntry, value: CacheEntry,
) { ) {
val cacheKey = CacheKey(key) val cacheKey = CacheKey(key)
lock.writeLock().withLock { cache.put(cacheKey, value)
val oldSize = map.put(cacheKey, value)?.let { old ->
val result = old.content.readableBytes()
old.content.release()
result
} ?: 0
val delta = value.content.readableBytes() - oldSize
mapSize += delta
removalQueue.offer(RemovalQueueElement(cacheKey, value, Instant.now().plus(maxAge)))
while (mapSize > maxSize) {
removeEldest()
}
}
} }
} }