renamed project to "Remote Cache Build Server" (RBCS)

This commit is contained in:
2025-02-06 14:48:35 +08:00
parent 5fef1b932e
commit 6c0eadb9fb
118 changed files with 561 additions and 565 deletions

View File

@@ -0,0 +1,29 @@
import net.woggioni.rbcs.api.CacheProvider;
import net.woggioni.rbcs.server.cache.FileSystemCacheProvider;
import net.woggioni.rbcs.server.cache.InMemoryCacheProvider;
module net.woggioni.rbcs.server {
requires java.sql;
requires java.xml;
requires java.logging;
requires java.naming;
requires kotlin.stdlib;
requires io.netty.buffer;
requires io.netty.transport;
requires io.netty.codec.http;
requires io.netty.common;
requires io.netty.handler;
requires io.netty.codec;
requires org.slf4j;
requires net.woggioni.jwo;
requires net.woggioni.rbcs.common;
requires net.woggioni.rbcs.api;
exports net.woggioni.rbcs.server;
opens net.woggioni.rbcs.server;
opens net.woggioni.rbcs.server.schema;
uses CacheProvider;
provides CacheProvider with FileSystemCacheProvider, InMemoryCacheProvider;
}

View File

@@ -0,0 +1,30 @@
package net.woggioni.rbcs.server
import io.netty.channel.ChannelHandlerContext
import org.slf4j.Logger
import java.net.InetSocketAddress
inline fun Logger.trace(ctx : ChannelHandlerContext, messageBuilder : () -> String) {
log(this, ctx, { isTraceEnabled }, { trace(it) } , messageBuilder)
}
inline fun Logger.debug(ctx : ChannelHandlerContext, messageBuilder : () -> String) {
log(this, ctx, { isDebugEnabled }, { debug(it) } , messageBuilder)
}
inline fun Logger.info(ctx : ChannelHandlerContext, messageBuilder : () -> String) {
log(this, ctx, { isInfoEnabled }, { info(it) } , messageBuilder)
}
inline fun Logger.warn(ctx : ChannelHandlerContext, messageBuilder : () -> String) {
log(this, ctx, { isWarnEnabled }, { warn(it) } , messageBuilder)
}
inline fun Logger.error(ctx : ChannelHandlerContext, messageBuilder : () -> String) {
log(this, ctx, { isErrorEnabled }, { error(it) } , messageBuilder)
}
inline fun log(log : Logger, ctx : ChannelHandlerContext,
filter : Logger.() -> Boolean,
loggerMethod : Logger.(String) -> Unit, messageBuilder : () -> String) {
if(log.filter()) {
val clientAddress = (ctx.channel().remoteAddress() as InetSocketAddress).address.hostAddress
log.loggerMethod(clientAddress + " - " + messageBuilder())
}
}

View File

