initial commit

This commit is contained in:
2022-03-02 03:46:24 +08:00
commit ce8d7e7a57
14 changed files with 1083 additions and 0 deletions

View File

@@ -0,0 +1,53 @@
package net.woggioni.gcs
import io.netty.buffer.Unpooled
import io.netty.channel.ChannelFutureListener
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.handler.codec.http.DefaultFullHttpResponse
import io.netty.handler.codec.http.FullHttpResponse
import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpRequest
import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpVersion
import io.netty.util.ReferenceCountUtil
abstract class AbstractNettyHttpAuthenticator(private val authorizer : Authorizer)
: ChannelInboundHandlerAdapter() {
private companion object {
private val AUTHENTICATION_FAILED: FullHttpResponse = DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.UNAUTHORIZED, Unpooled.EMPTY_BUFFER).apply {
headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
}
private val NOT_AUTHORIZED: FullHttpResponse = DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN, Unpooled.EMPTY_BUFFER).apply {
headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
}
}
abstract fun authenticate(ctx : ChannelHandlerContext, req : HttpRequest) : String?
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
if(msg is HttpRequest) {
val user = authenticate(ctx, msg) ?: return authenticationFailure(ctx, msg)
val authorized = authorizer.authorize(user, msg)
if(authorized) {
super.channelRead(ctx, msg)
} else {
authorizationFailure(ctx, msg)
}
}
}
private fun authenticationFailure(ctx: ChannelHandlerContext, msg: Any) {
ReferenceCountUtil.release(msg)
ctx.writeAndFlush(AUTHENTICATION_FAILED.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE)
}
private fun authorizationFailure(ctx: ChannelHandlerContext, msg: Any) {
ReferenceCountUtil.release(msg)
ctx.writeAndFlush(NOT_AUTHORIZED.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE)
}
}

View File

@@ -0,0 +1,7 @@
package net.woggioni.gcs
import io.netty.handler.codec.http.HttpRequest
fun interface Authorizer {
fun authorize(user : String, request: HttpRequest) : Boolean
}

View File

