solved issue with ignored HttpContent
and HttpCacheContent
messages in the Netty pipeline
All checks were successful
CI / build (push) Successful in 12m48s
All checks were successful
CI / build (push) Successful in 12m48s
This commit is contained in:
@@ -54,6 +54,7 @@ import net.woggioni.rbcs.server.auth.RoleAuthorizer
|
||||
import net.woggioni.rbcs.server.configuration.Parser
|
||||
import net.woggioni.rbcs.server.configuration.Serializer
|
||||
import net.woggioni.rbcs.server.exception.ExceptionHandler
|
||||
import net.woggioni.rbcs.server.handler.BlackHoleRequestHandler
|
||||
import net.woggioni.rbcs.server.handler.MaxRequestSizeHandler
|
||||
import net.woggioni.rbcs.server.handler.ServerHandler
|
||||
import net.woggioni.rbcs.server.throttling.BucketManager
|
||||
@@ -361,6 +362,7 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
|
||||
}
|
||||
pipeline.addLast(eventExecutorGroup, ServerHandler.NAME, serverHandler)
|
||||
pipeline.addLast(ExceptionHandler.NAME, ExceptionHandler)
|
||||
pipeline.addLast(BlackHoleRequestHandler.NAME, BlackHoleRequestHandler())
|
||||
}
|
||||
|
||||
override fun asyncClose() = cacheHandlerFactory.asyncClose()
|
||||
|
@@ -2,7 +2,6 @@ package net.woggioni.rbcs.server.cache
|
||||
|
||||
import io.netty.buffer.ByteBuf
|
||||
import io.netty.channel.ChannelHandlerContext
|
||||
import io.netty.channel.SimpleChannelInboundHandler
|
||||
import io.netty.handler.codec.http.LastHttpContent
|
||||
import io.netty.handler.stream.ChunkedNioFile
|
||||
import net.woggioni.rbcs.api.CacheHandler
|
||||
@@ -29,10 +28,16 @@ class FileSystemCacheHandler(
|
||||
private val chunkSize: Int
|
||||
) : CacheHandler() {
|
||||
|
||||
private interface InProgressRequest{
|
||||
|
||||
}
|
||||
|
||||
private class InProgressGetRequest(val request : CacheGetRequest) : InProgressRequest
|
||||
|
||||
private inner class InProgressPutRequest(
|
||||
val key : String,
|
||||
private val fileSink : FileSystemCache.FileSink
|
||||
) {
|
||||
) : InProgressRequest {
|
||||
|
||||
private val stream = Channels.newOutputStream(fileSink.channel).let {
|
||||
if (compressionEnabled) {
|
||||
@@ -56,7 +61,7 @@ class FileSystemCacheHandler(
|
||||
}
|
||||
}
|
||||
|
||||
private var inProgressPutRequest: InProgressPutRequest? = null
|
||||
private var inProgressRequest: InProgressRequest? = null
|
||||
|
||||
override fun channelRead0(ctx: ChannelHandlerContext, msg: CacheMessage) {
|
||||
when (msg) {
|
||||
@@ -69,55 +74,64 @@ class FileSystemCacheHandler(
|
||||
}
|
||||
|
||||
private fun handleGetRequest(ctx: ChannelHandlerContext, msg: CacheGetRequest) {
|
||||
val key = String(Base64.getUrlEncoder().encode(processCacheKey(msg.key, digestAlgorithm)))
|
||||
cache.get(key)?.also { entryValue ->
|
||||
sendMessageAndFlush(ctx, CacheValueFoundResponse(msg.key, entryValue.metadata))
|
||||
entryValue.channel.let { channel ->
|
||||
if(compressionEnabled) {
|
||||
InflaterInputStream(Channels.newInputStream(channel)).use { stream ->
|
||||
inProgressRequest = InProgressGetRequest(msg)
|
||||
|
||||
outerLoop@
|
||||
while (true) {
|
||||
val buf = ctx.alloc().heapBuffer(chunkSize)
|
||||
while(buf.readableBytes() < chunkSize) {
|
||||
val read = buf.writeBytes(stream, chunkSize)
|
||||
if(read < 0) {
|
||||
sendMessageAndFlush(ctx, LastCacheContent(buf))
|
||||
break@outerLoop
|
||||
}
|
||||
}
|
||||
sendMessageAndFlush(ctx, CacheContent(buf))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
sendMessage(ctx, ChunkedNioFile(channel, entryValue.offset, entryValue.size - entryValue.offset, chunkSize))
|
||||
sendMessageAndFlush(ctx, LastHttpContent.EMPTY_LAST_CONTENT)
|
||||
}
|
||||
}
|
||||
} ?: sendMessageAndFlush(ctx, CacheValueNotFoundResponse())
|
||||
}
|
||||
|
||||
private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) {
|
||||
val key = String(Base64.getUrlEncoder().encode(processCacheKey(msg.key, digestAlgorithm)))
|
||||
val sink = cache.put(key, msg.metadata)
|
||||
inProgressPutRequest = InProgressPutRequest(msg.key, sink)
|
||||
inProgressRequest = InProgressPutRequest(msg.key, sink)
|
||||
}
|
||||
|
||||
private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) {
|
||||
inProgressPutRequest!!.write(msg.content())
|
||||
val request = inProgressRequest
|
||||
if(request is InProgressPutRequest) {
|
||||
request.write(msg.content())
|
||||
}
|
||||
}
|
||||
|
||||
private fun handleLastCacheContent(ctx: ChannelHandlerContext, msg: LastCacheContent) {
|
||||
inProgressPutRequest?.let { request ->
|
||||
inProgressPutRequest = null
|
||||
request.write(msg.content())
|
||||
request.commit()
|
||||
sendMessageAndFlush(ctx, CachePutResponse(request.key))
|
||||
when(val request = inProgressRequest) {
|
||||
is InProgressPutRequest -> {
|
||||
inProgressRequest = null
|
||||
request.write(msg.content())
|
||||
request.commit()
|
||||
sendMessageAndFlush(ctx, CachePutResponse(request.key))
|
||||
}
|
||||
is InProgressGetRequest -> {
|
||||
val key = String(Base64.getUrlEncoder().encode(processCacheKey(request.request.key, digestAlgorithm)))
|
||||
cache.get(key)?.also { entryValue ->
|
||||
sendMessageAndFlush(ctx, CacheValueFoundResponse(request.request.key, entryValue.metadata))
|
||||
entryValue.channel.let { channel ->
|
||||
if(compressionEnabled) {
|
||||
InflaterInputStream(Channels.newInputStream(channel)).use { stream ->
|
||||
|
||||
outerLoop@
|
||||
while (true) {
|
||||
val buf = ctx.alloc().heapBuffer(chunkSize)
|
||||
while(buf.readableBytes() < chunkSize) {
|
||||
val read = buf.writeBytes(stream, chunkSize)
|
||||
if(read < 0) {
|
||||
sendMessageAndFlush(ctx, LastCacheContent(buf))
|
||||
break@outerLoop
|
||||
}
|
||||
}
|
||||
sendMessageAndFlush(ctx, CacheContent(buf))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
sendMessage(ctx, ChunkedNioFile(channel, entryValue.offset, entryValue.size - entryValue.offset, chunkSize))
|
||||
sendMessageAndFlush(ctx, LastHttpContent.EMPTY_LAST_CONTENT)
|
||||
}
|
||||
}
|
||||
} ?: sendMessageAndFlush(ctx, CacheValueNotFoundResponse())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
|
||||
inProgressPutRequest?.rollback()
|
||||
(inProgressRequest as? InProgressPutRequest)?.rollback()
|
||||
super.exceptionCaught(ctx, cause)
|
||||
}
|
||||
}
|
@@ -75,6 +75,10 @@ class InMemoryCache(
|
||||
cond.await(interval.toMillis(), TimeUnit.MILLISECONDS)
|
||||
}
|
||||
}
|
||||
map.forEach {
|
||||
it.value.content.release()
|
||||
}
|
||||
map.clear()
|
||||
}
|
||||
complete(null)
|
||||
} catch (ex: Throwable) {
|
||||
|
@@ -26,7 +26,15 @@ class InMemoryCacheHandler(
|
||||
private val compressionLevel: Int
|
||||
) : CacheHandler() {
|
||||
|
||||
private interface InProgressPutRequest : AutoCloseable {
|
||||
private interface InProgressRequest : AutoCloseable {
|
||||
}
|
||||
|
||||
private class InProgressGetRequest(val request : CacheGetRequest) : InProgressRequest {
|
||||
override fun close() {
|
||||
}
|
||||
}
|
||||
|
||||
private interface InProgressPutRequest : InProgressRequest {
|
||||
val request: CachePutRequest
|
||||
val buf: ByteBuf
|
||||
|
||||
@@ -74,7 +82,7 @@ class InMemoryCacheHandler(
|
||||
}
|
||||
}
|
||||
|
||||
private var inProgressPutRequest: InProgressPutRequest? = null
|
||||
private var inProgressRequest: InProgressRequest? = null
|
||||
|
||||
override fun channelRead0(ctx: ChannelHandlerContext, msg: CacheMessage) {
|
||||
when (msg) {
|
||||
@@ -87,24 +95,11 @@ class InMemoryCacheHandler(
|
||||
}
|
||||
|
||||
private fun handleGetRequest(ctx: ChannelHandlerContext, msg: CacheGetRequest) {
|
||||
cache.get(processCacheKey(msg.key, digestAlgorithm))?.let { value ->
|
||||
sendMessageAndFlush(ctx, CacheValueFoundResponse(msg.key, value.metadata))
|
||||
if (compressionEnabled) {
|
||||
val buf = ctx.alloc().heapBuffer()
|
||||
InflaterOutputStream(ByteBufOutputStream(buf)).use {
|
||||
value.content.readBytes(it, value.content.readableBytes())
|
||||
value.content.release()
|
||||
buf.retain()
|
||||
}
|
||||
sendMessage(ctx, LastCacheContent(buf))
|
||||
} else {
|
||||
sendMessage(ctx, LastCacheContent(value.content))
|
||||
}
|
||||
} ?: sendMessage(ctx, CacheValueNotFoundResponse())
|
||||
inProgressRequest = InProgressGetRequest(msg)
|
||||
}
|
||||
|
||||
private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) {
|
||||
inProgressPutRequest = if(compressionEnabled) {
|
||||
inProgressRequest = if(compressionEnabled) {
|
||||
InProgressCompressedPutRequest(ctx, msg)
|
||||
} else {
|
||||
InProgressPlainPutRequest(ctx, msg)
|
||||
@@ -112,27 +107,46 @@ class InMemoryCacheHandler(
|
||||
}
|
||||
|
||||
private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) {
|
||||
inProgressPutRequest?.append(msg.content())
|
||||
val req = inProgressRequest
|
||||
if(req is InProgressPutRequest) {
|
||||
req.append(msg.content())
|
||||
}
|
||||
}
|
||||
|
||||
private fun handleLastCacheContent(ctx: ChannelHandlerContext, msg: LastCacheContent) {
|
||||
handleCacheContent(ctx, msg)
|
||||
inProgressPutRequest?.let { inProgressRequest ->
|
||||
inProgressPutRequest = null
|
||||
val buf = inProgressRequest.buf
|
||||
buf.retain()
|
||||
inProgressRequest.close()
|
||||
val cacheKey = processCacheKey(inProgressRequest.request.key, digestAlgorithm)
|
||||
cache.put(cacheKey, CacheEntry(inProgressRequest.request.metadata, buf))
|
||||
sendMessageAndFlush(ctx, CachePutResponse(inProgressRequest.request.key))
|
||||
when(val req = inProgressRequest) {
|
||||
is InProgressGetRequest -> {
|
||||
cache.get(processCacheKey(req.request.key, digestAlgorithm))?.let { value ->
|
||||
sendMessageAndFlush(ctx, CacheValueFoundResponse(req.request.key, value.metadata))
|
||||
if (compressionEnabled) {
|
||||
val buf = ctx.alloc().heapBuffer()
|
||||
InflaterOutputStream(ByteBufOutputStream(buf)).use {
|
||||
value.content.readBytes(it, value.content.readableBytes())
|
||||
value.content.release()
|
||||
buf.retain()
|
||||
}
|
||||
sendMessage(ctx, LastCacheContent(buf))
|
||||
} else {
|
||||
sendMessage(ctx, LastCacheContent(value.content))
|
||||
}
|
||||
} ?: sendMessage(ctx, CacheValueNotFoundResponse())
|
||||
}
|
||||
is InProgressPutRequest -> {
|
||||
this.inProgressRequest = null
|
||||
val buf = req.buf
|
||||
buf.retain()
|
||||
req.close()
|
||||
val cacheKey = processCacheKey(req.request.key, digestAlgorithm)
|
||||
cache.put(cacheKey, CacheEntry(req.request.metadata, buf))
|
||||
sendMessageAndFlush(ctx, CachePutResponse(req.request.key))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
|
||||
inProgressPutRequest?.let { req ->
|
||||
req.buf.release()
|
||||
inProgressPutRequest = null
|
||||
}
|
||||
inProgressRequest?.close()
|
||||
inProgressRequest = null
|
||||
super.exceptionCaught(ctx, cause)
|
||||
}
|
||||
}
|
@@ -0,0 +1,13 @@
|
||||
package net.woggioni.rbcs.server.handler
|
||||
|
||||
import io.netty.channel.ChannelHandlerContext
|
||||
import io.netty.channel.SimpleChannelInboundHandler
|
||||
import io.netty.handler.codec.http.HttpContent
|
||||
|
||||
class BlackHoleRequestHandler : SimpleChannelInboundHandler<HttpContent>() {
|
||||
companion object {
|
||||
val NAME = BlackHoleRequestHandler::class.java.name
|
||||
}
|
||||
override fun channelRead0(ctx: ChannelHandlerContext, msg: HttpContent) {
|
||||
}
|
||||
}
|
@@ -94,6 +94,9 @@ class ThrottlingHandler(private val bucketManager : BucketManager,
|
||||
handleBuckets(buckets, ctx, msg, false)
|
||||
}, waitDuration.toMillis(), TimeUnit.MILLISECONDS)
|
||||
} else {
|
||||
queuedContent?.let { qc ->
|
||||
qc.forEach { it.release() }
|
||||
}
|
||||
this.queuedContent = null
|
||||
sendThrottledResponse(ctx, waitDuration)
|
||||
}
|
||||
|
@@ -154,7 +154,7 @@ class BasicAuthServerTest : AbstractBasicAuthServerTest() {
|
||||
}
|
||||
|
||||
@Test
|
||||
@Order(6)
|
||||
@Order(8)
|
||||
fun getAsAThrottledUser() {
|
||||
val client: HttpClient = HttpClient.newHttpClient()
|
||||
|
||||
@@ -172,7 +172,7 @@ class BasicAuthServerTest : AbstractBasicAuthServerTest() {
|
||||
}
|
||||
|
||||
@Test
|
||||
@Order(7)
|
||||
@Order(9)
|
||||
fun getAsAThrottledUser2() {
|
||||
val client: HttpClient = HttpClient.newHttpClient()
|
||||
|
||||
|
Reference in New Issue
Block a user