@@ -0,0 +1,432 @@
package net.woggioni.rbcs.server
import io.netty.bootstrap.ServerBootstrap
import io.netty.buffer.ByteBuf
import io.netty.channel.Channel
import io.netty.channel.ChannelFuture
import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.channel.ChannelInitializer
import io.netty.channel.ChannelOption
import io.netty.channel.ChannelPromise
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.codec.compression.CompressionOptions
import io.netty.handler.codec.http.DefaultHttpContent
import io.netty.handler.codec.http.HttpContentCompressor
import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpObjectAggregator
import io.netty.handler.codec.http.HttpRequest
import io.netty.handler.codec.http.HttpServerCodec
import io.netty.handler.ssl.ClientAuth
import io.netty.handler.ssl.SslContext
import io.netty.handler.ssl.SslContextBuilder
import io.netty.handler.ssl.SslHandler
import io.netty.handler.stream.ChunkedWriteHandler
import io.netty.handler.timeout.IdleState
import io.netty.handler.timeout.IdleStateEvent
import io.netty.handler.timeout.IdleStateHandler
import io.netty.util.AttributeKey
import io.netty.util.concurrent.DefaultEventExecutorGroup
import io.netty.util.concurrent.EventExecutorGroup
import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.api.exception.ConfigurationException
import net.woggioni.rbcs.common.RBCS.toUrl
import net.woggioni.rbcs.common.PasswordSecurity.decodePasswordHash
import net.woggioni.rbcs.common.PasswordSecurity.hashPassword
import net.woggioni.rbcs.common.Xml
import net.woggioni.rbcs.common.contextLogger
import net.woggioni.rbcs.common.debug
import net.woggioni.rbcs.common.info
import net.woggioni.rbcs.server.auth.AbstractNettyHttpAuthenticator
import net.woggioni.rbcs.server.auth.Authorizer
import net.woggioni.rbcs.server.auth.ClientCertificateValidator
import net.woggioni.rbcs.server.auth.RoleAuthorizer
import net.woggioni.rbcs.server.configuration.Parser
import net.woggioni.rbcs.server.configuration.Serializer
import net.woggioni.rbcs.server.exception.ExceptionHandler
import net.woggioni.rbcs.server.handler.ServerHandler
import net.woggioni.rbcs.server.throttling.ThrottlingHandler
import net.woggioni.jwo.JWO
import net.woggioni.jwo.Tuple2
import java.io.OutputStream
import java.net.InetSocketAddress
import java.nio.file.Files
import java.nio.file.Path
import java.security.KeyStore
import java.security.PrivateKey
import java.security.cert.X509Certificate
import java.util.Arrays
import java.util.Base64
import java.util.concurrent.TimeUnit
import java.util.regex.Matcher
import java.util.regex.Pattern
import javax.naming.ldap.LdapName
import javax.net.ssl.SSLPeerUnverifiedException
class RemoteBuildCacheServer(private val cfg: Configuration) {
private val log = contextLogger()
companion object {
val userAttribute: AttributeKey<Configuration.User> = AttributeKey.valueOf("user")
val groupAttribute: AttributeKey<Set<Configuration.Group>> = AttributeKey.valueOf("group")
val DEFAULT_CONFIGURATION_URL by lazy { "classpath:net/woggioni/rbcs/server/rbcs-default.xml".toUrl() }
private const val SSL_HANDLER_NAME = "sslHandler"
fun loadConfiguration(configurationFile: Path): Configuration {
val doc = Files.newInputStream(configurationFile).use {
Xml.parseXml(configurationFile.toUri().toURL(), it)
}
return Parser.parse(doc)
}
fun dumpConfiguration(conf: Configuration, outputStream: OutputStream) {
Xml.write(Serializer.serialize(conf), outputStream)
}
}
private class HttpChunkContentCompressor(
threshold: Int,
vararg compressionOptions: CompressionOptions = emptyArray()
) : HttpContentCompressor(threshold, *compressionOptions) {
override fun write(ctx: ChannelHandlerContext, msg: Any, promise: ChannelPromise) {
var message: Any? = msg
if (message is ByteBuf) {
// convert ByteBuf to HttpContent to make it work with compression. This is needed as we use the
// ChunkedWriteHandler to send files when compression is enabled.
val buff = message
if (buff.isReadable) {
// We only encode non empty buffers, as empty buffers can be used for determining when
// the content has been flushed and it confuses the HttpContentCompressor
// if we let it go
message = DefaultHttpContent(buff)
}
}
super.write(ctx, message, promise)
}
}
@Sharable
private class ClientCertificateAuthenticator(
authorizer: Authorizer,
private val anonymousUserGroups: Set<Configuration.Group>?,
private val userExtractor: Configuration.UserExtractor?,
private val groupExtractor: Configuration.GroupExtractor?,
) : AbstractNettyHttpAuthenticator(authorizer) {
override fun authenticate(ctx: ChannelHandlerContext, req: HttpRequest): AuthenticationResult? {
return try {
val sslHandler = (ctx.pipeline().get(SSL_HANDLER_NAME) as? SslHandler)
?: throw ConfigurationException("Client certificate authentication cannot be used when TLS is disabled")
val sslEngine = sslHandler.engine()
sslEngine.session.peerCertificates.takeIf {
it.isNotEmpty()
}?.let { peerCertificates ->
val clientCertificate = peerCertificates.first() as X509Certificate
val user = userExtractor?.extract(clientCertificate)
val group = groupExtractor?.extract(clientCertificate)
val allGroups = ((user?.groups ?: emptySet()).asSequence() + sequenceOf(group).filterNotNull()).toSet()
AuthenticationResult(user, allGroups)
} ?: anonymousUserGroups?.let{ AuthenticationResult(null, it) }
} catch (es: SSLPeerUnverifiedException) {
anonymousUserGroups?.let{ AuthenticationResult(null, it) }
}
}
}
@Sharable
private class NettyHttpBasicAuthenticator(
private val users: Map<String, Configuration.User>, authorizer: Authorizer
) : AbstractNettyHttpAuthenticator(authorizer) {
private val log = contextLogger()
override fun authenticate(ctx: ChannelHandlerContext, req: HttpRequest): AuthenticationResult? {
val authorizationHeader = req.headers()[HttpHeaderNames.AUTHORIZATION] ?: let {
log.debug(ctx) {
"Missing Authorization header"
}
return users[""]?.let { AuthenticationResult(it, it.groups) }
}
val cursor = authorizationHeader.indexOf(' ')
if (cursor < 0) {
log.debug(ctx) {
"Invalid Authorization header: '$authorizationHeader'"
}
return users[""]?.let { AuthenticationResult(it, it.groups) }
}
val authenticationType = authorizationHeader.substring(0, cursor)
if ("Basic" != authenticationType) {
log.debug(ctx) {
"Invalid authentication type header: '$authenticationType'"
}
return users[""]?.let { AuthenticationResult(it, it.groups) }
}
val (username, password) = Base64.getDecoder().decode(authorizationHeader.substring(cursor + 1))
.let(::String)
.let {
val colon = it.indexOf(':')
if (colon < 0) {
log.debug(ctx) {
"Missing colon from authentication"
}
return null
}
it.substring(0, colon) to it.substring(colon + 1)
}
return username.let(users::get)?.takeIf { user ->
user.password?.let { passwordAndSalt ->
val (_, salt) = decodePasswordHash(passwordAndSalt)
hashPassword(password, Base64.getEncoder().encodeToString(salt)) == passwordAndSalt
} ?: false
}?.let { user ->
AuthenticationResult(user, user.groups)
}
}
}
private class ServerInitializer(
private val cfg: Configuration,
private val eventExecutorGroup: EventExecutorGroup
) : ChannelInitializer<Channel>() {
companion object {
private fun createSslCtx(tls: Configuration.Tls): SslContext {
val keyStore = tls.keyStore
return if (keyStore == null) {
throw IllegalArgumentException("No keystore configured")
} else {
val javaKeyStore = loadKeystore(keyStore.file, keyStore.password)
val serverKey = javaKeyStore.getKey(
keyStore.keyAlias, (keyStore.keyPassword ?: "").let(String::toCharArray)
) as PrivateKey
val serverCert: Array<X509Certificate> =
Arrays.stream(javaKeyStore.getCertificateChain(keyStore.keyAlias))
.map { it as X509Certificate }
.toArray { size -> Array<X509Certificate?>(size) { null } }
SslContextBuilder.forServer(serverKey, *serverCert).apply {
val clientAuth = tls.trustStore?.let { trustStore ->
val ts = loadKeystore(trustStore.file, trustStore.password)
trustManager(
ClientCertificateValidator.getTrustManager(ts, trustStore.isCheckCertificateStatus)
)
if(trustStore.isRequireClientCertificate) ClientAuth.REQUIRE
else ClientAuth.OPTIONAL
} ?: ClientAuth.NONE
clientAuth(clientAuth)
}.build()
}
}
fun loadKeystore(file: Path, password: String?): KeyStore {
val ext = JWO.splitExtension(file)
.map(Tuple2<String, String>::get_2)
.orElseThrow {
IllegalArgumentException(
"Keystore file '${file}' must have .jks, .p12, .pfx extension"
)
}
val keystore = when (ext.substring(1).lowercase()) {
"jks" -> KeyStore.getInstance("JKS")
"p12", "pfx" -> KeyStore.getInstance("PKCS12")
else -> throw IllegalArgumentException(
"Keystore file '${file}' must have .jks, .p12, .pfx extension"
)
}
Files.newInputStream(file).use {
keystore.load(it, password?.let(String::toCharArray))
}
return keystore
}
}
private val log = contextLogger()
private val serverHandler = let {
val cacheImplementation = cfg.cache.materialize()
val prefix = Path.of("/").resolve(Path.of(cfg.serverPath ?: "/"))
ServerHandler(cacheImplementation, prefix)
}
private val exceptionHandler = ExceptionHandler()
private val throttlingHandler = ThrottlingHandler(cfg)
private val authenticator = when (val auth = cfg.authentication) {
is Configuration.BasicAuthentication -> NettyHttpBasicAuthenticator(cfg.users, RoleAuthorizer())
is Configuration.ClientCertificateAuthentication -> {
ClientCertificateAuthenticator(
RoleAuthorizer(),
cfg.users[""]?.groups,
userExtractor(auth),
groupExtractor(auth)
)
}
else -> null
}
private val sslContext: SslContext? = cfg.tls?.let(Companion::createSslCtx)
private fun userExtractor(authentication: Configuration.ClientCertificateAuthentication) =
authentication.userExtractor?.let { extractor ->
val pattern = Pattern.compile(extractor.pattern)
val rdnType = extractor.rdnType
Configuration.UserExtractor { cert: X509Certificate ->
val userName = LdapName(cert.subjectX500Principal.name).rdns.find {
it.type == rdnType
}?.let {
pattern.matcher(it.value.toString())
}?.takeIf(Matcher::matches)?.group(1)
cfg.users[userName] ?: throw java.lang.RuntimeException("Failed to extract user")
}
}
private fun groupExtractor(authentication: Configuration.ClientCertificateAuthentication) =
authentication.groupExtractor?.let { extractor ->
val pattern = Pattern.compile(extractor.pattern)
val rdnType = extractor.rdnType
Configuration.GroupExtractor { cert: X509Certificate ->
val groupName = LdapName(cert.subjectX500Principal.name).rdns.find {
it.type == rdnType
}?.let {
pattern.matcher(it.value.toString())
}?.takeIf(Matcher::matches)?.group(1)
cfg.groups[groupName] ?: throw java.lang.RuntimeException("Failed to extract group")
}
}
override fun initChannel(ch: Channel) {
log.debug {
"Created connection ${ch.id().asShortText()} with ${ch.remoteAddress()}"
}
ch.closeFuture().addListener {
log.debug {
"Closed connection ${ch.id().asShortText()} with ${ch.remoteAddress()}"
}
}
val pipeline = ch.pipeline()
cfg.connection.also { conn ->
val readTimeout = conn.readTimeout.toMillis()
val writeTimeout = conn.writeTimeout.toMillis()
if(readTimeout > 0 || writeTimeout > 0) {
pipeline.addLast(
IdleStateHandler(
false,
readTimeout,
writeTimeout,
0,
TimeUnit.MILLISECONDS
)
)
}
val readIdleTimeout = conn.readIdleTimeout.toMillis()
val writeIdleTimeout = conn.writeIdleTimeout.toMillis()
val idleTimeout = conn.idleTimeout.toMillis()
if(readIdleTimeout > 0 || writeIdleTimeout > 0 || idleTimeout > 0) {
pipeline.addLast(
IdleStateHandler(
true,
readIdleTimeout,
writeIdleTimeout,
idleTimeout,
TimeUnit.MILLISECONDS
)
)
}
}
pipeline.addLast(object : ChannelInboundHandlerAdapter() {
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
if (evt is IdleStateEvent) {
when(evt.state()) {
IdleState.READER_IDLE -> log.debug {
"Read timeout reached on channel ${ch.id().asShortText()}, closing the connection"
}
IdleState.WRITER_IDLE -> log.debug {
"Write timeout reached on channel ${ch.id().asShortText()}, closing the connection"
}
IdleState.ALL_IDLE -> log.debug {
"Idle timeout reached on channel ${ch.id().asShortText()}, closing the connection"
}
null -> throw IllegalStateException("This should never happen")
}
ctx.close()
}
}
})
sslContext?.newHandler(ch.alloc())?.also {
pipeline.addLast(SSL_HANDLER_NAME, it)
}
pipeline.addLast(HttpServerCodec())
pipeline.addLast(HttpChunkContentCompressor(1024))
pipeline.addLast(ChunkedWriteHandler())
pipeline.addLast(HttpObjectAggregator(cfg.connection.maxRequestSize))
authenticator?.let {
pipeline.addLast(it)
}
pipeline.addLast(throttlingHandler)
pipeline.addLast(eventExecutorGroup, serverHandler)
pipeline.addLast(exceptionHandler)
}
}
class ServerHandle(
httpChannelFuture: ChannelFuture,
private val executorGroups: Iterable<EventExecutorGroup>
) : AutoCloseable {
private val httpChannel: Channel = httpChannelFuture.channel()
private val closeFuture: ChannelFuture = httpChannel.closeFuture()
private val log = contextLogger()
fun shutdown(): ChannelFuture {
return httpChannel.close()
}
override fun close() {
try {
closeFuture.sync()
} finally {
executorGroups.forEach {
it.shutdownGracefully().sync()
}
}
log.info {
"RemoteBuildCacheServer has been gracefully shut down"
}
}
}
fun run(): ServerHandle {
// Create the multithreaded event loops for the server
val bossGroup = NioEventLoopGroup(1)
val serverSocketChannel = NioServerSocketChannel::class.java
val workerGroup = NioEventLoopGroup(0)
val eventExecutorGroup = run {
val threadFactory = if (cfg.eventExecutor.isUseVirtualThreads) {
Thread.ofVirtual().factory()
} else {
null
}
DefaultEventExecutorGroup(Runtime.getRuntime().availableProcessors(), threadFactory)
}
val bootstrap = ServerBootstrap().apply {
// Configure the server
group(bossGroup, workerGroup)
channel(serverSocketChannel)
childHandler(ServerInitializer(cfg, eventExecutorGroup))
option(ChannelOption.SO_BACKLOG, cfg.incomingConnectionsBacklogSize)
childOption(ChannelOption.SO_KEEPALIVE, true)
}
// Bind and start to accept incoming connections.
val bindAddress = InetSocketAddress(cfg.host, cfg.port)
val httpChannel = bootstrap.bind(bindAddress).sync()
log.info {
"RemoteBuildCacheServer is listening on ${cfg.host}:${cfg.port}"
}
return ServerHandle(httpChannel, setOf(bossGroup, workerGroup, eventExecutorGroup))
}
}

View File

