Compare commits

...

1 Commits

Author SHA1 Message Date
ba961bd30d added optional even listener to client API 2025-01-27 23:55:59 +08:00
4 changed files with 92 additions and 40 deletions

View File

@@ -7,6 +7,7 @@ module net.woggioni.gbcs.cli {
requires kotlin.stdlib; requires kotlin.stdlib;
requires net.woggioni.jwo; requires net.woggioni.jwo;
requires net.woggioni.gbcs.api; requires net.woggioni.gbcs.api;
requires io.netty.codec.http;
exports net.woggioni.gbcs.cli.impl.converters to info.picocli; exports net.woggioni.gbcs.cli.impl.converters to info.picocli;
opens net.woggioni.gbcs.cli.impl.commands to info.picocli; opens net.woggioni.gbcs.cli.impl.commands to info.picocli;

View File

@@ -1,10 +1,13 @@
package net.woggioni.gbcs.cli.impl.commands package net.woggioni.gbcs.cli.impl.commands
import io.netty.handler.codec.http.FullHttpRequest
import io.netty.handler.codec.http.FullHttpResponse
import net.woggioni.gbcs.common.contextLogger import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.common.error import net.woggioni.gbcs.common.error
import net.woggioni.gbcs.common.info import net.woggioni.gbcs.common.info
import net.woggioni.gbcs.cli.impl.GbcsCommand import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.client.GradleBuildCacheClient import net.woggioni.gbcs.client.GradleBuildCacheClient
import net.woggioni.gbcs.client.RequestEventListener
import net.woggioni.jwo.JWO import net.woggioni.jwo.JWO
import picocli.CommandLine import picocli.CommandLine
import java.security.SecureRandom import java.security.SecureRandom
@@ -14,6 +17,7 @@ import java.util.Base64
import java.util.concurrent.ExecutionException import java.util.concurrent.ExecutionException
import java.util.concurrent.Future import java.util.concurrent.Future
import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.Semaphore
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import kotlin.random.Random import kotlin.random.Random
@@ -53,53 +57,87 @@ class BenchmarkCommand : GbcsCommand() {
} }
} }
log.info {
"Starting insertion"
}
val entries = let { val entries = let {
val completionQueue = LinkedBlockingQueue<Future<Pair<String, ByteArray>>>(numberOfEntries) val completionCounter = AtomicLong(0)
val completionQueue = LinkedBlockingQueue<Pair<String, ByteArray>>(numberOfEntries)
val start = Instant.now() val start = Instant.now()
val semaphore = Semaphore(profile.maxConnections * 3)
val totalElapsedTime = AtomicLong(0) val totalElapsedTime = AtomicLong(0)
entryGenerator.take(numberOfEntries).forEach { entry -> val iterator = entryGenerator.take(numberOfEntries).iterator()
val requestStart = System.nanoTime() while(completionCounter.get() < numberOfEntries) {
val future = client.put(entry.first, entry.second).thenApply { entry } if(iterator.hasNext()) {
future.whenComplete { _, _ -> val entry = iterator.next()
totalElapsedTime.addAndGet((System.nanoTime() - requestStart)) semaphore.acquire()
completionQueue.put(future) val eventListener = object : RequestEventListener {
var start: Long? = null
override fun requestSent(req: FullHttpRequest) {
this.start = System.nanoTime()
}
override fun responseReceived(res: FullHttpResponse) {
this.start?.let { requestStart ->
totalElapsedTime.addAndGet((System.nanoTime() - requestStart))
}
this.start = null
}
}
val future = client.put(entry.first, entry.second, eventListener).thenApply { entry }
future.whenComplete { result, ex ->
if (ex != null) {
log.error(ex.message, ex)
} else {
completionQueue.put(result)
}
semaphore.release()
completionCounter.incrementAndGet()
}
} }
} }
val inserted = sequence<Pair<String, ByteArray>> { val inserted = completionQueue.toList()
var completionCounter = 0
while (completionCounter < numberOfEntries) {
val future = completionQueue.take()
try {
yield(future.get())
} catch (ee: ExecutionException) {
val cause = ee.cause ?: ee
log.error(cause.message, cause)
}
completionCounter += 1
}
}.toList()
val end = Instant.now() val end = Instant.now()
log.info { log.info {
val elapsed = Duration.between(start, end).toMillis() val elapsed = Duration.between(start, end).toMillis()
"Insertion rate: ${numberOfEntries.toDouble() / elapsed * 1000} ops/s" val opsPerSecond = String.format("%.2f", numberOfEntries.toDouble() / elapsed * 1000)
"Insertion rate: $opsPerSecond ops/s"
} }
log.info { log.info {
"Average time per insertion: ${totalElapsedTime.get() / numberOfEntries.toDouble() * 1000} ms" val avgTxTime = String.format("%.0f", totalElapsedTime.get() / numberOfEntries.toDouble() / 1e6)
"Average time per insertion: $avgTxTime ms"
} }
inserted inserted
} }
log.info { log.info {
"Inserted ${entries.size} entries" "Inserted ${entries.size} entries"
} }
log.info {
"Starting retrieval"
}
if (entries.isNotEmpty()) { if (entries.isNotEmpty()) {
val completionQueue = LinkedBlockingQueue<Future<Unit>>(entries.size) val completionCounter = AtomicLong(0)
val semaphore = Semaphore(profile.maxConnections * 3)
val start = Instant.now() val start = Instant.now()
val totalElapsedTime = AtomicLong(0) val totalElapsedTime = AtomicLong(0)
entries.forEach { entry -> entries.forEach { entry ->
val requestStart = System.nanoTime() semaphore.acquire()
val future = client.get(entry.first).thenApply { val eventListener = object : RequestEventListener {
totalElapsedTime.addAndGet((System.nanoTime() - requestStart)) var start : Long? = null
override fun requestSent(req: FullHttpRequest) {
this.start = System.nanoTime()
}
override fun responseReceived(res: FullHttpResponse) {
this.start?.let { requestStart ->
totalElapsedTime.addAndGet((System.nanoTime() - requestStart))
}
this.start = null
}
}
val future = client.get(entry.first, eventListener).thenApply {
if (it == null) { if (it == null) {
log.error { log.error {
"Missing entry for key '${entry.first}'" "Missing entry for key '${entry.first}'"
@@ -111,21 +149,19 @@ class BenchmarkCommand : GbcsCommand() {
} }
} }
future.whenComplete { _, _ -> future.whenComplete { _, _ ->
completionQueue.put(future) completionCounter.incrementAndGet()
semaphore.release()
} }
} }
var completionCounter = 0
while (completionCounter < entries.size) {
completionQueue.take()
completionCounter += 1
}
val end = Instant.now() val end = Instant.now()
log.info { log.info {
val elapsed = Duration.between(start, end).toMillis() val elapsed = Duration.between(start, end).toMillis()
"Retrieval rate: ${entries.size.toDouble() / elapsed * 1000} ops/s" val opsPerSecond = String.format("%.2f", entries.size.toDouble() / elapsed * 1000)
"Retrieval rate: $opsPerSecond ops/s"
} }
log.info { log.info {
"Average time per retrieval: ${totalElapsedTime.get() / numberOfEntries.toDouble() * 1e6} ms" val avgTxTime = String.format("%.0f", totalElapsedTime.get() / completionCounter.toDouble() / 1e6)
"Average time per retrieval: $avgTxTime ms"
} }
} else { } else {
log.error("Skipping retrieval benchmark as it was not possible to insert any entry in the cache") log.error("Skipping retrieval benchmark as it was not possible to insert any entry in the cache")

View File

@@ -213,9 +213,9 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
} }
} }
fun get(key: String): CompletableFuture<ByteArray?> { fun get(key: String, eventListener : RequestEventListener? = null): CompletableFuture<ByteArray?> {
return executeWithRetry { return executeWithRetry {
sendRequest(profile.serverURI.resolve(key), HttpMethod.GET, null) sendRequest(profile.serverURI.resolve(key), HttpMethod.GET, null, eventListener)
}.thenApply { }.thenApply {
val status = it.status() val status = it.status()
if (it.status() == HttpResponseStatus.NOT_FOUND) { if (it.status() == HttpResponseStatus.NOT_FOUND) {
@@ -234,9 +234,9 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
} }
} }
fun put(key: String, content: ByteArray): CompletableFuture<Unit> { fun put(key: String, content: ByteArray, eventListener : RequestEventListener? = null): CompletableFuture<Unit> {
return executeWithRetry { return executeWithRetry {
sendRequest(profile.serverURI.resolve(key), HttpMethod.PUT, content) sendRequest(profile.serverURI.resolve(key), HttpMethod.PUT, content, eventListener)
}.thenApply { }.thenApply {
val status = it.status() val status = it.status()
if (it.status() != HttpResponseStatus.CREATED && it.status() != HttpResponseStatus.OK) { if (it.status() != HttpResponseStatus.CREATED && it.status() != HttpResponseStatus.OK) {
@@ -245,7 +245,7 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
} }
} }
private fun sendRequest(uri: URI, method: HttpMethod, body: ByteArray?): CompletableFuture<FullHttpResponse> { private fun sendRequest(uri: URI, method: HttpMethod, body: ByteArray?, eventListener : RequestEventListener?): CompletableFuture<FullHttpResponse> {
val responseFuture = CompletableFuture<FullHttpResponse>() val responseFuture = CompletableFuture<FullHttpResponse>()
// Custom handler for processing responses // Custom handler for processing responses
pool.acquire().addListener(object : GenericFutureListener<NettyFuture<Channel>> { pool.acquire().addListener(object : GenericFutureListener<NettyFuture<Channel>> {
@@ -261,6 +261,7 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
pipeline.removeLast() pipeline.removeLast()
pool.release(channel) pool.release(channel)
responseFuture.complete(response) responseFuture.complete(response)
eventListener?.responseReceived(response)
} }
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
@@ -306,7 +307,11 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
// Set headers // Set headers
// Send the request // Send the request
channel.writeAndFlush(request) channel.writeAndFlush(request).addListener {
if(it.isSuccess) {
eventListener?.requestSent(request)
}
}
} else { } else {
responseFuture.completeExceptionally(channelFuture.cause()) responseFuture.completeExceptionally(channelFuture.cause())
} }

View File

@@ -0,0 +1,10 @@
package net.woggioni.gbcs.client
import io.netty.handler.codec.http.FullHttpRequest
import io.netty.handler.codec.http.FullHttpResponse
interface RequestEventListener {
fun requestSent(req : FullHttpRequest) {}
fun responseReceived(res : FullHttpResponse) {}
fun exceptionCaught(ex : Throwable) {}
}