fixed client bug (unhandled connection touts)
All checks were successful
CI / build (push) Successful in 3m7s

This commit is contained in:
2025-01-20 19:18:20 +08:00
parent 3d1847c408
commit 1a78c8092b
6 changed files with 52 additions and 17 deletions

View File

@@ -46,7 +46,8 @@ class BenchmarkCommand : GbcsCommand() {
val random = Random(SecureRandom.getInstance("NativePRNGNonBlocking").nextLong()) val random = Random(SecureRandom.getInstance("NativePRNGNonBlocking").nextLong())
while (true) { while (true) {
val key = Base64.getUrlEncoder().encode(random.nextBytes(16)).toString(Charsets.UTF_8) val key = Base64.getUrlEncoder().encode(random.nextBytes(16)).toString(Charsets.UTF_8)
val value = random.nextBytes(0x1000) val content = random.nextInt().toByte()
val value = ByteArray(0x1000, { _ -> content })
yield(key to value) yield(key to value)
} }
} }

View File

@@ -30,16 +30,18 @@ import io.netty.handler.ssl.SslContextBuilder
import io.netty.handler.stream.ChunkedWriteHandler import io.netty.handler.stream.ChunkedWriteHandler
import io.netty.util.concurrent.Future import io.netty.util.concurrent.Future
import io.netty.util.concurrent.GenericFutureListener import io.netty.util.concurrent.GenericFutureListener
import net.woggioni.gbcs.client.impl.Parser
import net.woggioni.gbcs.common.Xml import net.woggioni.gbcs.common.Xml
import net.woggioni.gbcs.common.contextLogger import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.common.debug import net.woggioni.gbcs.common.debug
import net.woggioni.gbcs.client.impl.Parser import net.woggioni.gbcs.common.trace
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.net.URI import java.net.URI
import java.nio.file.Files import java.nio.file.Files
import java.nio.file.Path import java.nio.file.Path
import java.security.PrivateKey import java.security.PrivateKey
import java.security.cert.X509Certificate import java.security.cert.X509Certificate
import java.time.Duration
import java.util.Base64 import java.util.Base64
import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
@@ -67,6 +69,7 @@ class GbcsClient(private val profile: Configuration.Profile) : AutoCloseable {
data class Profile( data class Profile(
val serverURI: URI, val serverURI: URI,
val authentication: Authentication?, val authentication: Authentication?,
val connectionTimeout : Duration?,
val maxConnections : Int val maxConnections : Int
) )
@@ -104,6 +107,9 @@ class GbcsClient(private val profile: Configuration.Profile) : AutoCloseable {
option(ChannelOption.TCP_NODELAY, true) option(ChannelOption.TCP_NODELAY, true)
option(ChannelOption.SO_KEEPALIVE, true) option(ChannelOption.SO_KEEPALIVE, true)
remoteAddress(InetSocketAddress(host, port)) remoteAddress(InetSocketAddress(host, port))
profile.connectionTimeout?.let {
option(ChannelOption.CONNECT_TIMEOUT_MILLIS, it.toMillis().toInt())
}
} }
val channelPoolHandler = object : AbstractChannelPoolHandler() { val channelPoolHandler = object : AbstractChannelPoolHandler() {
@@ -114,20 +120,29 @@ class GbcsClient(private val profile: Configuration.Profile) : AutoCloseable {
private var leaseCount = AtomicInteger() private var leaseCount = AtomicInteger()
override fun channelReleased(ch: Channel) { override fun channelReleased(ch: Channel) {
log.debug { val activeLeases = leaseCount.decrementAndGet()
"Released lease ${leaseCount.decrementAndGet()}" log.trace {
"Released channel ${ch.id().asShortText()}, number of active leases: $activeLeases"
} }
} }
override fun channelAcquired(ch: Channel?) { override fun channelAcquired(ch: Channel) {
log.debug { val activeLeases = leaseCount.getAndIncrement()
"Acquired lease ${leaseCount.getAndIncrement()}" log.trace {
"Acquired channel ${ch.id().asShortText()}, number of active leases: $activeLeases"
} }
} }
override fun channelCreated(ch: Channel) { override fun channelCreated(ch: Channel) {
val connectionId = connectionCount.getAndIncrement()
log.debug { log.debug {
"Created connection ${connectionCount.getAndIncrement()}" "Created connection $connectionId, total number of active connections: $connectionId"
}
ch.closeFuture().addListener {
val activeConnections = connectionCount.decrementAndGet()
log.debug {
"Closed connection $connectionId, total number of active connections: $activeConnections"
}
} }
val pipeline: ChannelPipeline = ch.pipeline() val pipeline: ChannelPipeline = ch.pipeline()
@@ -202,6 +217,7 @@ class GbcsClient(private val profile: Configuration.Profile) : AutoCloseable {
ctx.close() ctx.close()
pipeline.removeLast() pipeline.removeLast()
pool.release(channel) pool.release(channel)
super.exceptionCaught(ctx, cause)
} }
}) })
// Prepare the HTTP request // Prepare the HTTP request
@@ -219,7 +235,7 @@ class GbcsClient(private val profile: Configuration.Profile) : AutoCloseable {
set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes()) set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes())
} }
set(HttpHeaderNames.HOST, profile.serverURI.host) set(HttpHeaderNames.HOST, profile.serverURI.host)
set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE) set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
set( set(
HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderNames.ACCEPT_ENCODING,
HttpHeaderValues.GZIP.toString() + "," + HttpHeaderValues.DEFLATE.toString() HttpHeaderValues.GZIP.toString() + "," + HttpHeaderValues.DEFLATE.toString()
@@ -237,6 +253,8 @@ class GbcsClient(private val profile: Configuration.Profile) : AutoCloseable {
// Set headers // Set headers
// Send the request // Send the request
channel.writeAndFlush(request) channel.writeAndFlush(request)
} else {
responseFuture.completeExceptionally(channelFuture.cause())
} }
} }
}) })

