added compression and chunked transfer encoding

This commit is contained in:
2022-03-03 01:56:24 +08:00
parent ce8d7e7a57
commit 9ed9cb6b91
3 changed files with 116 additions and 311 deletions

View File

@@ -1,20 +1,26 @@
package net.woggioni.gcs
import io.netty.bootstrap.ServerBootstrap
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import io.netty.channel.Channel
import io.netty.channel.ChannelDuplexHandler
import io.netty.channel.ChannelFutureListener
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInitializer
import io.netty.channel.ChannelOption
import io.netty.channel.ChannelOutboundHandlerAdapter
import io.netty.channel.ChannelPromise
import io.netty.channel.DefaultFileRegion
import io.netty.channel.EventLoopGroup
import io.netty.channel.FileRegion
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.codec.compression.Brotli
import io.netty.handler.codec.compression.CompressionOptions
import io.netty.handler.codec.compression.StandardCompressionOptions
import io.netty.handler.codec.compression.Zstd
import io.netty.handler.codec.http.DefaultFullHttpResponse
import io.netty.handler.codec.http.DefaultHttpContent
import io.netty.handler.codec.http.DefaultHttpResponse
import io.netty.handler.codec.http.FullHttpRequest
import io.netty.handler.codec.http.FullHttpResponse
import io.netty.handler.codec.http.HttpContentCompressor
@@ -23,19 +29,47 @@ import io.netty.handler.codec.http.HttpHeaderValues
import io.netty.handler.codec.http.HttpMethod
import io.netty.handler.codec.http.HttpObjectAggregator
import io.netty.handler.codec.http.HttpRequest
import io.netty.handler.codec.http.HttpRequestDecoder
import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpServerCodec
import io.netty.handler.codec.http.HttpVersion
import io.netty.handler.codec.http.HttpUtil
import io.netty.handler.codec.http.LastHttpContent
import io.netty.handler.stream.ChunkedNioFile
import io.netty.handler.stream.ChunkedWriteHandler
import io.netty.util.concurrent.DefaultEventExecutorGroup
import io.netty.util.concurrent.EventExecutorGroup
import org.h2.mvstore.FileStore
import org.h2.mvstore.MVStore
import java.nio.channels.FileChannel
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
import java.nio.file.StandardOpenOption
import java.security.MessageDigest
import java.util.AbstractMap.SimpleEntry
import java.util.Base64
import javax.net.ssl.SSLContext
import javax.net.ssl.SSLEngine
class GradleBuildCacheServer {
internal class HttpChunkContentCompressor(threshold : Int, vararg compressionOptions: CompressionOptions = emptyArray())
: HttpContentCompressor(threshold, *compressionOptions) {
override fun write(ctx: ChannelHandlerContext, msg: Any, promise: ChannelPromise) {
var msg: Any? = msg
if (msg is ByteBuf) {
// convert ByteBuf to HttpContent to make it work with compression. This is needed as we use the
// ChunkedWriteHandler to send files when compression is enabled.
val buff = msg
if (buff.isReadable) {
// We only encode non empty buffers, as empty buffers can be used for determining when
// the content has been flushed and it confuses the HttpContentCompressor
// if we let it go
msg = DefaultHttpContent(buff)
}
}
super.write(ctx, msg, promise)
}
}
private class NettyHttpBasicAuthenticator(
private val credentials: Map<String, String>, authorizer: Authorizer) : AbstractNettyHttpAuthenticator(authorizer) {
@@ -82,15 +116,21 @@ class GradleBuildCacheServer {
}
}
private class ServerInitializer(private val mvStore: MVStore) : ChannelInitializer<Channel>() {
private class ServerInitializer(private val cacheDir: Path) : ChannelInitializer<Channel>() {
override fun initChannel(ch: Channel) {
val sslEngine: SSLEngine = SSLContext.getDefault().createSSLEngine()
sslEngine.useClientMode = false
val pipeline = ch.pipeline()
// pipeline.addLast(SslHandler(sslEngine))
pipeline.addLast(HttpServerCodec())
pipeline.addLast(HttpChunkContentCompressor(1024))
pipeline.addLast(ChunkedWriteHandler())
pipeline.addLast(HttpObjectAggregator(Int.MAX_VALUE))
pipeline.addLast(HttpContentCompressor(1024, *emptyArray<CompressionOptions>()))
pipeline.addLast(NettyHttpBasicAuthenticator(mapOf("user" to "password")) { user, _ -> user == "user" })
pipeline.addLast(group, ServerHandler(mvStore, "/cache"))
pipeline.addLast(group, ServerHandler(cacheDir, "/cache"))
pipeline.addLast(ExceptionHandler())
Files.createDirectories(cacheDir)
}
companion object {
@@ -98,7 +138,15 @@ class GradleBuildCacheServer {
}
}
private class ServerHandler(private val mvStore: MVStore, private val serverPrefix: String) : SimpleChannelInboundHandler<FullHttpRequest>() {
private class ExceptionHandler : ChannelDuplexHandler() {
private val log = contextLogger()
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
log.error(cause.message, cause)
ctx.close()
}
}
private class ServerHandler(private val cacheDir: Path, private val serverPrefix: String) : SimpleChannelInboundHandler<FullHttpRequest>() {
companion object {
private val log = contextLogger()
@@ -111,40 +159,49 @@ class GradleBuildCacheServer {
}
}
private val cache: MutableMap<String, ByteArray>
init {
cache = mvStore.openMap("buildCache")
}
override fun channelRead0(ctx: ChannelHandlerContext, msg: FullHttpRequest) {
val keepAlive: Boolean = HttpUtil.isKeepAlive(msg)
val method = msg.method()
val response: FullHttpResponse
if (method === HttpMethod.GET) {
val (prefix, key) = splitPath(msg)
if (serverPrefix == prefix) {
val value = cache[key]
if (value != null) {
val file = cacheDir.resolve(digestString(key.toByteArray()))
if (Files.exists(file)) {
log.debug(ctx) {
"Cache hit for key '$key'"
}
val content = Unpooled.copiedBuffer(value)
response = DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content)
val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK)
response.headers()[HttpHeaderNames.CONTENT_TYPE] = HttpHeaderValues.APPLICATION_OCTET_STREAM
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = content.readableBytes()
if(!keepAlive) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.IDENTITY)
} else {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
}
ctx.write(response)
val channel = FileChannel.open(file, StandardOpenOption.READ)
if(keepAlive) {
ctx.write(ChunkedNioFile(channel))
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
} else {
ctx.writeAndFlush(DefaultFileRegion(channel, 0, Files.size(file))).addListener(ChannelFutureListener.CLOSE)
}
} else {
log.debug(ctx) {
"Cache miss for key '$key'"
}
response = DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND)
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response)
}
} else {
log.warn(ctx) {
"Got request for unhandled path '${msg.uri()}'"
}
response = DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.BAD_REQUEST)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response)
}
} else if (method === HttpMethod.PUT) {
val (prefix, key) = splitPath(msg)
@@ -153,30 +210,30 @@ class GradleBuildCacheServer {
"Added value for key '$key' to build cache"
}
val content = msg.content()
val value = ByteArray(content.capacity())
content.readBytes(value)
cache[key] = value
mvStore.commit()
response = DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CREATED,
val file = cacheDir.resolve(digestString(key.toByteArray()))
Files.newOutputStream(file).use {
content.readBytes(it, content.readableBytes())
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.CREATED,
Unpooled.copiedBuffer(key.toByteArray()))
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes()
ctx.writeAndFlush(response)
} else {
log.warn(ctx) {
"Got request for unhandled path '${msg.uri()}'"
}
response = DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.BAD_REQUEST)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
ctx.writeAndFlush(response)
}
} else {
log.warn(ctx) {
"Got request with unhandled method '${msg.method().name()}'"
}
response = DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.BAD_REQUEST)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
ctx.writeAndFlush(response)
}
response.retain()
ctx.write(response)
ctx.flush()
}
}
@@ -184,11 +241,6 @@ class GradleBuildCacheServer {
// Create the multithreaded event loops for the server
val bossGroup: EventLoopGroup = NioEventLoopGroup()
val workerGroup: EventLoopGroup = NioEventLoopGroup()
val mvStore = MVStore.Builder()
.compress()
.fileName("/tmp/buildCache.mv")
.open()
val initialState = mvStore.commit()
try {
// A helper class that simplifies server configuration
val httpBootstrap = ServerBootstrap()
@@ -196,7 +248,7 @@ class GradleBuildCacheServer {
// Configure the server
httpBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel::class.java)
.childHandler(ServerInitializer(mvStore)) // <-- Our handler created here
.childHandler(ServerInitializer(Paths.get("/tmp/gbcs"))) // <-- Our handler created here
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
@@ -206,7 +258,6 @@ class GradleBuildCacheServer {
// Wait until server socket is closed
httpChannel.channel().closeFuture().sync()
} finally {
mvStore.close()
workerGroup.shutdownGracefully()
bossGroup.shutdownGracefully()
}
@@ -218,5 +269,28 @@ class GradleBuildCacheServer {
fun main(args: Array<String>) {
GradleBuildCacheServer().run()
}
private val hexArray = "0123456789ABCDEF".toCharArray()
fun bytesToHex(bytes: ByteArray): String {
val hexChars = CharArray(bytes.size * 2)
for (j in bytes.indices) {
val v: Int = bytes[j].toInt().and(0xFF)
hexChars[j * 2] = hexArray[v ushr 4]
hexChars[j * 2 + 1] = hexArray[v and 0x0F]
}
return String(hexChars)
}
fun digest(data : ByteArray,
md : MessageDigest = MessageDigest.getInstance("MD5")) : ByteArray {
md.update(data)
return md.digest()
}
fun digestString(data : ByteArray,
md : MessageDigest = MessageDigest.getInstance("MD5")) : String {
return bytesToHex(digest(data, md))
}
}
}