@@ -0,0 +1,73 @@
package net.woggioni.rbcs.server.auth
import io.netty.buffer.Unpooled
import io.netty.channel.ChannelFutureListener
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.handler.codec.http.DefaultFullHttpResponse
import io.netty.handler.codec.http.FullHttpResponse
import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpRequest
import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpVersion
import io.netty.util.ReferenceCountUtil
import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.api.Configuration.Group
import net.woggioni.rbcs.api.Role
import net.woggioni.rbcs.server.RemoteBuildCacheServer
abstract class AbstractNettyHttpAuthenticator(private val authorizer: Authorizer) : ChannelInboundHandlerAdapter() {
companion object {
private val AUTHENTICATION_FAILED: FullHttpResponse = DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.UNAUTHORIZED, Unpooled.EMPTY_BUFFER
).apply {
headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
}
private val NOT_AUTHORIZED: FullHttpResponse = DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN, Unpooled.EMPTY_BUFFER
).apply {
headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
}
}
class AuthenticationResult(val user: Configuration.User?, val groups: Set<Group>)
abstract fun authenticate(ctx: ChannelHandlerContext, req: HttpRequest): AuthenticationResult?
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
if (msg is HttpRequest) {
val result = authenticate(ctx, msg) ?: return authenticationFailure(ctx, msg)
ctx.channel().attr(RemoteBuildCacheServer.userAttribute).set(result.user)
ctx.channel().attr(RemoteBuildCacheServer.groupAttribute).set(result.groups)
val roles = (
(result.user?.let { user ->
user.groups.asSequence().flatMap { group ->
group.roles.asSequence()
}
} ?: emptySequence<Role>()) +
result.groups.asSequence().flatMap { it.roles.asSequence() }
).toSet()
val authorized = authorizer.authorize(roles, msg)
if (authorized) {
super.channelRead(ctx, msg)
} else {
authorizationFailure(ctx, msg)
}
}
}
private fun authenticationFailure(ctx: ChannelHandlerContext, msg: Any) {
ReferenceCountUtil.release(msg)
ctx.writeAndFlush(AUTHENTICATION_FAILED.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE)
}
private fun authorizationFailure(ctx: ChannelHandlerContext, msg: Any) {
ReferenceCountUtil.release(msg)
ctx.writeAndFlush(NOT_AUTHORIZED.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE)
}
}

View File

@@ -0,0 +1,8 @@
package net.woggioni.rbcs.server.auth
import io.netty.handler.codec.http.HttpRequest
import net.woggioni.rbcs.api.Role
fun interface Authorizer {
fun authorize(roles : Set<Role>, request: HttpRequest) : Boolean
}

View File

@@ -0,0 +1,90 @@
package net.woggioni.rbcs.server.auth
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.handler.ssl.SslHandler
import io.netty.handler.ssl.SslHandshakeCompletionEvent
import java.security.KeyStore
import java.security.cert.CertPathValidator
import java.security.cert.CertPathValidatorException
import java.security.cert.CertificateException
import java.security.cert.CertificateFactory
import java.security.cert.PKIXParameters
import java.security.cert.PKIXRevocationChecker
import java.security.cert.X509Certificate
import java.util.EnumSet
import javax.net.ssl.SSLSession
import javax.net.ssl.TrustManagerFactory
import javax.net.ssl.X509TrustManager
class ClientCertificateValidator private constructor(
private val sslHandler: SslHandler,
private val x509TrustManager: X509TrustManager
) : ChannelInboundHandlerAdapter() {
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
if (evt is SslHandshakeCompletionEvent) {
if (evt.isSuccess) {
val session: SSLSession = sslHandler.engine().session
val clientCertificateChain = session.peerCertificates as Array<X509Certificate>
val authType: String = clientCertificateChain[0].publicKey.algorithm
x509TrustManager.checkClientTrusted(clientCertificateChain, authType)
} else {
// Handle the failure, for example by closing the channel.
}
}
super.userEventTriggered(ctx, evt)
}
companion object {
fun getTrustManager(trustStore: KeyStore?, certificateRevocationEnabled: Boolean): X509TrustManager {
return if (trustStore != null) {
val certificateFactory = CertificateFactory.getInstance("X.509")
val validator = CertPathValidator.getInstance("PKIX").apply {
val rc = revocationChecker as PKIXRevocationChecker
rc.options = EnumSet.of(
PKIXRevocationChecker.Option.NO_FALLBACK
)
}
val params = PKIXParameters(trustStore).apply {
isRevocationEnabled = certificateRevocationEnabled
}
object : X509TrustManager {
override fun checkClientTrusted(chain: Array<out X509Certificate>, authType: String) {
val clientCertificateChain = certificateFactory.generateCertPath(chain.toList())
try {
validator.validate(clientCertificateChain, params)
} catch (ex: CertPathValidatorException) {
throw CertificateException(ex)
}
}
override fun checkServerTrusted(chain: Array<out X509Certificate>, authType: String) {
throw NotImplementedError()
}
private val acceptedIssuers = trustStore.aliases().asSequence()
.filter(trustStore::isCertificateEntry)
.map(trustStore::getCertificate)
.map { it as X509Certificate }
.toList()
.toTypedArray()
override fun getAcceptedIssuers() = acceptedIssuers
}
} else {
val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm())
trustManagerFactory.trustManagers.asSequence().filter { it is X509TrustManager }
.single() as X509TrustManager
}
}
fun of(
sslHandler: SslHandler,
trustStore: KeyStore?,
certificateRevocationEnabled: Boolean
): ClientCertificateValidator {
return ClientCertificateValidator(sslHandler, getTrustManager(trustStore, certificateRevocationEnabled))
}
}
}

View File

@@ -0,0 +1,23 @@
package net.woggioni.rbcs.server.auth
import io.netty.handler.codec.http.HttpMethod
import io.netty.handler.codec.http.HttpRequest
import net.woggioni.rbcs.api.Role
class RoleAuthorizer : Authorizer {
companion object {
private val METHOD_MAP = mapOf(
Role.Reader to setOf(HttpMethod.GET, HttpMethod.HEAD, HttpMethod.TRACE),
Role.Writer to setOf(HttpMethod.PUT, HttpMethod.POST)
)
}
override fun authorize(roles: Set<Role>, request: HttpRequest) : Boolean {
val allowedMethods = roles.asSequence()
.mapNotNull(METHOD_MAP::get)
.flatten()
.toSet()
return request.method() in allowedMethods
}
}

View File

@@ -0,0 +1,130 @@
package net.woggioni.rbcs.server.cache
import io.netty.buffer.ByteBuf
import net.woggioni.rbcs.api.Cache
import net.woggioni.rbcs.common.ByteBufInputStream
import net.woggioni.rbcs.common.RBCS.digestString
import net.woggioni.rbcs.common.contextLogger
import net.woggioni.jwo.JWO
import net.woggioni.jwo.LockFile
import java.nio.channels.Channels
import java.nio.channels.FileChannel
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardCopyOption
import java.nio.file.StandardOpenOption
import java.nio.file.attribute.BasicFileAttributes
import java.security.MessageDigest
import java.time.Duration
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicReference
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.Inflater
import java.util.zip.InflaterInputStream
class FileSystemCache(
val root: Path,
val maxAge: Duration,
val digestAlgorithm: String?,
val compressionEnabled: Boolean,
val compressionLevel: Int
) : Cache {
private companion object {
@JvmStatic
private val log = contextLogger()
}
init {
Files.createDirectories(root)
}
private var nextGc = AtomicReference(Instant.now().plus(maxAge))
override fun get(key: String) = (digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digestString(key.toByteArray(), md)
} ?: key).let { digest ->
root.resolve(digest).takeIf(Files::exists)
?.let { file ->
file.takeIf(Files::exists)?.let { file ->
if (compressionEnabled) {
val inflater = Inflater()
Channels.newChannel(
InflaterInputStream(
Channels.newInputStream(
FileChannel.open(
file,
StandardOpenOption.READ
)
), inflater
)
)
} else {
FileChannel.open(file, StandardOpenOption.READ)
}
}
}.also {
gc()
}.let {
CompletableFuture.completedFuture(it)
}
}
override fun put(key: String, content: ByteBuf): CompletableFuture<Void> {
(digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digestString(key.toByteArray(), md)
} ?: key).let { digest ->
val file = root.resolve(digest)
val tmpFile = Files.createTempFile(root, null, ".tmp")
try {
Files.newOutputStream(tmpFile).let {
if (compressionEnabled) {
val deflater = Deflater(compressionLevel)
DeflaterOutputStream(it, deflater)
} else {
it
}
}.use {
JWO.copy(ByteBufInputStream(content), it)
}
Files.move(tmpFile, file, StandardCopyOption.ATOMIC_MOVE)
} catch (t: Throwable) {
Files.delete(tmpFile)
throw t
}
}.also {
gc()
}
return CompletableFuture.completedFuture(null)
}
private fun gc() {
val now = Instant.now()
val oldValue = nextGc.getAndSet(now.plus(maxAge))
if (oldValue < now) {
actualGc(now)
}
}
@Synchronized
private fun actualGc(now: Instant) {
Files.list(root).filter {
val creationTimeStamp = Files.readAttributes(it, BasicFileAttributes::class.java)
.creationTime()
.toInstant()
now > creationTimeStamp.plus(maxAge)
}.forEach { file ->
LockFile.acquire(file, false).use {
Files.delete(file)
}
}
}
override fun close() {}
}

View File

@@ -0,0 +1,27 @@
package net.woggioni.rbcs.server.cache
import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.common.RBCS
import net.woggioni.jwo.Application
import java.nio.file.Path
import java.time.Duration
data class FileSystemCacheConfiguration(
val root: Path?,
val maxAge: Duration,
val digestAlgorithm : String?,
val compressionEnabled: Boolean,
val compressionLevel: Int,
) : Configuration.Cache {
override fun materialize() = FileSystemCache(
root ?: Application.builder("rbcs").build().computeCacheDirectory(),
maxAge,
digestAlgorithm,
compressionEnabled,
compressionLevel
)
override fun getNamespaceURI() = RBCS.RBCS_NAMESPACE_URI
override fun getTypeName() = "fileSystemCacheType"
}

View File