View File

@@ -11,6 +11,7 @@ import java.nio.file.Path
import java.security.KeyStore import java.security.KeyStore
import java.security.PrivateKey import java.security.PrivateKey
import java.security.cert.X509Certificate import java.security.cert.X509Certificate
import java.time.Duration
object Parser { object Parser {
@@ -60,7 +61,9 @@ object Parser {
val maxConnections = child.renderAttribute("max-connections") val maxConnections = child.renderAttribute("max-connections")
?.let(String::toInt) ?.let(String::toInt)
?: 50 ?: 50
profiles[name] = GbcsClient.Configuration.Profile(uri, authentication, maxConnections) val connectionTimeout = child.renderAttribute("connection-timeout")
?.let(Duration::parse)
profiles[name] = GbcsClient.Configuration.Profile(uri, authentication, connectionTimeout, maxConnections)
} }
} }
} }

View File

@@ -21,6 +21,7 @@
<xs:attribute name="name" type="xs:token" use="required"/> <xs:attribute name="name" type="xs:token" use="required"/>
<xs:attribute name="base-url" type="xs:anyURI" use="required"/> <xs:attribute name="base-url" type="xs:anyURI" use="required"/>
<xs:attribute name="max-connections" type="xs:positiveInteger" default="50"/> <xs:attribute name="max-connections" type="xs:positiveInteger" default="50"/>
<xs:attribute name="connection-timeout" type="xs:duration"/>
</xs:complexType> </xs:complexType>
<xs:complexType name="noAuthType"/> <xs:complexType name="noAuthType"/>

View File

@@ -21,14 +21,20 @@ import org.w3c.dom.TypeInfo
import java.nio.file.Paths import java.nio.file.Paths
import java.time.Duration import java.time.Duration
import java.time.temporal.ChronoUnit import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeUnit
object Parser { object Parser {
fun parse(document: Document): Configuration { fun parse(document: Document): Configuration {
val root = document.documentElement val root = document.documentElement
val anonymousUser = User("", null, emptySet()) val anonymousUser = User("", null, emptySet())
var connection: Configuration.Connection? = null var connection: Configuration.Connection = Configuration.Connection(
var eventExecutor: Configuration.EventExecutor? = null Duration.of(10, ChronoUnit.SECONDS),
Duration.of(10, ChronoUnit.SECONDS),
Duration.of(60, ChronoUnit.SECONDS),
Duration.of(30, ChronoUnit.SECONDS),
Duration.of(30, ChronoUnit.SECONDS),
67108864
)
var eventExecutor: Configuration.EventExecutor = Configuration.EventExecutor(true)
var cache: Cache? = null var cache: Cache? = null
var host = "127.0.0.1" var host = "127.0.0.1"
var port = 11080 var port = 11080

View File

@@ -1,11 +1,17 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?> <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<gbcs:server use-virtual-threads="false" <gbcs:server
max-request-size="67108864"
incoming-connections-backlog-size="1024"
xmlns:xs="http://www.w3.org/2001/XMLSchema-instance" xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
xmlns:gbcs="urn:net.woggioni.gbcs.server" xmlns:gbcs="urn:net.woggioni.gbcs.server"
xs:schemaLocation="urn:net.woggioni.gbcs.server jpms://net.woggioni.gbcs.server/net/woggioni/gbcs/server/schema/gbcs.xsd"> xs:schemaLocation="urn:net.woggioni.gbcs.server jpms://net.woggioni.gbcs.server/net/woggioni/gbcs/server/schema/gbcs.xsd">
<bind host="127.0.0.1" port="8080"/> <bind host="127.0.0.1" port="8080" incoming-connections-backlog-size="1024"/>
<connection
max-request-size="67108864"
idle-timeout="PT30S"
read-timeout="PT10S"
write-timeout="PT10S"
read-idle-timeout="PT60S"
write-idle-timeout="PT60S"/>
<event-executor use-virtual-threads="true"/>
<cache xs:type="gbcs:fileSystemCacheType" path="/tmp/gbcs" max-age="P7D"/> <cache xs:type="gbcs:fileSystemCacheType" path="/tmp/gbcs" max-age="P7D"/>
<authentication> <authentication>
<none/> <none/>