added optional even listener to client API

This commit is contained in:
2025-01-27 23:55:59 +08:00
parent 45458761f3
commit ba961bd30d
4 changed files with 92 additions and 40 deletions

View File

@@ -7,6 +7,7 @@ module net.woggioni.gbcs.cli {
requires kotlin.stdlib;
requires net.woggioni.jwo;
requires net.woggioni.gbcs.api;
requires io.netty.codec.http;
exports net.woggioni.gbcs.cli.impl.converters to info.picocli;
opens net.woggioni.gbcs.cli.impl.commands to info.picocli;

View File

@@ -1,10 +1,13 @@
package net.woggioni.gbcs.cli.impl.commands
import io.netty.handler.codec.http.FullHttpRequest
import io.netty.handler.codec.http.FullHttpResponse
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.common.error
import net.woggioni.gbcs.common.info
import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.client.GradleBuildCacheClient
import net.woggioni.gbcs.client.RequestEventListener
import net.woggioni.jwo.JWO
import picocli.CommandLine
import java.security.SecureRandom
@@ -14,6 +17,7 @@ import java.util.Base64
import java.util.concurrent.ExecutionException
import java.util.concurrent.Future
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.Semaphore
import java.util.concurrent.atomic.AtomicLong
import kotlin.random.Random
@@ -53,53 +57,87 @@ class BenchmarkCommand : GbcsCommand() {
}
}
log.info {
"Starting insertion"
}
val entries = let {
val completionQueue = LinkedBlockingQueue<Future<Pair<String, ByteArray>>>(numberOfEntries)
val completionCounter = AtomicLong(0)
val completionQueue = LinkedBlockingQueue<Pair<String, ByteArray>>(numberOfEntries)
val start = Instant.now()
val semaphore = Semaphore(profile.maxConnections * 3)
val totalElapsedTime = AtomicLong(0)
entryGenerator.take(numberOfEntries).forEach { entry ->
val requestStart = System.nanoTime()
val future = client.put(entry.first, entry.second).thenApply { entry }
future.whenComplete { _, _ ->
val iterator = entryGenerator.take(numberOfEntries).iterator()
while(completionCounter.get() < numberOfEntries) {
if(iterator.hasNext()) {
val entry = iterator.next()
semaphore.acquire()
val eventListener = object : RequestEventListener {
var start: Long? = null
override fun requestSent(req: FullHttpRequest) {
this.start = System.nanoTime()
}
override fun responseReceived(res: FullHttpResponse) {
this.start?.let { requestStart ->
totalElapsedTime.addAndGet((System.nanoTime() - requestStart))
completionQueue.put(future)
}
this.start = null
}
}
val future = client.put(entry.first, entry.second, eventListener).thenApply { entry }
future.whenComplete { result, ex ->
if (ex != null) {
log.error(ex.message, ex)
} else {
completionQueue.put(result)
}
semaphore.release()
completionCounter.incrementAndGet()
}
}
}
val inserted = sequence<Pair<String, ByteArray>> {
var completionCounter = 0
while (completionCounter < numberOfEntries) {
val future = completionQueue.take()
try {
yield(future.get())
} catch (ee: ExecutionException) {
val cause = ee.cause ?: ee
log.error(cause.message, cause)
}
completionCounter += 1
}
}.toList()
val inserted = completionQueue.toList()
val end = Instant.now()
log.info {
val elapsed = Duration.between(start, end).toMillis()
"Insertion rate: ${numberOfEntries.toDouble() / elapsed * 1000} ops/s"
val opsPerSecond = String.format("%.2f", numberOfEntries.toDouble() / elapsed * 1000)
"Insertion rate: $opsPerSecond ops/s"
}
log.info {
"Average time per insertion: ${totalElapsedTime.get() / numberOfEntries.toDouble() * 1000} ms"
val avgTxTime = String.format("%.0f", totalElapsedTime.get() / numberOfEntries.toDouble() / 1e6)
"Average time per insertion: $avgTxTime ms"
}
inserted
}
log.info {
"Inserted ${entries.size} entries"
}
log.info {
"Starting retrieval"
}
if (entries.isNotEmpty()) {
val completionQueue = LinkedBlockingQueue<Future<Unit>>(entries.size)
val completionCounter = AtomicLong(0)
val semaphore = Semaphore(profile.maxConnections * 3)
val start = Instant.now()
val totalElapsedTime = AtomicLong(0)
entries.forEach { entry ->
val requestStart = System.nanoTime()
val future = client.get(entry.first).thenApply {
semaphore.acquire()
val eventListener = object : RequestEventListener {
var start : Long? = null
override fun requestSent(req: FullHttpRequest) {
this.start = System.nanoTime()
}
override fun responseReceived(res: FullHttpResponse) {
this.start?.let { requestStart ->
totalElapsedTime.addAndGet((System.nanoTime() - requestStart))
}
this.start = null
}
}
val future = client.get(entry.first, eventListener).thenApply {
if (it == null) {
log.error {
"Missing entry for key '${entry.first}'"
@@ -111,21 +149,19 @@ class BenchmarkCommand : GbcsCommand() {
}
}
future.whenComplete { _, _ ->
completionQueue.put(future)
completionCounter.incrementAndGet()
semaphore.release()
}
}
var completionCounter = 0
while (completionCounter < entries.size) {
completionQueue.take()
completionCounter += 1
}
val end = Instant.now()
log.info {
val elapsed = Duration.between(start, end).toMillis()
"Retrieval rate: ${entries.size.toDouble() / elapsed * 1000} ops/s"
val opsPerSecond = String.format("%.2f", entries.size.toDouble() / elapsed * 1000)
"Retrieval rate: $opsPerSecond ops/s"
}
log.info {
"Average time per retrieval: ${totalElapsedTime.get() / numberOfEntries.toDouble() * 1e6} ms"
val avgTxTime = String.format("%.0f", totalElapsedTime.get() / completionCounter.toDouble() / 1e6)
"Average time per retrieval: $avgTxTime ms"
}
} else {
log.error("Skipping retrieval benchmark as it was not possible to insert any entry in the cache")

View File

@@ -213,9 +213,9 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
}
}
fun get(key: String): CompletableFuture<ByteArray?> {
fun get(key: String, eventListener : RequestEventListener? = null): CompletableFuture<ByteArray?> {
return executeWithRetry {
sendRequest(profile.serverURI.resolve(key), HttpMethod.GET, null)
sendRequest(profile.serverURI.resolve(key), HttpMethod.GET, null, eventListener)
}.thenApply {
val status = it.status()
if (it.status() == HttpResponseStatus.NOT_FOUND) {
@@ -234,9 +234,9 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
}
}
fun put(key: String, content: ByteArray): CompletableFuture<Unit> {
fun put(key: String, content: ByteArray, eventListener : RequestEventListener? = null): CompletableFuture<Unit> {
return executeWithRetry {
sendRequest(profile.serverURI.resolve(key), HttpMethod.PUT, content)
sendRequest(profile.serverURI.resolve(key), HttpMethod.PUT, content, eventListener)
}.thenApply {
val status = it.status()
if (it.status() != HttpResponseStatus.CREATED && it.status() != HttpResponseStatus.OK) {
@@ -245,7 +245,7 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
}
}
private fun sendRequest(uri: URI, method: HttpMethod, body: ByteArray?): CompletableFuture<FullHttpResponse> {
private fun sendRequest(uri: URI, method: HttpMethod, body: ByteArray?, eventListener : RequestEventListener?): CompletableFuture<FullHttpResponse> {
val responseFuture = CompletableFuture<FullHttpResponse>()
// Custom handler for processing responses
pool.acquire().addListener(object : GenericFutureListener<NettyFuture<Channel>> {
@@ -261,6 +261,7 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
pipeline.removeLast()
pool.release(channel)
responseFuture.complete(response)
eventListener?.responseReceived(response)
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
@@ -306,7 +307,11 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
// Set headers
// Send the request
channel.writeAndFlush(request)
channel.writeAndFlush(request).addListener {
if(it.isSuccess) {
eventListener?.requestSent(request)
}
}
} else {
responseFuture.completeExceptionally(channelFuture.cause())
}

View File

@@ -0,0 +1,10 @@
package net.woggioni.gbcs.client
import io.netty.handler.codec.http.FullHttpRequest
import io.netty.handler.codec.http.FullHttpResponse
interface RequestEventListener {
fun requestSent(req : FullHttpRequest) {}
fun responseReceived(res : FullHttpResponse) {}
fun exceptionCaught(ex : Throwable) {}
}