Compare commits
1 Commits
Author | SHA1 | Date | |
---|---|---|---|
c19bc9e91e
|
20
LICENSE
Normal file
20
LICENSE
Normal file
@@ -0,0 +1,20 @@
|
||||
The MIT License (MIT)
|
||||
|
||||
Copyright (c) 2017 Y. T. CHUNG <zonyitoo@gmail.com>
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
this software and associated documentation files (the "Software"), to deal in
|
||||
the Software without restriction, including without limitation the rights to
|
||||
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
|
||||
the Software, and to permit persons to whom the Software is furnished to do so,
|
||||
subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
||||
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
||||
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
||||
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
110
README.md
110
README.md
@@ -0,0 +1,110 @@
|
||||
# Remote Build Cache Server
|
||||
Remote Build Cache Server (shortened to RBCS) allows you to share and reuse unchanged build
|
||||
and test outputs across the team. This speeds up local and CI builds since cycles are not wasted
|
||||
re-building components that are unaffected by new code changes. RBCS supports both Gradle and
|
||||
Maven build tool environments.
|
||||
|
||||
## Getting Started
|
||||
|
||||
### Downloading the jar file
|
||||
You can download the latest version from [this link](https://gitea.woggioni.net/woggioni/-/packages/maven/net.woggioni-rbcs-cli/)
|
||||
|
||||
If you want to use memcache as a storage backend you'll also need to download [the memcache plugin](https://gitea.woggioni.net/woggioni/-/packages/maven/net.woggioni-rbcs-server-memcache/)
|
||||
|
||||
### Using the Docker image
|
||||
You can pull the latest Docker image with
|
||||
```bash
|
||||
docker pull gitea.woggioni.net/woggioni/rbcs:latest
|
||||
```
|
||||
|
||||
## Usage
|
||||
## Configuration
|
||||
### Using RBCS with Gradle
|
||||
|
||||
```groovy
|
||||
buildCache {
|
||||
remote(HttpBuildCache) {
|
||||
url = 'https://rbcs.example.com/'
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Using RBCS with Maven
|
||||
|
||||
Read [here](https://maven.apache.org/extensions/maven-build-cache-extension/remote-cache.html)
|
||||
|
||||
## FAQ
|
||||
### Why should I use a build cache?
|
||||
|
||||
#### Build Caches Improve Build & Test Performance
|
||||
|
||||
Building software consists of a number of steps, like compiling sources, executing tests, and linking binaries. We’ve seen that a binary artifact repository helps when such a step requires an external component by downloading the artifact from the repository rather than building it locally.
|
||||
However, there are many additional steps in this build process which can be optimized to reduce the build time. An obvious strategy is to avoid executing build steps which dominate the total build time when these build steps are not needed.
|
||||
Most build times are dominated by the testing step.
|
||||
|
||||
While binary repositories cannot capture the outcome of a test build step (only the test reports
|
||||
when included in binary artifacts), build caches are designed to eliminate redundant executions
|
||||
for every build step. Moreover, it generalizes the concept of avoiding work associated with any
|
||||
incremental step of the build, including test execution, compilation and resource processing.
|
||||
The mechanism itself is comparable to a pure function. That is, given some inputs such as source
|
||||
files and environment parameters we know that the output is always going to be the same.
|
||||
As a result, we can cache it and retrieve it based on a simple cryptographic hash of the inputs.
|
||||
Build caching is supported natively by some build tools.
|
||||
|
||||
#### Improve CI builds with a remote build cache
|
||||
|
||||
When analyzing the role of a build cache it is important to take into account the granularity
|
||||
of the changes that it caches. Imagine a full build for a project with 40 to 50 modules
|
||||
which fails at the last step (deployment) because the staging environment is temporarily unavailable.
|
||||
Although the vast majority of the build steps (potentially thousands) succeed,
|
||||
the change can not be deployed to the staging environment.
|
||||
Without a build cache one typically relies on a very complex CI configuration to reuse build step outputs
|
||||
or would have to repeat the full build once the environment is available.
|
||||
|
||||
Some build tools don’t support incremental builds properly. For example, outputs of a build started
|
||||
from scratch may vary when compared to subsequent builds that rely on the initial build’s output.
|
||||
As a result, to preserve build integrity, it’s crucial to rebuild from scratch, or ‘cleanly,’ in this
|
||||
scenario.
|
||||
|
||||
With a build cache, only the last step needs to be executed and the build can be re-triggered
|
||||
when the environment is back online. This automatically saves all of the time and
|
||||
resources required across the different build steps which were successfully executed.
|
||||
Instead of executing the intermediate steps, the build tool pulls the outputs from the build cache,
|
||||
avoiding a lot of redundant work
|
||||
|
||||
#### Share outputs with a remote build cache
|
||||
|
||||
One of the most important advantages of a remote build cache is the ability to share build outputs.
|
||||
In most CI configurations, for example, a number of pipelines are created.
|
||||
These may include one for building the sources, one for testing, one for publishing the outcomes
|
||||
to a remote repository, and other pipelines to test on different platforms.
|
||||
There are even situations where CI builds partially build a project (i.e. some modules and not others).
|
||||
|
||||
Most of those pipelines share a lot of intermediate build steps. All builds which perform testing
|
||||
require the binaries to be ready. All publishing builds require all previous steps to be executed.
|
||||
And because modern CI infrastructure means executing everything in containerized (isolated) environments,
|
||||
significant resources are wasted by repeatedly building the same intermediate artifacts.
|
||||
|
||||
A remote build cache greatly reduces this overhead by orders of magnitudes because it provides a way
|
||||
for all those pipelines to share their outputs. After all, there is no point recreating an output that
|
||||
is already available in the cache.
|
||||
|
||||
Because there are inherent dependencies between software components of a build,
|
||||
introducing a build cache dramatically reduces the impact of exploding a component into multiple pieces,
|
||||
allowing for increased modularity without increased overhead.
|
||||
|
||||
#### Make local developers more efficient with remote build caches
|
||||
|
||||
It is common for different teams within a company to work on different modules of a single large
|
||||
application. In this case, most teams don’t care about building the other parts of the software.
|
||||
By introducing a remote cache developers immediately benefit from pre-built artifacts when checking out code.
|
||||
Because it has already been built on CI, they don’t have to do it locally.
|
||||
|
||||
Introducing a remote cache is a huge benefit for those developers. Consider that a typical developer’s
|
||||
day begins by performing a code checkout. Most likely the checked out code has already been built on CI.
|
||||
Therefore, no time is wasted running the first build of the day. The remote cache provides all of the
|
||||
intermediate artifacts needed. And, in the event local changes are made, the remote cache still leverages
|
||||
partial cache hits for projects which are independent. As other developers in the organization request
|
||||
CI builds, the remote cache continues to populate, increasing the likelihood of these remote cache hits
|
||||
across team members.
|
||||
|
||||
|
@@ -2,10 +2,11 @@ org.gradle.configuration-cache=false
|
||||
org.gradle.parallel=true
|
||||
org.gradle.caching=true
|
||||
|
||||
rbcs.version = 0.1.6
|
||||
rbcs.version = 0.1.4
|
||||
|
||||
lys.version = 2025.02.08
|
||||
lys.version = 2025.02.05
|
||||
|
||||
gitea.maven.url = https://gitea.woggioni.net/api/packages/woggioni/maven
|
||||
docker.registry.url=gitea.woggioni.net
|
||||
|
||||
jpms-check.configurationName = runtimeClasspath
|
||||
|
@@ -44,6 +44,7 @@ envelopeJar {
|
||||
dependencies {
|
||||
implementation catalog.jwo
|
||||
implementation catalog.slf4j.api
|
||||
implementation catalog.netty.codec.http
|
||||
implementation catalog.picocli
|
||||
|
||||
implementation project(':rbcs-client')
|
||||
|
@@ -6,11 +6,9 @@ plugins {
|
||||
dependencies {
|
||||
implementation project(':rbcs-api')
|
||||
implementation project(':rbcs-common')
|
||||
implementation catalog.picocli
|
||||
implementation catalog.slf4j.api
|
||||
implementation catalog.netty.buffer
|
||||
implementation catalog.netty.handler
|
||||
implementation catalog.netty.transport
|
||||
implementation catalog.netty.common
|
||||
implementation catalog.netty.codec.http
|
||||
|
||||
testRuntimeOnly catalog.logback.classic
|
||||
|
@@ -45,9 +45,9 @@ import java.time.Duration
|
||||
import java.util.Base64
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import kotlin.random.Random
|
||||
import io.netty.util.concurrent.Future as NettyFuture
|
||||
|
||||
|
||||
class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoCloseable {
|
||||
private val group: NioEventLoopGroup
|
||||
private var sslContext: SslContext
|
||||
@@ -206,7 +206,6 @@ class RemoteBuildCacheClient(private val profile: Configuration.Profile) : AutoC
|
||||
retryPolicy.initialDelayMillis.toDouble(),
|
||||
retryPolicy.exp,
|
||||
outcomeHandler,
|
||||
Random.Default,
|
||||
operation
|
||||
)
|
||||
} else {
|
||||
|
@@ -3,8 +3,6 @@ package net.woggioni.rbcs.client
|
||||
import io.netty.util.concurrent.EventExecutorGroup
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import java.util.concurrent.TimeUnit
|
||||
import kotlin.math.pow
|
||||
import kotlin.random.Random
|
||||
|
||||
sealed class OperationOutcome<T> {
|
||||
class Success<T>(val result: T) : OperationOutcome<T>()
|
||||
@@ -26,10 +24,8 @@ fun <T> executeWithRetry(
|
||||
initialDelay: Double,
|
||||
exp: Double,
|
||||
outcomeHandler: OutcomeHandler<T>,
|
||||
randomizer : Random?,
|
||||
cb: () -> CompletableFuture<T>
|
||||
): CompletableFuture<T> {
|
||||
|
||||
val finalResult = cb()
|
||||
var future = finalResult
|
||||
var shortCircuit = false
|
||||
@@ -50,7 +46,7 @@ fun <T> executeWithRetry(
|
||||
is OutcomeHandlerResult.Retry -> {
|
||||
val res = CompletableFuture<T>()
|
||||
val delay = run {
|
||||
val scheduledDelay = (initialDelay * exp.pow(i.toDouble()) * (1.0 + (randomizer?.nextDouble(-0.5, 0.5) ?: 0.0))).toLong()
|
||||
val scheduledDelay = (initialDelay * Math.pow(exp, i.toDouble())).toLong()
|
||||
outcomeHandlerResult.suggestedDelayMillis?.coerceAtMost(scheduledDelay) ?: scheduledDelay
|
||||
}
|
||||
eventExecutorGroup.schedule({
|
||||
|
@@ -89,7 +89,7 @@ class RetryTest {
|
||||
val random = Random(testArgs.seed)
|
||||
|
||||
val future =
|
||||
executeWithRetry(executor, testArgs.maxAttempt, testArgs.initialDelay, testArgs.exp, outcomeHandler, null) {
|
||||
executeWithRetry(executor, testArgs.maxAttempt, testArgs.initialDelay, testArgs.exp, outcomeHandler) {
|
||||
val now = System.nanoTime()
|
||||
val result = CompletableFuture<Int>()
|
||||
executor.submit {
|
||||
@@ -129,7 +129,7 @@ class RetryTest {
|
||||
previousAttempt.first + testArgs.initialDelay * Math.pow(testArgs.exp, index.toDouble()) * 1e6
|
||||
val actualTimestamp = timestamp
|
||||
val err = Math.abs(expectedTimestamp - actualTimestamp) / expectedTimestamp
|
||||
Assertions.assertTrue(err < 1e-2)
|
||||
Assertions.assertTrue(err < 1e-3)
|
||||
}
|
||||
if (index == attempts.size - 1 && index < testArgs.maxAttempt - 1) {
|
||||
/*
|
||||
|
@@ -12,24 +12,6 @@ object RBCS {
|
||||
const val RBCS_PREFIX: String = "rbcs"
|
||||
const val XML_SCHEMA_NAMESPACE_URI = "http://www.w3.org/2001/XMLSchema-instance"
|
||||
|
||||
fun ByteArray.toInt(index : Int = 0) : Long {
|
||||
if(index + 4 > size) throw IllegalArgumentException("Not enough bytes to decode a 32 bits integer")
|
||||
var value : Long = 0
|
||||
for (b in index until index + 4) {
|
||||
value = (value shl 8) + (get(b).toInt() and 0xFF)
|
||||
}
|
||||
return value
|
||||
}
|
||||
|
||||
fun ByteArray.toLong(index : Int = 0) : Long {
|
||||
if(index + 8 > size) throw IllegalArgumentException("Not enough bytes to decode a 64 bits long integer")
|
||||
var value : Long = 0
|
||||
for (b in index until index + 8) {
|
||||
value = (value shl 8) + (get(b).toInt() and 0xFF)
|
||||
}
|
||||
return value
|
||||
}
|
||||
|
||||
fun digest(
|
||||
data: ByteArray,
|
||||
md: MessageDigest = MessageDigest.getInstance("MD5")
|
||||
|
@@ -9,9 +9,6 @@ dependencies {
|
||||
implementation catalog.jwo
|
||||
implementation catalog.slf4j.api
|
||||
implementation catalog.netty.codec.http
|
||||
implementation catalog.netty.handler
|
||||
implementation catalog.netty.buffer
|
||||
implementation catalog.netty.transport
|
||||
|
||||
api project(':rbcs-common')
|
||||
api project(':rbcs-api')
|
||||
@@ -39,4 +36,3 @@ publishing {
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
@@ -30,13 +30,11 @@ import io.netty.handler.timeout.IdleStateHandler
|
||||
import io.netty.util.AttributeKey
|
||||
import io.netty.util.concurrent.DefaultEventExecutorGroup
|
||||
import io.netty.util.concurrent.EventExecutorGroup
|
||||
import net.woggioni.jwo.JWO
|
||||
import net.woggioni.jwo.Tuple2
|
||||
import net.woggioni.rbcs.api.Configuration
|
||||
import net.woggioni.rbcs.api.exception.ConfigurationException
|
||||
import net.woggioni.rbcs.common.RBCS.toUrl
|
||||
import net.woggioni.rbcs.common.PasswordSecurity.decodePasswordHash
|
||||
import net.woggioni.rbcs.common.PasswordSecurity.hashPassword
|
||||
import net.woggioni.rbcs.common.RBCS.toUrl
|
||||
import net.woggioni.rbcs.common.Xml
|
||||
import net.woggioni.rbcs.common.contextLogger
|
||||
import net.woggioni.rbcs.common.debug
|
||||
@@ -50,6 +48,8 @@ import net.woggioni.rbcs.server.configuration.Serializer
|
||||
import net.woggioni.rbcs.server.exception.ExceptionHandler
|
||||
import net.woggioni.rbcs.server.handler.ServerHandler
|
||||
import net.woggioni.rbcs.server.throttling.ThrottlingHandler
|
||||
import net.woggioni.jwo.JWO
|
||||
import net.woggioni.jwo.Tuple2
|
||||
import java.io.OutputStream
|
||||
import java.net.InetSocketAddress
|
||||
import java.nio.file.Files
|
||||
@@ -59,7 +59,6 @@ import java.security.PrivateKey
|
||||
import java.security.cert.X509Certificate
|
||||
import java.util.Arrays
|
||||
import java.util.Base64
|
||||
import java.util.concurrent.Future
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.regex.Matcher
|
||||
import java.util.regex.Pattern
|
||||
@@ -129,12 +128,11 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
|
||||
val clientCertificate = peerCertificates.first() as X509Certificate
|
||||
val user = userExtractor?.extract(clientCertificate)
|
||||
val group = groupExtractor?.extract(clientCertificate)
|
||||
val allGroups =
|
||||
((user?.groups ?: emptySet()).asSequence() + sequenceOf(group).filterNotNull()).toSet()
|
||||
val allGroups = ((user?.groups ?: emptySet()).asSequence() + sequenceOf(group).filterNotNull()).toSet()
|
||||
AuthenticationResult(user, allGroups)
|
||||
} ?: anonymousUserGroups?.let { AuthenticationResult(null, it) }
|
||||
} ?: anonymousUserGroups?.let{ AuthenticationResult(null, it) }
|
||||
} catch (es: SSLPeerUnverifiedException) {
|
||||
anonymousUserGroups?.let { AuthenticationResult(null, it) }
|
||||
anonymousUserGroups?.let{ AuthenticationResult(null, it) }
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -193,7 +191,7 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
|
||||
private class ServerInitializer(
|
||||
private val cfg: Configuration,
|
||||
private val eventExecutorGroup: EventExecutorGroup
|
||||
) : ChannelInitializer<Channel>(), AutoCloseable {
|
||||
) : ChannelInitializer<Channel>() {
|
||||
|
||||
companion object {
|
||||
private fun createSslCtx(tls: Configuration.Tls): SslContext {
|
||||
@@ -215,7 +213,7 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
|
||||
trustManager(
|
||||
ClientCertificateValidator.getTrustManager(ts, trustStore.isCheckCertificateStatus)
|
||||
)
|
||||
if (trustStore.isRequireClientCertificate) ClientAuth.REQUIRE
|
||||
if(trustStore.isRequireClientCertificate) ClientAuth.REQUIRE
|
||||
else ClientAuth.OPTIONAL
|
||||
} ?: ClientAuth.NONE
|
||||
clientAuth(clientAuth)
|
||||
@@ -247,11 +245,10 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
|
||||
|
||||
private val log = contextLogger()
|
||||
|
||||
private val cache = cfg.cache.materialize()
|
||||
|
||||
private val serverHandler = let {
|
||||
val cacheImplementation = cfg.cache.materialize()
|
||||
val prefix = Path.of("/").resolve(Path.of(cfg.serverPath ?: "/"))
|
||||
ServerHandler(cache, prefix)
|
||||
ServerHandler(cacheImplementation, prefix)
|
||||
}
|
||||
|
||||
private val exceptionHandler = ExceptionHandler()
|
||||
@@ -314,7 +311,7 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
|
||||
cfg.connection.also { conn ->
|
||||
val readTimeout = conn.readTimeout.toMillis()
|
||||
val writeTimeout = conn.writeTimeout.toMillis()
|
||||
if (readTimeout > 0 || writeTimeout > 0) {
|
||||
if(readTimeout > 0 || writeTimeout > 0) {
|
||||
pipeline.addLast(
|
||||
IdleStateHandler(
|
||||
false,
|
||||
@@ -328,7 +325,7 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
|
||||
val readIdleTimeout = conn.readIdleTimeout.toMillis()
|
||||
val writeIdleTimeout = conn.writeIdleTimeout.toMillis()
|
||||
val idleTimeout = conn.idleTimeout.toMillis()
|
||||
if (readIdleTimeout > 0 || writeIdleTimeout > 0 || idleTimeout > 0) {
|
||||
if(readIdleTimeout > 0 || writeIdleTimeout > 0 || idleTimeout > 0) {
|
||||
pipeline.addLast(
|
||||
IdleStateHandler(
|
||||
true,
|
||||
@@ -343,19 +340,16 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
|
||||
pipeline.addLast(object : ChannelInboundHandlerAdapter() {
|
||||
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
|
||||
if (evt is IdleStateEvent) {
|
||||
when (evt.state()) {
|
||||
when(evt.state()) {
|
||||
IdleState.READER_IDLE -> log.debug {
|
||||
"Read timeout reached on channel ${ch.id().asShortText()}, closing the connection"
|
||||
}
|
||||
|
||||
IdleState.WRITER_IDLE -> log.debug {
|
||||
"Write timeout reached on channel ${ch.id().asShortText()}, closing the connection"
|
||||
}
|
||||
|
||||
IdleState.ALL_IDLE -> log.debug {
|
||||
"Idle timeout reached on channel ${ch.id().asShortText()}, closing the connection"
|
||||
}
|
||||
|
||||
null -> throw IllegalStateException("This should never happen")
|
||||
}
|
||||
ctx.close()
|
||||
@@ -376,41 +370,26 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
|
||||
pipeline.addLast(eventExecutorGroup, serverHandler)
|
||||
pipeline.addLast(exceptionHandler)
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
cache.close()
|
||||
}
|
||||
}
|
||||
|
||||
class ServerHandle(
|
||||
httpChannelFuture: ChannelFuture,
|
||||
private val executorGroups: Iterable<EventExecutorGroup>,
|
||||
private val serverInitializer: AutoCloseable
|
||||
private val executorGroups: Iterable<EventExecutorGroup>
|
||||
) : AutoCloseable {
|
||||
private val httpChannel: Channel = httpChannelFuture.channel()
|
||||
private val closeFuture: ChannelFuture = httpChannel.closeFuture()
|
||||
private val log = contextLogger()
|
||||
|
||||
fun shutdown(): Future<Void> {
|
||||
fun shutdown(): ChannelFuture {
|
||||
return httpChannel.close()
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
try {
|
||||
closeFuture.sync()
|
||||
} catch (ex: Throwable) {
|
||||
log.error(ex.message, ex)
|
||||
}
|
||||
try {
|
||||
serverInitializer.close()
|
||||
} catch (ex: Throwable) {
|
||||
log.error(ex.message, ex)
|
||||
}
|
||||
executorGroups.forEach {
|
||||
try {
|
||||
} finally {
|
||||
executorGroups.forEach {
|
||||
it.shutdownGracefully().sync()
|
||||
} catch (ex: Throwable) {
|
||||
log.error(ex.message, ex)
|
||||
}
|
||||
}
|
||||
log.info {
|
||||
@@ -432,12 +411,11 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
|
||||
}
|
||||
DefaultEventExecutorGroup(Runtime.getRuntime().availableProcessors(), threadFactory)
|
||||
}
|
||||
val serverInitializer = ServerInitializer(cfg, eventExecutorGroup)
|
||||
val bootstrap = ServerBootstrap().apply {
|
||||
// Configure the server
|
||||
group(bossGroup, workerGroup)
|
||||
channel(serverSocketChannel)
|
||||
childHandler(serverInitializer)
|
||||
childHandler(ServerInitializer(cfg, eventExecutorGroup))
|
||||
option(ChannelOption.SO_BACKLOG, cfg.incomingConnectionsBacklogSize)
|
||||
childOption(ChannelOption.SO_KEEPALIVE, true)
|
||||
}
|
||||
@@ -449,6 +427,6 @@ class RemoteBuildCacheServer(private val cfg: Configuration) {
|
||||
log.info {
|
||||
"RemoteBuildCacheServer is listening on ${cfg.host}:${cfg.port}"
|
||||
}
|
||||
return ServerHandle(httpChannel, setOf(bossGroup, workerGroup, eventExecutorGroup), serverInitializer)
|
||||
return ServerHandle(httpChannel, setOf(bossGroup, workerGroup, eventExecutorGroup))
|
||||
}
|
||||
}
|
||||
|
@@ -1,11 +1,12 @@
|
||||
package net.woggioni.rbcs.server.cache
|
||||
|
||||
import io.netty.buffer.ByteBuf
|
||||
import net.woggioni.jwo.JWO
|
||||
import net.woggioni.rbcs.api.Cache
|
||||
import net.woggioni.rbcs.common.ByteBufInputStream
|
||||
import net.woggioni.rbcs.common.RBCS.digestString
|
||||
import net.woggioni.rbcs.common.contextLogger
|
||||
import net.woggioni.jwo.JWO
|
||||
import net.woggioni.jwo.LockFile
|
||||
import java.nio.channels.Channels
|
||||
import java.nio.channels.FileChannel
|
||||
import java.nio.file.Files
|
||||
@@ -17,6 +18,7 @@ import java.security.MessageDigest
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
import java.util.zip.Deflater
|
||||
import java.util.zip.DeflaterOutputStream
|
||||
import java.util.zip.Inflater
|
||||
@@ -39,10 +41,7 @@ class FileSystemCache(
|
||||
Files.createDirectories(root)
|
||||
}
|
||||
|
||||
@Volatile
|
||||
private var running = true
|
||||
|
||||
private var nextGc = Instant.now()
|
||||
private var nextGc = AtomicReference(Instant.now().plus(maxAge))
|
||||
|
||||
override fun get(key: String) = (digestAlgorithm
|
||||
?.let(MessageDigest::getInstance)
|
||||
@@ -68,6 +67,8 @@ class FileSystemCache(
|
||||
FileChannel.open(file, StandardOpenOption.READ)
|
||||
}
|
||||
}
|
||||
}.also {
|
||||
gc()
|
||||
}.let {
|
||||
CompletableFuture.completedFuture(it)
|
||||
}
|
||||
@@ -97,51 +98,33 @@ class FileSystemCache(
|
||||
Files.delete(tmpFile)
|
||||
throw t
|
||||
}
|
||||
}.also {
|
||||
gc()
|
||||
}
|
||||
return CompletableFuture.completedFuture(null)
|
||||
}
|
||||
|
||||
private val garbageCollector = Thread.ofVirtual().name("file-system-cache-gc").start {
|
||||
while (running) {
|
||||
gc()
|
||||
}
|
||||
}
|
||||
|
||||
private fun gc() {
|
||||
val now = Instant.now()
|
||||
if (nextGc < now) {
|
||||
val oldestEntry = actualGc(now)
|
||||
nextGc = (oldestEntry ?: now).plus(maxAge)
|
||||
val oldValue = nextGc.getAndSet(now.plus(maxAge))
|
||||
if (oldValue < now) {
|
||||
actualGc(now)
|
||||
}
|
||||
Thread.sleep(minOf(Duration.between(now, nextGc), Duration.ofSeconds(1)))
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the creation timestamp of the oldest cache entry (if any)
|
||||
*/
|
||||
private fun actualGc(now: Instant) : Instant? {
|
||||
var result :Instant? = null
|
||||
Files.list(root)
|
||||
.filter { path ->
|
||||
JWO.splitExtension(path)
|
||||
.map { it._2 }
|
||||
.map { it != ".tmp" }
|
||||
.orElse(true)
|
||||
@Synchronized
|
||||
private fun actualGc(now: Instant) {
|
||||
Files.list(root).filter {
|
||||
val creationTimeStamp = Files.readAttributes(it, BasicFileAttributes::class.java)
|
||||
.creationTime()
|
||||
.toInstant()
|
||||
now > creationTimeStamp.plus(maxAge)
|
||||
}.forEach { file ->
|
||||
LockFile.acquire(file, false).use {
|
||||
Files.delete(file)
|
||||
}
|
||||
.filter {
|
||||
val creationTimeStamp = Files.readAttributes(it, BasicFileAttributes::class.java)
|
||||
.creationTime()
|
||||
.toInstant()
|
||||
if(result == null || creationTimeStamp < result) {
|
||||
result = creationTimeStamp
|
||||
}
|
||||
now > creationTimeStamp.plus(maxAge)
|
||||
}.forEach(Files::delete)
|
||||
return result
|
||||
}
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
running = false
|
||||
garbageCollector.join()
|
||||
}
|
||||
override fun close() {}
|
||||
}
|
@@ -1,12 +1,12 @@
|
||||
package net.woggioni.rbcs.server.cache
|
||||
|
||||
import io.netty.buffer.ByteBuf
|
||||
import net.woggioni.jwo.JWO
|
||||
import net.woggioni.rbcs.api.Cache
|
||||
import net.woggioni.rbcs.common.ByteBufInputStream
|
||||
import net.woggioni.rbcs.common.ByteBufOutputStream
|
||||
import net.woggioni.rbcs.common.RBCS.digestString
|
||||
import net.woggioni.rbcs.common.contextLogger
|
||||
import net.woggioni.jwo.JWO
|
||||
import java.nio.channels.Channels
|
||||
import java.security.MessageDigest
|
||||
import java.time.Duration
|
||||
@@ -42,11 +42,9 @@ class InMemoryCache(
|
||||
|
||||
private val removalQueue = PriorityBlockingQueue<RemovalQueueElement>()
|
||||
|
||||
@Volatile
|
||||
private var running = true
|
||||
|
||||
private val garbageCollector = Thread.ofVirtual().name("in-memory-cache-gc").start {
|
||||
while(running) {
|
||||
private val garbageCollector = Thread {
|
||||
while(true) {
|
||||
val el = removalQueue.take()
|
||||
val buf = el.value
|
||||
val now = Instant.now()
|
||||
@@ -64,6 +62,8 @@ class InMemoryCache(
|
||||
Thread.sleep(minOf(Duration.between(now, el.expiry), Duration.ofSeconds(1)))
|
||||
}
|
||||
}
|
||||
}.apply {
|
||||
start()
|
||||
}
|
||||
|
||||
private fun removeEldest() : Long {
|
||||
|
@@ -17,8 +17,6 @@ import net.woggioni.rbcs.api.exception.CacheException
|
||||
import net.woggioni.rbcs.api.exception.ContentTooLargeException
|
||||
import net.woggioni.rbcs.common.contextLogger
|
||||
import net.woggioni.rbcs.common.debug
|
||||
import java.net.SocketException
|
||||
import javax.net.ssl.SSLException
|
||||
import javax.net.ssl.SSLPeerUnverifiedException
|
||||
|
||||
@ChannelHandler.Sharable
|
||||
@@ -52,12 +50,7 @@ class ExceptionHandler : ChannelDuplexHandler() {
|
||||
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
|
||||
when (cause) {
|
||||
is DecoderException -> {
|
||||
log.debug(cause.message, cause)
|
||||
ctx.close()
|
||||
}
|
||||
|
||||
is SocketException -> {
|
||||
log.debug(cause.message, cause)
|
||||
log.error(cause.message, cause)
|
||||
ctx.close()
|
||||
}
|
||||
|
||||
@@ -66,16 +59,10 @@ class ExceptionHandler : ChannelDuplexHandler() {
|
||||
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE)
|
||||
}
|
||||
|
||||
is SSLException -> {
|
||||
log.debug(cause.message, cause)
|
||||
ctx.close()
|
||||
}
|
||||
|
||||
is ContentTooLargeException -> {
|
||||
ctx.writeAndFlush(TOO_BIG.retainedDuplicate())
|
||||
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE)
|
||||
}
|
||||
|
||||
is ReadTimeoutException -> {
|
||||
log.debug {
|
||||
val channelId = ctx.channel().id().asShortText()
|
||||
@@ -83,7 +70,6 @@ class ExceptionHandler : ChannelDuplexHandler() {
|
||||
}
|
||||
ctx.close()
|
||||
}
|
||||
|
||||
is WriteTimeoutException -> {
|
||||
log.debug {
|
||||
val channelId = ctx.channel().id().asShortText()
|
||||
@@ -91,13 +77,11 @@ class ExceptionHandler : ChannelDuplexHandler() {
|
||||
}
|
||||
ctx.close()
|
||||
}
|
||||
|
||||
is CacheException -> {
|
||||
log.error(cause.message, cause)
|
||||
ctx.writeAndFlush(NOT_AVAILABLE.retainedDuplicate())
|
||||
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE)
|
||||
}
|
||||
|
||||
else -> {
|
||||
log.error(cause.message, cause)
|
||||
ctx.writeAndFlush(SERVER_ERROR.retainedDuplicate())
|
||||
|
@@ -19,13 +19,10 @@ import java.util.concurrent.TimeUnit
|
||||
|
||||
|
||||
@Sharable
|
||||
class ThrottlingHandler(cfg: Configuration) : ChannelInboundHandlerAdapter() {
|
||||
|
||||
private companion object {
|
||||
@JvmStatic
|
||||
private val log = contextLogger()
|
||||
}
|
||||
class ThrottlingHandler(cfg: Configuration) :
|
||||
ChannelInboundHandlerAdapter() {
|
||||
|
||||
private val log = contextLogger()
|
||||
private val bucketManager = BucketManager.from(cfg)
|
||||
|
||||
private val connectionConfiguration = cfg.connection
|
||||
@@ -63,7 +60,7 @@ class ThrottlingHandler(cfg: Configuration) : ChannelInboundHandlerAdapter() {
|
||||
}
|
||||
}
|
||||
|
||||
private fun handleBuckets(buckets: List<Bucket>, ctx: ChannelHandlerContext, msg: Any, delayResponse: Boolean) {
|
||||
private fun handleBuckets(buckets : List<Bucket>, ctx : ChannelHandlerContext, msg : Any, delayResponse : Boolean) {
|
||||
var nextAttempt = -1L
|
||||
for (bucket in buckets) {
|
||||
val bucketNextAttempt = bucket.removeTokensWithEstimate(1)
|
||||
@@ -71,17 +68,17 @@ class ThrottlingHandler(cfg: Configuration) : ChannelInboundHandlerAdapter() {
|
||||
nextAttempt = bucketNextAttempt
|
||||
}
|
||||
}
|
||||
if (nextAttempt < 0) {
|
||||
if(nextAttempt < 0) {
|
||||
super.channelRead(ctx, msg)
|
||||
return
|
||||
}
|
||||
val waitDuration = Duration.of(LongMath.ceilDiv(nextAttempt, 100_000_000L) * 100L, ChronoUnit.MILLIS)
|
||||
if (delayResponse && waitDuration < waitThreshold) {
|
||||
ctx.executor().schedule({
|
||||
handleBuckets(buckets, ctx, msg, false)
|
||||
}, waitDuration.toMillis(), TimeUnit.MILLISECONDS)
|
||||
} else {
|
||||
val waitDuration = Duration.of(LongMath.ceilDiv(nextAttempt, 100_000_000L) * 100L, ChronoUnit.MILLIS)
|
||||
if (delayResponse && waitDuration < waitThreshold) {
|
||||
ctx.executor().schedule({
|
||||
handleBuckets(buckets, ctx, msg, false)
|
||||
}, waitDuration.toMillis(), TimeUnit.MILLISECONDS)
|
||||
} else {
|
||||
sendThrottledResponse(ctx, waitDuration)
|
||||
}
|
||||
sendThrottledResponse(ctx, waitDuration)
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user