diff --git a/build.gradle b/build.gradle index 14b62e4..bfbecf2 100644 --- a/build.gradle +++ b/build.gradle @@ -14,7 +14,6 @@ repositories { dependencies { implementation group: 'org.slf4j', name: 'slf4j-api', version: getProperty('slf4j.version') - implementation group: 'com.h2database', name: 'h2', version: getProperty('h2.version') implementation group: 'io.netty', name: 'netty-codec-http', version: getProperty('netty.version') runtimeOnly group: 'org.slf4j', name: 'slf4j-simple', version: getProperty('slf4j.version') @@ -35,10 +34,6 @@ if(JavaVersion.current() > JavaVersion.VERSION_1_8) { } } -//lombok { -// version = getProperty('lombok.version') -//} - run { systemProperty 'org.slf4j.simpleLogger.defaultLogLevel', 'trace' } diff --git a/src/main/java/net/woggioni/gcs/GradleBuildCacheServer23.java b/src/main/java/net/woggioni/gcs/GradleBuildCacheServer23.java deleted file mode 100644 index 836d184..0000000 --- a/src/main/java/net/woggioni/gcs/GradleBuildCacheServer23.java +++ /dev/null @@ -1,264 +0,0 @@ -package net.woggioni.gcs; - -import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.codec.compression.StandardCompressionOptions; -import io.netty.handler.codec.http.DefaultFullHttpResponse; -import io.netty.handler.codec.http.FullHttpRequest; -import io.netty.handler.codec.http.FullHttpResponse; -import io.netty.handler.codec.http.HttpContentCompressor; -import io.netty.handler.codec.http.HttpHeaderNames; -import io.netty.handler.codec.http.HttpHeaderValues; -import io.netty.handler.codec.http.HttpMethod; -import io.netty.handler.codec.http.HttpObjectAggregator; -import io.netty.handler.codec.http.HttpRequest; -import io.netty.handler.codec.http.HttpResponse; -import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.codec.http.HttpServerCodec; -import io.netty.handler.codec.http.HttpUtil; -import io.netty.handler.codec.http.HttpVersion; -import io.netty.util.ReferenceCountUtil; -import io.netty.util.concurrent.DefaultEventExecutorGroup; -import io.netty.util.concurrent.EventExecutorGroup; -import org.h2.mvstore.MVStore; - -import java.net.InetSocketAddress; -import java.nio.charset.StandardCharsets; -import java.util.AbstractMap; -import java.util.Base64; -import java.util.Map; -import java.util.Objects; - -import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; - -public class GradleBuildCacheServer23 { - -// private static final class NettyHttpBasicAuthenticator extends ChannelInboundHandlerAdapter { -// -// private static final FullHttpResponse AUTHENTICATION_FAILED = new DefaultFullHttpResponse( -// HTTP_1_1, HttpResponseStatus.UNAUTHORIZED, Unpooled.EMPTY_BUFFER); -// -// private final String basicAuthHeader; -// -// public NettyHttpBasicAuthenticator(String username, String password) { -// this.basicAuthHeader = -// "Basic " + Base64.getEncoder() -// .encodeToString((username + ":" + password) -// .getBytes(StandardCharsets.ISO_8859_1)); -// } -// -// @Override -// public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { -// if (msg instanceof HttpRequest) { -// HttpRequest req = (HttpRequest) msg; -// String authorizationHeader = req.headers().get(HttpHeaderNames.AUTHORIZATION); -// log.warn(); -// -// int cursor = authorizationHeader.indexOf(' '); -// if(cursor < 0) { -// if(log.isDebugEnabled()) { -// log.debug("Invalid Authorization header: '{}'", authorizationHeader); -// } -// authenticationFailure(ctx, msg); -// } -// String authenticationType = authorizationHeader.substring(0, cursor); -// if(!Objects.equals("Basic", authenticationType)) { -// if(log.isDebugEnabled()) { -// ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress(); -// log.debug("Invalid authentication type header: '{}'", authenticationType); -// } -// authenticationFailure(ctx, msg); -// } -// -// if (HttpUtil.is100ContinueExpected(req)) { -// HttpResponse accept = acceptMessage(req); -// -// if (accept == null) { -// // the expectation failed so we refuse the request. -// HttpResponse rejection = rejectResponse(req); -// ReferenceCountUtil.release(msg); -// ctx.writeAndFlush(rejection).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); -// return; -// } -// -// ctx.writeAndFlush(accept).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); -// req.headers().remove(HttpHeaderNames.EXPECT); -// } -// } -// super.channelRead(ctx, msg); -// } -// -// public void authenticationFailure(ChannelHandlerContext ctx, Object msg) { -// ReferenceCountUtil.release(msg); -// ctx.writeAndFlush(AUTHENTICATION_FAILED.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); -// } -// } -// -// @RequiredArgsConstructor -// private static class ServerInitializer extends ChannelInitializer { -// -// private final MVStore mvStore; -// static final EventExecutorGroup group = -// new DefaultEventExecutorGroup(Runtime.getRuntime().availableProcessors()); -// -// @Override -// protected void initChannel(Channel ch) { -// ChannelPipeline pipeline = ch.pipeline(); -// pipeline.addLast(new HttpServerCodec()); -// pipeline.addLast(new HttpObjectAggregator(Integer.MAX_VALUE)); -// pipeline.addLast(group, new ServerHandler(mvStore, "/cache")); -// pipeline.addLast( -// new HttpContentCompressor(1024, -// StandardCompressionOptions.deflate(), -// StandardCompressionOptions.brotli(), -// StandardCompressionOptions.gzip(), -// StandardCompressionOptions.zstd())); -// } -// } -// -// private static class AuthenticationHandler extends SimpleChannelInboundHandler { -// @Override -// protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception { -// -// } -// } -// -// @Slf4j -// private static class ServerHandler extends SimpleChannelInboundHandler { -// -// private final String serverPrefix; -// private final Map cache; -// -// public ServerHandler(MVStore mvStore, String serverPrefix) { -// this.serverPrefix = serverPrefix; -// cache = mvStore.openMap("buildCache"); -// } -// -// private static Map.Entry splitPath(HttpRequest req) { -// String uri = req.uri(); -// int i = uri.lastIndexOf('/'); -// if(i < 0) throw new RuntimeException(String.format("Malformed request URI: '%s'", uri)); -// return new AbstractMap.SimpleEntry<>(uri.substring(0, i), uri.substring(i + 1)); -// } -// -// private void authenticate(HttpRequest req) { -// String authorizationHeader = req.headers().get(HttpHeaderNames.AUTHORIZATION); -// if(authorizationHeader != null) { -// int cursor = authorizationHeader.indexOf(' '); -// if(cursor < 0) { -// throw new IllegalArgumentException( -// String.format("Illegal format for 'Authorization' HTTP header: '%s'", authorizationHeader)); -// } -// String authorizationType = authorizationHeader.substring(0, cursor); -// if(!Objects.equals("Basic", authorizationType) { -// -// } -// } -// -// } -// -// @Override -// protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) { -// HttpMethod method = msg.method(); -// FullHttpResponse response; -// if(method == HttpMethod.GET) { -// Map.Entry prefixAndKey = splitPath(msg); -// String prefix = prefixAndKey.getKey(); -// String key = prefixAndKey.getValue(); -// if(Objects.equals(serverPrefix, prefix)) { -// byte[] value = cache.get(key); -// if(value != null) { -// if(log.isDebugEnabled()) { -// log.debug("Successfully retrieved value for key '{}' from build cache", key); -// } -// ByteBuf content = Unpooled.copiedBuffer(value); -// response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content); -// response.headers().set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_OCTET_STREAM); -// response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes()); -// } else { -// if(log.isDebugEnabled()) { -// log.debug("Cache miss for key '{}'", key); -// } -// response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND); -// } -// } else { -// if(log.isWarnEnabled()) { -// log.warn("Got request for unhandled path '{}'", msg.uri()); -// } -// response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST); -// } -// } else if(method == HttpMethod.PUT) { -// Map.Entry prefixAndKey = splitPath(msg); -// String prefix = prefixAndKey.getKey(); -// String key = prefixAndKey.getValue(); -// if(Objects.equals(serverPrefix, prefix)) { -// if(log.isDebugEnabled()) { -// log.debug("Added value for key '{}' to build cache", key); -// } -// cache.put(key, msg.content().array()); -// ByteBuf content = Unpooled.copiedBuffer(key, StandardCharsets.UTF_8); -// response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CREATED, content); -// } else { -// if(log.isWarnEnabled()) { -// log.warn("Got request for unhandled path '{}'", msg.uri()); -// } -// response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST); -// } -// } else { -// if(log.isWarnEnabled()) { -// log.warn("Got request with unhandled method '{}'", msg.method().name()); -// } -// response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST); -// } -// ctx.write(response); -// ctx.flush(); -// } -// } -// -// private static final int HTTP_PORT = 8080; -// -// public void run() throws Exception { -// -// // Create the multithreaded event loops for the server -// EventLoopGroup bossGroup = new NioEventLoopGroup(); -// EventLoopGroup workerGroup = new NioEventLoopGroup(); -// try(MVStore mvStore = MVStore.open("/tmp/buildCache")) { -// // A helper class that simplifies server configuration -// ServerBootstrap httpBootstrap = new ServerBootstrap(); -// -// // Configure the server -// httpBootstrap.group(bossGroup, workerGroup) -// .channel(NioServerSocketChannel.class) -// .childHandler(new ServerInitializer(mvStore)) // <-- Our handler created here -// .option(ChannelOption.SO_BACKLOG, 128) -// .childOption(ChannelOption.SO_KEEPALIVE, true); -// -// // Bind and start to accept incoming connections. -// ChannelFuture httpChannel = httpBootstrap.bind(HTTP_PORT).sync(); -// -// // Wait until server socket is closed -// httpChannel.channel().closeFuture().sync(); -// } -// finally { -// workerGroup.shutdownGracefully(); -// bossGroup.shutdownGracefully(); -// } -// } -// -// public static void main(String[] args) throws Exception { -// new GradleBuildCacheServer().run(); -// } -} \ No newline at end of file diff --git a/src/main/kotlin/net/woggioni/gcs/GradleBuildCacheServer.kt b/src/main/kotlin/net/woggioni/gcs/GradleBuildCacheServer.kt index bcb4ca9..b26b08b 100644 --- a/src/main/kotlin/net/woggioni/gcs/GradleBuildCacheServer.kt +++ b/src/main/kotlin/net/woggioni/gcs/GradleBuildCacheServer.kt @@ -1,20 +1,26 @@ package net.woggioni.gcs import io.netty.bootstrap.ServerBootstrap +import io.netty.buffer.ByteBuf import io.netty.buffer.Unpooled import io.netty.channel.Channel +import io.netty.channel.ChannelDuplexHandler +import io.netty.channel.ChannelFutureListener import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelInitializer import io.netty.channel.ChannelOption +import io.netty.channel.ChannelOutboundHandlerAdapter +import io.netty.channel.ChannelPromise +import io.netty.channel.DefaultFileRegion import io.netty.channel.EventLoopGroup +import io.netty.channel.FileRegion import io.netty.channel.SimpleChannelInboundHandler import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.socket.nio.NioServerSocketChannel -import io.netty.handler.codec.compression.Brotli import io.netty.handler.codec.compression.CompressionOptions -import io.netty.handler.codec.compression.StandardCompressionOptions -import io.netty.handler.codec.compression.Zstd import io.netty.handler.codec.http.DefaultFullHttpResponse +import io.netty.handler.codec.http.DefaultHttpContent +import io.netty.handler.codec.http.DefaultHttpResponse import io.netty.handler.codec.http.FullHttpRequest import io.netty.handler.codec.http.FullHttpResponse import io.netty.handler.codec.http.HttpContentCompressor @@ -23,19 +29,47 @@ import io.netty.handler.codec.http.HttpHeaderValues import io.netty.handler.codec.http.HttpMethod import io.netty.handler.codec.http.HttpObjectAggregator import io.netty.handler.codec.http.HttpRequest -import io.netty.handler.codec.http.HttpRequestDecoder import io.netty.handler.codec.http.HttpResponseStatus import io.netty.handler.codec.http.HttpServerCodec -import io.netty.handler.codec.http.HttpVersion +import io.netty.handler.codec.http.HttpUtil +import io.netty.handler.codec.http.LastHttpContent +import io.netty.handler.stream.ChunkedNioFile +import io.netty.handler.stream.ChunkedWriteHandler import io.netty.util.concurrent.DefaultEventExecutorGroup import io.netty.util.concurrent.EventExecutorGroup -import org.h2.mvstore.FileStore -import org.h2.mvstore.MVStore +import java.nio.channels.FileChannel +import java.nio.file.Files +import java.nio.file.Path +import java.nio.file.Paths +import java.nio.file.StandardOpenOption +import java.security.MessageDigest import java.util.AbstractMap.SimpleEntry import java.util.Base64 +import javax.net.ssl.SSLContext +import javax.net.ssl.SSLEngine class GradleBuildCacheServer { + + internal class HttpChunkContentCompressor(threshold : Int, vararg compressionOptions: CompressionOptions = emptyArray()) + : HttpContentCompressor(threshold, *compressionOptions) { + override fun write(ctx: ChannelHandlerContext, msg: Any, promise: ChannelPromise) { + var msg: Any? = msg + if (msg is ByteBuf) { + // convert ByteBuf to HttpContent to make it work with compression. This is needed as we use the + // ChunkedWriteHandler to send files when compression is enabled. + val buff = msg + if (buff.isReadable) { + // We only encode non empty buffers, as empty buffers can be used for determining when + // the content has been flushed and it confuses the HttpContentCompressor + // if we let it go + msg = DefaultHttpContent(buff) + } + } + super.write(ctx, msg, promise) + } + } + private class NettyHttpBasicAuthenticator( private val credentials: Map, authorizer: Authorizer) : AbstractNettyHttpAuthenticator(authorizer) { @@ -82,15 +116,21 @@ class GradleBuildCacheServer { } } - private class ServerInitializer(private val mvStore: MVStore) : ChannelInitializer() { + private class ServerInitializer(private val cacheDir: Path) : ChannelInitializer() { override fun initChannel(ch: Channel) { + val sslEngine: SSLEngine = SSLContext.getDefault().createSSLEngine() + sslEngine.useClientMode = false val pipeline = ch.pipeline() +// pipeline.addLast(SslHandler(sslEngine)) pipeline.addLast(HttpServerCodec()) + pipeline.addLast(HttpChunkContentCompressor(1024)) + pipeline.addLast(ChunkedWriteHandler()) pipeline.addLast(HttpObjectAggregator(Int.MAX_VALUE)) - pipeline.addLast(HttpContentCompressor(1024, *emptyArray())) pipeline.addLast(NettyHttpBasicAuthenticator(mapOf("user" to "password")) { user, _ -> user == "user" }) - pipeline.addLast(group, ServerHandler(mvStore, "/cache")) + pipeline.addLast(group, ServerHandler(cacheDir, "/cache")) + pipeline.addLast(ExceptionHandler()) + Files.createDirectories(cacheDir) } companion object { @@ -98,7 +138,15 @@ class GradleBuildCacheServer { } } - private class ServerHandler(private val mvStore: MVStore, private val serverPrefix: String) : SimpleChannelInboundHandler() { + private class ExceptionHandler : ChannelDuplexHandler() { + private val log = contextLogger() + override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { + log.error(cause.message, cause) + ctx.close() + } + } + + private class ServerHandler(private val cacheDir: Path, private val serverPrefix: String) : SimpleChannelInboundHandler() { companion object { private val log = contextLogger() @@ -111,40 +159,49 @@ class GradleBuildCacheServer { } } - private val cache: MutableMap - - init { - cache = mvStore.openMap("buildCache") - } - override fun channelRead0(ctx: ChannelHandlerContext, msg: FullHttpRequest) { + val keepAlive: Boolean = HttpUtil.isKeepAlive(msg) val method = msg.method() - val response: FullHttpResponse if (method === HttpMethod.GET) { val (prefix, key) = splitPath(msg) if (serverPrefix == prefix) { - val value = cache[key] - if (value != null) { + val file = cacheDir.resolve(digestString(key.toByteArray())) + if (Files.exists(file)) { log.debug(ctx) { "Cache hit for key '$key'" } - val content = Unpooled.copiedBuffer(value) - response = DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content) + val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK) response.headers()[HttpHeaderNames.CONTENT_TYPE] = HttpHeaderValues.APPLICATION_OCTET_STREAM - response.headers()[HttpHeaderNames.CONTENT_LENGTH] = content.readableBytes() + if(!keepAlive) { + response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE) + response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.IDENTITY) + } else { + response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE) + response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED) + } + ctx.write(response) + val channel = FileChannel.open(file, StandardOpenOption.READ) + if(keepAlive) { + ctx.write(ChunkedNioFile(channel)) + ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT) + } else { + ctx.writeAndFlush(DefaultFileRegion(channel, 0, Files.size(file))).addListener(ChannelFutureListener.CLOSE) + } } else { log.debug(ctx) { "Cache miss for key '$key'" } - response = DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND) + val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND) response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0 + ctx.writeAndFlush(response) } } else { log.warn(ctx) { "Got request for unhandled path '${msg.uri()}'" } - response = DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST) + val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.BAD_REQUEST) response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0 + ctx.writeAndFlush(response) } } else if (method === HttpMethod.PUT) { val (prefix, key) = splitPath(msg) @@ -153,30 +210,30 @@ class GradleBuildCacheServer { "Added value for key '$key' to build cache" } val content = msg.content() - val value = ByteArray(content.capacity()) - content.readBytes(value) - cache[key] = value - mvStore.commit() - response = DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CREATED, + val file = cacheDir.resolve(digestString(key.toByteArray())) + Files.newOutputStream(file).use { + content.readBytes(it, content.readableBytes()) + } + val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.CREATED, Unpooled.copiedBuffer(key.toByteArray())) response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes() + ctx.writeAndFlush(response) } else { log.warn(ctx) { "Got request for unhandled path '${msg.uri()}'" } - response = DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST) + val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.BAD_REQUEST) response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0" + ctx.writeAndFlush(response) } } else { log.warn(ctx) { "Got request with unhandled method '${msg.method().name()}'" } - response = DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST) + val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.BAD_REQUEST) response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0" + ctx.writeAndFlush(response) } - response.retain() - ctx.write(response) - ctx.flush() } } @@ -184,11 +241,6 @@ class GradleBuildCacheServer { // Create the multithreaded event loops for the server val bossGroup: EventLoopGroup = NioEventLoopGroup() val workerGroup: EventLoopGroup = NioEventLoopGroup() - val mvStore = MVStore.Builder() - .compress() - .fileName("/tmp/buildCache.mv") - .open() - val initialState = mvStore.commit() try { // A helper class that simplifies server configuration val httpBootstrap = ServerBootstrap() @@ -196,7 +248,7 @@ class GradleBuildCacheServer { // Configure the server httpBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel::class.java) - .childHandler(ServerInitializer(mvStore)) // <-- Our handler created here + .childHandler(ServerInitializer(Paths.get("/tmp/gbcs"))) // <-- Our handler created here .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) @@ -206,7 +258,6 @@ class GradleBuildCacheServer { // Wait until server socket is closed httpChannel.channel().closeFuture().sync() } finally { - mvStore.close() workerGroup.shutdownGracefully() bossGroup.shutdownGracefully() } @@ -218,5 +269,28 @@ class GradleBuildCacheServer { fun main(args: Array) { GradleBuildCacheServer().run() } + + private val hexArray = "0123456789ABCDEF".toCharArray() + + fun bytesToHex(bytes: ByteArray): String { + val hexChars = CharArray(bytes.size * 2) + for (j in bytes.indices) { + val v: Int = bytes[j].toInt().and(0xFF) + hexChars[j * 2] = hexArray[v ushr 4] + hexChars[j * 2 + 1] = hexArray[v and 0x0F] + } + return String(hexChars) + } + + fun digest(data : ByteArray, + md : MessageDigest = MessageDigest.getInstance("MD5")) : ByteArray { + md.update(data) + return md.digest() + } + + fun digestString(data : ByteArray, + md : MessageDigest = MessageDigest.getInstance("MD5")) : String { + return bytesToHex(digest(data, md)) + } } } \ No newline at end of file