@@ -0,0 +1,63 @@
package net.woggioni.rbcs.server.cache
import net.woggioni.rbcs.api.CacheProvider
import net.woggioni.rbcs.common.RBCS
import net.woggioni.rbcs.common.Xml
import net.woggioni.rbcs.common.Xml.Companion.renderAttribute
import org.w3c.dom.Document
import org.w3c.dom.Element
import java.nio.file.Path
import java.time.Duration
import java.util.zip.Deflater
class FileSystemCacheProvider : CacheProvider<FileSystemCacheConfiguration> {
override fun getXmlSchemaLocation() = "classpath:net/woggioni/rbcs/server/schema/rbcs.xsd"
override fun getXmlType() = "fileSystemCacheType"
override fun getXmlNamespace() = "urn:net.woggioni.rbcs.server"
override fun deserialize(el: Element): FileSystemCacheConfiguration {
val path = el.renderAttribute("path")
?.let(Path::of)
val maxAge = el.renderAttribute("max-age")
?.let(Duration::parse)
?: Duration.ofDays(1)
val enableCompression = el.renderAttribute("enable-compression")
?.let(String::toBoolean)
?: true
val compressionLevel = el.renderAttribute("compression-level")
?.let(String::toInt)
?: Deflater.DEFAULT_COMPRESSION
val digestAlgorithm = el.renderAttribute("digest") ?: "MD5"
return FileSystemCacheConfiguration(
path,
maxAge,
digestAlgorithm,
enableCompression,
compressionLevel
)
}
override fun serialize(doc: Document, cache : FileSystemCacheConfiguration) = cache.run {
val result = doc.createElement("cache")
Xml.of(doc, result) {
val prefix = doc.lookupPrefix(RBCS.RBCS_NAMESPACE_URI)
attr("xs:type", "${prefix}:fileSystemCacheType", RBCS.XML_SCHEMA_NAMESPACE_URI)
attr("path", root.toString())
attr("max-age", maxAge.toString())
digestAlgorithm?.let { digestAlgorithm ->
attr("digest", digestAlgorithm)
}
attr("enable-compression", compressionEnabled.toString())
compressionLevel.takeIf {
it != Deflater.DEFAULT_COMPRESSION
}?.let {
attr("compression-level", it.toString())
}
}
result
}
}

View File

@@ -0,0 +1,150 @@
package net.woggioni.rbcs.server.cache
import io.netty.buffer.ByteBuf
import net.woggioni.rbcs.api.Cache
import net.woggioni.rbcs.common.ByteBufInputStream
import net.woggioni.rbcs.common.ByteBufOutputStream
import net.woggioni.rbcs.common.RBCS.digestString
import net.woggioni.rbcs.common.contextLogger
import net.woggioni.jwo.JWO
import java.nio.channels.Channels
import java.security.MessageDigest
import java.time.Duration
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.PriorityBlockingQueue
import java.util.concurrent.atomic.AtomicLong
import java.util.zip.Deflater
import java.util.zip.DeflaterOutputStream
import java.util.zip.Inflater
import java.util.zip.InflaterInputStream
class InMemoryCache(
val maxAge: Duration,
val maxSize: Long,
val digestAlgorithm: String?,
val compressionEnabled: Boolean,
val compressionLevel: Int
) : Cache {
companion object {
@JvmStatic
private val log = contextLogger()
}
private val size = AtomicLong()
private val map = ConcurrentHashMap<String, ByteBuf>()
private class RemovalQueueElement(val key: String, val value : ByteBuf, val expiry : Instant) : Comparable<RemovalQueueElement> {
override fun compareTo(other: RemovalQueueElement) = expiry.compareTo(other.expiry)
}
private val removalQueue = PriorityBlockingQueue<RemovalQueueElement>()
private var running = true
private val garbageCollector = Thread {
while(true) {
val el = removalQueue.take()
val buf = el.value
val now = Instant.now()
if(now > el.expiry) {
val removed = map.remove(el.key, buf)
if(removed) {
updateSizeAfterRemoval(buf)
//Decrease the reference count for map
buf.release()
}
//Decrease the reference count for removalQueue
buf.release()
} else {
removalQueue.put(el)
Thread.sleep(minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1)))
}
}
}.apply {
start()
}
private fun removeEldest() : Long {
while(true) {
val el = removalQueue.take()
val buf = el.value
val removed = map.remove(el.key, buf)
//Decrease the reference count for removalQueue
buf.release()
if(removed) {
val newSize = updateSizeAfterRemoval(buf)
//Decrease the reference count for map
buf.release()
return newSize
}
}
}
private fun updateSizeAfterRemoval(removed: ByteBuf) : Long {
return size.updateAndGet { currentSize : Long ->
currentSize - removed.readableBytes()
}
}
override fun close() {
running = false
garbageCollector.join()
}
override fun get(key: String) =
(digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digestString(key.toByteArray(), md)
} ?: key
).let { digest ->
map[digest]
?.let { value ->
val copy = value.retainedDuplicate()
copy.touch("This has to be released by the caller of the cache")
if (compressionEnabled) {
val inflater = Inflater()
Channels.newChannel(InflaterInputStream(ByteBufInputStream(copy), inflater))
} else {
Channels.newChannel(ByteBufInputStream(copy))
}
}
}.let {
CompletableFuture.completedFuture(it)
}
override fun put(key: String, content: ByteBuf) =
(digestAlgorithm
?.let(MessageDigest::getInstance)
?.let { md ->
digestString(key.toByteArray(), md)
} ?: key).let { digest ->
content.retain()
val value = if (compressionEnabled) {
val deflater = Deflater(compressionLevel)
val buf = content.alloc().buffer()
buf.retain()
DeflaterOutputStream(ByteBufOutputStream(buf), deflater).use { outputStream ->
ByteBufInputStream(content).use { inputStream ->
JWO.copy(inputStream, outputStream)
}
}
buf
} else {
content
}
val old = map.put(digest, value)
val delta = value.readableBytes() - (old?.readableBytes() ?: 0)
var newSize = size.updateAndGet { currentSize : Long ->
currentSize + delta
}
removalQueue.put(RemovalQueueElement(digest, value.retain(), Instant.now().plus(maxAge)))
while(newSize > maxSize) {
newSize = removeEldest()
}
}.let {
CompletableFuture.completedFuture<Void>(null)
}
}

View File

@@ -0,0 +1,25 @@
package net.woggioni.rbcs.server.cache
import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.common.RBCS
import java.time.Duration
data class InMemoryCacheConfiguration(
val maxAge: Duration,
val maxSize: Long,
val digestAlgorithm : String?,
val compressionEnabled: Boolean,
val compressionLevel: Int,
) : Configuration.Cache {
override fun materialize() = InMemoryCache(
maxAge,
maxSize,
digestAlgorithm,
compressionEnabled,
compressionLevel
)
override fun getNamespaceURI() = RBCS.RBCS_NAMESPACE_URI
override fun getTypeName() = "inMemoryCacheType"
}

View File

@@ -0,0 +1,63 @@
package net.woggioni.rbcs.server.cache
import net.woggioni.rbcs.api.CacheProvider
import net.woggioni.rbcs.common.RBCS
import net.woggioni.rbcs.common.Xml
import net.woggioni.rbcs.common.Xml.Companion.renderAttribute
import org.w3c.dom.Document
import org.w3c.dom.Element
import java.time.Duration
import java.util.zip.Deflater
class InMemoryCacheProvider : CacheProvider<InMemoryCacheConfiguration> {
override fun getXmlSchemaLocation() = "classpath:net/woggioni/rbcs/server/schema/rbcs.xsd"
override fun getXmlType() = "inMemoryCacheType"
override fun getXmlNamespace() = "urn:net.woggioni.rbcs.server"
override fun deserialize(el: Element): InMemoryCacheConfiguration {
val maxAge = el.renderAttribute("max-age")
?.let(Duration::parse)
?: Duration.ofDays(1)
val maxSize = el.renderAttribute("max-size")
?.let(java.lang.Long::decode)
?: 0x1000000
val enableCompression = el.renderAttribute("enable-compression")
?.let(String::toBoolean)
?: true
val compressionLevel = el.renderAttribute("compression-level")
?.let(String::toInt)
?: Deflater.DEFAULT_COMPRESSION
val digestAlgorithm = el.renderAttribute("digest") ?: "MD5"
return InMemoryCacheConfiguration(
maxAge,
maxSize,
digestAlgorithm,
enableCompression,
compressionLevel
)
}
override fun serialize(doc: Document, cache : InMemoryCacheConfiguration) = cache.run {
val result = doc.createElement("cache")
Xml.of(doc, result) {
val prefix = doc.lookupPrefix(RBCS.RBCS_NAMESPACE_URI)
attr("xs:type", "${prefix}:inMemoryCacheType", RBCS.XML_SCHEMA_NAMESPACE_URI)
attr("max-age", maxAge.toString())
attr("max-size", maxSize.toString())
digestAlgorithm?.let { digestAlgorithm ->
attr("digest", digestAlgorithm)
}
attr("enable-compression", compressionEnabled.toString())
compressionLevel.takeIf {
it != Deflater.DEFAULT_COMPRESSION
}?.let {
attr("compression-level", it.toString())
}
}
result
}
}

View File

@@ -0,0 +1,15 @@
package net.woggioni.rbcs.server.configuration
import net.woggioni.rbcs.api.CacheProvider
import net.woggioni.rbcs.api.Configuration
import java.util.ServiceLoader
object CacheSerializers {
val index = (Configuration::class.java.module.layer?.let { layer ->
ServiceLoader.load(layer, CacheProvider::class.java)
} ?: ServiceLoader.load(CacheProvider::class.java))
.asSequence()
.map {
(it.xmlNamespace to it.xmlType) to it
}.toMap()
}

