From ba961bd30d378eee40dea758b54ce4fec70b7d96 Mon Sep 17 00:00:00 2001 From: Walter Oggioni Date: Mon, 27 Jan 2025 23:55:59 +0800 Subject: [PATCH] added optional even listener to client API --- gbcs-cli/src/main/java/module-info.java | 1 + .../cli/impl/commands/BenchmarkCommand.kt | 104 ++++++++++++------ .../kotlin/net/woggioni/gbcs/client/Client.kt | 17 ++- .../gbcs/client/RequestEventListener.kt | 10 ++ 4 files changed, 92 insertions(+), 40 deletions(-) create mode 100644 gbcs-client/src/main/kotlin/net/woggioni/gbcs/client/RequestEventListener.kt diff --git a/gbcs-cli/src/main/java/module-info.java b/gbcs-cli/src/main/java/module-info.java index a38f913..adafdb2 100644 --- a/gbcs-cli/src/main/java/module-info.java +++ b/gbcs-cli/src/main/java/module-info.java @@ -7,6 +7,7 @@ module net.woggioni.gbcs.cli { requires kotlin.stdlib; requires net.woggioni.jwo; requires net.woggioni.gbcs.api; + requires io.netty.codec.http; exports net.woggioni.gbcs.cli.impl.converters to info.picocli; opens net.woggioni.gbcs.cli.impl.commands to info.picocli; diff --git a/gbcs-cli/src/main/kotlin/net/woggioni/gbcs/cli/impl/commands/BenchmarkCommand.kt b/gbcs-cli/src/main/kotlin/net/woggioni/gbcs/cli/impl/commands/BenchmarkCommand.kt index 6ad3425..b3f07d0 100644 --- a/gbcs-cli/src/main/kotlin/net/woggioni/gbcs/cli/impl/commands/BenchmarkCommand.kt +++ b/gbcs-cli/src/main/kotlin/net/woggioni/gbcs/cli/impl/commands/BenchmarkCommand.kt @@ -1,10 +1,13 @@ package net.woggioni.gbcs.cli.impl.commands +import io.netty.handler.codec.http.FullHttpRequest +import io.netty.handler.codec.http.FullHttpResponse import net.woggioni.gbcs.common.contextLogger import net.woggioni.gbcs.common.error import net.woggioni.gbcs.common.info import net.woggioni.gbcs.cli.impl.GbcsCommand import net.woggioni.gbcs.client.GradleBuildCacheClient +import net.woggioni.gbcs.client.RequestEventListener import net.woggioni.jwo.JWO import picocli.CommandLine import java.security.SecureRandom @@ -14,6 +17,7 @@ import java.util.Base64 import java.util.concurrent.ExecutionException import java.util.concurrent.Future import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.Semaphore import java.util.concurrent.atomic.AtomicLong import kotlin.random.Random @@ -53,53 +57,87 @@ class BenchmarkCommand : GbcsCommand() { } } + log.info { + "Starting insertion" + } val entries = let { - val completionQueue = LinkedBlockingQueue>>(numberOfEntries) + val completionCounter = AtomicLong(0) + val completionQueue = LinkedBlockingQueue>(numberOfEntries) val start = Instant.now() + val semaphore = Semaphore(profile.maxConnections * 3) val totalElapsedTime = AtomicLong(0) - entryGenerator.take(numberOfEntries).forEach { entry -> - val requestStart = System.nanoTime() - val future = client.put(entry.first, entry.second).thenApply { entry } - future.whenComplete { _, _ -> - totalElapsedTime.addAndGet((System.nanoTime() - requestStart)) - completionQueue.put(future) + val iterator = entryGenerator.take(numberOfEntries).iterator() + while(completionCounter.get() < numberOfEntries) { + if(iterator.hasNext()) { + val entry = iterator.next() + semaphore.acquire() + val eventListener = object : RequestEventListener { + var start: Long? = null + override fun requestSent(req: FullHttpRequest) { + this.start = System.nanoTime() + } + + override fun responseReceived(res: FullHttpResponse) { + this.start?.let { requestStart -> + totalElapsedTime.addAndGet((System.nanoTime() - requestStart)) + } + this.start = null + } + } + val future = client.put(entry.first, entry.second, eventListener).thenApply { entry } + future.whenComplete { result, ex -> + if (ex != null) { + log.error(ex.message, ex) + } else { + completionQueue.put(result) + } + semaphore.release() + completionCounter.incrementAndGet() + } } } - val inserted = sequence> { - var completionCounter = 0 - while (completionCounter < numberOfEntries) { - val future = completionQueue.take() - try { - yield(future.get()) - } catch (ee: ExecutionException) { - val cause = ee.cause ?: ee - log.error(cause.message, cause) - } - completionCounter += 1 - } - }.toList() + val inserted = completionQueue.toList() val end = Instant.now() log.info { val elapsed = Duration.between(start, end).toMillis() - "Insertion rate: ${numberOfEntries.toDouble() / elapsed * 1000} ops/s" + val opsPerSecond = String.format("%.2f", numberOfEntries.toDouble() / elapsed * 1000) + "Insertion rate: $opsPerSecond ops/s" } log.info { - "Average time per insertion: ${totalElapsedTime.get() / numberOfEntries.toDouble() * 1000} ms" + val avgTxTime = String.format("%.0f", totalElapsedTime.get() / numberOfEntries.toDouble() / 1e6) + "Average time per insertion: $avgTxTime ms" } inserted } log.info { "Inserted ${entries.size} entries" } + log.info { + "Starting retrieval" + } if (entries.isNotEmpty()) { - val completionQueue = LinkedBlockingQueue>(entries.size) + val completionCounter = AtomicLong(0) + val semaphore = Semaphore(profile.maxConnections * 3) val start = Instant.now() val totalElapsedTime = AtomicLong(0) entries.forEach { entry -> - val requestStart = System.nanoTime() - val future = client.get(entry.first).thenApply { - totalElapsedTime.addAndGet((System.nanoTime() - requestStart)) + semaphore.acquire() + val eventListener = object : RequestEventListener { + var start : Long? = null + override fun requestSent(req: FullHttpRequest) { + this.start = System.nanoTime() + } + + override fun responseReceived(res: FullHttpResponse) { + this.start?.let { requestStart -> + totalElapsedTime.addAndGet((System.nanoTime() - requestStart)) + } + this.start = null + } + } + + val future = client.get(entry.first, eventListener).thenApply { if (it == null) { log.error { "Missing entry for key '${entry.first}'" @@ -111,21 +149,19 @@ class BenchmarkCommand : GbcsCommand() { } } future.whenComplete { _, _ -> - completionQueue.put(future) + completionCounter.incrementAndGet() + semaphore.release() } } - var completionCounter = 0 - while (completionCounter < entries.size) { - completionQueue.take() - completionCounter += 1 - } val end = Instant.now() log.info { val elapsed = Duration.between(start, end).toMillis() - "Retrieval rate: ${entries.size.toDouble() / elapsed * 1000} ops/s" + val opsPerSecond = String.format("%.2f", entries.size.toDouble() / elapsed * 1000) + "Retrieval rate: $opsPerSecond ops/s" } log.info { - "Average time per retrieval: ${totalElapsedTime.get() / numberOfEntries.toDouble() * 1e6} ms" + val avgTxTime = String.format("%.0f", totalElapsedTime.get() / completionCounter.toDouble() / 1e6) + "Average time per retrieval: $avgTxTime ms" } } else { log.error("Skipping retrieval benchmark as it was not possible to insert any entry in the cache") diff --git a/gbcs-client/src/main/kotlin/net/woggioni/gbcs/client/Client.kt b/gbcs-client/src/main/kotlin/net/woggioni/gbcs/client/Client.kt index 05eb5a4..20a4af9 100644 --- a/gbcs-client/src/main/kotlin/net/woggioni/gbcs/client/Client.kt +++ b/gbcs-client/src/main/kotlin/net/woggioni/gbcs/client/Client.kt @@ -213,9 +213,9 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC } } - fun get(key: String): CompletableFuture { + fun get(key: String, eventListener : RequestEventListener? = null): CompletableFuture { return executeWithRetry { - sendRequest(profile.serverURI.resolve(key), HttpMethod.GET, null) + sendRequest(profile.serverURI.resolve(key), HttpMethod.GET, null, eventListener) }.thenApply { val status = it.status() if (it.status() == HttpResponseStatus.NOT_FOUND) { @@ -234,9 +234,9 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC } } - fun put(key: String, content: ByteArray): CompletableFuture { + fun put(key: String, content: ByteArray, eventListener : RequestEventListener? = null): CompletableFuture { return executeWithRetry { - sendRequest(profile.serverURI.resolve(key), HttpMethod.PUT, content) + sendRequest(profile.serverURI.resolve(key), HttpMethod.PUT, content, eventListener) }.thenApply { val status = it.status() if (it.status() != HttpResponseStatus.CREATED && it.status() != HttpResponseStatus.OK) { @@ -245,7 +245,7 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC } } - private fun sendRequest(uri: URI, method: HttpMethod, body: ByteArray?): CompletableFuture { + private fun sendRequest(uri: URI, method: HttpMethod, body: ByteArray?, eventListener : RequestEventListener?): CompletableFuture { val responseFuture = CompletableFuture() // Custom handler for processing responses pool.acquire().addListener(object : GenericFutureListener> { @@ -261,6 +261,7 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC pipeline.removeLast() pool.release(channel) responseFuture.complete(response) + eventListener?.responseReceived(response) } override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { @@ -306,7 +307,11 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC // Set headers // Send the request - channel.writeAndFlush(request) + channel.writeAndFlush(request).addListener { + if(it.isSuccess) { + eventListener?.requestSent(request) + } + } } else { responseFuture.completeExceptionally(channelFuture.cause()) } diff --git a/gbcs-client/src/main/kotlin/net/woggioni/gbcs/client/RequestEventListener.kt b/gbcs-client/src/main/kotlin/net/woggioni/gbcs/client/RequestEventListener.kt new file mode 100644 index 0000000..d140bea --- /dev/null +++ b/gbcs-client/src/main/kotlin/net/woggioni/gbcs/client/RequestEventListener.kt @@ -0,0 +1,10 @@ +package net.woggioni.gbcs.client + +import io.netty.handler.codec.http.FullHttpRequest +import io.netty.handler.codec.http.FullHttpResponse + +interface RequestEventListener { + fun requestSent(req : FullHttpRequest) {} + fun responseReceived(res : FullHttpResponse) {} + fun exceptionCaught(ex : Throwable) {} +} \ No newline at end of file