added healthcheck command to client

This commit is contained in:
2025-02-05 00:02:17 +08:00
parent c2e388b931
commit 4180df2352
4 changed files with 141 additions and 74 deletions

View File

@@ -5,6 +5,7 @@ import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.cli.impl.commands.BenchmarkCommand import net.woggioni.gbcs.cli.impl.commands.BenchmarkCommand
import net.woggioni.gbcs.cli.impl.commands.ClientCommand import net.woggioni.gbcs.cli.impl.commands.ClientCommand
import net.woggioni.gbcs.cli.impl.commands.GetCommand import net.woggioni.gbcs.cli.impl.commands.GetCommand
import net.woggioni.gbcs.cli.impl.commands.HealthCheckCommand
import net.woggioni.gbcs.cli.impl.commands.PasswordHashCommand import net.woggioni.gbcs.cli.impl.commands.PasswordHashCommand
import net.woggioni.gbcs.cli.impl.commands.PutCommand import net.woggioni.gbcs.cli.impl.commands.PutCommand
import net.woggioni.gbcs.cli.impl.commands.ServerCommand import net.woggioni.gbcs.cli.impl.commands.ServerCommand
@@ -44,6 +45,7 @@ class GradleBuildCacheServerCli : GbcsCommand() {
addSubcommand(BenchmarkCommand()) addSubcommand(BenchmarkCommand())
addSubcommand(PutCommand()) addSubcommand(PutCommand())
addSubcommand(GetCommand()) addSubcommand(GetCommand())
addSubcommand(HealthCheckCommand())
}) })
System.exit(commandLine.execute(*args)) System.exit(commandLine.execute(*args))
} }

View File

@@ -46,90 +46,91 @@ class BenchmarkCommand : GbcsCommand() {
clientCommand.configuration.profiles[profileName] clientCommand.configuration.profiles[profileName]
?: throw IllegalArgumentException("Profile $profileName does not exist in configuration") ?: throw IllegalArgumentException("Profile $profileName does not exist in configuration")
} }
val client = GradleBuildCacheClient(profile) GradleBuildCacheClient(profile).use { client ->
val entryGenerator = sequence { val entryGenerator = sequence {
val random = Random(SecureRandom.getInstance("NativePRNGNonBlocking").nextLong()) val random = Random(SecureRandom.getInstance("NativePRNGNonBlocking").nextLong())
while (true) { while (true) {
val key = JWO.bytesToHex(random.nextBytes(16)) val key = JWO.bytesToHex(random.nextBytes(16))
val content = random.nextInt().toByte() val content = random.nextInt().toByte()
val value = ByteArray(size, { _ -> content }) val value = ByteArray(size, { _ -> content })
yield(key to value) yield(key to value)
}
} }
}
log.info { log.info {
"Starting insertion" "Starting insertion"
} }
val entries = let { val entries = let {
val completionCounter = AtomicLong(0) val completionCounter = AtomicLong(0)
val completionQueue = LinkedBlockingQueue<Pair<String, ByteArray>>(numberOfEntries) val completionQueue = LinkedBlockingQueue<Pair<String, ByteArray>>(numberOfEntries)
val start = Instant.now() val start = Instant.now()
val semaphore = Semaphore(profile.maxConnections * 3) val semaphore = Semaphore(profile.maxConnections * 3)
val iterator = entryGenerator.take(numberOfEntries).iterator() val iterator = entryGenerator.take(numberOfEntries).iterator()
while(completionCounter.get() < numberOfEntries) { while (completionCounter.get() < numberOfEntries) {
if(iterator.hasNext()) { if (iterator.hasNext()) {
val entry = iterator.next() val entry = iterator.next()
semaphore.acquire()
val future = client.put(entry.first, entry.second).thenApply { entry }
future.whenComplete { result, ex ->
if (ex != null) {
log.error(ex.message, ex)
} else {
completionQueue.put(result)
}
semaphore.release()
completionCounter.incrementAndGet()
}
}
}
val inserted = completionQueue.toList()
val end = Instant.now()
log.info {
val elapsed = Duration.between(start, end).toMillis()
val opsPerSecond = String.format("%.2f", numberOfEntries.toDouble() / elapsed * 1000)
"Insertion rate: $opsPerSecond ops/s"
}
inserted
}
log.info {
"Inserted ${entries.size} entries"
}
log.info {
"Starting retrieval"
}
if (entries.isNotEmpty()) {
val completionCounter = AtomicLong(0)
val semaphore = Semaphore(profile.maxConnections * 3)
val start = Instant.now()
entries.forEach { entry ->
semaphore.acquire() semaphore.acquire()
val future = client.put(entry.first, entry.second).thenApply { entry }
future.whenComplete { result, ex -> val future = client.get(entry.first).thenApply {
if (ex != null) { if (it == null) {
log.error(ex.message, ex) log.error {
} else { "Missing entry for key '${entry.first}'"
completionQueue.put(result) }
} else if (!entry.second.contentEquals(it)) {
log.error {
"Retrieved a value different from what was inserted for key '${entry.first}'"
}
} }
semaphore.release() }
future.whenComplete { _, _ ->
completionCounter.incrementAndGet() completionCounter.incrementAndGet()
semaphore.release()
} }
} }
} val end = Instant.now()
log.info {
val inserted = completionQueue.toList() val elapsed = Duration.between(start, end).toMillis()
val end = Instant.now() val opsPerSecond = String.format("%.2f", entries.size.toDouble() / elapsed * 1000)
log.info { "Retrieval rate: $opsPerSecond ops/s"
val elapsed = Duration.between(start, end).toMillis()
val opsPerSecond = String.format("%.2f", numberOfEntries.toDouble() / elapsed * 1000)
"Insertion rate: $opsPerSecond ops/s"
}
inserted
}
log.info {
"Inserted ${entries.size} entries"
}
log.info {
"Starting retrieval"
}
if (entries.isNotEmpty()) {
val completionCounter = AtomicLong(0)
val semaphore = Semaphore(profile.maxConnections * 3)
val start = Instant.now()
entries.forEach { entry ->
semaphore.acquire()
val future = client.get(entry.first).thenApply {
if (it == null) {
log.error {
"Missing entry for key '${entry.first}'"
}
} else if (!entry.second.contentEquals(it)) {
log.error {
"Retrieved a value different from what was inserted for key '${entry.first}'"
}
}
}
future.whenComplete { _, _ ->
completionCounter.incrementAndGet()
semaphore.release()
} }
} else {
log.error("Skipping retrieval benchmark as it was not possible to insert any entry in the cache")
} }
val end = Instant.now()
log.info {
val elapsed = Duration.between(start, end).toMillis()
val opsPerSecond = String.format("%.2f", entries.size.toDouble() / elapsed * 1000)
"Retrieval rate: $opsPerSecond ops/s"
}
} else {
log.error("Skipping retrieval benchmark as it was not possible to insert any entry in the cache")
} }
} }
} }