View File

@@ -0,0 +1,298 @@
package net.woggioni.rbcs.server.configuration
import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.api.Configuration.Authentication
import net.woggioni.rbcs.api.Configuration.BasicAuthentication
import net.woggioni.rbcs.api.Configuration.Cache
import net.woggioni.rbcs.api.Configuration.ClientCertificateAuthentication
import net.woggioni.rbcs.api.Configuration.Group
import net.woggioni.rbcs.api.Configuration.KeyStore
import net.woggioni.rbcs.api.Configuration.Tls
import net.woggioni.rbcs.api.Configuration.TlsCertificateExtractor
import net.woggioni.rbcs.api.Configuration.TrustStore
import net.woggioni.rbcs.api.Configuration.User
import net.woggioni.rbcs.api.Role
import net.woggioni.rbcs.api.exception.ConfigurationException
import net.woggioni.rbcs.common.Xml.Companion.asIterable
import net.woggioni.rbcs.common.Xml.Companion.renderAttribute
import org.w3c.dom.Document
import org.w3c.dom.Element
import org.w3c.dom.TypeInfo
import java.nio.file.Paths
import java.time.Duration
import java.time.temporal.ChronoUnit
object Parser {
fun parse(document: Document): Configuration {
val root = document.documentElement
val anonymousUser = User("", null, emptySet(), null)
var connection: Configuration.Connection = Configuration.Connection(
Duration.of(10, ChronoUnit.SECONDS),
Duration.of(10, ChronoUnit.SECONDS),
Duration.of(60, ChronoUnit.SECONDS),
Duration.of(30, ChronoUnit.SECONDS),
Duration.of(30, ChronoUnit.SECONDS),
67108864
)
var eventExecutor: Configuration.EventExecutor = Configuration.EventExecutor(true)
var cache: Cache? = null
var host = "127.0.0.1"
var port = 11080
var users: Map<String, User> = mapOf(anonymousUser.name to anonymousUser)
var groups = emptyMap<String, Group>()
var tls: Tls? = null
val serverPath = root.renderAttribute("path")
var incomingConnectionsBacklogSize = 1024
var authentication: Authentication? = null
for (child in root.asIterable()) {
val tagName = child.localName
when (tagName) {
"authentication" -> {
for (gchild in child.asIterable()) {
when (gchild.localName) {
"basic" -> {
authentication = BasicAuthentication()
}
"client-certificate" -> {
var tlsExtractorUser: TlsCertificateExtractor? = null
var tlsExtractorGroup: TlsCertificateExtractor? = null
for (ggchild in gchild.asIterable()) {
when (ggchild.localName) {
"group-extractor" -> {
val attrName = ggchild.renderAttribute("attribute-name")
val pattern = ggchild.renderAttribute("pattern")
tlsExtractorGroup = TlsCertificateExtractor(attrName, pattern)
}
"user-extractor" -> {
val attrName = ggchild.renderAttribute("attribute-name")
val pattern = ggchild.renderAttribute("pattern")
tlsExtractorUser = TlsCertificateExtractor(attrName, pattern)
}
}
}
authentication = ClientCertificateAuthentication(tlsExtractorUser, tlsExtractorGroup)
}
}
}
}
"authorization" -> {
var knownUsers = sequenceOf(anonymousUser)
for (gchild in child.asIterable()) {
when (gchild.localName) {
"users" -> {
knownUsers += parseUsers(gchild)
}
"groups" -> {
val pair = parseGroups(gchild, knownUsers)
users = pair.first
groups = pair.second
}
}
}
}
"bind" -> {
host = child.renderAttribute("host") ?: throw ConfigurationException("host attribute is required")
port = Integer.parseInt(child.renderAttribute("port"))
incomingConnectionsBacklogSize = child.renderAttribute("incoming-connections-backlog-size")
?.let(Integer::parseInt)
?: 1024
}
"cache" -> {
cache = (child as TypeInfo).let { tf ->
val typeNamespace = tf.typeNamespace
val typeName = tf.typeName
CacheSerializers.index[typeNamespace to typeName]
?: throw IllegalArgumentException("Cache provider for namespace '$typeNamespace' with name '$typeName' not found")
}.deserialize(child)
}
"connection" -> {
val writeTimeout = child.renderAttribute("write-timeout")
?.let(Duration::parse) ?: Duration.of(0, ChronoUnit.SECONDS)
val readTimeout = child.renderAttribute("read-timeout")
?.let(Duration::parse) ?: Duration.of(0, ChronoUnit.SECONDS)
val idleTimeout = child.renderAttribute("idle-timeout")
?.let(Duration::parse) ?: Duration.of(30, ChronoUnit.SECONDS)
val readIdleTimeout = child.renderAttribute("read-idle-timeout")
?.let(Duration::parse) ?: Duration.of(60, ChronoUnit.SECONDS)
val writeIdleTimeout = child.renderAttribute("write-idle-timeout")
?.let(Duration::parse) ?: Duration.of(60, ChronoUnit.SECONDS)
val maxRequestSize = child.renderAttribute("max-request-size")
?.let(String::toInt) ?: 67108864
connection = Configuration.Connection(
readTimeout,
writeTimeout,
idleTimeout,
readIdleTimeout,
writeIdleTimeout,
maxRequestSize
)
}
"event-executor" -> {
val useVirtualThread = root.renderAttribute("use-virtual-threads")
?.let(String::toBoolean) ?: true
eventExecutor = Configuration.EventExecutor(useVirtualThread)
}
"tls" -> {
var keyStore: KeyStore? = null
var trustStore: TrustStore? = null
for (granChild in child.asIterable()) {
when (granChild.localName) {
"keystore" -> {
val keyStoreFile = Paths.get(granChild.renderAttribute("file"))
val keyStorePassword = granChild.renderAttribute("password")
val keyAlias = granChild.renderAttribute("key-alias")
val keyPassword = granChild.renderAttribute("key-password")
keyStore = KeyStore(
keyStoreFile,
keyStorePassword,
keyAlias,
keyPassword
)
}
"truststore" -> {
val trustStoreFile = Paths.get(granChild.renderAttribute("file"))
val trustStorePassword = granChild.renderAttribute("password")
val checkCertificateStatus = granChild.renderAttribute("check-certificate-status")
?.let(String::toBoolean)
?: false
val requireClientCertificate = child.renderAttribute("require-client-certificate")
?.let(String::toBoolean) ?: false
trustStore = TrustStore(
trustStoreFile,
trustStorePassword,
checkCertificateStatus,
requireClientCertificate
)
}
}
}
tls = Tls(keyStore, trustStore)
}
}
}
return Configuration.of(
host,
port,
incomingConnectionsBacklogSize,
serverPath,
eventExecutor,
connection,
users,
groups,
cache!!,
authentication,
tls,
)
}
private fun parseRoles(root: Element) = root.asIterable().asSequence().map {
when (it.localName) {
"reader" -> Role.Reader
"writer" -> Role.Writer
else -> throw UnsupportedOperationException("Illegal node '${it.localName}'")
}
}.toSet()
private fun parseUserRefs(root: Element) = root.asIterable().asSequence().map {
when (it.localName) {
"user" -> it.renderAttribute("ref")
"anonymous" -> ""
else -> ConfigurationException("Unrecognized tag '${it.localName}'")
}
}
private fun parseQuota(el: Element): Configuration.Quota {
val calls = el.renderAttribute("calls")
?.let(String::toLong)
?: throw ConfigurationException("Missing attribute 'calls'")
val maxAvailableCalls = el.renderAttribute("max-available-calls")
?.let(String::toLong)
?: calls
val initialAvailableCalls = el.renderAttribute("initial-available-calls")
?.let(String::toLong)
?: maxAvailableCalls
val period = el.renderAttribute("period")
?.let(Duration::parse)
?: throw ConfigurationException("Missing attribute 'period'")
return Configuration.Quota(calls, period, initialAvailableCalls, maxAvailableCalls)
}
private fun parseUsers(root: Element): Sequence<User> {
return root.asIterable().asSequence().mapNotNull { child ->
when (child.localName) {
"user" -> {
val username = child.renderAttribute("name")
val password = child.renderAttribute("password")
var quota: Configuration.Quota? = null
for (gchild in child.asIterable()) {
if (gchild.localName == "quota") {
quota = parseQuota(gchild)
}
}
User(username, password, emptySet(), quota)
}
"anonymous" -> {
var quota: Configuration.Quota? = null
for (gchild in child.asIterable()) {
if (gchild.localName == "quota") {
quota= parseQuota(gchild)
}
}
User("", null, emptySet(), quota)
}
else -> null
}
}
}
private fun parseGroups(root: Element, knownUsers: Sequence<User>): Pair<Map<String, User>, Map<String, Group>> {
val knownUsersMap = knownUsers.associateBy(User::getName)
val userGroups = mutableMapOf<String, MutableSet<String>>()
val groups = root.asIterable().asSequence().filter {
it.localName == "group"
}.map { el ->
val groupName = el.renderAttribute("name") ?: throw ConfigurationException("Group name is required")
var roles = emptySet<Role>()
var userQuota: Configuration.Quota? = null
var groupQuota: Configuration.Quota? = null
for (child in el.asIterable()) {
when (child.localName) {
"users" -> {
parseUserRefs(child).mapNotNull(knownUsersMap::get).forEach { user ->
userGroups.computeIfAbsent(user.name) {
mutableSetOf()
}.add(groupName)
}
}
"roles" -> {
roles = parseRoles(child)
}
"group-quota" -> {
userQuota = parseQuota(child)
}
"user-quota" -> {
groupQuota = parseQuota(child)
}
}
}
groupName to Group(groupName, roles, userQuota, groupQuota)
}.toMap()
val users = knownUsersMap.map { (name, user) ->
name to User(name, user.password, userGroups[name]?.mapNotNull { groups[it] }?.toSet() ?: emptySet(), user.quota)
}.toMap()
return users to groups
}
}

