forked from woggioni/rbcs
added throttling
This commit is contained in:
@@ -2,11 +2,8 @@ package net.woggioni.gbcs.server
|
||||
|
||||
import io.netty.bootstrap.ServerBootstrap
|
||||
import io.netty.buffer.ByteBuf
|
||||
import io.netty.buffer.Unpooled
|
||||
import io.netty.channel.Channel
|
||||
import io.netty.channel.ChannelDuplexHandler
|
||||
import io.netty.channel.ChannelFuture
|
||||
import io.netty.channel.ChannelFutureListener
|
||||
import io.netty.channel.ChannelHandler.Sharable
|
||||
import io.netty.channel.ChannelHandlerContext
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter
|
||||
@@ -15,18 +12,13 @@ import io.netty.channel.ChannelOption
|
||||
import io.netty.channel.ChannelPromise
|
||||
import io.netty.channel.nio.NioEventLoopGroup
|
||||
import io.netty.channel.socket.nio.NioServerSocketChannel
|
||||
import io.netty.handler.codec.DecoderException
|
||||
import io.netty.handler.codec.compression.CompressionOptions
|
||||
import io.netty.handler.codec.http.DefaultFullHttpResponse
|
||||
import io.netty.handler.codec.http.DefaultHttpContent
|
||||
import io.netty.handler.codec.http.FullHttpResponse
|
||||
import io.netty.handler.codec.http.HttpContentCompressor
|
||||
import io.netty.handler.codec.http.HttpHeaderNames
|
||||
import io.netty.handler.codec.http.HttpObjectAggregator
|
||||
import io.netty.handler.codec.http.HttpRequest
|
||||
import io.netty.handler.codec.http.HttpResponseStatus
|
||||
import io.netty.handler.codec.http.HttpServerCodec
|
||||
import io.netty.handler.codec.http.HttpVersion
|
||||
import io.netty.handler.ssl.ClientAuth
|
||||
import io.netty.handler.ssl.SslContext
|
||||
import io.netty.handler.ssl.SslContextBuilder
|
||||
@@ -34,16 +26,13 @@ import io.netty.handler.ssl.SslHandler
|
||||
import io.netty.handler.stream.ChunkedWriteHandler
|
||||
import io.netty.handler.timeout.IdleStateEvent
|
||||
import io.netty.handler.timeout.IdleStateHandler
|
||||
import io.netty.handler.timeout.ReadTimeoutException
|
||||
import io.netty.handler.timeout.ReadTimeoutHandler
|
||||
import io.netty.handler.timeout.WriteTimeoutException
|
||||
import io.netty.handler.timeout.WriteTimeoutHandler
|
||||
import io.netty.util.AttributeKey
|
||||
import io.netty.util.concurrent.DefaultEventExecutorGroup
|
||||
import io.netty.util.concurrent.EventExecutorGroup
|
||||
import net.woggioni.gbcs.api.Configuration
|
||||
import net.woggioni.gbcs.api.Role
|
||||
import net.woggioni.gbcs.api.exception.ConfigurationException
|
||||
import net.woggioni.gbcs.api.exception.ContentTooLargeException
|
||||
import net.woggioni.gbcs.common.GBCS.toUrl
|
||||
import net.woggioni.gbcs.common.PasswordSecurity.decodePasswordHash
|
||||
import net.woggioni.gbcs.common.PasswordSecurity.hashPassword
|
||||
@@ -57,7 +46,9 @@ import net.woggioni.gbcs.server.auth.ClientCertificateValidator
|
||||
import net.woggioni.gbcs.server.auth.RoleAuthorizer
|
||||
import net.woggioni.gbcs.server.configuration.Parser
|
||||
import net.woggioni.gbcs.server.configuration.Serializer
|
||||
import net.woggioni.gbcs.server.exception.ExceptionHandler
|
||||
import net.woggioni.gbcs.server.handler.ServerHandler
|
||||
import net.woggioni.gbcs.server.throttling.ThrottlingHandler
|
||||
import net.woggioni.jwo.JWO
|
||||
import net.woggioni.jwo.Tuple2
|
||||
import java.io.OutputStream
|
||||
@@ -75,12 +66,14 @@ import java.util.regex.Pattern
|
||||
import javax.naming.ldap.LdapName
|
||||
import javax.net.ssl.SSLPeerUnverifiedException
|
||||
|
||||
|
||||
class GradleBuildCacheServer(private val cfg: Configuration) {
|
||||
private val log = contextLogger()
|
||||
|
||||
companion object {
|
||||
|
||||
val userAttribute: AttributeKey<Configuration.User> = AttributeKey.valueOf("user")
|
||||
val groupAttribute: AttributeKey<Set<Configuration.Group>> = AttributeKey.valueOf("group")
|
||||
|
||||
val DEFAULT_CONFIGURATION_URL by lazy { "classpath:net/woggioni/gbcs/gbcs-default.xml".toUrl() }
|
||||
private const val SSL_HANDLER_NAME = "sslHandler"
|
||||
|
||||
@@ -120,12 +113,12 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
|
||||
@Sharable
|
||||
private class ClientCertificateAuthenticator(
|
||||
authorizer: Authorizer,
|
||||
private val anonymousUserRoles: Set<Role>?,
|
||||
private val anonymousUserGroups: Set<Configuration.Group>?,
|
||||
private val userExtractor: Configuration.UserExtractor?,
|
||||
private val groupExtractor: Configuration.GroupExtractor?,
|
||||
) : AbstractNettyHttpAuthenticator(authorizer) {
|
||||
|
||||
override fun authenticate(ctx: ChannelHandlerContext, req: HttpRequest): Set<Role>? {
|
||||
override fun authenticate(ctx: ChannelHandlerContext, req: HttpRequest): AuthenticationResult? {
|
||||
return try {
|
||||
val sslHandler = (ctx.pipeline().get(SSL_HANDLER_NAME) as? SslHandler)
|
||||
?: throw ConfigurationException("Client certificate authentication cannot be used when TLS is disabled")
|
||||
@@ -136,10 +129,11 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
|
||||
val clientCertificate = peerCertificates.first() as X509Certificate
|
||||
val user = userExtractor?.extract(clientCertificate)
|
||||
val group = groupExtractor?.extract(clientCertificate)
|
||||
(group?.roles ?: emptySet()) + (user?.roles ?: emptySet())
|
||||
} ?: anonymousUserRoles
|
||||
val allGroups = ((user?.groups ?: emptySet()).asSequence() + sequenceOf(group).filterNotNull()).toSet()
|
||||
AuthenticationResult(user, allGroups)
|
||||
} ?: anonymousUserGroups?.let{ AuthenticationResult(null, it) }
|
||||
} catch (es: SSLPeerUnverifiedException) {
|
||||
anonymousUserRoles
|
||||
anonymousUserGroups?.let{ AuthenticationResult(null, it) }
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -150,26 +144,26 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
|
||||
) : AbstractNettyHttpAuthenticator(authorizer) {
|
||||
private val log = contextLogger()
|
||||
|
||||
override fun authenticate(ctx: ChannelHandlerContext, req: HttpRequest): Set<Role>? {
|
||||
override fun authenticate(ctx: ChannelHandlerContext, req: HttpRequest): AuthenticationResult? {
|
||||
val authorizationHeader = req.headers()[HttpHeaderNames.AUTHORIZATION] ?: let {
|
||||
log.debug(ctx) {
|
||||
"Missing Authorization header"
|
||||
}
|
||||
return users[""]?.roles
|
||||
return users[""]?.let { AuthenticationResult(it, it.groups) }
|
||||
}
|
||||
val cursor = authorizationHeader.indexOf(' ')
|
||||
if (cursor < 0) {
|
||||
log.debug(ctx) {
|
||||
"Invalid Authorization header: '$authorizationHeader'"
|
||||
}
|
||||
return users[""]?.roles
|
||||
return users[""]?.let { AuthenticationResult(it, it.groups) }
|
||||
}
|
||||
val authenticationType = authorizationHeader.substring(0, cursor)
|
||||
if ("Basic" != authenticationType) {
|
||||
log.debug(ctx) {
|
||||
"Invalid authentication type header: '$authenticationType'"
|
||||
}
|
||||
return users[""]?.roles
|
||||
return users[""]?.let { AuthenticationResult(it, it.groups) }
|
||||
}
|
||||
val (username, password) = Base64.getDecoder().decode(authorizationHeader.substring(cursor + 1))
|
||||
.let(::String)
|
||||
@@ -189,7 +183,9 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
|
||||
val (_, salt) = decodePasswordHash(passwordAndSalt)
|
||||
hashPassword(password, Base64.getEncoder().encodeToString(salt)) == passwordAndSalt
|
||||
} ?: false
|
||||
}?.roles
|
||||
}?.let { user ->
|
||||
AuthenticationResult(user, user.groups)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -257,13 +253,14 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
|
||||
}
|
||||
|
||||
private val exceptionHandler = ExceptionHandler()
|
||||
private val throttlingHandler = ThrottlingHandler(cfg)
|
||||
|
||||
private val authenticator = when (val auth = cfg.authentication) {
|
||||
is Configuration.BasicAuthentication -> NettyHttpBasicAuthenticator(cfg.users, RoleAuthorizer())
|
||||
is Configuration.ClientCertificateAuthentication -> {
|
||||
ClientCertificateAuthenticator(
|
||||
RoleAuthorizer(),
|
||||
cfg.users[""]?.roles,
|
||||
cfg.users[""]?.groups,
|
||||
userExtractor(auth),
|
||||
groupExtractor(auth)
|
||||
)
|
||||
@@ -312,10 +309,10 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
|
||||
}
|
||||
}
|
||||
val pipeline = ch.pipeline()
|
||||
cfg.connection.apply {
|
||||
pipeline.addLast(ReadTimeoutHandler(readTimeout.toMillis(), TimeUnit.MILLISECONDS))
|
||||
pipeline.addLast(WriteTimeoutHandler(writeTimeout.toMillis(), TimeUnit.MILLISECONDS))
|
||||
pipeline.addLast(IdleStateHandler(false, 0, 0, idleTimeout.toMillis(), TimeUnit.MILLISECONDS))
|
||||
cfg.connection.also { conn ->
|
||||
pipeline.addLast(ReadTimeoutHandler(conn.readTimeout.toMillis(), TimeUnit.MILLISECONDS))
|
||||
pipeline.addLast(WriteTimeoutHandler(conn.writeTimeout.toMillis(), TimeUnit.MILLISECONDS))
|
||||
pipeline.addLast(IdleStateHandler(false, 0, 0, conn.idleTimeout.toMillis(), TimeUnit.MILLISECONDS))
|
||||
}
|
||||
pipeline.addLast(object : ChannelInboundHandlerAdapter() {
|
||||
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
|
||||
@@ -337,65 +334,12 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
|
||||
authenticator?.let {
|
||||
pipeline.addLast(it)
|
||||
}
|
||||
pipeline.addLast(throttlingHandler)
|
||||
pipeline.addLast(eventExecutorGroup, serverHandler)
|
||||
pipeline.addLast(exceptionHandler)
|
||||
}
|
||||
}
|
||||
|
||||
@Sharable
|
||||
private class ExceptionHandler : ChannelDuplexHandler() {
|
||||
private val log = contextLogger()
|
||||
|
||||
private val NOT_AUTHORIZED: FullHttpResponse = DefaultFullHttpResponse(
|
||||
HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN, Unpooled.EMPTY_BUFFER
|
||||
).apply {
|
||||
headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
|
||||
}
|
||||
|
||||
private val TOO_BIG: FullHttpResponse = DefaultFullHttpResponse(
|
||||
HttpVersion.HTTP_1_1, HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, Unpooled.EMPTY_BUFFER
|
||||
).apply {
|
||||
headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
|
||||
}
|
||||
|
||||
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
|
||||
when (cause) {
|
||||
is DecoderException -> {
|
||||
log.error(cause.message, cause)
|
||||
ctx.close()
|
||||
}
|
||||
|
||||
is SSLPeerUnverifiedException -> {
|
||||
ctx.writeAndFlush(NOT_AUTHORIZED.retainedDuplicate())
|
||||
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE)
|
||||
}
|
||||
|
||||
is ContentTooLargeException -> {
|
||||
ctx.writeAndFlush(TOO_BIG.retainedDuplicate())
|
||||
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE)
|
||||
}
|
||||
is ReadTimeoutException -> {
|
||||
log.debug {
|
||||
val channelId = ctx.channel().id().asShortText()
|
||||
"Read timeout on channel $channelId, closing the connection"
|
||||
}
|
||||
ctx.close()
|
||||
}
|
||||
is WriteTimeoutException -> {
|
||||
log.debug {
|
||||
val channelId = ctx.channel().id().asShortText()
|
||||
"Write timeout on channel $channelId, closing the connection"
|
||||
}
|
||||
ctx.close()
|
||||
}
|
||||
else -> {
|
||||
log.error(cause.message, cause)
|
||||
ctx.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class ServerHandle(
|
||||
httpChannelFuture: ChannelFuture,
|
||||
private val executorGroups: Iterable<EventExecutorGroup>
|
||||
|
||||
Reference in New Issue
Block a user