@@ -0,0 +1,222 @@
package net.woggioni.gcs
import io.netty.bootstrap.ServerBootstrap
import io.netty.buffer.Unpooled
import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInitializer
import io.netty.channel.ChannelOption
import io.netty.channel.EventLoopGroup
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.codec.compression.Brotli
import io.netty.handler.codec.compression.CompressionOptions
import io.netty.handler.codec.compression.StandardCompressionOptions
import io.netty.handler.codec.compression.Zstd
import io.netty.handler.codec.http.DefaultFullHttpResponse
import io.netty.handler.codec.http.FullHttpRequest
import io.netty.handler.codec.http.FullHttpResponse
import io.netty.handler.codec.http.HttpContentCompressor
import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpHeaderValues
import io.netty.handler.codec.http.HttpMethod
import io.netty.handler.codec.http.HttpObjectAggregator
import io.netty.handler.codec.http.HttpRequest
import io.netty.handler.codec.http.HttpRequestDecoder
import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpServerCodec
import io.netty.handler.codec.http.HttpVersion
import io.netty.util.concurrent.DefaultEventExecutorGroup
import io.netty.util.concurrent.EventExecutorGroup
import org.h2.mvstore.FileStore
import org.h2.mvstore.MVStore
import java.util.AbstractMap.SimpleEntry
import java.util.Base64
class GradleBuildCacheServer {
private class NettyHttpBasicAuthenticator(
private val credentials: Map<String, String>, authorizer: Authorizer) : AbstractNettyHttpAuthenticator(authorizer) {
companion object {
private val log = contextLogger()
}
override fun authenticate(ctx: ChannelHandlerContext, req: HttpRequest): String? {
val authorizationHeader = req.headers()[HttpHeaderNames.AUTHORIZATION] ?: let {
log.debug(ctx) {
"Missing Authorization header"
}
return null
}
val cursor = authorizationHeader.indexOf(' ')
if (cursor < 0) {
log.debug(ctx) {
"Invalid Authorization header: '$authorizationHeader'"
}
return null
}
val authenticationType = authorizationHeader.substring(0, cursor)
if ("Basic" != authenticationType) {
log.debug(ctx) {
"Invalid authentication type header: '$authenticationType'"
}
return null
}
val (user, password) = Base64.getDecoder().decode(authorizationHeader.substring(cursor + 1))
.let(::String)
.let {
val colon = it.indexOf(':')
if(colon < 0) {
log.debug(ctx) {
"Missing colon from authentication"
}
return null
}
it.substring(0, colon) to it.substring(colon + 1)
}
return user.takeIf {
credentials[user] == password
}
}
}
private class ServerInitializer(private val mvStore: MVStore) : ChannelInitializer<Channel>() {
override fun initChannel(ch: Channel) {
val pipeline = ch.pipeline()
pipeline.addLast(HttpServerCodec())
pipeline.addLast(HttpObjectAggregator(Int.MAX_VALUE))
pipeline.addLast(HttpContentCompressor(1024, *emptyArray<CompressionOptions>()))
pipeline.addLast(NettyHttpBasicAuthenticator(mapOf("user" to "password")) { user, _ -> user == "user" })
pipeline.addLast(group, ServerHandler(mvStore, "/cache"))
}
companion object {
val group: EventExecutorGroup = DefaultEventExecutorGroup(Runtime.getRuntime().availableProcessors())
}
}
private class ServerHandler(private val mvStore: MVStore, private val serverPrefix: String) : SimpleChannelInboundHandler<FullHttpRequest>() {
companion object {
private val log = contextLogger()
private fun splitPath(req: HttpRequest): Map.Entry<String, String> {
val uri = req.uri()
val i = uri.lastIndexOf('/')
if (i < 0) throw RuntimeException(String.format("Malformed request URI: '%s'", uri))
return SimpleEntry(uri.substring(0, i), uri.substring(i + 1))
}
}
private val cache: MutableMap<String, ByteArray>
init {
cache = mvStore.openMap("buildCache")
}
override fun channelRead0(ctx: ChannelHandlerContext, msg: FullHttpRequest) {
val method = msg.method()
val response: FullHttpResponse
if (method === HttpMethod.GET) {
val (prefix, key) = splitPath(msg)
if (serverPrefix == prefix) {
val value = cache[key]
if (value != null) {
log.debug(ctx) {
"Cache hit for key '$key'"
}
val content = Unpooled.copiedBuffer(value)
response = DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content)
response.headers()[HttpHeaderNames.CONTENT_TYPE] = HttpHeaderValues.APPLICATION_OCTET_STREAM
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = content.readableBytes()
} else {
log.debug(ctx) {
"Cache miss for key '$key'"
}
response = DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
}
} else {
log.warn(ctx) {
"Got request for unhandled path '${msg.uri()}'"
}
response = DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
}
} else if (method === HttpMethod.PUT) {
val (prefix, key) = splitPath(msg)
if (serverPrefix == prefix) {
log.debug(ctx) {
"Added value for key '$key' to build cache"
}
val content = msg.content()
val value = ByteArray(content.capacity())
content.readBytes(value)
cache[key] = value
mvStore.commit()
response = DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CREATED,
Unpooled.copiedBuffer(key.toByteArray()))
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes()
} else {
log.warn(ctx) {
"Got request for unhandled path '${msg.uri()}'"
}
response = DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
}
} else {
log.warn(ctx) {
"Got request with unhandled method '${msg.method().name()}'"
}
response = DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
}
response.retain()
ctx.write(response)
ctx.flush()
}
}
fun run() {
// Create the multithreaded event loops for the server
val bossGroup: EventLoopGroup = NioEventLoopGroup()
val workerGroup: EventLoopGroup = NioEventLoopGroup()
val mvStore = MVStore.Builder()
.compress()
.fileName("/tmp/buildCache.mv")
.open()
val initialState = mvStore.commit()
try {
// A helper class that simplifies server configuration
val httpBootstrap = ServerBootstrap()
// Configure the server
httpBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel::class.java)
.childHandler(ServerInitializer(mvStore)) // <-- Our handler created here
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
// Bind and start to accept incoming connections.
val httpChannel = httpBootstrap.bind(HTTP_PORT).sync()
// Wait until server socket is closed
httpChannel.channel().closeFuture().sync()
} finally {
mvStore.close()
workerGroup.shutdownGracefully()
bossGroup.shutdownGracefully()
}
}
companion object {
private const val HTTP_PORT = 8080
@JvmStatic
fun main(args: Array<String>) {
GradleBuildCacheServer().run()
}
}
}