View File

@@ -0,0 +1,186 @@
package net.woggioni.rbcs.server.configuration
import net.woggioni.rbcs.api.CacheProvider
import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.common.RBCS
import net.woggioni.rbcs.common.Xml
import org.w3c.dom.Document
object Serializer {
private fun Xml.serializeQuota(quota : Configuration.Quota) {
attr("calls", quota.calls.toString())
attr("period", quota.period.toString())
attr("max-available-calls", quota.maxAvailableCalls.toString())
attr("initial-available-calls", quota.initialAvailableCalls.toString())
}
fun serialize(conf : Configuration) : Document {
val schemaLocations = CacheSerializers.index.values.asSequence().map {
it.xmlNamespace to it.xmlSchemaLocation
}.toMap()
return Xml.of(RBCS.RBCS_NAMESPACE_URI, RBCS.RBCS_PREFIX + ":server") {
// attr("xmlns:xs", GradleBuildCacheServer.XML_SCHEMA_NAMESPACE_URI)
val value = schemaLocations.asSequence().map { (k, v) -> "$k $v" }.joinToString(" ")
attr("xs:schemaLocation", value , namespaceURI = RBCS.XML_SCHEMA_NAMESPACE_URI)
conf.serverPath
?.takeIf(String::isNotEmpty)
?.let { serverPath ->
attr("path", serverPath)
}
node("bind") {
attr("host", conf.host)
attr("port", conf.port.toString())
attr("incoming-connections-backlog-size", conf.incomingConnectionsBacklogSize.toString())
}
node("connection") {
conf.connection.let { connection ->
attr("read-timeout", connection.readTimeout.toString())
attr("write-timeout", connection.writeTimeout.toString())
attr("idle-timeout", connection.idleTimeout.toString())
attr("read-idle-timeout", connection.readIdleTimeout.toString())
attr("write-idle-timeout", connection.writeIdleTimeout.toString())
attr("max-request-size", connection.maxRequestSize.toString())
}
}
node("event-executor") {
attr("use-virtual-threads", conf.eventExecutor.isUseVirtualThreads.toString())
}
val cache = conf.cache
val serializer : CacheProvider<Configuration.Cache> =
(CacheSerializers.index[cache.namespaceURI to cache.typeName] as? CacheProvider<Configuration.Cache>) ?: throw NotImplementedError()
element.appendChild(serializer.serialize(doc, cache))
node("authorization") {
node("users") {
for(user in conf.users.values) {
if(user.name.isNotEmpty()) {
node("user") {
attr("name", user.name)
user.password?.let { password ->
attr("password", password)
}
user.quota?.let { quota ->
node("quota") {
serializeQuota(quota)
}
}
}
}
}
conf.users[""]
?.let { anonymousUser ->
anonymousUser.quota?.let { quota ->
node("anonymous") {
node("quota") {
serializeQuota(quota)
}
}
}
}
}
node("groups") {
val groups = conf.users.values.asSequence()
.flatMap {
user -> user.groups.map { it to user }
}.groupBy(Pair<Configuration.Group, Configuration.User>::first, Pair<Configuration.Group, Configuration.User>::second)
for(pair in groups) {
val group = pair.key
val users = pair.value
node("group") {
attr("name", group.name)
if(users.isNotEmpty()) {
node("users") {
var anonymousUser : Configuration.User? = null
for(user in users) {
if(user.name.isNotEmpty()) {
node("user") {
attr("ref", user.name)
}
} else {
anonymousUser = user
}
}
if(anonymousUser != null) {
node("anonymous")
}
}
}
if(group.roles.isNotEmpty()) {
node("roles") {
for(role in group.roles) {
node(role.toString().lowercase())
}
}
}
group.userQuota?.let { quota ->
node("user-quota") {
serializeQuota(quota)
}
}
group.groupQuota?.let { quota ->
node("group-quota") {
serializeQuota(quota)
}
}
}
}
}
}
conf.authentication?.let { authentication ->
node("authentication") {
when(authentication) {
is Configuration.BasicAuthentication -> {
node("basic")
}
is Configuration.ClientCertificateAuthentication -> {
node("client-certificate") {
authentication.groupExtractor?.let { extractor ->
node("group-extractor") {
attr("attribute-name", extractor.rdnType)
attr("pattern", extractor.pattern)
}
}
authentication.userExtractor?.let { extractor ->
node("user-extractor") {
attr("attribute-name", extractor.rdnType)
attr("pattern", extractor.pattern)
}
}
}
}
}
}
}
conf.tls?.let { tlsConfiguration ->
node("tls") {
tlsConfiguration.keyStore?.let { keyStore ->
node("keystore") {
attr("file", keyStore.file.toString())
keyStore.password?.let { keyStorePassword ->
attr("password", keyStorePassword)
}
attr("key-alias", keyStore.keyAlias)
keyStore.keyPassword?.let { keyPassword ->
attr("key-password", keyPassword)
}
}
}
tlsConfiguration.trustStore?.let { trustStore ->
node("truststore") {
attr("file", trustStore.file.toString())
trustStore.password?.let { password ->
attr("password", password)
}
attr("check-certificate-status", trustStore.isCheckCertificateStatus.toString())
attr("require-client-certificate", trustStore.isRequireClientCertificate.toString())
}
}
}
}
}
}
}

View File

@@ -0,0 +1,92 @@
package net.woggioni.rbcs.server.exception
import io.netty.buffer.Unpooled
import io.netty.channel.ChannelDuplexHandler
import io.netty.channel.ChannelFutureListener
import io.netty.channel.ChannelHandler
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.DecoderException
import io.netty.handler.codec.http.DefaultFullHttpResponse
import io.netty.handler.codec.http.FullHttpResponse
import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpVersion
import io.netty.handler.timeout.ReadTimeoutException
import io.netty.handler.timeout.WriteTimeoutException
import net.woggioni.rbcs.api.exception.CacheException
import net.woggioni.rbcs.api.exception.ContentTooLargeException
import net.woggioni.rbcs.common.contextLogger
import net.woggioni.rbcs.common.debug
import javax.net.ssl.SSLPeerUnverifiedException
@ChannelHandler.Sharable
class ExceptionHandler : ChannelDuplexHandler() {
private val log = contextLogger()
private val NOT_AUTHORIZED: FullHttpResponse = DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN, Unpooled.EMPTY_BUFFER
).apply {
headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
}
private val TOO_BIG: FullHttpResponse = DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, Unpooled.EMPTY_BUFFER
).apply {
headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
}
private val NOT_AVAILABLE: FullHttpResponse = DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.SERVICE_UNAVAILABLE, Unpooled.EMPTY_BUFFER
).apply {
headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
}
private val SERVER_ERROR: FullHttpResponse = DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.EMPTY_BUFFER
).apply {
headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
when (cause) {
is DecoderException -> {
log.error(cause.message, cause)
ctx.close()
}
is SSLPeerUnverifiedException -> {
ctx.writeAndFlush(NOT_AUTHORIZED.retainedDuplicate())
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE)
}
is ContentTooLargeException -> {
ctx.writeAndFlush(TOO_BIG.retainedDuplicate())
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE)
}
is ReadTimeoutException -> {
log.debug {
val channelId = ctx.channel().id().asShortText()
"Read timeout on channel $channelId, closing the connection"
}
ctx.close()
}
is WriteTimeoutException -> {
log.debug {
val channelId = ctx.channel().id().asShortText()
"Write timeout on channel $channelId, closing the connection"
}
ctx.close()
}
is CacheException -> {
log.error(cause.message, cause)
ctx.writeAndFlush(NOT_AVAILABLE.retainedDuplicate())
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE)
}
else -> {
log.error(cause.message, cause)
ctx.writeAndFlush(SERVER_ERROR.retainedDuplicate())
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE)
}
}
}
}

View File

