simplified InMemoryCache implementation

This commit is contained in:
2025-03-03 09:44:37 +08:00
parent d64f7f4f27
commit 31ce34cddb

View File

@@ -6,11 +6,11 @@ import net.woggioni.rbcs.api.CacheValueMetadata
import net.woggioni.rbcs.common.createLogger
import java.time.Duration
import java.time.Instant
import java.util.PriorityQueue
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.PriorityBlockingQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.concurrent.withLock
private class CacheKey(private val value: ByteArray) {
override fun equals(other: Any?) = if (other is CacheKey) {
@@ -34,15 +34,17 @@ class InMemoryCache(
private val log = createLogger<InMemoryCache>()
}
private val size = AtomicLong()
private val map = ConcurrentHashMap<CacheKey, CacheEntry>()
private var mapSize : Long = 0
private val map = HashMap<CacheKey, CacheEntry>()
private val lock = ReentrantReadWriteLock()
private val cond = lock.writeLock().newCondition()
private class RemovalQueueElement(val key: CacheKey, val value: CacheEntry, val expiry: Instant) :
Comparable<RemovalQueueElement> {
override fun compareTo(other: RemovalQueueElement) = expiry.compareTo(other.expiry)
}
private val removalQueue = PriorityBlockingQueue<RemovalQueueElement>()
private val removalQueue = PriorityQueue<RemovalQueueElement>()
@Volatile
private var running = true
@@ -51,20 +53,27 @@ class InMemoryCache(
init {
Thread.ofVirtual().name("in-memory-cache-gc").start {
try {
while (running) {
val el = removalQueue.poll(1, TimeUnit.SECONDS) ?: continue
val value = el.value
val now = Instant.now()
if (now > el.expiry) {
val removed = map.remove(el.key, value)
if (removed) {
updateSizeAfterRemoval(value.content)
//Decrease the reference count for map
value.content.release()
lock.writeLock().withLock {
while (running) {
val el = removalQueue.poll()
if(el == null) {
cond.await(1000, TimeUnit.MILLISECONDS)
continue
}
val value = el.value
val now = Instant.now()
if (now > el.expiry) {
val removed = map.remove(el.key, value)
if (removed) {
updateSizeAfterRemoval(value.content)
//Decrease the reference count for map
value.content.release()
}
} else {
removalQueue.offer(el)
val interval = minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1))
cond.await(interval.toMillis(), TimeUnit.MILLISECONDS)
}
} else {
removalQueue.put(el)
Thread.sleep(minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1)))
}
}
complete(null)
@@ -77,7 +86,7 @@ class InMemoryCache(
fun removeEldest(): Long {
while (true) {
val el = removalQueue.take()
val el = removalQueue.poll() ?: return mapSize
val value = el.value
val removed = map.remove(el.key, value)
if (removed) {
@@ -90,18 +99,22 @@ class InMemoryCache(
}
private fun updateSizeAfterRemoval(removed: ByteBuf): Long {
return size.updateAndGet { currentSize: Long ->
currentSize - removed.readableBytes()
}
mapSize -= removed.readableBytes()
return mapSize
}
override fun asyncClose() : CompletableFuture<Void> {
running = false
lock.writeLock().withLock {
cond.signal()
}
return closeFuture
}
fun get(key: ByteArray) = map[CacheKey(key)]?.run {
CacheEntry(metadata, content.retainedDuplicate())
fun get(key: ByteArray) = lock.readLock().withLock {
map[CacheKey(key)]?.run {
CacheEntry(metadata, content.retainedDuplicate())
}
}
fun put(
@@ -109,18 +122,18 @@ class InMemoryCache(
value: CacheEntry,
) {
val cacheKey = CacheKey(key)
val oldSize = map.put(cacheKey, value)?.let { old ->
val result = old.content.readableBytes()
old.content.release()
result
} ?: 0
val delta = value.content.readableBytes() - oldSize
var newSize = size.updateAndGet { currentSize: Long ->
currentSize + delta
}
removalQueue.put(RemovalQueueElement(cacheKey, value, Instant.now().plus(maxAge)))
while (newSize > maxSize) {
newSize = removeEldest()
lock.writeLock().withLock {
val oldSize = map.put(cacheKey, value)?.let { old ->
val result = old.content.readableBytes()
old.content.release()
result
} ?: 0
val delta = value.content.readableBytes() - oldSize
mapSize += delta
removalQueue.offer(RemovalQueueElement(cacheKey, value, Instant.now().plus(maxAge)))
while (mapSize > maxSize) {
removeEldest()
}
}
}
}