View File

@@ -0,0 +1,45 @@
package net.woggioni.gbcs.cli.impl.commands
import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.client.GradleBuildCacheClient
import net.woggioni.gbcs.common.contextLogger
import picocli.CommandLine
import java.security.SecureRandom
import kotlin.random.Random
@CommandLine.Command(
name = "health",
description = ["Check server health"],
showDefaultValues = true
)
class HealthCheckCommand : GbcsCommand() {
private val log = contextLogger()
@CommandLine.Spec
private lateinit var spec: CommandLine.Model.CommandSpec
override fun run() {
val clientCommand = spec.parent().userObject() as ClientCommand
val profile = clientCommand.profileName.let { profileName ->
clientCommand.configuration.profiles[profileName]
?: throw IllegalArgumentException("Profile $profileName does not exist in configuration")
}
GradleBuildCacheClient(profile).use { client ->
val random = Random(SecureRandom.getInstance("NativePRNGNonBlocking").nextLong())
val nonce = ByteArray(0xa0)
random.nextBytes(nonce)
client.healthCheck(nonce).thenApply { value ->
if(value == null) {
throw IllegalStateException("Empty response from server")
}
for(i in 0 until nonce.size) {
for(j in value.size - nonce.size until nonce.size) {
if(nonce[i] != value[j]) {
throw IllegalStateException("Server nonce does not match")
}
}
}
}.get()
}
}
}

View File

@@ -213,6 +213,25 @@ class GradleBuildCacheClient(private val profile: Configuration.Profile) : AutoC
} }
} }
fun healthCheck(nonce: ByteArray): CompletableFuture<ByteArray?> {
return executeWithRetry {
sendRequest(profile.serverURI, HttpMethod.TRACE, nonce)
}.thenApply {
val status = it.status()
if (it.status() != HttpResponseStatus.OK) {
throw HttpException(status)
} else {
it.content()
}
}.thenApply { maybeByteBuf ->
maybeByteBuf?.let {
val result = ByteArray(it.readableBytes())
it.getBytes(0, result)
result
}
}
}
fun get(key: String): CompletableFuture<ByteArray?> { fun get(key: String): CompletableFuture<ByteArray?> {
return executeWithRetry { return executeWithRetry {
sendRequest(profile.serverURI.resolve(key), HttpMethod.GET, null) sendRequest(profile.serverURI.resolve(key), HttpMethod.GET, null)