added semaphore to benchmark command

This commit is contained in:
2025-01-28 00:00:07 +08:00
parent 45458761f3
commit a2a40ab60f

View File

@@ -1,19 +1,17 @@
package net.woggioni.gbcs.cli.impl.commands package net.woggioni.gbcs.cli.impl.commands
import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.client.GradleBuildCacheClient
import net.woggioni.gbcs.common.contextLogger import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.common.error import net.woggioni.gbcs.common.error
import net.woggioni.gbcs.common.info import net.woggioni.gbcs.common.info
import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.client.GradleBuildCacheClient
import net.woggioni.jwo.JWO import net.woggioni.jwo.JWO
import picocli.CommandLine import picocli.CommandLine
import java.security.SecureRandom import java.security.SecureRandom
import java.time.Duration import java.time.Duration
import java.time.Instant import java.time.Instant
import java.util.Base64
import java.util.concurrent.ExecutionException
import java.util.concurrent.Future
import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.Semaphore
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import kotlin.random.Random import kotlin.random.Random
@@ -53,53 +51,55 @@ class BenchmarkCommand : GbcsCommand() {
} }
} }
log.info {
"Starting insertion"
}
val entries = let { val entries = let {
val completionQueue = LinkedBlockingQueue<Future<Pair<String, ByteArray>>>(numberOfEntries) val completionCounter = AtomicLong(0)
val completionQueue = LinkedBlockingQueue<Pair<String, ByteArray>>(numberOfEntries)
val start = Instant.now() val start = Instant.now()
val totalElapsedTime = AtomicLong(0) val semaphore = Semaphore(profile.maxConnections * 3)
entryGenerator.take(numberOfEntries).forEach { entry -> val iterator = entryGenerator.take(numberOfEntries).iterator()
val requestStart = System.nanoTime() while(completionCounter.get() < numberOfEntries) {
val future = client.put(entry.first, entry.second).thenApply { entry } if(iterator.hasNext()) {
future.whenComplete { _, _ -> val entry = iterator.next()
totalElapsedTime.addAndGet((System.nanoTime() - requestStart)) semaphore.acquire()
completionQueue.put(future) val future = client.put(entry.first, entry.second).thenApply { entry }
future.whenComplete { result, ex ->
if (ex != null) {
log.error(ex.message, ex)
} else {
completionQueue.put(result)
}
semaphore.release()
completionCounter.incrementAndGet()
}
} }
} }
val inserted = sequence<Pair<String, ByteArray>> { val inserted = completionQueue.toList()
var completionCounter = 0
while (completionCounter < numberOfEntries) {
val future = completionQueue.take()
try {
yield(future.get())
} catch (ee: ExecutionException) {
val cause = ee.cause ?: ee
log.error(cause.message, cause)
}
completionCounter += 1
}
}.toList()
val end = Instant.now() val end = Instant.now()
log.info { log.info {
val elapsed = Duration.between(start, end).toMillis() val elapsed = Duration.between(start, end).toMillis()
"Insertion rate: ${numberOfEntries.toDouble() / elapsed * 1000} ops/s" val opsPerSecond = String.format("%.2f", numberOfEntries.toDouble() / elapsed * 1000)
} "Insertion rate: $opsPerSecond ops/s"
log.info {
"Average time per insertion: ${totalElapsedTime.get() / numberOfEntries.toDouble() * 1000} ms"
} }
inserted inserted
} }
log.info { log.info {
"Inserted ${entries.size} entries" "Inserted ${entries.size} entries"
} }
log.info {
"Starting retrieval"
}
if (entries.isNotEmpty()) { if (entries.isNotEmpty()) {
val completionQueue = LinkedBlockingQueue<Future<Unit>>(entries.size) val completionCounter = AtomicLong(0)
val semaphore = Semaphore(profile.maxConnections * 3)
val start = Instant.now() val start = Instant.now()
val totalElapsedTime = AtomicLong(0)
entries.forEach { entry -> entries.forEach { entry ->
val requestStart = System.nanoTime() semaphore.acquire()
val future = client.get(entry.first).thenApply { val future = client.get(entry.first).thenApply {
totalElapsedTime.addAndGet((System.nanoTime() - requestStart))
if (it == null) { if (it == null) {
log.error { log.error {
"Missing entry for key '${entry.first}'" "Missing entry for key '${entry.first}'"
@@ -111,21 +111,15 @@ class BenchmarkCommand : GbcsCommand() {
} }
} }
future.whenComplete { _, _ -> future.whenComplete { _, _ ->
completionQueue.put(future) completionCounter.incrementAndGet()
semaphore.release()
} }
} }
var completionCounter = 0
while (completionCounter < entries.size) {
completionQueue.take()
completionCounter += 1
}
val end = Instant.now() val end = Instant.now()
log.info { log.info {
val elapsed = Duration.between(start, end).toMillis() val elapsed = Duration.between(start, end).toMillis()
"Retrieval rate: ${entries.size.toDouble() / elapsed * 1000} ops/s" val opsPerSecond = String.format("%.2f", entries.size.toDouble() / elapsed * 1000)
} "Retrieval rate: $opsPerSecond ops/s"
log.info {
"Average time per retrieval: ${totalElapsedTime.get() / numberOfEntries.toDouble() * 1e6} ms"
} }
} else { } else {
log.error("Skipping retrieval benchmark as it was not possible to insert any entry in the cache") log.error("Skipping retrieval benchmark as it was not possible to insert any entry in the cache")