diff --git a/gradle.properties b/gradle.properties
index a2af0e9..2e66c3b 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -3,6 +3,6 @@ org.gradle.parallel=true
org.gradle.caching=true
gitea.maven.url = https://gitea.woggioni.net/api/packages/woggioni/maven
-jwo.version = 2025.01.17
-lys.version = 2025.01.10
+jwo.version = 2025.01.22
+lys.version = 2025.01.17
guice.version = 5.0.1
diff --git a/src/main/java/net/woggioni/jwo/Bucket.java b/src/main/java/net/woggioni/jwo/Bucket.java
new file mode 100644
index 0000000..e5c64a8
--- /dev/null
+++ b/src/main/java/net/woggioni/jwo/Bucket.java
@@ -0,0 +1,72 @@
+package net.woggioni.jwo;
+
+import net.woggioni.jwo.internal.LocalBucket;
+
+import java.time.Duration;
+
+/**
+ * This class implements the token bucket algorithm
+ * for throttling
+ *
+ * @see token bucket algorithm
+ */
+public interface Bucket {
+ /**
+ * Tries to consume nTokens from the bucket, returning true if the operation was successful, false otherwise.
+ * When this method returns false no tokens are actually consumed and the user is advised to retry after sometime
+ *
+ * @param nTokens numberOfTokens to consume
+ * @return false if the bucket did not contain enough token for the operation to complete,
+ * true otherwise (in the latter case nTokens are actually consumed)
+ */
+ default boolean removeTokens(long nTokens) {
+ return removeTokens(nTokens, System.nanoTime());
+ }
+
+ /**
+ * Tries to consume nTokens from the bucket, returning true if the operation was successful, false otherwise.
+ * When this method returns false no tokens are actually consumed and the user is advised to retry after sometime
+ *
+ * @param nTokens numberOfTokens to consume
+ * @param currentTimestamp timestamp used to compute how much time has elapsed since the last bucket refill
+ * @return false if the bucket did not contain enough token for the operation to complete,
+ * true otherwise (in the latter case nTokens are actually consumed)
+ */
+ boolean removeTokens(long nTokens, long currentTimestamp);
+
+ /**
+ * Tries to consume nTokens from the bucket, returning -1 if the operation was successful or the amount of time
+ * to wait before calling the method again that guarantees a positive outcome
+ *
+ * @param nTokens numberOfTokens to consume
+ * @return -1 if the bucket did not contain enough token for the operation to complete,
+ * a positive number of nanoseconds to wait that guarantees that the next
+ * {@link #removeTokensWithEstimate(long)} or {@link #removeTokens(long)} invocation with the same {@param nTokens} parameter
+ * will succeed
+ */
+ default long removeTokensWithEstimate(long nTokens) {
+ return removeTokensWithEstimate(nTokens, System.nanoTime());
+ }
+
+ /**
+ * Tries to consume nTokens from the bucket, returning -1 if the operation was successful or the amount of time
+ * to wait before calling the method again that guarantees a positive outcome
+ *
+ * @param nTokens numberOfTokens to consume
+ * @param currentTimestamp timestamp used to compute how much time has elapsed since the last bucket refill
+ * @return -1 if the bucket did not contain enough token for the operation to complete,
+ * a positive number of nanoseconds to wait that guarantees that the next
+ * {@link #removeTokensWithEstimate(long)} or {@link #removeTokens(long)} invocation with the same {@param nTokens} parameter
+ * will succeed
+ */
+ long removeTokensWithEstimate(long nTokens, long currentTimestamp);
+
+ static Bucket local(long maxCapacity, long fillAmount, Duration fillPeriod) {
+ return local(maxCapacity, fillAmount, fillPeriod, maxCapacity);
+ }
+
+ static Bucket local(long maxCapacity, long fillAmount, Duration fillPeriod, long initialAmount) {
+ return new LocalBucket(maxCapacity, fillAmount, fillPeriod.toNanos(), initialAmount);
+ }
+}
+
diff --git a/src/main/java/net/woggioni/jwo/IntegerMath.java b/src/main/java/net/woggioni/jwo/IntegerMath.java
new file mode 100644
index 0000000..2fd66cd
--- /dev/null
+++ b/src/main/java/net/woggioni/jwo/IntegerMath.java
@@ -0,0 +1,13 @@
+package net.woggioni.jwo;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class IntegerMath {
+
+ public static int ceilDiv(int a, int b) {
+ if(a <= 0) throw new IllegalArgumentException("a must be positive");
+ return 1 + (a - 1) / b;
+ }
+}
diff --git a/src/main/java/net/woggioni/jwo/LRUCache.java b/src/main/java/net/woggioni/jwo/LRUCache.java
index 14d6446..c2d529a 100644
--- a/src/main/java/net/woggioni/jwo/LRUCache.java
+++ b/src/main/java/net/woggioni/jwo/LRUCache.java
@@ -1,19 +1,12 @@
package net.woggioni.jwo;
-import lombok.RequiredArgsConstructor;
-
-import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
-import java.util.Set;
import java.util.function.Function;
-@RequiredArgsConstructor
-public class LRUCache implements Map {
- private final Map delegate;
-
- private LRUCache(final long maxSize, final Function loader, Class cls) {
- delegate = new LinkedHashMap() {
+public class LRUCache {
+ public static Map of(final long maxSize, final Function loader, Class cls) {
+ return new LinkedHashMap() {
@Override
protected boolean removeEldestEntry(Map.Entry eldest) {
return size() >= maxSize;
@@ -27,84 +20,21 @@ public class LRUCache implements Map {
return null;
}
}
+
+ @Override
+ public void putAll(Map extends K, ? extends V> m) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public V put(K key, V value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public V putIfAbsent(K key, V value) {
+ throw new UnsupportedOperationException();
+ }
};
}
-
- @Override
- public V get(Object key) {
- return delegate.get(key);
- }
-
- @Override
- public boolean containsKey(Object key) {
- return delegate.containsKey(key);
- }
-
- @Override
- public int size() {
- return delegate.size();
- }
-
- @Override
- public boolean isEmpty() {
- return delegate.isEmpty();
- }
-
- @Override
- public V put(K key, V value) {
- return null;
- }
-
- @Override
- public boolean containsValue(Object value) {
- return delegate.containsValue(value);
- }
-
- @Override
- public V remove(Object key) {
- return delegate.remove(key);
- }
-
- @Override
- public void putAll(Map extends K, ? extends V> m) {
- delegate.putAll(m);
- }
-
- @Override
- public void clear() {
- delegate.clear();
- }
-
- @Override
- public Set keySet() {
- return delegate.keySet();
- }
-
- @Override
- public Collection values() {
- return delegate.values();
- }
-
- @Override
- public Set> entrySet() {
- return delegate.entrySet();
- }
-
- @Override
- public boolean equals(Object o) {
- if(o instanceof LRUCache) {
- return delegate.equals(o);
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return delegate.hashCode();
- }
-
- public static LRUCache of(final long maxSize, final Function loader, Class cls) {
- return new LRUCache<>(maxSize, loader, cls);
- }
}
diff --git a/src/main/java/net/woggioni/jwo/LongMath.java b/src/main/java/net/woggioni/jwo/LongMath.java
new file mode 100644
index 0000000..dc407e3
--- /dev/null
+++ b/src/main/java/net/woggioni/jwo/LongMath.java
@@ -0,0 +1,13 @@
+package net.woggioni.jwo;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class LongMath {
+
+ public static long ceilDiv(long a, long b) {
+ if(a <= 0) throw new IllegalArgumentException("a must be positive");
+ return 1 + (a - 1) / b;
+ }
+}
diff --git a/src/main/java/net/woggioni/jwo/internal/LocalBucket.java b/src/main/java/net/woggioni/jwo/internal/LocalBucket.java
new file mode 100644
index 0000000..865e76c
--- /dev/null
+++ b/src/main/java/net/woggioni/jwo/internal/LocalBucket.java
@@ -0,0 +1,99 @@
+package net.woggioni.jwo.internal;
+
+import net.woggioni.jwo.Bucket;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongBinaryOperator;
+import java.util.function.LongUnaryOperator;
+
+import static net.woggioni.jwo.LongMath.ceilDiv;
+
+public class LocalBucket implements Bucket {
+ private final long maxCapacity;
+ private final long fillAmount;
+ private final long fillPeriod;
+
+ private final AtomicLong availableTokens;
+ private final AtomicLong lastFill;
+
+ public LocalBucket(long maxCapacity, long fillAmount, long fillPeriod) {
+ this(maxCapacity, fillAmount, fillPeriod, maxCapacity);
+ }
+
+ public LocalBucket(long maxCapacity, long fillAmount, long fillPeriod, long initialAmount) {
+ this(maxCapacity, fillAmount, fillPeriod, initialAmount, System.nanoTime());
+ }
+
+ public LocalBucket(long maxCapacity, long fillAmount, long fillPeriod, long initialAmount, long currentTimestamp) {
+ if (maxCapacity <= 0 || fillAmount <= 0 || fillPeriod <= 0) {
+ throw new IllegalArgumentException("maxCapacity, fillAmount and fillPeriod must all be positive");
+ }
+ this.maxCapacity = maxCapacity;
+ this.fillAmount = fillAmount;
+ this.fillPeriod = fillPeriod;
+ this.availableTokens = new AtomicLong(initialAmount);
+ this.lastFill = new AtomicLong(currentTimestamp);
+ }
+
+ long getTokenPrivate(long nTokens, long now) {
+ if(nTokens > maxCapacity) throw new IllegalArgumentException("The requested number of tokens exceeds the bucket max capacity");
+ final LongBinaryOperator tickCalculator = (lf, currentTimestamp) -> (currentTimestamp - lf) / fillPeriod;
+ final LongBinaryOperator timestampCalculator = (lf, currentTimestamp) -> lf + tickCalculator.applyAsLong(lf, currentTimestamp) * fillPeriod;
+ final long previousFillTime = lastFill.getAndAccumulate(now, timestampCalculator);
+ final long currentFillTime = timestampCalculator.applyAsLong(previousFillTime, now);
+ if (currentFillTime != previousFillTime) {
+ final long ticks = tickCalculator.applyAsLong(previousFillTime, now);
+ final LongUnaryOperator filledAmountCalculator = currentTokens -> Math.min(currentTokens + ticks * fillAmount, maxCapacity);
+ final LongUnaryOperator tokenCalculator = currentTokens -> {
+ final long filledAmount = filledAmountCalculator.applyAsLong(currentTokens);
+ if (filledAmount >= nTokens) {
+ return filledAmount - nTokens;
+ } else {
+ return filledAmount;
+ }
+ };
+ final long previousTokenAmount = availableTokens.getAndUpdate(tokenCalculator);
+ final long filledAmount = filledAmountCalculator.applyAsLong(previousTokenAmount);
+ return filledAmount;
+ } else {
+ final long previousTokenAmount = availableTokens.getAndUpdate(n -> {
+ if (n >= nTokens) {
+ return n - nTokens;
+ } else {
+ return n;
+ }
+ });
+ return previousTokenAmount;
+ }
+ }
+
+ @Override
+ public boolean removeTokens(long nTokens) {
+ return removeTokens(nTokens, System.nanoTime());
+ }
+
+ @Override
+ public boolean removeTokens(long nTokens, long currentTimestamp) {
+ return getTokenPrivate(nTokens, currentTimestamp) >= nTokens;
+ }
+
+ @Override
+ public long removeTokensWithEstimate(long nTokens) {
+ final long previousTokenAmount = getTokenPrivate(nTokens, System.nanoTime());
+ if(previousTokenAmount >= nTokens) {
+ return -1;
+ } else {
+ return ceilDiv((nTokens - previousTokenAmount) * fillPeriod, fillAmount);
+ }
+ }
+
+ @Override
+ public long removeTokensWithEstimate(long nTokens, long currentTimestamp) {
+ final long previousTokenAmount = getTokenPrivate(nTokens, currentTimestamp);
+ if(previousTokenAmount >= nTokens) {
+ return -1;
+ } else {
+ return ceilDiv((nTokens - previousTokenAmount) * fillPeriod, fillAmount);
+ }
+ }
+}
diff --git a/src/test/java/net/woggioni/jwo/internal/LocalBucketTest.java b/src/test/java/net/woggioni/jwo/internal/LocalBucketTest.java
new file mode 100644
index 0000000..6d139f5
--- /dev/null
+++ b/src/test/java/net/woggioni/jwo/internal/LocalBucketTest.java
@@ -0,0 +1,53 @@
+package net.woggioni.jwo.internal;
+
+import lombok.SneakyThrows;
+import net.woggioni.jwo.Bucket;
+import net.woggioni.jwo.Con;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+public class LocalBucketTest {
+ @Test
+ @SneakyThrows
+ void testRemoveTokens() {
+ try(final ExecutorService executorService = Executors.newFixedThreadPool(16)) {
+ final Bucket bucket = new LocalBucket(100, 1, Duration.of(62500, ChronoUnit.MICROS).toNanos(), 0);
+ final long now = System.nanoTime();
+ IntStream.range(0, 16).mapToObj(n -> executorService.submit(() -> {
+ while (!bucket.removeTokens(1)) {
+ }
+ })).forEach((Con>) Future::get);
+ final double elapsedTime = System.nanoTime() - now;
+ System.out.printf("Elapsed time: %.3f s\n", elapsedTime / 1e9);
+ Assertions.assertTrue(0.1 > (1_000_000_000 - elapsedTime));
+ executorService.shutdown();
+ Assertions.assertTrue(executorService.awaitTermination(10, TimeUnit.SECONDS));
+ }
+ }
+
+ @Test
+ @SneakyThrows
+ void testRemoveTokensWithEstimate() {
+ final long now = System.nanoTime();
+ final long maxCapacity = 10000;
+ final long fillAmount = 1;
+ final long fillPeriod = Duration.of(62500, ChronoUnit.MICROS).toNanos();
+ final long initialAmount = 0;
+ final Bucket bucket = new LocalBucket(maxCapacity, fillAmount, fillPeriod, initialAmount, now);
+ Assertions.assertThrows(IllegalArgumentException.class, () -> bucket.removeTokensWithEstimate(maxCapacity + 1, now));
+ final long requestedTokens = 1000;
+ final long result = bucket.removeTokensWithEstimate(requestedTokens, now);
+ Assertions.assertEquals(result, (requestedTokens - initialAmount) * fillPeriod / fillAmount);
+ final long elapsedTime = Duration.ofSeconds(30).toNanos();
+ final long result2 = bucket.removeTokensWithEstimate(requestedTokens, now + elapsedTime);
+ Assertions.assertEquals(result2, (requestedTokens - initialAmount) * fillPeriod / fillAmount - elapsedTime);
+ }
+}