fixed throttling retry-after estimation
All checks were successful
CI / build (push) Successful in 3m38s
All checks were successful
CI / build (push) Successful in 3m38s
This commit is contained in:
@@ -11,8 +11,10 @@ import net.woggioni.gbcs.api.Configuration
|
||||
import net.woggioni.gbcs.common.contextLogger
|
||||
import net.woggioni.gbcs.server.GradleBuildCacheServer
|
||||
import net.woggioni.jwo.Bucket
|
||||
import net.woggioni.jwo.LongMath
|
||||
import java.net.InetSocketAddress
|
||||
import java.time.Duration
|
||||
import java.time.temporal.ChronoUnit
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
|
||||
@@ -54,24 +56,30 @@ class ThrottlingHandler(cfg: Configuration) :
|
||||
if (buckets.isEmpty()) {
|
||||
return super.channelRead(ctx, msg)
|
||||
} else {
|
||||
var nextAttempt = Long.MAX_VALUE
|
||||
for (bucket in buckets) {
|
||||
val bucketNextAttempt = bucket.removeTokensWithEstimate(1)
|
||||
if (bucketNextAttempt < 0) {
|
||||
return super.channelRead(ctx, msg)
|
||||
} else if (bucketNextAttempt < nextAttempt) {
|
||||
nextAttempt = bucketNextAttempt
|
||||
}
|
||||
}
|
||||
val waitDuration = Duration.ofNanos(nextAttempt)
|
||||
if (waitDuration < waitThreshold) {
|
||||
ctx.executor().schedule({
|
||||
ctx.fireChannelRead(msg)
|
||||
}, waitDuration.toNanos(), TimeUnit.NANOSECONDS)
|
||||
} else {
|
||||
sendThrottledResponse(ctx, waitDuration)
|
||||
handleBuckets(buckets, ctx, msg, true)
|
||||
}
|
||||
}
|
||||
|
||||
private fun handleBuckets(buckets : List<Bucket>, ctx : ChannelHandlerContext, msg : Any, delayResponse : Boolean) {
|
||||
var nextAttempt = -1L
|
||||
for (bucket in buckets) {
|
||||
val bucketNextAttempt = bucket.removeTokensWithEstimate(1)
|
||||
if (bucketNextAttempt > nextAttempt) {
|
||||
nextAttempt = bucketNextAttempt
|
||||
}
|
||||
}
|
||||
if(nextAttempt < 0) {
|
||||
super.channelRead(ctx, msg)
|
||||
return
|
||||
}
|
||||
val waitDuration = Duration.of(LongMath.ceilDiv(nextAttempt, 100_000_000L) * 100L, ChronoUnit.MILLIS)
|
||||
if (delayResponse && waitDuration < waitThreshold) {
|
||||
ctx.executor().schedule({
|
||||
handleBuckets(buckets, ctx, msg, false)
|
||||
}, waitDuration.toMillis(), TimeUnit.MILLISECONDS)
|
||||
} else {
|
||||
sendThrottledResponse(ctx, waitDuration)
|
||||
}
|
||||
}
|
||||
|
||||
private fun sendThrottledResponse(ctx: ChannelHandlerContext, retryAfter: Duration) {
|
||||
@@ -80,7 +88,12 @@ class ThrottlingHandler(cfg: Configuration) :
|
||||
HttpResponseStatus.TOO_MANY_REQUESTS
|
||||
)
|
||||
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
|
||||
response.headers()[HttpHeaderNames.RETRY_AFTER] = retryAfter.seconds
|
||||
retryAfter.seconds.takeIf {
|
||||
it > 0
|
||||
}?.let {
|
||||
response.headers()[HttpHeaderNames.RETRY_AFTER] = retryAfter.seconds
|
||||
}
|
||||
|
||||
ctx.writeAndFlush(response)
|
||||
}
|
||||
}
|
@@ -133,4 +133,19 @@ class TlsServerTest : AbstractTlsServerTest() {
|
||||
val response: HttpResponse<String> = client.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofString())
|
||||
Assertions.assertEquals(HttpResponseStatus.FORBIDDEN.code(), response.statusCode())
|
||||
}
|
||||
|
||||
@Test
|
||||
@Order(8)
|
||||
fun traceAsAnonymousUser() {
|
||||
val client: HttpClient = getHttpClient(null)
|
||||
val requestBuilder = newRequestBuilder("").method(
|
||||
"TRACE",
|
||||
HttpRequest.BodyPublishers.ofByteArray("sfgsdgfaiousfiuhsd".toByteArray())
|
||||
)
|
||||
|
||||
val response: HttpResponse<ByteArray> =
|
||||
client.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofByteArray())
|
||||
Assertions.assertEquals(HttpResponseStatus.OK.code(), response.statusCode())
|
||||
println(String(response.body()))
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user