fixed race condition in InMemoryCache
This commit is contained in:
@@ -1,7 +1,9 @@
|
||||
package net.woggioni.gbcs.common
|
||||
|
||||
import net.woggioni.jwo.JWO
|
||||
import java.net.URI
|
||||
import java.net.URL
|
||||
import java.security.MessageDigest
|
||||
|
||||
object GBCS {
|
||||
fun String.toUrl() : URL = URL.of(URI(this), null)
|
||||
@@ -9,4 +11,19 @@ object GBCS {
|
||||
const val GBCS_NAMESPACE_URI: String = "urn:net.woggioni.gbcs.server"
|
||||
const val GBCS_PREFIX: String = "gbcs"
|
||||
const val XML_SCHEMA_NAMESPACE_URI = "http://www.w3.org/2001/XMLSchema-instance"
|
||||
|
||||
fun digest(
|
||||
data: ByteArray,
|
||||
md: MessageDigest = MessageDigest.getInstance("MD5")
|
||||
): ByteArray {
|
||||
md.update(data)
|
||||
return md.digest()
|
||||
}
|
||||
|
||||
fun digestString(
|
||||
data: ByteArray,
|
||||
md: MessageDigest = MessageDigest.getInstance("MD5")
|
||||
): String {
|
||||
return JWO.bytesToHex(digest(data, md))
|
||||
}
|
||||
}
|
@@ -1,21 +0,0 @@
|
||||
package net.woggioni.gbcs.server.cache
|
||||
|
||||
import net.woggioni.jwo.JWO
|
||||
import java.security.MessageDigest
|
||||
|
||||
object CacheUtils {
|
||||
fun digest(
|
||||
data: ByteArray,
|
||||
md: MessageDigest = MessageDigest.getInstance("MD5")
|
||||
): ByteArray {
|
||||
md.update(data)
|
||||
return md.digest()
|
||||
}
|
||||
|
||||
fun digestString(
|
||||
data: ByteArray,
|
||||
md: MessageDigest = MessageDigest.getInstance("MD5")
|
||||
): String {
|
||||
return JWO.bytesToHex(digest(data, md))
|
||||
}
|
||||
}
|
@@ -1,8 +1,8 @@
|
||||
package net.woggioni.gbcs.server.cache
|
||||
|
||||
import net.woggioni.gbcs.api.Cache
|
||||
import net.woggioni.gbcs.common.GBCS.digestString
|
||||
import net.woggioni.gbcs.common.contextLogger
|
||||
import net.woggioni.gbcs.server.cache.CacheUtils.digestString
|
||||
import net.woggioni.jwo.LockFile
|
||||
import java.nio.channels.Channels
|
||||
import java.nio.channels.FileChannel
|
||||
|
@@ -1,18 +1,15 @@
|
||||
package net.woggioni.gbcs.server.cache
|
||||
|
||||
import net.woggioni.gbcs.api.Cache
|
||||
import net.woggioni.gbcs.server.cache.CacheUtils.digestString
|
||||
import net.woggioni.gbcs.common.GBCS.digestString
|
||||
import java.io.ByteArrayInputStream
|
||||
import java.io.ByteArrayOutputStream
|
||||
import java.nio.ByteBuffer
|
||||
import java.nio.channels.Channels
|
||||
import java.security.MessageDigest
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.PriorityBlockingQueue
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
import java.util.zip.Deflater
|
||||
import java.util.zip.DeflaterOutputStream
|
||||
import java.util.zip.Inflater
|
||||
@@ -25,33 +22,27 @@ class InMemoryCache(
|
||||
val compressionLevel: Int
|
||||
) : Cache {
|
||||
|
||||
private val map = ConcurrentHashMap<String, MapValue>()
|
||||
|
||||
private class MapValue(val rc: AtomicInteger, val payload : AtomicReference<ByteArray>)
|
||||
|
||||
private class RemovalQueueElement(val key: String, val expiry : Instant) : Comparable<RemovalQueueElement> {
|
||||
override fun compareTo(other: RemovalQueueElement)= expiry.compareTo(other.expiry)
|
||||
private val map = ConcurrentHashMap<String, ByteArray>()
|
||||
|
||||
private class RemovalQueueElement(val key: String, val value : ByteArray, val expiry : Instant) : Comparable<RemovalQueueElement> {
|
||||
override fun compareTo(other: RemovalQueueElement) = expiry.compareTo(other.expiry)
|
||||
}
|
||||
|
||||
private val removalQueue = PriorityBlockingQueue<RemovalQueueElement>()
|
||||
|
||||
private var running = true
|
||||
private val garbageCollector = Thread({
|
||||
private val garbageCollector = Thread {
|
||||
while(true) {
|
||||
val el = removalQueue.take()
|
||||
val now = Instant.now()
|
||||
if(now > el.expiry) {
|
||||
val value = map[el.key] ?: continue
|
||||
val rc = value.rc.decrementAndGet()
|
||||
if(rc == 0) {
|
||||
map.remove(el.key)
|
||||
}
|
||||
map.remove(el.key, el.value)
|
||||
} else {
|
||||
removalQueue.put(el)
|
||||
Thread.sleep(minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1)))
|
||||
}
|
||||
}
|
||||
}).apply {
|
||||
}.apply {
|
||||
start()
|
||||
}
|
||||
|
||||
@@ -68,8 +59,6 @@ class InMemoryCache(
|
||||
} ?: key
|
||||
).let { digest ->
|
||||
map[digest]
|
||||
?.let(MapValue::payload)
|
||||
?.let(AtomicReference<ByteArray>::get)
|
||||
?.let { value ->
|
||||
if (compressionEnabled) {
|
||||
val inflater = Inflater()
|
||||
@@ -96,11 +85,8 @@ class InMemoryCache(
|
||||
} else {
|
||||
content
|
||||
}
|
||||
val mapValue = map.computeIfAbsent(digest) {
|
||||
MapValue(AtomicInteger(0), AtomicReference())
|
||||
}
|
||||
mapValue.payload.set(value)
|
||||
removalQueue.put(RemovalQueueElement(digest, Instant.now().plus(maxAge)))
|
||||
map[digest] = value
|
||||
removalQueue.put(RemovalQueueElement(digest, value, Instant.now().plus(maxAge)))
|
||||
}
|
||||
}
|
||||
}
|
@@ -2,9 +2,9 @@ org.gradle.configuration-cache=false
|
||||
org.gradle.parallel=true
|
||||
org.gradle.caching=true
|
||||
|
||||
gbcs.version = 0.0.11
|
||||
gbcs.version = 0.1.1
|
||||
|
||||
lys.version = 2025.01.25
|
||||
lys.version = 2025.01.31
|
||||
|
||||
gitea.maven.url = https://gitea.woggioni.net/api/packages/woggioni/maven
|
||||
docker.registry.url=gitea.woggioni.net
|
||||
|
Reference in New Issue
Block a user