fixed memory leak in memcached plugin

This commit is contained in:
2025-02-05 14:41:11 +08:00
parent 4180df2352
commit 1b6cf1bd96
8 changed files with 86 additions and 31 deletions

View File

@@ -25,8 +25,12 @@ class GradleBuildCacheServerCli : GbcsCommand() {
companion object {
@JvmStatic
fun main(vararg args: String) {
Thread.currentThread().contextClassLoader = GradleBuildCacheServerCli::class.java.classLoader
val currentClassLoader = GradleBuildCacheServerCli::class.java.classLoader
Thread.currentThread().contextClassLoader = currentClassLoader
if(currentClassLoader.javaClass.name == "net.woggioni.envelope.loader.ModuleClassLoader") {
//We're running in an envelope jar and custom URL protocols won't work
GbcsUrlStreamHandlerFactory.install()
}
val log = contextLogger()
val app = Application.builder("gbcs")
.configurationDirectoryEnvVar("GBCS_CONFIGURATION_DIR")

View File

@@ -2,6 +2,7 @@ package net.woggioni.gbcs.cli.impl.commands
import net.woggioni.gbcs.api.Configuration
import net.woggioni.gbcs.cli.impl.GbcsCommand
import net.woggioni.gbcs.cli.impl.converters.DurationConverter
import net.woggioni.gbcs.common.contextLogger
import net.woggioni.gbcs.common.debug
import net.woggioni.gbcs.common.info
@@ -13,6 +14,7 @@ import picocli.CommandLine
import java.io.ByteArrayOutputStream
import java.nio.file.Files
import java.nio.file.Path
import java.time.Duration
@CommandLine.Command(
name = "server",
@@ -35,6 +37,14 @@ class ServerCommand(app : Application) : GbcsCommand() {
}
}
@CommandLine.Option(
names = ["-t", "--timeout"],
description = ["Exit after the specified time"],
paramLabel = "TIMEOUT",
converter = [DurationConverter::class]
)
private var timeout: Duration? = null
@CommandLine.Option(
names = ["-c", "--config-file"],
description = ["Read the application configuration from this file"],
@@ -42,10 +52,6 @@ class ServerCommand(app : Application) : GbcsCommand() {
)
private var configurationFile: Path = findConfigurationFile(app, "gbcs-server.xml")
val configuration : Configuration by lazy {
GradleBuildCacheServer.loadConfiguration(configurationFile)
}
override fun run() {
if (!Files.exists(configurationFile)) {
Files.createDirectories(configurationFile.parent)
@@ -61,7 +67,11 @@ class ServerCommand(app : Application) : GbcsCommand() {
}
}
val server = GradleBuildCacheServer(configuration)
server.run().use {
server.run().use { server ->
timeout?.let {
Thread.sleep(it)
server.shutdown()
}
}
}
}

View File

@@ -0,0 +1,11 @@
package net.woggioni.gbcs.cli.impl.converters
import picocli.CommandLine
import java.time.Duration
class DurationConverter : CommandLine.ITypeConverter<Duration> {
override fun convert(value: String): Duration {
return Duration.parse(value)
}
}

View File

@@ -15,6 +15,4 @@
<root level="info">
<appender-ref ref="console"/>
</root>
<logger name="com.google.code.yanf4j" level="warn"/>
<logger name="net.rubyeye.xmemcached" level="warn"/>
</configuration>

View File

@@ -0,0 +1,7 @@
package net.woggioni.gbcs.common
class ResourceNotFoundException(msg : String? = null, cause: Throwable? = null) : RuntimeException(msg, cause) {
}
class ModuleNotFoundException(msg : String? = null, cause: Throwable? = null) : RuntimeException(msg, cause) {
}

View File

@@ -36,13 +36,17 @@ class GbcsUrlStreamHandlerFactory : URLStreamHandlerProvider() {
private class JpmsHandler : URLStreamHandler() {
override fun openConnection(u: URL): URLConnection {
val thisModule = javaClass.module
val sourceModule = Optional.ofNullable(thisModule)
.map { obj: Module -> obj.layer }
.flatMap { layer: ModuleLayer ->
val moduleName = u.host
layer.findModule(moduleName)
}.orElse(thisModule)
val thisModule = javaClass.module
val sourceModule =
thisModule
?.let(Module::getLayer)
?.let { layer: ModuleLayer ->
layer.findModule(moduleName).orElse(null)
} ?: if(thisModule.layer == null) {
thisModule
} else throw ModuleNotFoundException("Module '$moduleName' not found")
return JpmsResourceURLConnection(u, sourceModule)
}
}
@@ -53,7 +57,9 @@ class GbcsUrlStreamHandlerFactory : URLStreamHandlerProvider() {
@Throws(IOException::class)
override fun getInputStream(): InputStream {
return module.getResourceAsStream(getURL().path)
val resource = getURL().path
return module.getResourceAsStream(resource)
?: throw ResourceNotFoundException("Resource '$resource' not found in module '${module.name}'")
}
}

View File

@@ -114,13 +114,14 @@ class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseabl
val channel = channelFuture.now
val pipeline = channel.pipeline()
channel.pipeline()
.addLast("handler", object : SimpleChannelInboundHandler<FullBinaryMemcacheResponse>() {
.addLast("client-handler", object : SimpleChannelInboundHandler<FullBinaryMemcacheResponse>() {
override fun channelRead0(
ctx: ChannelHandlerContext,
msg: FullBinaryMemcacheResponse
) {
pipeline.removeLast()
pool.release(channel)
msg.touch("The method's caller must remember to release this")
response.complete(msg.retain())
}
@@ -164,7 +165,8 @@ class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseabl
when (val status = response.status()) {
BinaryMemcacheResponseStatus.SUCCESS -> {
val compressionMode = cfg.compressionMode
val content = response.content()
val content = response.content().retain()
response.release()
if (compressionMode != null) {
when (compressionMode) {
MemcacheCacheConfiguration.CompressionMode.GZIP -> {
@@ -224,10 +226,14 @@ class MemcacheClient(private val cfg: MemcacheCacheConfiguration) : AutoCloseabl
}
}
return sendRequest(request).thenApply { response ->
when(val status = response.status()) {
try {
when (val status = response.status()) {
BinaryMemcacheResponseStatus.SUCCESS -> null
else -> throw MemcacheException(status)
}
} finally {
response.release()
}
}
}

View File

@@ -57,10 +57,9 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
}
ctx.write(response)
val content : Any = when (channel) {
is FileChannel -> DefaultFileRegion(channel, 0, channel.size())
else -> ChunkedNioStream(channel)
}
when (channel) {
is FileChannel -> {
val content = DefaultFileRegion(channel, 0, channel.size())
if (keepAlive) {
ctx.write(content)
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate())
@@ -68,6 +67,20 @@ class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
ctx.writeAndFlush(content)
.addListener(ChannelFutureListener.CLOSE)
}
}
else -> {
val content = ChunkedNioStream(channel)
if (keepAlive) {
ctx.write(content).addListener {
content.close()
}
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate())
} else {
ctx.writeAndFlush(content)
.addListener(ChannelFutureListener.CLOSE)
}
}
}
} else {
log.debug(ctx) {
"Cache miss for key '$key'"