@@ -0,0 +1,161 @@
package net.woggioni.rbcs.server.handler
import io.netty.buffer.Unpooled
import io.netty.channel.ChannelFutureListener
import io.netty.channel.ChannelHandler
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.DefaultFileRegion
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.http.DefaultFullHttpResponse
import io.netty.handler.codec.http.DefaultHttpResponse
import io.netty.handler.codec.http.FullHttpRequest
import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpHeaderValues
import io.netty.handler.codec.http.HttpMethod
import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpUtil
import io.netty.handler.codec.http.LastHttpContent
import io.netty.handler.stream.ChunkedNioStream
import net.woggioni.rbcs.api.Cache
import net.woggioni.rbcs.common.contextLogger
import net.woggioni.rbcs.server.debug
import net.woggioni.rbcs.server.warn
import java.nio.channels.FileChannel
import java.nio.file.Path
@ChannelHandler.Sharable
class ServerHandler(private val cache: Cache, private val serverPrefix: Path) :
SimpleChannelInboundHandler<FullHttpRequest>() {
private val log = contextLogger()
override fun channelRead0(ctx: ChannelHandlerContext, msg: FullHttpRequest) {
val keepAlive: Boolean = HttpUtil.isKeepAlive(msg)
val method = msg.method()
if (method === HttpMethod.GET) {
val path = Path.of(msg.uri())
val prefix = path.parent
val key = path.fileName?.toString() ?: let {
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response)
return
}
if (serverPrefix == prefix) {
cache.get(key).thenApply { channel ->
if(channel != null) {
log.debug(ctx) {
"Cache hit for key '$key'"
}
val response = DefaultHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK)
response.headers()[HttpHeaderNames.CONTENT_TYPE] = HttpHeaderValues.APPLICATION_OCTET_STREAM
if (!keepAlive) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.IDENTITY)
} else {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
}
ctx.write(response)
when (channel) {
is FileChannel -> {
val content = DefaultFileRegion(channel, 0, channel.size())
if (keepAlive) {
ctx.write(content)
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate())
} else {
ctx.writeAndFlush(content)
.addListener(ChannelFutureListener.CLOSE)
}
}
else -> {
val content = ChunkedNioStream(channel)
if (keepAlive) {
ctx.write(content).addListener {
content.close()
}
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT.retainedDuplicate())
} else {
ctx.writeAndFlush(content)
.addListener(ChannelFutureListener.CLOSE)
}
}
}
} else {
log.debug(ctx) {
"Cache miss for key '$key'"
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.NOT_FOUND)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response)
}
}.whenComplete { _, ex -> ex?.let(ctx::fireExceptionCaught) }
} else {
log.warn(ctx) {
"Got request for unhandled path '${msg.uri()}'"
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.BAD_REQUEST)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
ctx.writeAndFlush(response)
}
} else if (method === HttpMethod.PUT) {
val path = Path.of(msg.uri())
val prefix = path.parent
val key = path.fileName.toString()
if (serverPrefix == prefix) {
log.debug(ctx) {
"Added value for key '$key' to build cache"
}
cache.put(key, msg.content()).thenRun {
val response = DefaultFullHttpResponse(
msg.protocolVersion(), HttpResponseStatus.CREATED,
Unpooled.copiedBuffer(key.toByteArray())
)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = response.content().readableBytes()
ctx.writeAndFlush(response)
}.whenComplete { _, ex ->
ctx.fireExceptionCaught(ex)
}
} else {
log.warn(ctx) {
"Got request for unhandled path '${msg.uri()}'"
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.BAD_REQUEST)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
ctx.writeAndFlush(response)
}
} else if(method == HttpMethod.TRACE) {
val replayedRequestHead = ctx.alloc().buffer()
replayedRequestHead.writeCharSequence("TRACE ${Path.of(msg.uri())} ${msg.protocolVersion().text()}\r\n", Charsets.US_ASCII)
msg.headers().forEach { (key, value) ->
replayedRequestHead.apply {
writeCharSequence(key, Charsets.US_ASCII)
writeCharSequence(": ", Charsets.US_ASCII)
writeCharSequence(value, Charsets.UTF_8)
writeCharSequence("\r\n", Charsets.US_ASCII)
}
}
replayedRequestHead.writeCharSequence("\r\n", Charsets.US_ASCII)
val requestBody = msg.content()
requestBody.retain()
val responseBody = ctx.alloc().compositeBuffer(2).apply {
addComponents(true, replayedRequestHead)
addComponents(true, requestBody)
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK, responseBody)
response.headers().apply {
set(HttpHeaderNames.CONTENT_TYPE, "message/http")
set(HttpHeaderNames.CONTENT_LENGTH, responseBody.readableBytes())
}
ctx.writeAndFlush(response)
} else {
log.warn(ctx) {
"Got request with unhandled method '${msg.method().name()}'"
}
val response = DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.METHOD_NOT_ALLOWED)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = "0"
ctx.writeAndFlush(response)
}
}
}

View File

@@ -0,0 +1,91 @@
package net.woggioni.rbcs.server.throttling
import net.woggioni.rbcs.api.Configuration
import net.woggioni.jwo.Bucket
import java.net.InetSocketAddress
import java.util.Arrays
import java.util.concurrent.ConcurrentHashMap
import java.util.function.Function
class BucketManager private constructor(
private val bucketsByUser: Map<Configuration.User, List<Bucket>> = HashMap(),
private val bucketsByGroup: Map<Configuration.Group, Bucket> = HashMap(),
loader: Function<InetSocketAddress, Bucket>?
) {
private class BucketsByAddress(
private val map: MutableMap<ByteArrayKey, Bucket>,
private val loader: Function<InetSocketAddress, Bucket>
) {
fun getBucket(socketAddress : InetSocketAddress) = map.computeIfAbsent(ByteArrayKey(socketAddress.address.address)) {
loader.apply(socketAddress)
}
}
private val bucketsByAddress: BucketsByAddress? = loader?.let {
BucketsByAddress(ConcurrentHashMap(), it)
}
private class ByteArrayKey(val array: ByteArray) {
override fun equals(other: Any?) = (other as? ByteArrayKey)?.let { bak ->
array contentEquals bak.array
} ?: false
override fun hashCode() = Arrays.hashCode(array)
}
fun getBucketByAddress(address : InetSocketAddress) : Bucket? {
return bucketsByAddress?.getBucket(address)
}
fun getBucketByUser(user : Configuration.User) = bucketsByUser[user]
fun getBucketByGroup(group : Configuration.Group) = bucketsByGroup[group]
companion object {
fun from(cfg : Configuration) : BucketManager {
val bucketsByUser = cfg.users.values.asSequence().map { user ->
val buckets = (
user.quota
?.let { quota ->
sequenceOf(quota)
} ?: user.groups.asSequence()
.mapNotNull(Configuration.Group::getUserQuota)
).map { quota ->
Bucket.local(
quota.maxAvailableCalls,
quota.calls,
quota.period,
quota.initialAvailableCalls
)
}.toList()
user to buckets
}.toMap()
val bucketsByGroup = cfg.groups.values.asSequence().filter {
it.groupQuota != null
}.map { group ->
val quota = group.groupQuota
val bucket = Bucket.local(
quota.maxAvailableCalls,
quota.calls,
quota.period,
quota.initialAvailableCalls
)
group to bucket
}.toMap()
return BucketManager(
bucketsByUser,
bucketsByGroup,
cfg.users[""]?.quota?.let { anonymousUserQuota ->
Function {
Bucket.local(
anonymousUserQuota.maxAvailableCalls,
anonymousUserQuota.calls,
anonymousUserQuota.period,
anonymousUserQuota.initialAvailableCalls
)
}
}
)
}
}
}

View File

@@ -0,0 +1,99 @@
package net.woggioni.rbcs.server.throttling
import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.handler.codec.http.DefaultFullHttpResponse
import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpVersion
import net.woggioni.rbcs.api.Configuration
import net.woggioni.rbcs.common.contextLogger
import net.woggioni.rbcs.server.RemoteBuildCacheServer
import net.woggioni.jwo.Bucket
import net.woggioni.jwo.LongMath
import java.net.InetSocketAddress
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeUnit
@Sharable
class ThrottlingHandler(cfg: Configuration) :
ChannelInboundHandlerAdapter() {
private val log = contextLogger()
private val bucketManager = BucketManager.from(cfg)
private val connectionConfiguration = cfg.connection
/**
* If the suggested waiting time from the bucket is lower than this
* amount, then the server will simply wait by itself before sending a response
* instead of replying with 429
*/
private val waitThreshold = minOf(
connectionConfiguration.idleTimeout,
connectionConfiguration.readIdleTimeout,
connectionConfiguration.writeIdleTimeout
).dividedBy(2)
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
val buckets = mutableListOf<Bucket>()
val user = ctx.channel().attr(RemoteBuildCacheServer.userAttribute).get()
if (user != null) {
bucketManager.getBucketByUser(user)?.let(buckets::addAll)
}
val groups = ctx.channel().attr(RemoteBuildCacheServer.groupAttribute).get() ?: emptySet()
if (groups.isNotEmpty()) {
groups.forEach { group ->
bucketManager.getBucketByGroup(group)?.let(buckets::add)
}
}
if (user == null && groups.isEmpty()) {
bucketManager.getBucketByAddress(ctx.channel().remoteAddress() as InetSocketAddress)?.let(buckets::add)
}
if (buckets.isEmpty()) {
return super.channelRead(ctx, msg)
} else {
handleBuckets(buckets, ctx, msg, true)
}
}
private fun handleBuckets(buckets : List<Bucket>, ctx : ChannelHandlerContext, msg : Any, delayResponse : Boolean) {
var nextAttempt = -1L
for (bucket in buckets) {
val bucketNextAttempt = bucket.removeTokensWithEstimate(1)
if (bucketNextAttempt > nextAttempt) {
nextAttempt = bucketNextAttempt
}
}
if(nextAttempt < 0) {
super.channelRead(ctx, msg)
return
}
val waitDuration = Duration.of(LongMath.ceilDiv(nextAttempt, 100_000_000L) * 100L, ChronoUnit.MILLIS)
if (delayResponse && waitDuration < waitThreshold) {
ctx.executor().schedule({
handleBuckets(buckets, ctx, msg, false)
}, waitDuration.toMillis(), TimeUnit.MILLISECONDS)
} else {
sendThrottledResponse(ctx, waitDuration)
}
}
private fun sendThrottledResponse(ctx: ChannelHandlerContext, retryAfter: Duration) {
val response = DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
HttpResponseStatus.TOO_MANY_REQUESTS
)
response.headers()[HttpHeaderNames.CONTENT_LENGTH] = 0
retryAfter.seconds.takeIf {
it > 0
}?.let {
response.headers()[HttpHeaderNames.RETRY_AFTER] = retryAfter.seconds
}
ctx.writeAndFlush(response)
}
}

View File

@@ -0,0 +1,2 @@
net.woggioni.rbcs.server.cache.FileSystemCacheProvider
net.woggioni.rbcs.server.cache.InMemoryCacheProvider

View File

