Compare commits
1 Commits
0.3.0-SNAP
...
caffeine
Author | SHA1 | Date | |
---|---|---|---|
19965dd3b0
|
@@ -43,6 +43,46 @@
|
|||||||
{
|
{
|
||||||
"name":"com.aayushatharva.brotli4j.Brotli4jLoader"
|
"name":"com.aayushatharva.brotli4j.Brotli4jLoader"
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"name":"com.github.benmanes.caffeine.cache.BLCHeader$DrainStatusRef",
|
||||||
|
"fields":[{"name":"drainStatus"}]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name":"com.github.benmanes.caffeine.cache.BaseMpscLinkedArrayQueueColdProducerFields",
|
||||||
|
"fields":[{"name":"producerLimit"}]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name":"com.github.benmanes.caffeine.cache.BaseMpscLinkedArrayQueueConsumerFields",
|
||||||
|
"fields":[{"name":"consumerIndex"}]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name":"com.github.benmanes.caffeine.cache.BaseMpscLinkedArrayQueueProducerFields",
|
||||||
|
"fields":[{"name":"producerIndex"}]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name":"com.github.benmanes.caffeine.cache.BoundedLocalCache",
|
||||||
|
"fields":[{"name":"refreshes"}]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name":"com.github.benmanes.caffeine.cache.PS",
|
||||||
|
"fields":[{"name":"key"}, {"name":"value"}]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name":"com.github.benmanes.caffeine.cache.PSW",
|
||||||
|
"fields":[{"name":"writeTime"}]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name":"com.github.benmanes.caffeine.cache.PSWMS",
|
||||||
|
"methods":[{"name":"<init>","parameterTypes":[] }]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name":"com.github.benmanes.caffeine.cache.SSMSW",
|
||||||
|
"fields":[{"name":"FACTORY"}]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name":"com.github.benmanes.caffeine.cache.StripedBuffer",
|
||||||
|
"fields":[{"name":"tableBusy"}]
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"name":"com.github.luben.zstd.Zstd"
|
"name":"com.github.luben.zstd.Zstd"
|
||||||
},
|
},
|
||||||
@@ -588,6 +628,9 @@
|
|||||||
"name":"net.woggioni.rbcs.server.exception.ExceptionHandler",
|
"name":"net.woggioni.rbcs.server.exception.ExceptionHandler",
|
||||||
"methods":[{"name":"exceptionCaught","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Throwable"] }]
|
"methods":[{"name":"exceptionCaught","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Throwable"] }]
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"name":"net.woggioni.rbcs.server.handler.BlackHoleRequestHandler"
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"name":"net.woggioni.rbcs.server.handler.MaxRequestSizeHandler",
|
"name":"net.woggioni.rbcs.server.handler.MaxRequestSizeHandler",
|
||||||
"methods":[{"name":"channelRead","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] }]
|
"methods":[{"name":"channelRead","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] }]
|
||||||
|
@@ -12,6 +12,7 @@ dependencies {
|
|||||||
implementation catalog.netty.handler
|
implementation catalog.netty.handler
|
||||||
implementation catalog.netty.buffer
|
implementation catalog.netty.buffer
|
||||||
implementation catalog.netty.transport
|
implementation catalog.netty.transport
|
||||||
|
implementation("com.github.ben-manes.caffeine:caffeine:3.2.0")
|
||||||
|
|
||||||
api project(':rbcs-common')
|
api project(':rbcs-common')
|
||||||
api project(':rbcs-api')
|
api project(':rbcs-api')
|
||||||
|
@@ -18,6 +18,7 @@ module net.woggioni.rbcs.server {
|
|||||||
requires net.woggioni.jwo;
|
requires net.woggioni.jwo;
|
||||||
requires net.woggioni.rbcs.common;
|
requires net.woggioni.rbcs.common;
|
||||||
requires net.woggioni.rbcs.api;
|
requires net.woggioni.rbcs.api;
|
||||||
|
requires com.github.benmanes.caffeine;
|
||||||
|
|
||||||
exports net.woggioni.rbcs.server;
|
exports net.woggioni.rbcs.server;
|
||||||
|
|
||||||
|
@@ -1,16 +1,13 @@
|
|||||||
package net.woggioni.rbcs.server.cache
|
package net.woggioni.rbcs.server.cache
|
||||||
|
|
||||||
|
import com.github.benmanes.caffeine.cache.Cache
|
||||||
|
import com.github.benmanes.caffeine.cache.Caffeine
|
||||||
import io.netty.buffer.ByteBuf
|
import io.netty.buffer.ByteBuf
|
||||||
import net.woggioni.rbcs.api.AsyncCloseable
|
import net.woggioni.rbcs.api.AsyncCloseable
|
||||||
import net.woggioni.rbcs.api.CacheValueMetadata
|
import net.woggioni.rbcs.api.CacheValueMetadata
|
||||||
import net.woggioni.rbcs.common.createLogger
|
import net.woggioni.rbcs.common.createLogger
|
||||||
import java.time.Duration
|
import java.time.Duration
|
||||||
import java.time.Instant
|
|
||||||
import java.util.PriorityQueue
|
|
||||||
import java.util.concurrent.CompletableFuture
|
import java.util.concurrent.CompletableFuture
|
||||||
import java.util.concurrent.TimeUnit
|
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock
|
|
||||||
import kotlin.concurrent.withLock
|
|
||||||
|
|
||||||
private class CacheKey(private val value: ByteArray) {
|
private class CacheKey(private val value: ByteArray) {
|
||||||
override fun equals(other: Any?) = if (other is CacheKey) {
|
override fun equals(other: Any?) = if (other is CacheKey) {
|
||||||
@@ -26,118 +23,29 @@ class CacheEntry(
|
|||||||
)
|
)
|
||||||
|
|
||||||
class InMemoryCache(
|
class InMemoryCache(
|
||||||
private val maxAge: Duration,
|
maxAge: Duration,
|
||||||
private val maxSize: Long
|
maxSize: Long
|
||||||
) : AsyncCloseable {
|
) : AsyncCloseable {
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
private val log = createLogger<InMemoryCache>()
|
private val log = createLogger<InMemoryCache>()
|
||||||
}
|
}
|
||||||
|
|
||||||
private var mapSize : Long = 0
|
private val cache: Cache<CacheKey, CacheEntry> = Caffeine.newBuilder()
|
||||||
private val map = HashMap<CacheKey, CacheEntry>()
|
.expireAfterWrite(maxAge)
|
||||||
private val lock = ReentrantReadWriteLock()
|
.maximumSize(maxSize)
|
||||||
private val cond = lock.writeLock().newCondition()
|
.build()
|
||||||
|
override fun asyncClose(): CompletableFuture<Void> = CompletableFuture.completedFuture(null)
|
||||||
|
|
||||||
private class RemovalQueueElement(val key: CacheKey, val value: CacheEntry, val expiry: Instant) :
|
fun get(key: ByteArray) = cache.getIfPresent(CacheKey(key))?.run {
|
||||||
Comparable<RemovalQueueElement> {
|
|
||||||
override fun compareTo(other: RemovalQueueElement) = expiry.compareTo(other.expiry)
|
|
||||||
}
|
|
||||||
|
|
||||||
private val removalQueue = PriorityQueue<RemovalQueueElement>()
|
|
||||||
|
|
||||||
@Volatile
|
|
||||||
private var running = true
|
|
||||||
|
|
||||||
private val closeFuture = object : CompletableFuture<Void>() {
|
|
||||||
init {
|
|
||||||
Thread.ofVirtual().name("in-memory-cache-gc").start {
|
|
||||||
try {
|
|
||||||
lock.writeLock().withLock {
|
|
||||||
while (running) {
|
|
||||||
val el = removalQueue.poll()
|
|
||||||
if(el == null) {
|
|
||||||
cond.await(1000, TimeUnit.MILLISECONDS)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
val value = el.value
|
|
||||||
val now = Instant.now()
|
|
||||||
if (now > el.expiry) {
|
|
||||||
val removed = map.remove(el.key, value)
|
|
||||||
if (removed) {
|
|
||||||
updateSizeAfterRemoval(value.content)
|
|
||||||
//Decrease the reference count for map
|
|
||||||
value.content.release()
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
removalQueue.offer(el)
|
|
||||||
val interval = minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1))
|
|
||||||
cond.await(interval.toMillis(), TimeUnit.MILLISECONDS)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
map.forEach {
|
|
||||||
it.value.content.release()
|
|
||||||
}
|
|
||||||
map.clear()
|
|
||||||
}
|
|
||||||
complete(null)
|
|
||||||
} catch (ex: Throwable) {
|
|
||||||
completeExceptionally(ex)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fun removeEldest(): Long {
|
|
||||||
while (true) {
|
|
||||||
val el = removalQueue.poll() ?: return mapSize
|
|
||||||
val value = el.value
|
|
||||||
val removed = map.remove(el.key, value)
|
|
||||||
if (removed) {
|
|
||||||
val newSize = updateSizeAfterRemoval(value.content)
|
|
||||||
//Decrease the reference count for map
|
|
||||||
value.content.release()
|
|
||||||
return newSize
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun updateSizeAfterRemoval(removed: ByteBuf): Long {
|
|
||||||
mapSize -= removed.readableBytes()
|
|
||||||
return mapSize
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun asyncClose() : CompletableFuture<Void> {
|
|
||||||
running = false
|
|
||||||
lock.writeLock().withLock {
|
|
||||||
cond.signal()
|
|
||||||
}
|
|
||||||
return closeFuture
|
|
||||||
}
|
|
||||||
|
|
||||||
fun get(key: ByteArray) = lock.readLock().withLock {
|
|
||||||
map[CacheKey(key)]?.run {
|
|
||||||
CacheEntry(metadata, content.retainedDuplicate())
|
CacheEntry(metadata, content.retainedDuplicate())
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
fun put(
|
fun put(
|
||||||
key: ByteArray,
|
key: ByteArray,
|
||||||
value: CacheEntry,
|
value: CacheEntry,
|
||||||
) {
|
) {
|
||||||
val cacheKey = CacheKey(key)
|
val cacheKey = CacheKey(key)
|
||||||
lock.writeLock().withLock {
|
cache.put(cacheKey, value)
|
||||||
val oldSize = map.put(cacheKey, value)?.let { old ->
|
|
||||||
val result = old.content.readableBytes()
|
|
||||||
old.content.release()
|
|
||||||
result
|
|
||||||
} ?: 0
|
|
||||||
val delta = value.content.readableBytes() - oldSize
|
|
||||||
mapSize += delta
|
|
||||||
removalQueue.offer(RemovalQueueElement(cacheKey, value, Instant.now().plus(maxAge)))
|
|
||||||
while (mapSize > maxSize) {
|
|
||||||
removeEldest()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
Reference in New Issue
Block a user