diff --git a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCache.kt b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCache.kt index add8bab..416015c 100644 --- a/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCache.kt +++ b/rbcs-server/src/main/kotlin/net/woggioni/rbcs/server/cache/InMemoryCache.kt @@ -6,11 +6,11 @@ import net.woggioni.rbcs.api.CacheValueMetadata import net.woggioni.rbcs.common.createLogger import java.time.Duration import java.time.Instant +import java.util.PriorityQueue import java.util.concurrent.CompletableFuture -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.PriorityBlockingQueue import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.locks.ReentrantReadWriteLock +import kotlin.concurrent.withLock private class CacheKey(private val value: ByteArray) { override fun equals(other: Any?) = if (other is CacheKey) { @@ -34,15 +34,17 @@ class InMemoryCache( private val log = createLogger() } - private val size = AtomicLong() - private val map = ConcurrentHashMap() + private var mapSize : Long = 0 + private val map = HashMap() + private val lock = ReentrantReadWriteLock() + private val cond = lock.writeLock().newCondition() private class RemovalQueueElement(val key: CacheKey, val value: CacheEntry, val expiry: Instant) : Comparable { override fun compareTo(other: RemovalQueueElement) = expiry.compareTo(other.expiry) } - private val removalQueue = PriorityBlockingQueue() + private val removalQueue = PriorityQueue() @Volatile private var running = true @@ -51,20 +53,27 @@ class InMemoryCache( init { Thread.ofVirtual().name("in-memory-cache-gc").start { try { - while (running) { - val el = removalQueue.poll(1, TimeUnit.SECONDS) ?: continue - val value = el.value - val now = Instant.now() - if (now > el.expiry) { - val removed = map.remove(el.key, value) - if (removed) { - updateSizeAfterRemoval(value.content) - //Decrease the reference count for map - value.content.release() + lock.writeLock().withLock { + while (running) { + val el = removalQueue.poll() + if(el == null) { + cond.await(1000, TimeUnit.MILLISECONDS) + continue + } + val value = el.value + val now = Instant.now() + if (now > el.expiry) { + val removed = map.remove(el.key, value) + if (removed) { + updateSizeAfterRemoval(value.content) + //Decrease the reference count for map + value.content.release() + } + } else { + removalQueue.offer(el) + val interval = minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1)) + cond.await(interval.toMillis(), TimeUnit.MILLISECONDS) } - } else { - removalQueue.put(el) - Thread.sleep(minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1))) } } complete(null) @@ -77,7 +86,7 @@ class InMemoryCache( fun removeEldest(): Long { while (true) { - val el = removalQueue.take() + val el = removalQueue.poll() ?: return mapSize val value = el.value val removed = map.remove(el.key, value) if (removed) { @@ -90,18 +99,22 @@ class InMemoryCache( } private fun updateSizeAfterRemoval(removed: ByteBuf): Long { - return size.updateAndGet { currentSize: Long -> - currentSize - removed.readableBytes() - } + mapSize -= removed.readableBytes() + return mapSize } override fun asyncClose() : CompletableFuture { running = false + lock.writeLock().withLock { + cond.signal() + } return closeFuture } - fun get(key: ByteArray) = map[CacheKey(key)]?.run { - CacheEntry(metadata, content.retainedDuplicate()) + fun get(key: ByteArray) = lock.readLock().withLock { + map[CacheKey(key)]?.run { + CacheEntry(metadata, content.retainedDuplicate()) + } } fun put( @@ -109,18 +122,18 @@ class InMemoryCache( value: CacheEntry, ) { val cacheKey = CacheKey(key) - val oldSize = map.put(cacheKey, value)?.let { old -> - val result = old.content.readableBytes() - old.content.release() - result - } ?: 0 - val delta = value.content.readableBytes() - oldSize - var newSize = size.updateAndGet { currentSize: Long -> - currentSize + delta - } - removalQueue.put(RemovalQueueElement(cacheKey, value, Instant.now().plus(maxAge))) - while (newSize > maxSize) { - newSize = removeEldest() + lock.writeLock().withLock { + val oldSize = map.put(cacheKey, value)?.let { old -> + val result = old.content.readableBytes() + old.content.release() + result + } ?: 0 + val delta = value.content.readableBytes() - oldSize + mapSize += delta + removalQueue.offer(RemovalQueueElement(cacheKey, value, Instant.now().plus(maxAge))) + while (mapSize > maxSize) { + removeEldest() + } } } } \ No newline at end of file