From d61cfc6ea79c1e9c04a01a2ea559c31e5fe93e6c Mon Sep 17 00:00:00 2001 From: Walter Oggioni Date: Wed, 22 Jan 2025 15:06:39 +0800 Subject: [PATCH] added bucket class --- gradle.properties | 4 +- src/main/java/net/woggioni/jwo/Bucket.java | 72 ++++++++++++ .../java/net/woggioni/jwo/IntegerMath.java | 13 +++ src/main/java/net/woggioni/jwo/LRUCache.java | 106 +++--------------- src/main/java/net/woggioni/jwo/LongMath.java | 13 +++ .../woggioni/jwo/internal/LocalBucket.java | 99 ++++++++++++++++ .../jwo/internal/LocalBucketTest.java | 53 +++++++++ 7 files changed, 270 insertions(+), 90 deletions(-) create mode 100644 src/main/java/net/woggioni/jwo/Bucket.java create mode 100644 src/main/java/net/woggioni/jwo/IntegerMath.java create mode 100644 src/main/java/net/woggioni/jwo/LongMath.java create mode 100644 src/main/java/net/woggioni/jwo/internal/LocalBucket.java create mode 100644 src/test/java/net/woggioni/jwo/internal/LocalBucketTest.java diff --git a/gradle.properties b/gradle.properties index a2af0e9..2e66c3b 100644 --- a/gradle.properties +++ b/gradle.properties @@ -3,6 +3,6 @@ org.gradle.parallel=true org.gradle.caching=true gitea.maven.url = https://gitea.woggioni.net/api/packages/woggioni/maven -jwo.version = 2025.01.17 -lys.version = 2025.01.10 +jwo.version = 2025.01.22 +lys.version = 2025.01.17 guice.version = 5.0.1 diff --git a/src/main/java/net/woggioni/jwo/Bucket.java b/src/main/java/net/woggioni/jwo/Bucket.java new file mode 100644 index 0000000..e5c64a8 --- /dev/null +++ b/src/main/java/net/woggioni/jwo/Bucket.java @@ -0,0 +1,72 @@ +package net.woggioni.jwo; + +import net.woggioni.jwo.internal.LocalBucket; + +import java.time.Duration; + +/** + * This class implements the token bucket algorithm + * for throttling + * + * @see token bucket algorithm + */ +public interface Bucket { + /** + * Tries to consume nTokens from the bucket, returning true if the operation was successful, false otherwise. + * When this method returns false no tokens are actually consumed and the user is advised to retry after sometime + * + * @param nTokens numberOfTokens to consume + * @return false if the bucket did not contain enough token for the operation to complete, + * true otherwise (in the latter case nTokens are actually consumed) + */ + default boolean removeTokens(long nTokens) { + return removeTokens(nTokens, System.nanoTime()); + } + + /** + * Tries to consume nTokens from the bucket, returning true if the operation was successful, false otherwise. + * When this method returns false no tokens are actually consumed and the user is advised to retry after sometime + * + * @param nTokens numberOfTokens to consume + * @param currentTimestamp timestamp used to compute how much time has elapsed since the last bucket refill + * @return false if the bucket did not contain enough token for the operation to complete, + * true otherwise (in the latter case nTokens are actually consumed) + */ + boolean removeTokens(long nTokens, long currentTimestamp); + + /** + * Tries to consume nTokens from the bucket, returning -1 if the operation was successful or the amount of time + * to wait before calling the method again that guarantees a positive outcome + * + * @param nTokens numberOfTokens to consume + * @return -1 if the bucket did not contain enough token for the operation to complete, + * a positive number of nanoseconds to wait that guarantees that the next + * {@link #removeTokensWithEstimate(long)} or {@link #removeTokens(long)} invocation with the same {@param nTokens} parameter + * will succeed + */ + default long removeTokensWithEstimate(long nTokens) { + return removeTokensWithEstimate(nTokens, System.nanoTime()); + } + + /** + * Tries to consume nTokens from the bucket, returning -1 if the operation was successful or the amount of time + * to wait before calling the method again that guarantees a positive outcome + * + * @param nTokens numberOfTokens to consume + * @param currentTimestamp timestamp used to compute how much time has elapsed since the last bucket refill + * @return -1 if the bucket did not contain enough token for the operation to complete, + * a positive number of nanoseconds to wait that guarantees that the next + * {@link #removeTokensWithEstimate(long)} or {@link #removeTokens(long)} invocation with the same {@param nTokens} parameter + * will succeed + */ + long removeTokensWithEstimate(long nTokens, long currentTimestamp); + + static Bucket local(long maxCapacity, long fillAmount, Duration fillPeriod) { + return local(maxCapacity, fillAmount, fillPeriod, maxCapacity); + } + + static Bucket local(long maxCapacity, long fillAmount, Duration fillPeriod, long initialAmount) { + return new LocalBucket(maxCapacity, fillAmount, fillPeriod.toNanos(), initialAmount); + } +} + diff --git a/src/main/java/net/woggioni/jwo/IntegerMath.java b/src/main/java/net/woggioni/jwo/IntegerMath.java new file mode 100644 index 0000000..2fd66cd --- /dev/null +++ b/src/main/java/net/woggioni/jwo/IntegerMath.java @@ -0,0 +1,13 @@ +package net.woggioni.jwo; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class IntegerMath { + + public static int ceilDiv(int a, int b) { + if(a <= 0) throw new IllegalArgumentException("a must be positive"); + return 1 + (a - 1) / b; + } +} diff --git a/src/main/java/net/woggioni/jwo/LRUCache.java b/src/main/java/net/woggioni/jwo/LRUCache.java index 14d6446..c2d529a 100644 --- a/src/main/java/net/woggioni/jwo/LRUCache.java +++ b/src/main/java/net/woggioni/jwo/LRUCache.java @@ -1,19 +1,12 @@ package net.woggioni.jwo; -import lombok.RequiredArgsConstructor; - -import java.util.Collection; import java.util.LinkedHashMap; import java.util.Map; -import java.util.Set; import java.util.function.Function; -@RequiredArgsConstructor -public class LRUCache implements Map { - private final Map delegate; - - private LRUCache(final long maxSize, final Function loader, Class cls) { - delegate = new LinkedHashMap() { +public class LRUCache { + public static Map of(final long maxSize, final Function loader, Class cls) { + return new LinkedHashMap() { @Override protected boolean removeEldestEntry(Map.Entry eldest) { return size() >= maxSize; @@ -27,84 +20,21 @@ public class LRUCache implements Map { return null; } } + + @Override + public void putAll(Map m) { + throw new UnsupportedOperationException(); + } + + @Override + public V put(K key, V value) { + throw new UnsupportedOperationException(); + } + + @Override + public V putIfAbsent(K key, V value) { + throw new UnsupportedOperationException(); + } }; } - - @Override - public V get(Object key) { - return delegate.get(key); - } - - @Override - public boolean containsKey(Object key) { - return delegate.containsKey(key); - } - - @Override - public int size() { - return delegate.size(); - } - - @Override - public boolean isEmpty() { - return delegate.isEmpty(); - } - - @Override - public V put(K key, V value) { - return null; - } - - @Override - public boolean containsValue(Object value) { - return delegate.containsValue(value); - } - - @Override - public V remove(Object key) { - return delegate.remove(key); - } - - @Override - public void putAll(Map m) { - delegate.putAll(m); - } - - @Override - public void clear() { - delegate.clear(); - } - - @Override - public Set keySet() { - return delegate.keySet(); - } - - @Override - public Collection values() { - return delegate.values(); - } - - @Override - public Set> entrySet() { - return delegate.entrySet(); - } - - @Override - public boolean equals(Object o) { - if(o instanceof LRUCache) { - return delegate.equals(o); - } else { - return false; - } - } - - @Override - public int hashCode() { - return delegate.hashCode(); - } - - public static LRUCache of(final long maxSize, final Function loader, Class cls) { - return new LRUCache<>(maxSize, loader, cls); - } } diff --git a/src/main/java/net/woggioni/jwo/LongMath.java b/src/main/java/net/woggioni/jwo/LongMath.java new file mode 100644 index 0000000..dc407e3 --- /dev/null +++ b/src/main/java/net/woggioni/jwo/LongMath.java @@ -0,0 +1,13 @@ +package net.woggioni.jwo; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class LongMath { + + public static long ceilDiv(long a, long b) { + if(a <= 0) throw new IllegalArgumentException("a must be positive"); + return 1 + (a - 1) / b; + } +} diff --git a/src/main/java/net/woggioni/jwo/internal/LocalBucket.java b/src/main/java/net/woggioni/jwo/internal/LocalBucket.java new file mode 100644 index 0000000..865e76c --- /dev/null +++ b/src/main/java/net/woggioni/jwo/internal/LocalBucket.java @@ -0,0 +1,99 @@ +package net.woggioni.jwo.internal; + +import net.woggioni.jwo.Bucket; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.LongBinaryOperator; +import java.util.function.LongUnaryOperator; + +import static net.woggioni.jwo.LongMath.ceilDiv; + +public class LocalBucket implements Bucket { + private final long maxCapacity; + private final long fillAmount; + private final long fillPeriod; + + private final AtomicLong availableTokens; + private final AtomicLong lastFill; + + public LocalBucket(long maxCapacity, long fillAmount, long fillPeriod) { + this(maxCapacity, fillAmount, fillPeriod, maxCapacity); + } + + public LocalBucket(long maxCapacity, long fillAmount, long fillPeriod, long initialAmount) { + this(maxCapacity, fillAmount, fillPeriod, initialAmount, System.nanoTime()); + } + + public LocalBucket(long maxCapacity, long fillAmount, long fillPeriod, long initialAmount, long currentTimestamp) { + if (maxCapacity <= 0 || fillAmount <= 0 || fillPeriod <= 0) { + throw new IllegalArgumentException("maxCapacity, fillAmount and fillPeriod must all be positive"); + } + this.maxCapacity = maxCapacity; + this.fillAmount = fillAmount; + this.fillPeriod = fillPeriod; + this.availableTokens = new AtomicLong(initialAmount); + this.lastFill = new AtomicLong(currentTimestamp); + } + + long getTokenPrivate(long nTokens, long now) { + if(nTokens > maxCapacity) throw new IllegalArgumentException("The requested number of tokens exceeds the bucket max capacity"); + final LongBinaryOperator tickCalculator = (lf, currentTimestamp) -> (currentTimestamp - lf) / fillPeriod; + final LongBinaryOperator timestampCalculator = (lf, currentTimestamp) -> lf + tickCalculator.applyAsLong(lf, currentTimestamp) * fillPeriod; + final long previousFillTime = lastFill.getAndAccumulate(now, timestampCalculator); + final long currentFillTime = timestampCalculator.applyAsLong(previousFillTime, now); + if (currentFillTime != previousFillTime) { + final long ticks = tickCalculator.applyAsLong(previousFillTime, now); + final LongUnaryOperator filledAmountCalculator = currentTokens -> Math.min(currentTokens + ticks * fillAmount, maxCapacity); + final LongUnaryOperator tokenCalculator = currentTokens -> { + final long filledAmount = filledAmountCalculator.applyAsLong(currentTokens); + if (filledAmount >= nTokens) { + return filledAmount - nTokens; + } else { + return filledAmount; + } + }; + final long previousTokenAmount = availableTokens.getAndUpdate(tokenCalculator); + final long filledAmount = filledAmountCalculator.applyAsLong(previousTokenAmount); + return filledAmount; + } else { + final long previousTokenAmount = availableTokens.getAndUpdate(n -> { + if (n >= nTokens) { + return n - nTokens; + } else { + return n; + } + }); + return previousTokenAmount; + } + } + + @Override + public boolean removeTokens(long nTokens) { + return removeTokens(nTokens, System.nanoTime()); + } + + @Override + public boolean removeTokens(long nTokens, long currentTimestamp) { + return getTokenPrivate(nTokens, currentTimestamp) >= nTokens; + } + + @Override + public long removeTokensWithEstimate(long nTokens) { + final long previousTokenAmount = getTokenPrivate(nTokens, System.nanoTime()); + if(previousTokenAmount >= nTokens) { + return -1; + } else { + return ceilDiv((nTokens - previousTokenAmount) * fillPeriod, fillAmount); + } + } + + @Override + public long removeTokensWithEstimate(long nTokens, long currentTimestamp) { + final long previousTokenAmount = getTokenPrivate(nTokens, currentTimestamp); + if(previousTokenAmount >= nTokens) { + return -1; + } else { + return ceilDiv((nTokens - previousTokenAmount) * fillPeriod, fillAmount); + } + } +} diff --git a/src/test/java/net/woggioni/jwo/internal/LocalBucketTest.java b/src/test/java/net/woggioni/jwo/internal/LocalBucketTest.java new file mode 100644 index 0000000..6d139f5 --- /dev/null +++ b/src/test/java/net/woggioni/jwo/internal/LocalBucketTest.java @@ -0,0 +1,53 @@ +package net.woggioni.jwo.internal; + +import lombok.SneakyThrows; +import net.woggioni.jwo.Bucket; +import net.woggioni.jwo.Con; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +public class LocalBucketTest { + @Test + @SneakyThrows + void testRemoveTokens() { + try(final ExecutorService executorService = Executors.newFixedThreadPool(16)) { + final Bucket bucket = new LocalBucket(100, 1, Duration.of(62500, ChronoUnit.MICROS).toNanos(), 0); + final long now = System.nanoTime(); + IntStream.range(0, 16).mapToObj(n -> executorService.submit(() -> { + while (!bucket.removeTokens(1)) { + } + })).forEach((Con>) Future::get); + final double elapsedTime = System.nanoTime() - now; + System.out.printf("Elapsed time: %.3f s\n", elapsedTime / 1e9); + Assertions.assertTrue(0.1 > (1_000_000_000 - elapsedTime)); + executorService.shutdown(); + Assertions.assertTrue(executorService.awaitTermination(10, TimeUnit.SECONDS)); + } + } + + @Test + @SneakyThrows + void testRemoveTokensWithEstimate() { + final long now = System.nanoTime(); + final long maxCapacity = 10000; + final long fillAmount = 1; + final long fillPeriod = Duration.of(62500, ChronoUnit.MICROS).toNanos(); + final long initialAmount = 0; + final Bucket bucket = new LocalBucket(maxCapacity, fillAmount, fillPeriod, initialAmount, now); + Assertions.assertThrows(IllegalArgumentException.class, () -> bucket.removeTokensWithEstimate(maxCapacity + 1, now)); + final long requestedTokens = 1000; + final long result = bucket.removeTokensWithEstimate(requestedTokens, now); + Assertions.assertEquals(result, (requestedTokens - initialAmount) * fillPeriod / fillAmount); + final long elapsedTime = Duration.ofSeconds(30).toNanos(); + final long result2 = bucket.removeTokensWithEstimate(requestedTokens, now + elapsedTime); + Assertions.assertEquals(result2, (requestedTokens - initialAmount) * fillPeriod / fillAmount - elapsedTime); + } +}