added connection pooling to gbcs-client
All checks were successful
CI / build (push) Successful in 3m55s

This commit is contained in:
2025-01-16 13:37:14 +08:00
parent 5af99330f8
commit 05a265e4b4
16 changed files with 144 additions and 165 deletions

View File

@@ -8,6 +8,7 @@ module net.woggioni.gbcs.client {
requires java.xml;
requires net.woggioni.gbcs.base;
requires io.netty.codec;
requires org.slf4j;
exports net.woggioni.gbcs.client;

View File

@@ -5,11 +5,13 @@ import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInitializer
import io.netty.channel.ChannelOption
import io.netty.channel.ChannelPipeline
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.pool.AbstractChannelPoolHandler
import io.netty.channel.pool.ChannelPool
import io.netty.channel.pool.FixedChannelPool
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.DecoderException
import io.netty.handler.codec.http.DefaultFullHttpRequest
@@ -26,8 +28,14 @@ import io.netty.handler.codec.http.HttpVersion
import io.netty.handler.ssl.SslContext
import io.netty.handler.ssl.SslContextBuilder
import io.netty.handler.stream.ChunkedWriteHandler
import io.netty.util.concurrent.Future
import io.netty.util.concurrent.GenericFutureListener
import net.woggioni.gbcs.base.Xml
import net.woggioni.gbcs.base.contextLogger
import net.woggioni.gbcs.base.debug
import net.woggioni.gbcs.base.info
import net.woggioni.gbcs.client.impl.Parser
import java.net.InetSocketAddress
import java.net.URI
import java.nio.file.Files
import java.nio.file.Path
@@ -35,29 +43,36 @@ import java.security.PrivateKey
import java.security.cert.X509Certificate
import java.util.Base64
import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicInteger
import io.netty.util.concurrent.Future as NettyFuture
class GbcsClient(private val profile: Configuration.Profile) : AutoCloseable {
private val group: NioEventLoopGroup
private var sslContext: SslContext
private val log = contextLogger()
private val pool: ChannelPool
data class Configuration(
val profiles : Map<String, Profile>
val profiles: Map<String, Profile>
) {
sealed class Authentication {
data class TlsClientAuthenticationCredentials(val key: PrivateKey, val certificateChain: Array<X509Certificate>) : Authentication()
data class TlsClientAuthenticationCredentials(
val key: PrivateKey,
val certificateChain: Array<X509Certificate>
) : Authentication()
data class BasicAuthenticationCredentials(val username: String, val password: String) : Authentication()
}
data class Profile(
val serverURI: URI,
val authentication : Authentication?
val authentication: Authentication?,
val maxConnections : Int
)
companion object {
fun parse(path : Path) : Configuration {
fun parse(path: Path): Configuration {
return Files.newInputStream(path).use {
Xml.parseXml(path.toUri().toURL(), it)
}.let(Parser::parse)
@@ -67,9 +82,7 @@ class GbcsClient(private val profile: Configuration.Profile) : AutoCloseable {
init {
group = NioEventLoopGroup()
this.sslContext = SslContextBuilder.forClient().also { builder ->
sslContext = SslContextBuilder.forClient().also { builder ->
(profile.authentication as? Configuration.Authentication.TlsClientAuthenticationCredentials)?.let { tlsClientAuthenticationCredentials ->
builder.keyManager(
tlsClientAuthenticationCredentials.key,
@@ -77,6 +90,61 @@ class GbcsClient(private val profile: Configuration.Profile) : AutoCloseable {
)
}
}.build()
val (scheme, host, port) = profile.serverURI.run {
Triple(
if (scheme == null) "http" else profile.serverURI.scheme,
host,
port.takeIf { it > 0 } ?: if ("https" == scheme.lowercase()) 443 else 80
)
}
val bootstrap = Bootstrap().apply {
group(group)
channel(NioSocketChannel::class.java)
option(ChannelOption.TCP_NODELAY, true)
option(ChannelOption.SO_KEEPALIVE, true)
remoteAddress(InetSocketAddress(host, port))
}
val channelPoolHandler = object : AbstractChannelPoolHandler() {
@Volatile
private var connectionCount = AtomicInteger()
@Volatile
private var leaseCount = AtomicInteger()
override fun channelReleased(ch: Channel) {
log.debug {
"Released lease ${leaseCount.decrementAndGet()}"
}
}
override fun channelAcquired(ch: Channel?) {
log.debug {
"Acquired lease ${leaseCount.getAndIncrement()}"
}
}
override fun channelCreated(ch: Channel) {
log.debug {
"Created connection ${connectionCount.getAndIncrement()}"
}
val pipeline: ChannelPipeline = ch.pipeline()
// Add SSL handler if needed
if ("https".equals(scheme, ignoreCase = true)) {
pipeline.addLast("ssl", sslContext.newHandler(ch.alloc(), host, port))
}
// HTTP handlers
pipeline.addLast("codec", HttpClientCodec())
pipeline.addLast("decompressor", HttpContentDecompressor())
pipeline.addLast("aggregator", HttpObjectAggregator(1048576))
pipeline.addLast("chunked", ChunkedWriteHandler())
}
}
pool = FixedChannelPool(bootstrap, channelPoolHandler, profile.maxConnections)
}
fun get(key: String): CompletableFuture<ByteArray?> {
@@ -110,92 +178,69 @@ class GbcsClient(private val profile: Configuration.Profile) : AutoCloseable {
private fun sendRequest(uri: URI, method: HttpMethod, body: ByteArray?): CompletableFuture<FullHttpResponse> {
val responseFuture = CompletableFuture<FullHttpResponse>()
try {
val scheme = if (uri.scheme == null) "http" else uri.scheme
val host = uri.host
var port = uri.port
if (port == -1) {
port = if ("https".equals(scheme, ignoreCase = true)) 443 else 80
}
val bootstrap = Bootstrap()
bootstrap.group(group)
.channel(NioSocketChannel::class.java)
.handler(object : ChannelInitializer<SocketChannel>() {
override fun initChannel(ch: SocketChannel) {
val pipeline: ChannelPipeline = ch.pipeline()
// Add SSL handler if needed
if ("https".equals(scheme, ignoreCase = true)) {
pipeline.addLast("ssl", sslContext.newHandler(ch.alloc(), host, port))
// Custom handler for processing responses
pool.acquire().addListener(object : GenericFutureListener<NettyFuture<Channel>> {
override fun operationComplete(channelFuture: Future<Channel>) {
if (channelFuture.isSuccess) {
val channel = channelFuture.now
val pipeline = channel.pipeline()
channel.pipeline().addLast("handler", object : SimpleChannelInboundHandler<FullHttpResponse>() {
override fun channelRead0(
ctx: ChannelHandlerContext,
response: FullHttpResponse
) {
responseFuture.complete(response)
pipeline.removeLast()
pool.release(channel)
}
// HTTP handlers
pipeline.addLast("codec", HttpClientCodec())
pipeline.addLast("decompressor", HttpContentDecompressor())
pipeline.addLast("aggregator", HttpObjectAggregator(1048576))
pipeline.addLast("chunked", ChunkedWriteHandler())
// Custom handler for processing responses
pipeline.addLast("handler", object : SimpleChannelInboundHandler<FullHttpResponse>() {
override fun channelRead0(
ctx: ChannelHandlerContext,
response: FullHttpResponse
) {
responseFuture.complete(response)
ctx.close()
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
val ex = when (cause) {
is DecoderException -> cause.cause
else -> cause
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
val ex = when (cause) {
is DecoderException -> cause.cause
else -> cause
responseFuture.completeExceptionally(ex)
ctx.close()
pipeline.removeLast()
pool.release(channel)
}
})
// Prepare the HTTP request
val request: FullHttpRequest = let {
val content: ByteBuf? = body?.takeIf(ByteArray::isNotEmpty)?.let(Unpooled::wrappedBuffer)
DefaultFullHttpRequest(
HttpVersion.HTTP_1_1,
method,
uri.rawPath,
content ?: Unpooled.buffer(0)
).apply {
headers().apply {
if (content != null) {
set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_OCTET_STREAM)
set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes())
}
set(HttpHeaderNames.HOST, profile.serverURI.host)
set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
set(
HttpHeaderNames.ACCEPT_ENCODING,
HttpHeaderValues.GZIP.toString() + "," + HttpHeaderValues.DEFLATE.toString()
)
// Add basic auth if configured
(profile.authentication as? Configuration.Authentication.BasicAuthenticationCredentials)?.let { credentials ->
val auth = "${credentials.username}:${credentials.password}"
val encodedAuth = Base64.getEncoder().encodeToString(auth.toByteArray())
set(HttpHeaderNames.AUTHORIZATION, "Basic $encodedAuth")
}
responseFuture.completeExceptionally(ex)
ctx.close()
}
})
}
})
// Connect to host
val channel: Channel = bootstrap.connect(host, port).sync().channel()
// Prepare the HTTP request
val request: FullHttpRequest = let {
val content: ByteBuf? = body?.takeIf(ByteArray::isNotEmpty)?.let(Unpooled::wrappedBuffer)
DefaultFullHttpRequest(HttpVersion.HTTP_1_1, method, uri.rawPath, content ?: Unpooled.buffer(0)).apply {
headers().apply {
if (content != null) {
set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_OCTET_STREAM)
set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes())
}
set(HttpHeaderNames.HOST, host)
set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
set(
HttpHeaderNames.ACCEPT_ENCODING,
HttpHeaderValues.GZIP.toString() + "," + HttpHeaderValues.DEFLATE.toString()
)
// Add basic auth if configured
(profile.authentication as? Configuration.Authentication.BasicAuthenticationCredentials)?.let { credentials ->
val auth = "${credentials.username}:${credentials.password}"
val encodedAuth = Base64.getEncoder().encodeToString(auth.toByteArray())
set(HttpHeaderNames.AUTHORIZATION, "Basic $encodedAuth")
}
}
// Set headers
// Send the request
channel.writeAndFlush(request)
}
}
// Set headers
// Send the request
channel.writeAndFlush(request)
} catch (e: Exception) {
responseFuture.completeExceptionally(e)
}
})
return responseFuture
}

View File

@@ -1,38 +0,0 @@
package net.woggioni.gbcs.client
import java.net.URI
import java.nio.file.Files
import java.nio.file.Path
import java.security.KeyStore
import java.security.PrivateKey
import java.security.cert.X509Certificate
import kotlin.random.Random
//object Main {
// @JvmStatic
// fun main(vararg args : String) {
// val pwd = "PO%!*bW9p'Zp#=uu\$fl{Ij`Ad.8}x#ho".toCharArray()
// val keystore = KeyStore.getInstance("PKCS12").apply{
// Files.newInputStream(Path.of("/home/woggioni/ssl/woggioni@c962475fa38.pfx")).use {
// load(it, pwd)
// }
// }
// val key = keystore.getKey("woggioni@c962475fa38", pwd) as PrivateKey
// val certChain = keystore.getCertificateChain("woggioni@c962475fa38").asSequence()
// .map { it as X509Certificate }
// .toList()
// .toTypedArray()
// GbcsClient.Configuration(
// serverURI = URI("https://gbcs.woggioni.net/"),
// GbcsClient.TlsClientAuthenticationCredentials(
// key, certChain
// )
// ).let(::GbcsClient).use { client ->
// val random = Random(101325)
// val entry = "something" to ByteArray(0x1000).also(random::nextBytes)
// client.put(entry.first, entry.second)
// val retrieved = client.get(entry.first).get()
// println(retrieved.contentEquals(entry.second))
// }
// }
//}

View File

@@ -55,7 +55,11 @@ object Parser {
}
}
}
profiles[name] = GbcsClient.Configuration.Profile(uri, authentication)
val maxConnections = child.getAttribute("max-connections")
.takeIf(String::isNotEmpty)
?.let(String::toInt)
?: 50
profiles[name] = GbcsClient.Configuration.Profile(uri, authentication, maxConnections)
}
}
}

View File

@@ -19,6 +19,7 @@
</xs:choice>
<xs:attribute name="name" type="xs:token" use="required"/>
<xs:attribute name="base-url" type="xs:anyURI" use="required"/>
<xs:attribute name="max-connections" type="xs:positiveInteger" default="50"/>
</xs:complexType>
<xs:complexType name="basicAuthType">