forked from woggioni/rbcs
updated Netty version
This commit is contained in:
@@ -12,7 +12,6 @@ import io.netty.channel.ChannelInitializer
|
||||
import io.netty.channel.ChannelOption
|
||||
import io.netty.channel.ChannelPromise
|
||||
import io.netty.channel.DefaultFileRegion
|
||||
import io.netty.channel.EventLoopGroup
|
||||
import io.netty.channel.SimpleChannelInboundHandler
|
||||
import io.netty.channel.nio.NioEventLoopGroup
|
||||
import io.netty.channel.socket.nio.NioServerSocketChannel
|
||||
@@ -69,7 +68,6 @@ import java.security.PrivateKey
|
||||
import java.security.cert.X509Certificate
|
||||
import java.util.Arrays
|
||||
import java.util.Base64
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.regex.Matcher
|
||||
import java.util.regex.Pattern
|
||||
import javax.naming.ldap.LdapName
|
||||
@@ -178,40 +176,39 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
|
||||
}
|
||||
}
|
||||
|
||||
private class ServerInitializer(private val cfg: Configuration) : ChannelInitializer<Channel>() {
|
||||
|
||||
private fun createSslCtx(tls: Configuration.Tls): SslContext {
|
||||
val keyStore = tls.keyStore
|
||||
return if (keyStore == null) {
|
||||
throw IllegalArgumentException("No keystore configured")
|
||||
} else {
|
||||
val javaKeyStore = loadKeystore(keyStore.file, keyStore.password)
|
||||
val serverKey = javaKeyStore.getKey(
|
||||
keyStore.keyAlias, keyStore.keyPassword?.let(String::toCharArray)
|
||||
) as PrivateKey
|
||||
val serverCert: Array<X509Certificate> =
|
||||
Arrays.stream(javaKeyStore.getCertificateChain(keyStore.keyAlias))
|
||||
.map { it as X509Certificate }
|
||||
.toArray { size -> Array<X509Certificate?>(size) { null } }
|
||||
SslContextBuilder.forServer(serverKey, *serverCert).apply {
|
||||
if (tls.isVerifyClients) {
|
||||
clientAuth(ClientAuth.OPTIONAL)
|
||||
val trustStore = tls.trustStore
|
||||
if (trustStore != null) {
|
||||
val ts = loadKeystore(trustStore.file, trustStore.password)
|
||||
trustManager(
|
||||
ClientCertificateValidator.getTrustManager(ts, trustStore.isCheckCertificateStatus)
|
||||
)
|
||||
}
|
||||
}
|
||||
}.build()
|
||||
}
|
||||
}
|
||||
|
||||
private val sslContext: SslContext? = cfg.tls?.let(this::createSslCtx)
|
||||
private val group: EventExecutorGroup = DefaultEventExecutorGroup(Runtime.getRuntime().availableProcessors())
|
||||
private class ServerInitializer(
|
||||
private val cfg: Configuration,
|
||||
private val eventExecutorGroup: EventExecutorGroup
|
||||
) : ChannelInitializer<Channel>() {
|
||||
|
||||
companion object {
|
||||
private fun createSslCtx(tls: Configuration.Tls): SslContext {
|
||||
val keyStore = tls.keyStore
|
||||
return if (keyStore == null) {
|
||||
throw IllegalArgumentException("No keystore configured")
|
||||
} else {
|
||||
val javaKeyStore = loadKeystore(keyStore.file, keyStore.password)
|
||||
val serverKey = javaKeyStore.getKey(
|
||||
keyStore.keyAlias, keyStore.keyPassword?.let(String::toCharArray)
|
||||
) as PrivateKey
|
||||
val serverCert: Array<X509Certificate> =
|
||||
Arrays.stream(javaKeyStore.getCertificateChain(keyStore.keyAlias))
|
||||
.map { it as X509Certificate }
|
||||
.toArray { size -> Array<X509Certificate?>(size) { null } }
|
||||
SslContextBuilder.forServer(serverKey, *serverCert).apply {
|
||||
if (tls.isVerifyClients) {
|
||||
clientAuth(ClientAuth.OPTIONAL)
|
||||
val trustStore = tls.trustStore
|
||||
if (trustStore != null) {
|
||||
val ts = loadKeystore(trustStore.file, trustStore.password)
|
||||
trustManager(
|
||||
ClientCertificateValidator.getTrustManager(ts, trustStore.isCheckCertificateStatus)
|
||||
)
|
||||
}
|
||||
}
|
||||
}.build()
|
||||
}
|
||||
}
|
||||
|
||||
fun loadKeystore(file: Path, password: String?): KeyStore {
|
||||
val ext = JWO.splitExtension(file)
|
||||
@@ -235,6 +232,8 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
|
||||
}
|
||||
}
|
||||
|
||||
private val sslContext: SslContext? = cfg.tls?.let(Companion::createSslCtx)
|
||||
|
||||
private fun userExtractor(authentication: Configuration.ClientCertificateAuthentication) =
|
||||
authentication.userExtractor?.let { extractor ->
|
||||
val pattern = Pattern.compile(extractor.pattern)
|
||||
@@ -294,7 +293,7 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
|
||||
}
|
||||
val cacheImplementation = cfg.cache.materialize()
|
||||
val prefix = Path.of("/").resolve(Path.of(cfg.serverPath ?: "/"))
|
||||
pipeline.addLast(group, ServerHandler(cacheImplementation, prefix))
|
||||
pipeline.addLast(eventExecutorGroup, ServerHandler(cacheImplementation, prefix))
|
||||
pipeline.addLast(ExceptionHandler())
|
||||
}
|
||||
}
|
||||
@@ -446,8 +445,7 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
|
||||
|
||||
class ServerHandle(
|
||||
httpChannelFuture: ChannelFuture,
|
||||
private val bossGroup: EventLoopGroup,
|
||||
private val workerGroup: EventLoopGroup
|
||||
private val executorGroups : Iterable<EventExecutorGroup>
|
||||
) : AutoCloseable {
|
||||
private val httpChannel: Channel = httpChannelFuture.channel()
|
||||
|
||||
@@ -461,31 +459,32 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
|
||||
try {
|
||||
closeFuture.sync()
|
||||
} finally {
|
||||
val fut1 = workerGroup.shutdownGracefully()
|
||||
val fut2 = if (bossGroup !== workerGroup) {
|
||||
bossGroup.shutdownGracefully()
|
||||
} else null
|
||||
fut1.sync()
|
||||
fut2?.sync()
|
||||
executorGroups.forEach {
|
||||
it.shutdownGracefully().sync()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun run(): ServerHandle {
|
||||
// Create the multithreaded event loops for the server
|
||||
val bossGroup = NioEventLoopGroup(0, Executors.defaultThreadFactory())
|
||||
val bossGroup = NioEventLoopGroup(0)
|
||||
val serverSocketChannel = NioServerSocketChannel::class.java
|
||||
val workerGroup = if (cfg.isUseVirtualThread) {
|
||||
NioEventLoopGroup(0, Executors.newVirtualThreadPerTaskExecutor())
|
||||
} else {
|
||||
bossGroup
|
||||
val workerGroup = bossGroup
|
||||
val eventExecutorGroup = run {
|
||||
val threadFactory = if(cfg.isUseVirtualThread) {
|
||||
Thread.ofVirtual().factory()
|
||||
} else {
|
||||
null
|
||||
}
|
||||
DefaultEventExecutorGroup(Runtime.getRuntime().availableProcessors(), threadFactory)
|
||||
}
|
||||
// A helper class that simplifies server configuration
|
||||
val bootstrap = ServerBootstrap().apply {
|
||||
// Configure the server
|
||||
group(bossGroup, workerGroup)
|
||||
channel(serverSocketChannel)
|
||||
childHandler(ServerInitializer(cfg))
|
||||
childHandler(ServerInitializer(cfg, eventExecutorGroup))
|
||||
option(ChannelOption.SO_BACKLOG, 128)
|
||||
childOption(ChannelOption.SO_KEEPALIVE, true)
|
||||
}
|
||||
@@ -494,7 +493,7 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
|
||||
// Bind and start to accept incoming connections.
|
||||
val bindAddress = InetSocketAddress(cfg.host, cfg.port)
|
||||
val httpChannel = bootstrap.bind(bindAddress).sync()
|
||||
return ServerHandle(httpChannel, bossGroup, workerGroup)
|
||||
return ServerHandle(httpChannel, setOf(bossGroup, workerGroup, eventExecutorGroup))
|
||||
}
|
||||
|
||||
companion object {
|
||||
@@ -508,7 +507,7 @@ class GradleBuildCacheServer(private val cfg: Configuration) {
|
||||
return Parser.parse(doc)
|
||||
}
|
||||
|
||||
fun dumpConfiguration(conf : Configuration, outputStream: OutputStream) {
|
||||
fun dumpConfiguration(conf: Configuration, outputStream: OutputStream) {
|
||||
Xml.write(Serializer.serialize(conf), outputStream)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user