Compare commits

...

1 Commits

4 changed files with 56 additions and 103 deletions

View File

@@ -43,6 +43,46 @@
{
"name":"com.aayushatharva.brotli4j.Brotli4jLoader"
},
{
"name":"com.github.benmanes.caffeine.cache.BLCHeader$DrainStatusRef",
"fields":[{"name":"drainStatus"}]
},
{
"name":"com.github.benmanes.caffeine.cache.BaseMpscLinkedArrayQueueColdProducerFields",
"fields":[{"name":"producerLimit"}]
},
{
"name":"com.github.benmanes.caffeine.cache.BaseMpscLinkedArrayQueueConsumerFields",
"fields":[{"name":"consumerIndex"}]
},
{
"name":"com.github.benmanes.caffeine.cache.BaseMpscLinkedArrayQueueProducerFields",
"fields":[{"name":"producerIndex"}]
},
{
"name":"com.github.benmanes.caffeine.cache.BoundedLocalCache",
"fields":[{"name":"refreshes"}]
},
{
"name":"com.github.benmanes.caffeine.cache.PS",
"fields":[{"name":"key"}, {"name":"value"}]
},
{
"name":"com.github.benmanes.caffeine.cache.PSW",
"fields":[{"name":"writeTime"}]
},
{
"name":"com.github.benmanes.caffeine.cache.PSWMS",
"methods":[{"name":"<init>","parameterTypes":[] }]
},
{
"name":"com.github.benmanes.caffeine.cache.SSMSW",
"fields":[{"name":"FACTORY"}]
},
{
"name":"com.github.benmanes.caffeine.cache.StripedBuffer",
"fields":[{"name":"tableBusy"}]
},
{
"name":"com.github.luben.zstd.Zstd"
},
@@ -588,6 +628,9 @@
"name":"net.woggioni.rbcs.server.exception.ExceptionHandler",
"methods":[{"name":"exceptionCaught","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Throwable"] }]
},
{
"name":"net.woggioni.rbcs.server.handler.BlackHoleRequestHandler"
},
{
"name":"net.woggioni.rbcs.server.handler.MaxRequestSizeHandler",
"methods":[{"name":"channelRead","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] }]

View File

@@ -12,6 +12,7 @@ dependencies {
implementation catalog.netty.handler
implementation catalog.netty.buffer
implementation catalog.netty.transport
implementation("com.github.ben-manes.caffeine:caffeine:3.2.0")
api project(':rbcs-common')
api project(':rbcs-api')

View File

@@ -18,6 +18,7 @@ module net.woggioni.rbcs.server {
requires net.woggioni.jwo;
requires net.woggioni.rbcs.common;
requires net.woggioni.rbcs.api;
requires com.github.benmanes.caffeine;
exports net.woggioni.rbcs.server;

View File

@@ -1,16 +1,13 @@
package net.woggioni.rbcs.server.cache
import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.Caffeine
import io.netty.buffer.ByteBuf
import net.woggioni.rbcs.api.AsyncCloseable
import net.woggioni.rbcs.api.CacheValueMetadata
import net.woggioni.rbcs.common.createLogger
import java.time.Duration
import java.time.Instant
import java.util.PriorityQueue
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.concurrent.withLock
private class CacheKey(private val value: ByteArray) {
override fun equals(other: Any?) = if (other is CacheKey) {
@@ -26,118 +23,29 @@ class CacheEntry(
)
class InMemoryCache(
private val maxAge: Duration,
private val maxSize: Long
maxAge: Duration,
maxSize: Long
) : AsyncCloseable {
companion object {
private val log = createLogger<InMemoryCache>()
}
private var mapSize : Long = 0
private val map = HashMap<CacheKey, CacheEntry>()
private val lock = ReentrantReadWriteLock()
private val cond = lock.writeLock().newCondition()
private val cache: Cache<CacheKey, CacheEntry> = Caffeine.newBuilder()
.expireAfterWrite(maxAge)
.maximumSize(maxSize)
.build()
override fun asyncClose(): CompletableFuture<Void> = CompletableFuture.completedFuture(null)
private class RemovalQueueElement(val key: CacheKey, val value: CacheEntry, val expiry: Instant) :
Comparable<RemovalQueueElement> {
override fun compareTo(other: RemovalQueueElement) = expiry.compareTo(other.expiry)
}
private val removalQueue = PriorityQueue<RemovalQueueElement>()
@Volatile
private var running = true
private val closeFuture = object : CompletableFuture<Void>() {
init {
Thread.ofVirtual().name("in-memory-cache-gc").start {
try {
lock.writeLock().withLock {
while (running) {
val el = removalQueue.poll()
if(el == null) {
cond.await(1000, TimeUnit.MILLISECONDS)
continue
}
val value = el.value
val now = Instant.now()
if (now > el.expiry) {
val removed = map.remove(el.key, value)
if (removed) {
updateSizeAfterRemoval(value.content)
//Decrease the reference count for map
value.content.release()
}
} else {
removalQueue.offer(el)
val interval = minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1))
cond.await(interval.toMillis(), TimeUnit.MILLISECONDS)
}
}
map.forEach {
it.value.content.release()
}
map.clear()
}
complete(null)
} catch (ex: Throwable) {
completeExceptionally(ex)
}
}
}
}
fun removeEldest(): Long {
while (true) {
val el = removalQueue.poll() ?: return mapSize
val value = el.value
val removed = map.remove(el.key, value)
if (removed) {
val newSize = updateSizeAfterRemoval(value.content)
//Decrease the reference count for map
value.content.release()
return newSize
}
}
}
private fun updateSizeAfterRemoval(removed: ByteBuf): Long {
mapSize -= removed.readableBytes()
return mapSize
}
override fun asyncClose() : CompletableFuture<Void> {
running = false
lock.writeLock().withLock {
cond.signal()
}
return closeFuture
}
fun get(key: ByteArray) = lock.readLock().withLock {
map[CacheKey(key)]?.run {
fun get(key: ByteArray) = cache.getIfPresent(CacheKey(key))?.run {
CacheEntry(metadata, content.retainedDuplicate())
}
}
fun put(
key: ByteArray,
value: CacheEntry,
) {
val cacheKey = CacheKey(key)
lock.writeLock().withLock {
val oldSize = map.put(cacheKey, value)?.let { old ->
val result = old.content.readableBytes()
old.content.release()
result
} ?: 0
val delta = value.content.readableBytes() - oldSize
mapSize += delta
removalQueue.offer(RemovalQueueElement(cacheKey, value, Instant.now().plus(maxAge)))
while (mapSize > maxSize) {
removeEldest()
}
}
cache.put(cacheKey, value)
}
}