@@ -0,0 +1,19 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<rbcs:server
xmlns:xs="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rbcs="urn:net.woggioni.rbcs.server"
xs:schemaLocation="urn:net.woggioni.rbcs.server jpms://net.woggioni.rbcs.server/net/woggioni/rbcs/server/schema/rbcs.xsd">
<bind host="127.0.0.1" port="8080" incoming-connections-backlog-size="1024"/>
<connection
max-request-size="67108864"
idle-timeout="PT30S"
read-timeout="PT10S"
write-timeout="PT10S"
read-idle-timeout="PT60S"
write-idle-timeout="PT60S"/>
<event-executor use-virtual-threads="true"/>
<cache xs:type="rbcs:fileSystemCacheType" path="/tmp/rbcs" max-age="P7D"/>
<authentication>
<none/>
</authentication>
</rbcs:server>

View File

@@ -0,0 +1,224 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<xs:schema targetNamespace="urn:net.woggioni.rbcs.server"
xmlns:xs="http://www.w3.org/2001/XMLSchema"
xmlns:rbcs="urn:net.woggioni.rbcs.server"
elementFormDefault="unqualified">
<xs:element name="server" type="rbcs:serverType"/>
<xs:complexType name="serverType">
<xs:sequence minOccurs="0">
<xs:element name="bind" type="rbcs:bindType" maxOccurs="1"/>
<xs:element name="connection" type="rbcs:connectionType" minOccurs="0" maxOccurs="1"/>
<xs:element name="event-executor" type="rbcs:eventExecutorType" minOccurs="0" maxOccurs="1"/>
<xs:element name="cache" type="rbcs:cacheType" maxOccurs="1"/>
<xs:element name="authorization" type="rbcs:authorizationType" minOccurs="0">
<xs:key name="userId">
<xs:selector xpath="users/user"/>
<xs:field xpath="@name"/>
</xs:key>
<xs:keyref name="userRef" refer="rbcs:userId">
<xs:selector xpath="groups/group/users/user"/>
<xs:field xpath="@ref"/>
</xs:keyref>
</xs:element>
<xs:element name="authentication" type="rbcs:authenticationType" minOccurs="0" maxOccurs="1"/>
<xs:element name="tls" type="rbcs:tlsType" minOccurs="0" maxOccurs="1"/>
</xs:sequence>
<xs:attribute name="path" type="xs:string" use="optional"/>
</xs:complexType>
<xs:complexType name="bindType">
<xs:attribute name="host" type="xs:token" use="required"/>
<xs:attribute name="port" type="xs:unsignedShort" use="required"/>
<xs:attribute name="incoming-connections-backlog-size" type="xs:unsignedInt" use="optional" default="1024"/>
</xs:complexType>
<xs:complexType name="connectionType">
<xs:attribute name="read-timeout" type="xs:duration" use="optional" default="PT0S"/>
<xs:attribute name="write-timeout" type="xs:duration" use="optional" default="PT0S"/>
<xs:attribute name="idle-timeout" type="xs:duration" use="optional" default="PT30S"/>
<xs:attribute name="read-idle-timeout" type="xs:duration" use="optional" default="PT60S"/>
<xs:attribute name="write-idle-timeout" type="xs:duration" use="optional" default="PT60S"/>
<xs:attribute name="max-request-size" type="xs:unsignedInt" use="optional" default="67108864"/>
</xs:complexType>
<xs:complexType name="eventExecutorType">
<xs:attribute name="use-virtual-threads" type="xs:boolean" use="optional" default="true"/>
</xs:complexType>
<xs:complexType name="cacheType" abstract="true"/>
<xs:complexType name="inMemoryCacheType">
<xs:complexContent>
<xs:extension base="rbcs:cacheType">
<xs:attribute name="max-age" type="xs:duration" default="P1D"/>
<xs:attribute name="max-size" type="xs:token" default="0x1000000"/>
<xs:attribute name="digest" type="xs:token" default="MD5"/>
<xs:attribute name="enable-compression" type="xs:boolean" default="true"/>
<xs:attribute name="compression-level" type="xs:byte" default="-1"/>
</xs:extension>
</xs:complexContent>
</xs:complexType>
<xs:complexType name="fileSystemCacheType">
<xs:complexContent>
<xs:extension base="rbcs:cacheType">
<xs:attribute name="path" type="xs:string" use="required"/>
<xs:attribute name="max-age" type="xs:duration" default="P1D"/>
<xs:attribute name="digest" type="xs:token" default="MD5"/>
<xs:attribute name="enable-compression" type="xs:boolean" default="true"/>
<xs:attribute name="compression-level" type="xs:byte" default="-1"/>
</xs:extension>
</xs:complexContent>
</xs:complexType>
<xs:complexType name="tlsCertificateAuthorizationType">
<xs:sequence>
<xs:element name="group-extractor" type="rbcs:X500NameExtractorType" minOccurs="0"/>
<xs:element name="user-extractor" type="rbcs:X500NameExtractorType" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="X500NameExtractorType">
<xs:attribute name="attribute-name" type="xs:token"/>
<xs:attribute name="pattern" type="xs:token"/>
</xs:complexType>
<xs:complexType name="authorizationType">
<xs:all>
<xs:element name="users" type="rbcs:usersType"/>
<xs:element name="groups" type="rbcs:groupsType">
<xs:unique name="groupKey">
<xs:selector xpath="group"/>
<xs:field xpath="@name"/>
</xs:unique>
</xs:element>
</xs:all>
</xs:complexType>
<xs:complexType name="authenticationType">
<xs:choice>
<xs:element name="basic"/>
<xs:element name="client-certificate" type="rbcs:tlsCertificateAuthorizationType"/>
<xs:element name="none"/>
</xs:choice>
</xs:complexType>
<xs:complexType name="quotaType">
<xs:attribute name="calls" type="xs:positiveInteger" use="required"/>
<xs:attribute name="period" type="xs:duration" use="required"/>
<xs:attribute name="max-available-calls" type="xs:positiveInteger" use="optional"/>
<xs:attribute name="initial-available-calls" type="xs:unsignedInt" use="optional"/>
</xs:complexType>
<xs:complexType name="anonymousUserType">
<xs:sequence>
<xs:element name="quota" type="rbcs:quotaType" minOccurs="0" maxOccurs="1"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="userType">
<xs:sequence>
<xs:element name="quota" type="rbcs:quotaType" minOccurs="0" maxOccurs="1"/>
</xs:sequence>
<xs:attribute name="name" type="xs:token" use="required"/>
<xs:attribute name="password" type="xs:string" use="optional"/>
</xs:complexType>
<xs:complexType name="usersType">
<xs:sequence>
<xs:element name="user" type="rbcs:userType" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="anonymous" type="rbcs:anonymousUserType" minOccurs="0" maxOccurs="1"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="groupsType">
<xs:sequence>
<xs:element name="group" type="rbcs:groupType" maxOccurs="unbounded" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="groupType">
<xs:sequence>
<xs:element name="users" type="rbcs:userRefsType" maxOccurs="1" minOccurs="0">
<xs:unique name="userRefWriterKey">
<xs:selector xpath="user"/>
<xs:field xpath="@ref"/>
</xs:unique>
</xs:element>
<xs:element name="roles" type="rbcs:rolesType" maxOccurs="1" minOccurs="0"/>
<xs:element name="user-quota" type="rbcs:quotaType" minOccurs="0" maxOccurs="1"/>
<xs:element name="group-quota" type="rbcs:quotaType" minOccurs="0" maxOccurs="1"/>
</xs:sequence>
<xs:attribute name="name" type="xs:token"/>
</xs:complexType>
<xs:simpleType name="role" final="restriction" >
<xs:restriction base="xs:token">
<xs:enumeration value="READER" />
<xs:enumeration value="WRITER" />
</xs:restriction>
</xs:simpleType>
<xs:complexType name="rolesType">
<xs:sequence>
<xs:choice maxOccurs="unbounded">
<xs:element name="writer"/>
<xs:element name="reader"/>
</xs:choice>
</xs:sequence>
</xs:complexType>
<xs:complexType name="userRefsType">
<xs:sequence>
<xs:element name="user" type="rbcs:userRefType" maxOccurs="unbounded" minOccurs="0"/>
<xs:element name="anonymous" minOccurs="0" maxOccurs="1"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="userRefType">
<xs:attribute name="ref" type="xs:string" use="required"/>
</xs:complexType>
<xs:complexType name="tlsType">
<xs:all>
<xs:element name="keystore" type="rbcs:keyStoreType" />
<xs:element name="truststore" type="rbcs:trustStoreType" minOccurs="0"/>
</xs:all>
</xs:complexType>
<xs:complexType name="keyStoreType">
<xs:attribute name="file" type="xs:string" use="required"/>
<xs:attribute name="password" type="xs:string"/>
<xs:attribute name="key-alias" type="xs:string" use="required"/>
<xs:attribute name="key-password" type="xs:string"/>
</xs:complexType>
<xs:complexType name="trustStoreType">
<xs:attribute name="file" type="xs:string" use="required"/>
<xs:attribute name="password" type="xs:string"/>
<xs:attribute name="check-certificate-status" type="xs:boolean"/>
<xs:attribute name="require-client-certificate" type="xs:boolean" use="optional" default="false"/>
</xs:complexType>
<xs:complexType name="propertiesType">
<xs:sequence>
<xs:element maxOccurs="unbounded" minOccurs="0" name="property" type="rbcs:propertyType"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="propertyType">
<xs:simpleContent>
<xs:extension base="xs:string">
<xs:attribute name="key" type="xs:string" use="required"/>
</xs:extension>
</xs:simpleContent>
</xs:complexType>
<xs:complexType name="hostAndPortType">
<xs:attribute name="host" type="xs:string" use="required"/>
<xs:attribute name="port" type="xs:unsignedShort" use="required"/>
</xs:complexType>
</xs:schema>