From a2a40ab60f0841e2f7bc8bc52f3bbcce5f749e8a Mon Sep 17 00:00:00 2001 From: Walter Oggioni Date: Tue, 28 Jan 2025 00:00:07 +0800 Subject: [PATCH] added semaphore to benchmark command --- .../cli/impl/commands/BenchmarkCommand.kt | 82 +++++++++---------- 1 file changed, 38 insertions(+), 44 deletions(-) diff --git a/gbcs-cli/src/main/kotlin/net/woggioni/gbcs/cli/impl/commands/BenchmarkCommand.kt b/gbcs-cli/src/main/kotlin/net/woggioni/gbcs/cli/impl/commands/BenchmarkCommand.kt index 6ad3425..5b7aac1 100644 --- a/gbcs-cli/src/main/kotlin/net/woggioni/gbcs/cli/impl/commands/BenchmarkCommand.kt +++ b/gbcs-cli/src/main/kotlin/net/woggioni/gbcs/cli/impl/commands/BenchmarkCommand.kt @@ -1,19 +1,17 @@ package net.woggioni.gbcs.cli.impl.commands +import net.woggioni.gbcs.cli.impl.GbcsCommand +import net.woggioni.gbcs.client.GradleBuildCacheClient import net.woggioni.gbcs.common.contextLogger import net.woggioni.gbcs.common.error import net.woggioni.gbcs.common.info -import net.woggioni.gbcs.cli.impl.GbcsCommand -import net.woggioni.gbcs.client.GradleBuildCacheClient import net.woggioni.jwo.JWO import picocli.CommandLine import java.security.SecureRandom import java.time.Duration import java.time.Instant -import java.util.Base64 -import java.util.concurrent.ExecutionException -import java.util.concurrent.Future import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.Semaphore import java.util.concurrent.atomic.AtomicLong import kotlin.random.Random @@ -53,53 +51,55 @@ class BenchmarkCommand : GbcsCommand() { } } + log.info { + "Starting insertion" + } val entries = let { - val completionQueue = LinkedBlockingQueue>>(numberOfEntries) + val completionCounter = AtomicLong(0) + val completionQueue = LinkedBlockingQueue>(numberOfEntries) val start = Instant.now() - val totalElapsedTime = AtomicLong(0) - entryGenerator.take(numberOfEntries).forEach { entry -> - val requestStart = System.nanoTime() - val future = client.put(entry.first, entry.second).thenApply { entry } - future.whenComplete { _, _ -> - totalElapsedTime.addAndGet((System.nanoTime() - requestStart)) - completionQueue.put(future) + val semaphore = Semaphore(profile.maxConnections * 3) + val iterator = entryGenerator.take(numberOfEntries).iterator() + while(completionCounter.get() < numberOfEntries) { + if(iterator.hasNext()) { + val entry = iterator.next() + semaphore.acquire() + val future = client.put(entry.first, entry.second).thenApply { entry } + future.whenComplete { result, ex -> + if (ex != null) { + log.error(ex.message, ex) + } else { + completionQueue.put(result) + } + semaphore.release() + completionCounter.incrementAndGet() + } } } - val inserted = sequence> { - var completionCounter = 0 - while (completionCounter < numberOfEntries) { - val future = completionQueue.take() - try { - yield(future.get()) - } catch (ee: ExecutionException) { - val cause = ee.cause ?: ee - log.error(cause.message, cause) - } - completionCounter += 1 - } - }.toList() + val inserted = completionQueue.toList() val end = Instant.now() log.info { val elapsed = Duration.between(start, end).toMillis() - "Insertion rate: ${numberOfEntries.toDouble() / elapsed * 1000} ops/s" - } - log.info { - "Average time per insertion: ${totalElapsedTime.get() / numberOfEntries.toDouble() * 1000} ms" + val opsPerSecond = String.format("%.2f", numberOfEntries.toDouble() / elapsed * 1000) + "Insertion rate: $opsPerSecond ops/s" } inserted } log.info { "Inserted ${entries.size} entries" } + log.info { + "Starting retrieval" + } if (entries.isNotEmpty()) { - val completionQueue = LinkedBlockingQueue>(entries.size) + val completionCounter = AtomicLong(0) + val semaphore = Semaphore(profile.maxConnections * 3) val start = Instant.now() - val totalElapsedTime = AtomicLong(0) entries.forEach { entry -> - val requestStart = System.nanoTime() + semaphore.acquire() + val future = client.get(entry.first).thenApply { - totalElapsedTime.addAndGet((System.nanoTime() - requestStart)) if (it == null) { log.error { "Missing entry for key '${entry.first}'" @@ -111,21 +111,15 @@ class BenchmarkCommand : GbcsCommand() { } } future.whenComplete { _, _ -> - completionQueue.put(future) + completionCounter.incrementAndGet() + semaphore.release() } } - var completionCounter = 0 - while (completionCounter < entries.size) { - completionQueue.take() - completionCounter += 1 - } val end = Instant.now() log.info { val elapsed = Duration.between(start, end).toMillis() - "Retrieval rate: ${entries.size.toDouble() / elapsed * 1000} ops/s" - } - log.info { - "Average time per retrieval: ${totalElapsedTime.get() / numberOfEntries.toDouble() * 1e6} ms" + val opsPerSecond = String.format("%.2f", entries.size.toDouble() / elapsed * 1000) + "Retrieval rate: $opsPerSecond ops/s" } } else { log.error("Skipping retrieval benchmark as it was not possible to insert any entry in the cache")