View File

@@ -0,0 +1,107 @@
package net.woggioni.gcs
import io.netty.channel.ChannelHandlerContext
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.net.InetSocketAddress
inline fun <reified T> T.contextLogger() = LoggerFactory.getLogger(T::class.java)
inline fun Logger.traceParam(messageBuilder : () -> Pair<String, Array<Any>>) {
if(isTraceEnabled) {
val (format, params) = messageBuilder()
trace(format, params)
}
}
inline fun Logger.debugParam(messageBuilder : () -> Pair<String, Array<Any>>) {
if(isDebugEnabled) {
val (format, params) = messageBuilder()
info(format, params)
}
}
inline fun Logger.infoParam(messageBuilder : () -> Pair<String, Array<Any>>) {
if(isInfoEnabled) {
val (format, params) = messageBuilder()
info(format, params)
}
}
inline fun Logger.warnParam(messageBuilder : () -> Pair<String, Array<Any>>) {
if(isWarnEnabled) {
val (format, params) = messageBuilder()
warn(format, params)
}
}
inline fun Logger.errorParam(messageBuilder : () -> Pair<String, Array<Any>>) {
if(isErrorEnabled) {
val (format, params) = messageBuilder()
error(format, params)
}
}
inline fun log(log : Logger,
filter : Logger.() -> Boolean,
loggerMethod : Logger.(String) -> Unit, messageBuilder : () -> String) {
if(log.filter()) {
log.loggerMethod(messageBuilder())
}
}
inline fun Logger.trace(messageBuilder : () -> String) {
if(isTraceEnabled) {
trace(messageBuilder())
}
}
inline fun Logger.debug(messageBuilder : () -> String) {
if(isDebugEnabled) {
debug(messageBuilder())
}
}
inline fun Logger.info(messageBuilder : () -> String) {
if(isInfoEnabled) {
info(messageBuilder())
}
}
inline fun Logger.warn(messageBuilder : () -> String) {
if(isWarnEnabled) {
warn(messageBuilder())
}
}
inline fun Logger.error(messageBuilder : () -> String) {
if(isErrorEnabled) {
error(messageBuilder())
}
}
inline fun Logger.trace(ctx : ChannelHandlerContext, messageBuilder : () -> String) {
log(this, ctx, { isTraceEnabled }, { trace(it) } , messageBuilder)
}
inline fun Logger.debug(ctx : ChannelHandlerContext, messageBuilder : () -> String) {
log(this, ctx, { isDebugEnabled }, { debug(it) } , messageBuilder)
}
inline fun Logger.info(ctx : ChannelHandlerContext, messageBuilder : () -> String) {
log(this, ctx, { isInfoEnabled }, { info(it) } , messageBuilder)
}
inline fun Logger.warn(ctx : ChannelHandlerContext, messageBuilder : () -> String) {
log(this, ctx, { isWarnEnabled }, { warn(it) } , messageBuilder)
}
inline fun Logger.error(ctx : ChannelHandlerContext, messageBuilder : () -> String) {
log(this, ctx, { isErrorEnabled }, { error(it) } , messageBuilder)
}
inline fun log(log : Logger, ctx : ChannelHandlerContext,
filter : Logger.() -> Boolean,
loggerMethod : Logger.(String) -> Unit, messageBuilder : () -> String) {
if(log.filter()) {
val clientAddress = (ctx.channel().remoteAddress() as InetSocketAddress).address.hostAddress
log.loggerMethod(clientAddress + " - " + messageBuilder())
}
}