Compare commits
1 Commits
3774ab8ef0
...
caffeine
Author | SHA1 | Date | |
---|---|---|---|
19965dd3b0
|
@@ -43,6 +43,46 @@
|
||||
{
|
||||
"name":"com.aayushatharva.brotli4j.Brotli4jLoader"
|
||||
},
|
||||
{
|
||||
"name":"com.github.benmanes.caffeine.cache.BLCHeader$DrainStatusRef",
|
||||
"fields":[{"name":"drainStatus"}]
|
||||
},
|
||||
{
|
||||
"name":"com.github.benmanes.caffeine.cache.BaseMpscLinkedArrayQueueColdProducerFields",
|
||||
"fields":[{"name":"producerLimit"}]
|
||||
},
|
||||
{
|
||||
"name":"com.github.benmanes.caffeine.cache.BaseMpscLinkedArrayQueueConsumerFields",
|
||||
"fields":[{"name":"consumerIndex"}]
|
||||
},
|
||||
{
|
||||
"name":"com.github.benmanes.caffeine.cache.BaseMpscLinkedArrayQueueProducerFields",
|
||||
"fields":[{"name":"producerIndex"}]
|
||||
},
|
||||
{
|
||||
"name":"com.github.benmanes.caffeine.cache.BoundedLocalCache",
|
||||
"fields":[{"name":"refreshes"}]
|
||||
},
|
||||
{
|
||||
"name":"com.github.benmanes.caffeine.cache.PS",
|
||||
"fields":[{"name":"key"}, {"name":"value"}]
|
||||
},
|
||||
{
|
||||
"name":"com.github.benmanes.caffeine.cache.PSW",
|
||||
"fields":[{"name":"writeTime"}]
|
||||
},
|
||||
{
|
||||
"name":"com.github.benmanes.caffeine.cache.PSWMS",
|
||||
"methods":[{"name":"<init>","parameterTypes":[] }]
|
||||
},
|
||||
{
|
||||
"name":"com.github.benmanes.caffeine.cache.SSMSW",
|
||||
"fields":[{"name":"FACTORY"}]
|
||||
},
|
||||
{
|
||||
"name":"com.github.benmanes.caffeine.cache.StripedBuffer",
|
||||
"fields":[{"name":"tableBusy"}]
|
||||
},
|
||||
{
|
||||
"name":"com.github.luben.zstd.Zstd"
|
||||
},
|
||||
@@ -588,6 +628,9 @@
|
||||
"name":"net.woggioni.rbcs.server.exception.ExceptionHandler",
|
||||
"methods":[{"name":"exceptionCaught","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Throwable"] }]
|
||||
},
|
||||
{
|
||||
"name":"net.woggioni.rbcs.server.handler.BlackHoleRequestHandler"
|
||||
},
|
||||
{
|
||||
"name":"net.woggioni.rbcs.server.handler.MaxRequestSizeHandler",
|
||||
"methods":[{"name":"channelRead","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] }]
|
||||
|
@@ -12,6 +12,7 @@ dependencies {
|
||||
implementation catalog.netty.handler
|
||||
implementation catalog.netty.buffer
|
||||
implementation catalog.netty.transport
|
||||
implementation("com.github.ben-manes.caffeine:caffeine:3.2.0")
|
||||
|
||||
api project(':rbcs-common')
|
||||
api project(':rbcs-api')
|
||||
|
@@ -18,6 +18,7 @@ module net.woggioni.rbcs.server {
|
||||
requires net.woggioni.jwo;
|
||||
requires net.woggioni.rbcs.common;
|
||||
requires net.woggioni.rbcs.api;
|
||||
requires com.github.benmanes.caffeine;
|
||||
|
||||
exports net.woggioni.rbcs.server;
|
||||
|
||||
|
@@ -1,16 +1,13 @@
|
||||
package net.woggioni.rbcs.server.cache
|
||||
|
||||
import com.github.benmanes.caffeine.cache.Cache
|
||||
import com.github.benmanes.caffeine.cache.Caffeine
|
||||
import io.netty.buffer.ByteBuf
|
||||
import net.woggioni.rbcs.api.AsyncCloseable
|
||||
import net.woggioni.rbcs.api.CacheValueMetadata
|
||||
import net.woggioni.rbcs.common.createLogger
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
import java.util.PriorityQueue
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock
|
||||
import kotlin.concurrent.withLock
|
||||
|
||||
private class CacheKey(private val value: ByteArray) {
|
||||
override fun equals(other: Any?) = if (other is CacheKey) {
|
||||
@@ -26,118 +23,29 @@ class CacheEntry(
|
||||
)
|
||||
|
||||
class InMemoryCache(
|
||||
private val maxAge: Duration,
|
||||
private val maxSize: Long
|
||||
maxAge: Duration,
|
||||
maxSize: Long
|
||||
) : AsyncCloseable {
|
||||
|
||||
companion object {
|
||||
private val log = createLogger<InMemoryCache>()
|
||||
}
|
||||
|
||||
private var mapSize : Long = 0
|
||||
private val map = HashMap<CacheKey, CacheEntry>()
|
||||
private val lock = ReentrantReadWriteLock()
|
||||
private val cond = lock.writeLock().newCondition()
|
||||
private val cache: Cache<CacheKey, CacheEntry> = Caffeine.newBuilder()
|
||||
.expireAfterWrite(maxAge)
|
||||
.maximumSize(maxSize)
|
||||
.build()
|
||||
override fun asyncClose(): CompletableFuture<Void> = CompletableFuture.completedFuture(null)
|
||||
|
||||
private class RemovalQueueElement(val key: CacheKey, val value: CacheEntry, val expiry: Instant) :
|
||||
Comparable<RemovalQueueElement> {
|
||||
override fun compareTo(other: RemovalQueueElement) = expiry.compareTo(other.expiry)
|
||||
}
|
||||
|
||||
private val removalQueue = PriorityQueue<RemovalQueueElement>()
|
||||
|
||||
@Volatile
|
||||
private var running = true
|
||||
|
||||
private val closeFuture = object : CompletableFuture<Void>() {
|
||||
init {
|
||||
Thread.ofVirtual().name("in-memory-cache-gc").start {
|
||||
try {
|
||||
lock.writeLock().withLock {
|
||||
while (running) {
|
||||
val el = removalQueue.poll()
|
||||
if(el == null) {
|
||||
cond.await(1000, TimeUnit.MILLISECONDS)
|
||||
continue
|
||||
}
|
||||
val value = el.value
|
||||
val now = Instant.now()
|
||||
if (now > el.expiry) {
|
||||
val removed = map.remove(el.key, value)
|
||||
if (removed) {
|
||||
updateSizeAfterRemoval(value.content)
|
||||
//Decrease the reference count for map
|
||||
value.content.release()
|
||||
}
|
||||
} else {
|
||||
removalQueue.offer(el)
|
||||
val interval = minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1))
|
||||
cond.await(interval.toMillis(), TimeUnit.MILLISECONDS)
|
||||
}
|
||||
}
|
||||
map.forEach {
|
||||
it.value.content.release()
|
||||
}
|
||||
map.clear()
|
||||
}
|
||||
complete(null)
|
||||
} catch (ex: Throwable) {
|
||||
completeExceptionally(ex)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun removeEldest(): Long {
|
||||
while (true) {
|
||||
val el = removalQueue.poll() ?: return mapSize
|
||||
val value = el.value
|
||||
val removed = map.remove(el.key, value)
|
||||
if (removed) {
|
||||
val newSize = updateSizeAfterRemoval(value.content)
|
||||
//Decrease the reference count for map
|
||||
value.content.release()
|
||||
return newSize
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun updateSizeAfterRemoval(removed: ByteBuf): Long {
|
||||
mapSize -= removed.readableBytes()
|
||||
return mapSize
|
||||
}
|
||||
|
||||
override fun asyncClose() : CompletableFuture<Void> {
|
||||
running = false
|
||||
lock.writeLock().withLock {
|
||||
cond.signal()
|
||||
}
|
||||
return closeFuture
|
||||
}
|
||||
|
||||
fun get(key: ByteArray) = lock.readLock().withLock {
|
||||
map[CacheKey(key)]?.run {
|
||||
fun get(key: ByteArray) = cache.getIfPresent(CacheKey(key))?.run {
|
||||
CacheEntry(metadata, content.retainedDuplicate())
|
||||
}
|
||||
}
|
||||
|
||||
fun put(
|
||||
key: ByteArray,
|
||||
value: CacheEntry,
|
||||
) {
|
||||
val cacheKey = CacheKey(key)
|
||||
lock.writeLock().withLock {
|
||||
val oldSize = map.put(cacheKey, value)?.let { old ->
|
||||
val result = old.content.readableBytes()
|
||||
old.content.release()
|
||||
result
|
||||
} ?: 0
|
||||
val delta = value.content.readableBytes() - oldSize
|
||||
mapSize += delta
|
||||
removalQueue.offer(RemovalQueueElement(cacheKey, value, Instant.now().plus(maxAge)))
|
||||
while (mapSize > maxSize) {
|
||||
removeEldest()
|
||||
}
|
||||
}
|
||||
cache.put(cacheKey, value)
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user