fixed throttling retry-after estimation
All checks were successful
CI / build (push) Successful in 3m9s
All checks were successful
CI / build (push) Successful in 3m9s
This commit is contained in:
@@ -309,8 +309,33 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
|
|||||||
}
|
}
|
||||||
val pipeline = ch.pipeline()
|
val pipeline = ch.pipeline()
|
||||||
cfg.connection.also { conn ->
|
cfg.connection.also { conn ->
|
||||||
pipeline.addLast(IdleStateHandler(false, conn.readTimeout.toMillis(), conn.writeTimeout.toMillis(), 0, TimeUnit.MILLISECONDS))
|
val readTimeout = conn.readTimeout.toMillis()
|
||||||
pipeline.addLast(IdleStateHandler(true, conn.readIdleTimeout.toMillis(), conn.writeIdleTimeout.toMillis(), conn.idleTimeout.toMillis(), TimeUnit.MILLISECONDS))
|
val writeTimeout = conn.writeTimeout.toMillis()
|
||||||
|
if(readTimeout > 0 || writeTimeout > 0) {
|
||||||
|
pipeline.addLast(
|
||||||
|
IdleStateHandler(
|
||||||
|
false,
|
||||||
|
readTimeout,
|
||||||
|
writeTimeout,
|
||||||
|
0,
|
||||||
|
TimeUnit.MILLISECONDS
|
||||||
|
)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
val readIdleTimeout = conn.readIdleTimeout.toMillis()
|
||||||
|
val writeIdleTimeout = conn.writeIdleTimeout.toMillis()
|
||||||
|
val idleTimeout = conn.idleTimeout.toMillis()
|
||||||
|
if(readIdleTimeout > 0 || writeIdleTimeout > 0 || idleTimeout > 0) {
|
||||||
|
pipeline.addLast(
|
||||||
|
IdleStateHandler(
|
||||||
|
true,
|
||||||
|
readIdleTimeout,
|
||||||
|
writeIdleTimeout,
|
||||||
|
idleTimeout,
|
||||||
|
TimeUnit.MILLISECONDS
|
||||||
|
)
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
pipeline.addLast(object : ChannelInboundHandlerAdapter() {
|
pipeline.addLast(object : ChannelInboundHandlerAdapter() {
|
||||||
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
|
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
|
||||||
|
@@ -114,9 +114,9 @@ object Parser {
|
|||||||
|
|
||||||
"connection" -> {
|
"connection" -> {
|
||||||
val writeTimeout = child.renderAttribute("write-timeout")
|
val writeTimeout = child.renderAttribute("write-timeout")
|
||||||
?.let(Duration::parse) ?: Duration.of(10, ChronoUnit.SECONDS)
|
?.let(Duration::parse) ?: Duration.of(0, ChronoUnit.SECONDS)
|
||||||
val readTimeout = child.renderAttribute("read-timeout")
|
val readTimeout = child.renderAttribute("read-timeout")
|
||||||
?.let(Duration::parse) ?: Duration.of(10, ChronoUnit.SECONDS)
|
?.let(Duration::parse) ?: Duration.of(0, ChronoUnit.SECONDS)
|
||||||
val idleTimeout = child.renderAttribute("idle-timeout")
|
val idleTimeout = child.renderAttribute("idle-timeout")
|
||||||
?.let(Duration::parse) ?: Duration.of(30, ChronoUnit.SECONDS)
|
?.let(Duration::parse) ?: Duration.of(30, ChronoUnit.SECONDS)
|
||||||
val readIdleTimeout = child.renderAttribute("read-idle-timeout")
|
val readIdleTimeout = child.renderAttribute("read-idle-timeout")
|
||||||
|
@@ -11,8 +11,10 @@ import net.woggioni.gbcs.api.Configuration
|
|||||||
import net.woggioni.gbcs.common.contextLogger
|
import net.woggioni.gbcs.common.contextLogger
|
||||||
import net.woggioni.gbcs.server.GradleBuildCacheServer
|
import net.woggioni.gbcs.server.GradleBuildCacheServer
|
||||||
import net.woggioni.jwo.Bucket
|
import net.woggioni.jwo.Bucket
|
||||||
|
import net.woggioni.jwo.LongMath
|
||||||
import java.net.InetSocketAddress
|
import java.net.InetSocketAddress
|
||||||
import java.time.Duration
|
import java.time.Duration
|
||||||
|
import java.time.temporal.ChronoUnit
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
|
|
||||||
@@ -54,25 +56,31 @@ class ThrottlingHandler(cfg: Configuration) :
|
|||||||
if (buckets.isEmpty()) {
|
if (buckets.isEmpty()) {
|
||||||
return super.channelRead(ctx, msg)
|
return super.channelRead(ctx, msg)
|
||||||
} else {
|
} else {
|
||||||
var nextAttempt = Long.MAX_VALUE
|
handleBuckets(buckets, ctx, msg, true)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun handleBuckets(buckets : List<Bucket>, ctx : ChannelHandlerContext, msg : Any, delayResponse : Boolean) {
|
||||||
|
var nextAttempt = -1L
|
||||||
for (bucket in buckets) {
|
for (bucket in buckets) {
|
||||||
val bucketNextAttempt = bucket.removeTokensWithEstimate(1)
|
val bucketNextAttempt = bucket.removeTokensWithEstimate(1)
|
||||||
if (bucketNextAttempt < 0) {
|
if (bucketNextAttempt > nextAttempt) {
|
||||||
return super.channelRead(ctx, msg)
|
|
||||||
} else if (bucketNextAttempt < nextAttempt) {
|
|
||||||
nextAttempt = bucketNextAttempt
|
nextAttempt = bucketNextAttempt
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
val waitDuration = Duration.ofNanos(nextAttempt)
|
if(nextAttempt < 0) {
|
||||||
if (waitDuration < waitThreshold) {
|
super.channelRead(ctx, msg)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
val waitDuration = Duration.of(LongMath.ceilDiv(nextAttempt, 100_000_000L) * 100L, ChronoUnit.MILLIS)
|
||||||
|
if (delayResponse && waitDuration < waitThreshold) {
|
||||||
ctx.executor().schedule({
|
ctx.executor().schedule({
|
||||||
ctx.fireChannelRead(msg)
|
handleBuckets(buckets, ctx, msg, false)
|
||||||
}, waitDuration.toNanos(), TimeUnit.NANOSECONDS)
|
}, waitDuration.toMillis(), TimeUnit.MILLISECONDS)
|
||||||
} else {
|
} else {
|
||||||
sendThrottledResponse(ctx, waitDuration)
|
sendThrottledResponse(ctx, waitDuration)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
private fun sendThrottledResponse(ctx: ChannelHandlerContext, retryAfter: Duration) {
|
private fun sendThrottledResponse(ctx: ChannelHandlerContext, retryAfter: Duration) {
|
||||||
val response = DefaultFullHttpResponse(
|
val response = DefaultFullHttpResponse(
|
||||||
@@ -80,7 +88,12 @@ class ThrottlingHandler(cfg: Configuration) :
|
|||||||
HttpResponseStatus.TOO_MANY_REQUESTS
|
HttpResponseStatus.TOO_MANY_REQUESTS
|
||||||
)
|
)
|
||||||
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
|
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
|
||||||
|
retryAfter.seconds.takeIf {
|
||||||
|
it > 0
|
||||||
|
}?.let {
|
||||||
response.headers()[HttpHeaderNames.RETRY_AFTER] = retryAfter.seconds
|
response.headers()[HttpHeaderNames.RETRY_AFTER] = retryAfter.seconds
|
||||||
|
}
|
||||||
|
|
||||||
ctx.writeAndFlush(response)
|
ctx.writeAndFlush(response)
|
||||||
}
|
}
|
||||||
}
|
}
|
@@ -34,8 +34,8 @@
|
|||||||
</xs:complexType>
|
</xs:complexType>
|
||||||
|
|
||||||
<xs:complexType name="connectionType">
|
<xs:complexType name="connectionType">
|
||||||
<xs:attribute name="read-timeout" type="xs:duration" use="optional" default="PT10S"/>
|
<xs:attribute name="read-timeout" type="xs:duration" use="optional" default="PT0S"/>
|
||||||
<xs:attribute name="write-timeout" type="xs:duration" use="optional" default="PT10S"/>
|
<xs:attribute name="write-timeout" type="xs:duration" use="optional" default="PT0S"/>
|
||||||
<xs:attribute name="idle-timeout" type="xs:duration" use="optional" default="PT30S"/>
|
<xs:attribute name="idle-timeout" type="xs:duration" use="optional" default="PT30S"/>
|
||||||
<xs:attribute name="read-idle-timeout" type="xs:duration" use="optional" default="PT60S"/>
|
<xs:attribute name="read-idle-timeout" type="xs:duration" use="optional" default="PT60S"/>
|
||||||
<xs:attribute name="write-idle-timeout" type="xs:duration" use="optional" default="PT60S"/>
|
<xs:attribute name="write-idle-timeout" type="xs:duration" use="optional" default="PT60S"/>
|
||||||
|
@@ -133,4 +133,19 @@ class TlsServerTest : AbstractTlsServerTest() {
|
|||||||
val response: HttpResponse<String> = client.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofString())
|
val response: HttpResponse<String> = client.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofString())
|
||||||
Assertions.assertEquals(HttpResponseStatus.FORBIDDEN.code(), response.statusCode())
|
Assertions.assertEquals(HttpResponseStatus.FORBIDDEN.code(), response.statusCode())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@Order(8)
|
||||||
|
fun traceAsAnonymousUser() {
|
||||||
|
val client: HttpClient = getHttpClient(null)
|
||||||
|
val requestBuilder = newRequestBuilder("").method(
|
||||||
|
"TRACE",
|
||||||
|
HttpRequest.BodyPublishers.ofByteArray("sfgsdgfaiousfiuhsd".toByteArray())
|
||||||
|
)
|
||||||
|
|
||||||
|
val response: HttpResponse<ByteArray> =
|
||||||
|
client.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofByteArray())
|
||||||
|
Assertions.assertEquals(HttpResponseStatus.OK.code(), response.statusCode())
|
||||||
|
println(String(response.body()))
|
||||||
|
}
|
||||||
}
|
}
|
@@ -4,7 +4,7 @@ org.gradle.caching=true
|
|||||||
|
|
||||||
gbcs.version = 0.0.11
|
gbcs.version = 0.0.11
|
||||||
|
|
||||||
lys.version = 2025.01.24
|
lys.version = 2025.01.25
|
||||||
|
|
||||||
gitea.maven.url = https://gitea.woggioni.net/api/packages/woggioni/maven
|
gitea.maven.url = https://gitea.woggioni.net/api/packages/woggioni/maven
|
||||||
docker.registry.url=gitea.woggioni.net
|
docker.registry.url=gitea.woggioni.net
|
||||||
|
@@ -1,6 +1,5 @@
|
|||||||
pluginManagement {
|
pluginManagement {
|
||||||
repositories {
|
repositories {
|
||||||
mavenLocal()
|
|
||||||
maven {
|
maven {
|
||||||
url = getProperty('gitea.maven.url')
|
url = getProperty('gitea.maven.url')
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user