fixed memory leak in MemcachedCacheHandler
This commit is contained in:
@@ -1,7 +1,6 @@
|
||||
package net.woggioni.rbcs.server.memcache
|
||||
|
||||
import io.netty.channel.ChannelFactory
|
||||
import io.netty.channel.ChannelHandler
|
||||
import io.netty.channel.EventLoopGroup
|
||||
import io.netty.channel.pool.FixedChannelPool
|
||||
import io.netty.channel.socket.DatagramChannel
|
||||
|
@@ -111,17 +111,17 @@ class MemcacheCacheHandler(
|
||||
}
|
||||
if (responseSent) {
|
||||
acc.readBytes(outputStream, acc.readableBytes())
|
||||
if(acc.readableBytes() >= chunkSize) {
|
||||
if (acc.readableBytes() >= chunkSize) {
|
||||
flush(false)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun flush(last : Boolean) {
|
||||
private fun flush(last: Boolean) {
|
||||
val toSend = extractChunk(chunk, ctx.alloc())
|
||||
val msg = if(last) {
|
||||
val msg = if (last) {
|
||||
log.trace(ctx) {
|
||||
"Sending last chunk to client on channel"
|
||||
"Sending last chunk to client"
|
||||
}
|
||||
LastCacheContent(toSend)
|
||||
} else {
|
||||
@@ -148,14 +148,14 @@ class MemcacheCacheHandler(
|
||||
}
|
||||
|
||||
private inner class InProgressPutRequest(
|
||||
private val ch : NettyChannel,
|
||||
metadata : CacheValueMetadata,
|
||||
val digest : ByteBuf,
|
||||
private val ch: NettyChannel,
|
||||
metadata: CacheValueMetadata,
|
||||
val digest: ByteBuf,
|
||||
val requestController: CompletableFuture<MemcacheRequestController>,
|
||||
private val alloc: ByteBufAllocator
|
||||
) : InProgressRequest {
|
||||
private var totalSize = 0
|
||||
private var tmpFile : FileChannel? = null
|
||||
private var tmpFile: FileChannel? = null
|
||||
private val accumulator = alloc.compositeBuffer()
|
||||
private val stream = ByteBufOutputStream(accumulator).let {
|
||||
if (compressionEnabled) {
|
||||
@@ -182,7 +182,7 @@ class MemcacheCacheHandler(
|
||||
tmpFile?.let {
|
||||
flushToDisk(it, accumulator)
|
||||
}
|
||||
if(accumulator.readableBytes() > 0x100000) {
|
||||
if (accumulator.readableBytes() > 0x100000) {
|
||||
log.debug(ch) {
|
||||
"Entry is too big, buffering it into a file"
|
||||
}
|
||||
@@ -199,18 +199,18 @@ class MemcacheCacheHandler(
|
||||
}
|
||||
}
|
||||
|
||||
private fun flushToDisk(fc : FileChannel, buf : CompositeByteBuf) {
|
||||
private fun flushToDisk(fc: FileChannel, buf: CompositeByteBuf) {
|
||||
val chunk = extractChunk(buf, alloc)
|
||||
fc.write(chunk.nioBuffer())
|
||||
chunk.release()
|
||||
}
|
||||
|
||||
fun commit() : Pair<Int, ReadableByteChannel> {
|
||||
fun commit(): Pair<Int, ReadableByteChannel> {
|
||||
digest.release()
|
||||
accumulator.retain()
|
||||
stream.close()
|
||||
val fileChannel = tmpFile
|
||||
return if(fileChannel != null) {
|
||||
return if (fileChannel != null) {
|
||||
flushToDisk(fileChannel, accumulator)
|
||||
accumulator.release()
|
||||
fileChannel.position(0)
|
||||
@@ -244,7 +244,67 @@ class MemcacheCacheHandler(
|
||||
}
|
||||
|
||||
private fun handleGetRequest(ctx: ChannelHandlerContext, msg: CacheGetRequest) {
|
||||
inProgressRequest = InProgressGetRequest(msg.key, ctx)
|
||||
log.debug(ctx) {
|
||||
"Fetching ${msg.key} from memcache"
|
||||
}
|
||||
val key = ctx.alloc().buffer().also {
|
||||
it.writeBytes(processCacheKey(msg.key, digestAlgorithm))
|
||||
}
|
||||
val responseHandler = object : MemcacheResponseHandler {
|
||||
override fun responseReceived(response: BinaryMemcacheResponse) {
|
||||
val status = response.status()
|
||||
when (status) {
|
||||
BinaryMemcacheResponseStatus.SUCCESS -> {
|
||||
log.debug(ctx) {
|
||||
"Cache hit for key ${msg.key} on memcache"
|
||||
}
|
||||
inProgressRequest = InProgressGetRequest(msg.key, ctx)
|
||||
}
|
||||
|
||||
BinaryMemcacheResponseStatus.KEY_ENOENT -> {
|
||||
log.debug(ctx) {
|
||||
"Cache miss for key ${msg.key} on memcache"
|
||||
}
|
||||
sendMessageAndFlush(ctx, CacheValueNotFoundResponse())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun contentReceived(content: MemcacheContent) {
|
||||
log.trace(ctx) {
|
||||
"${if (content is LastMemcacheContent) "Last chunk" else "Chunk"} of ${
|
||||
content.content().readableBytes()
|
||||
} bytes received from memcache for key ${msg.key}"
|
||||
}
|
||||
(inProgressRequest as? InProgressGetRequest)?.let { inProgressGetRequest ->
|
||||
inProgressGetRequest.write(content.content())
|
||||
if (content is LastMemcacheContent) {
|
||||
inProgressRequest = null
|
||||
inProgressGetRequest.commit()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun exceptionCaught(ex: Throwable) {
|
||||
(inProgressRequest as? InProgressGetRequest).let { inProgressGetRequest ->
|
||||
inProgressGetRequest?.let {
|
||||
inProgressRequest = null
|
||||
it.rollback()
|
||||
}
|
||||
}
|
||||
this@MemcacheCacheHandler.exceptionCaught(ctx, ex)
|
||||
}
|
||||
}
|
||||
client.sendRequest(key.retainedDuplicate(), responseHandler).thenAccept { requestHandle ->
|
||||
log.trace(ctx) {
|
||||
"Sending GET request for key ${msg.key} to memcache"
|
||||
}
|
||||
val request = DefaultBinaryMemcacheRequest(key).apply {
|
||||
setOpcode(BinaryMemcacheOpcodes.GET)
|
||||
}
|
||||
requestHandle.sendRequest(request)
|
||||
requestHandle.sendContent(LastMemcacheContent.EMPTY_LAST_CONTENT)
|
||||
}
|
||||
}
|
||||
|
||||
private fun handlePutRequest(ctx: ChannelHandlerContext, msg: CachePutRequest) {
|
||||
@@ -261,6 +321,7 @@ class MemcacheCacheHandler(
|
||||
}
|
||||
sendMessageAndFlush(ctx, CachePutResponse(msg.key))
|
||||
}
|
||||
|
||||
else -> this@MemcacheCacheHandler.exceptionCaught(ctx, MemcacheException(status))
|
||||
}
|
||||
}
|
||||
@@ -282,13 +343,14 @@ class MemcacheCacheHandler(
|
||||
|
||||
private fun handleCacheContent(ctx: ChannelHandlerContext, msg: CacheContent) {
|
||||
val request = inProgressRequest
|
||||
when(request) {
|
||||
when (request) {
|
||||
is InProgressPutRequest -> {
|
||||
log.trace(ctx) {
|
||||
"Received chunk of ${msg.content().readableBytes()} bytes for memcache"
|
||||
}
|
||||
request.write(msg.content())
|
||||
}
|
||||
|
||||
is InProgressGetRequest -> {
|
||||
msg.release()
|
||||
}
|
||||
@@ -297,7 +359,7 @@ class MemcacheCacheHandler(
|
||||
|
||||
private fun handleLastCacheContent(ctx: ChannelHandlerContext, msg: LastCacheContent) {
|
||||
val request = inProgressRequest
|
||||
when(request) {
|
||||
when (request) {
|
||||
is InProgressPutRequest -> {
|
||||
inProgressRequest = null
|
||||
log.trace(ctx) {
|
||||
@@ -314,7 +376,7 @@ class MemcacheCacheHandler(
|
||||
"Trying to send SET request to memcache"
|
||||
}
|
||||
request.requestController.whenComplete { requestController, ex ->
|
||||
if(ex == null) {
|
||||
if (ex == null) {
|
||||
log.trace(ctx) {
|
||||
"Sending SET request to memcache"
|
||||
}
|
||||
@@ -332,7 +394,7 @@ class MemcacheCacheHandler(
|
||||
while (true) {
|
||||
val read = source.read(bb)
|
||||
bb.limit()
|
||||
if(read >= 0 && bb.position() < chunkSize && bb.hasRemaining()) {
|
||||
if (read >= 0 && bb.position() < chunkSize && bb.hasRemaining()) {
|
||||
continue
|
||||
}
|
||||
val chunk = ctx.alloc().buffer(chunkSize)
|
||||
@@ -342,7 +404,7 @@ class MemcacheCacheHandler(
|
||||
log.trace(ctx) {
|
||||
"Sending ${chunk.readableBytes()} bytes chunk to memcache"
|
||||
}
|
||||
if(read < 0) {
|
||||
if (read < 0) {
|
||||
requestController.sendContent(DefaultLastMemcacheContent(chunk))
|
||||
break
|
||||
} else {
|
||||
@@ -355,73 +417,12 @@ class MemcacheCacheHandler(
|
||||
}
|
||||
}
|
||||
}
|
||||
is InProgressGetRequest -> {
|
||||
log.debug(ctx) {
|
||||
"Fetching ${request.key} from memcache"
|
||||
}
|
||||
val key = ctx.alloc().buffer().also {
|
||||
it.writeBytes(processCacheKey(request.key, digestAlgorithm))
|
||||
}
|
||||
val responseHandler = object : MemcacheResponseHandler {
|
||||
override fun responseReceived(response: BinaryMemcacheResponse) {
|
||||
val status = response.status()
|
||||
when (status) {
|
||||
BinaryMemcacheResponseStatus.SUCCESS -> {
|
||||
log.debug(ctx) {
|
||||
"Cache hit for key ${request.key} on memcache"
|
||||
}
|
||||
inProgressRequest = InProgressGetRequest(request.key, ctx)
|
||||
}
|
||||
|
||||
BinaryMemcacheResponseStatus.KEY_ENOENT -> {
|
||||
log.debug(ctx) {
|
||||
"Cache miss for key ${request.key} on memcache"
|
||||
}
|
||||
sendMessageAndFlush(ctx, CacheValueNotFoundResponse())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun contentReceived(content: MemcacheContent) {
|
||||
log.trace(ctx) {
|
||||
"${if(content is LastMemcacheContent) "Last chunk" else "Chunk"} of ${content.content().readableBytes()} bytes received from memcache for key ${request.key}"
|
||||
}
|
||||
(inProgressRequest as? InProgressGetRequest).let { inProgressGetRequest ->
|
||||
inProgressGetRequest?.write(content.content())
|
||||
if (content is LastMemcacheContent) {
|
||||
inProgressRequest = null
|
||||
inProgressGetRequest?.commit()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun exceptionCaught(ex: Throwable) {
|
||||
(inProgressRequest as? InProgressGetRequest).let { inProgressGetRequest ->
|
||||
inProgressGetRequest?.let {
|
||||
inProgressRequest = null
|
||||
it.rollback()
|
||||
}
|
||||
}
|
||||
this@MemcacheCacheHandler.exceptionCaught(ctx, ex)
|
||||
}
|
||||
}
|
||||
client.sendRequest(key.retainedDuplicate(), responseHandler).thenAccept { requestHandle ->
|
||||
log.trace(ctx) {
|
||||
"Sending GET request for key ${request.key} to memcache"
|
||||
}
|
||||
val request = DefaultBinaryMemcacheRequest(key).apply {
|
||||
setOpcode(BinaryMemcacheOpcodes.GET)
|
||||
}
|
||||
requestHandle.sendRequest(request)
|
||||
requestHandle.sendContent(LastMemcacheContent.EMPTY_LAST_CONTENT)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
|
||||
val request = inProgressRequest
|
||||
when(request) {
|
||||
when (request) {
|
||||
is InProgressPutRequest -> {
|
||||
inProgressRequest = null
|
||||
request.requestController.thenAccept { controller ->
|
||||
@@ -429,6 +430,7 @@ class MemcacheCacheHandler(
|
||||
}
|
||||
request.rollback()
|
||||
}
|
||||
|
||||
is InProgressGetRequest -> {
|
||||
inProgressRequest = null
|
||||
request.rollback()
|
||||
|
Reference in New Issue
Block a user