added bucket class
All checks were successful
CI / build (push) Successful in 1m39s

This commit is contained in:
2025-01-22 15:06:39 +08:00
parent 8436da536f
commit d61cfc6ea7
7 changed files with 270 additions and 90 deletions

View File

@@ -3,6 +3,6 @@ org.gradle.parallel=true
org.gradle.caching=true
gitea.maven.url = https://gitea.woggioni.net/api/packages/woggioni/maven
jwo.version = 2025.01.17
lys.version = 2025.01.10
jwo.version = 2025.01.22
lys.version = 2025.01.17
guice.version = 5.0.1

View File

@@ -0,0 +1,72 @@
package net.woggioni.jwo;
import net.woggioni.jwo.internal.LocalBucket;
import java.time.Duration;
/**
* This class implements the <a href="https://en.wikipedia.org/wiki/Token_bucket">token bucket algorithm</a>
* for throttling
*
* @see <a href="https://en.wikipedia.org/wiki/Token_bucket">token bucket algorithm</a>
*/
public interface Bucket {
/**
* Tries to consume nTokens from the bucket, returning true if the operation was successful, false otherwise.
* When this method returns false no tokens are actually consumed and the user is advised to retry after sometime
*
* @param nTokens numberOfTokens to consume
* @return false if the bucket did not contain enough token for the operation to complete,
* true otherwise (in the latter case nTokens are actually consumed)
*/
default boolean removeTokens(long nTokens) {
return removeTokens(nTokens, System.nanoTime());
}
/**
* Tries to consume nTokens from the bucket, returning true if the operation was successful, false otherwise.
* When this method returns false no tokens are actually consumed and the user is advised to retry after sometime
*
* @param nTokens numberOfTokens to consume
* @param currentTimestamp timestamp used to compute how much time has elapsed since the last bucket refill
* @return false if the bucket did not contain enough token for the operation to complete,
* true otherwise (in the latter case nTokens are actually consumed)
*/
boolean removeTokens(long nTokens, long currentTimestamp);
/**
* Tries to consume nTokens from the bucket, returning -1 if the operation was successful or the amount of time
* to wait before calling the method again that guarantees a positive outcome
*
* @param nTokens numberOfTokens to consume
* @return -1 if the bucket did not contain enough token for the operation to complete,
* a positive number of nanoseconds to wait that guarantees that the next
* {@link #removeTokensWithEstimate(long)} or {@link #removeTokens(long)} invocation with the same {@param nTokens} parameter
* will succeed
*/
default long removeTokensWithEstimate(long nTokens) {
return removeTokensWithEstimate(nTokens, System.nanoTime());
}
/**
* Tries to consume nTokens from the bucket, returning -1 if the operation was successful or the amount of time
* to wait before calling the method again that guarantees a positive outcome
*
* @param nTokens numberOfTokens to consume
* @param currentTimestamp timestamp used to compute how much time has elapsed since the last bucket refill
* @return -1 if the bucket did not contain enough token for the operation to complete,
* a positive number of nanoseconds to wait that guarantees that the next
* {@link #removeTokensWithEstimate(long)} or {@link #removeTokens(long)} invocation with the same {@param nTokens} parameter
* will succeed
*/
long removeTokensWithEstimate(long nTokens, long currentTimestamp);
static Bucket local(long maxCapacity, long fillAmount, Duration fillPeriod) {
return local(maxCapacity, fillAmount, fillPeriod, maxCapacity);
}
static Bucket local(long maxCapacity, long fillAmount, Duration fillPeriod, long initialAmount) {
return new LocalBucket(maxCapacity, fillAmount, fillPeriod.toNanos(), initialAmount);
}
}

View File

@@ -0,0 +1,13 @@
package net.woggioni.jwo;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class IntegerMath {
public static int ceilDiv(int a, int b) {
if(a <= 0) throw new IllegalArgumentException("a must be positive");
return 1 + (a - 1) / b;
}
}

View File

@@ -1,19 +1,12 @@
package net.woggioni.jwo;
import lombok.RequiredArgsConstructor;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
@RequiredArgsConstructor
public class LRUCache<K, V> implements Map<K, V> {
private final Map<K, V> delegate;
private LRUCache(final long maxSize, final Function<K, V> loader, Class<K> cls) {
delegate = new LinkedHashMap<K, V>() {
public class LRUCache {
public static <K, V> Map<K, V> of(final long maxSize, final Function<K, V> loader, Class<K> cls) {
return new LinkedHashMap<K, V>() {
@Override
protected boolean removeEldestEntry(Map.Entry eldest) {
return size() >= maxSize;
@@ -27,84 +20,21 @@ public class LRUCache<K, V> implements Map<K, V> {
return null;
}
}
@Override
public void putAll(Map<? extends K, ? extends V> m) {
throw new UnsupportedOperationException();
}
@Override
public V put(K key, V value) {
throw new UnsupportedOperationException();
}
@Override
public V putIfAbsent(K key, V value) {
throw new UnsupportedOperationException();
}
};
}
@Override
public V get(Object key) {
return delegate.get(key);
}
@Override
public boolean containsKey(Object key) {
return delegate.containsKey(key);
}
@Override
public int size() {
return delegate.size();
}
@Override
public boolean isEmpty() {
return delegate.isEmpty();
}
@Override
public V put(K key, V value) {
return null;
}
@Override
public boolean containsValue(Object value) {
return delegate.containsValue(value);
}
@Override
public V remove(Object key) {
return delegate.remove(key);
}
@Override
public void putAll(Map<? extends K, ? extends V> m) {
delegate.putAll(m);
}
@Override
public void clear() {
delegate.clear();
}
@Override
public Set<K> keySet() {
return delegate.keySet();
}
@Override
public Collection<V> values() {
return delegate.values();
}
@Override
public Set<Entry<K, V>> entrySet() {
return delegate.entrySet();
}
@Override
public boolean equals(Object o) {
if(o instanceof LRUCache) {
return delegate.equals(o);
} else {
return false;
}
}
@Override
public int hashCode() {
return delegate.hashCode();
}
public static <K, V> LRUCache<K, V> of(final long maxSize, final Function<K, V> loader, Class<K> cls) {
return new LRUCache<>(maxSize, loader, cls);
}
}

View File

@@ -0,0 +1,13 @@
package net.woggioni.jwo;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class LongMath {
public static long ceilDiv(long a, long b) {
if(a <= 0) throw new IllegalArgumentException("a must be positive");
return 1 + (a - 1) / b;
}
}

View File

@@ -0,0 +1,99 @@
package net.woggioni.jwo.internal;
import net.woggioni.jwo.Bucket;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongBinaryOperator;
import java.util.function.LongUnaryOperator;
import static net.woggioni.jwo.LongMath.ceilDiv;
public class LocalBucket implements Bucket {
private final long maxCapacity;
private final long fillAmount;
private final long fillPeriod;
private final AtomicLong availableTokens;
private final AtomicLong lastFill;
public LocalBucket(long maxCapacity, long fillAmount, long fillPeriod) {
this(maxCapacity, fillAmount, fillPeriod, maxCapacity);
}
public LocalBucket(long maxCapacity, long fillAmount, long fillPeriod, long initialAmount) {
this(maxCapacity, fillAmount, fillPeriod, initialAmount, System.nanoTime());
}
public LocalBucket(long maxCapacity, long fillAmount, long fillPeriod, long initialAmount, long currentTimestamp) {
if (maxCapacity <= 0 || fillAmount <= 0 || fillPeriod <= 0) {
throw new IllegalArgumentException("maxCapacity, fillAmount and fillPeriod must all be positive");
}
this.maxCapacity = maxCapacity;
this.fillAmount = fillAmount;
this.fillPeriod = fillPeriod;
this.availableTokens = new AtomicLong(initialAmount);
this.lastFill = new AtomicLong(currentTimestamp);
}
long getTokenPrivate(long nTokens, long now) {
if(nTokens > maxCapacity) throw new IllegalArgumentException("The requested number of tokens exceeds the bucket max capacity");
final LongBinaryOperator tickCalculator = (lf, currentTimestamp) -> (currentTimestamp - lf) / fillPeriod;
final LongBinaryOperator timestampCalculator = (lf, currentTimestamp) -> lf + tickCalculator.applyAsLong(lf, currentTimestamp) * fillPeriod;
final long previousFillTime = lastFill.getAndAccumulate(now, timestampCalculator);
final long currentFillTime = timestampCalculator.applyAsLong(previousFillTime, now);
if (currentFillTime != previousFillTime) {
final long ticks = tickCalculator.applyAsLong(previousFillTime, now);
final LongUnaryOperator filledAmountCalculator = currentTokens -> Math.min(currentTokens + ticks * fillAmount, maxCapacity);
final LongUnaryOperator tokenCalculator = currentTokens -> {
final long filledAmount = filledAmountCalculator.applyAsLong(currentTokens);
if (filledAmount >= nTokens) {
return filledAmount - nTokens;
} else {
return filledAmount;
}
};
final long previousTokenAmount = availableTokens.getAndUpdate(tokenCalculator);
final long filledAmount = filledAmountCalculator.applyAsLong(previousTokenAmount);
return filledAmount;
} else {
final long previousTokenAmount = availableTokens.getAndUpdate(n -> {
if (n >= nTokens) {
return n - nTokens;
} else {
return n;
}
});
return previousTokenAmount;
}
}
@Override
public boolean removeTokens(long nTokens) {
return removeTokens(nTokens, System.nanoTime());
}
@Override
public boolean removeTokens(long nTokens, long currentTimestamp) {
return getTokenPrivate(nTokens, currentTimestamp) >= nTokens;
}
@Override
public long removeTokensWithEstimate(long nTokens) {
final long previousTokenAmount = getTokenPrivate(nTokens, System.nanoTime());
if(previousTokenAmount >= nTokens) {
return -1;
} else {
return ceilDiv((nTokens - previousTokenAmount) * fillPeriod, fillAmount);
}
}
@Override
public long removeTokensWithEstimate(long nTokens, long currentTimestamp) {
final long previousTokenAmount = getTokenPrivate(nTokens, currentTimestamp);
if(previousTokenAmount >= nTokens) {
return -1;
} else {
return ceilDiv((nTokens - previousTokenAmount) * fillPeriod, fillAmount);
}
}
}

View File

@@ -0,0 +1,53 @@
package net.woggioni.jwo.internal;
import lombok.SneakyThrows;
import net.woggioni.jwo.Bucket;
import net.woggioni.jwo.Con;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
public class LocalBucketTest {
@Test
@SneakyThrows
void testRemoveTokens() {
try(final ExecutorService executorService = Executors.newFixedThreadPool(16)) {
final Bucket bucket = new LocalBucket(100, 1, Duration.of(62500, ChronoUnit.MICROS).toNanos(), 0);
final long now = System.nanoTime();
IntStream.range(0, 16).mapToObj(n -> executorService.submit(() -> {
while (!bucket.removeTokens(1)) {
}
})).forEach((Con<Future<?>>) Future::get);
final double elapsedTime = System.nanoTime() - now;
System.out.printf("Elapsed time: %.3f s\n", elapsedTime / 1e9);
Assertions.assertTrue(0.1 > (1_000_000_000 - elapsedTime));
executorService.shutdown();
Assertions.assertTrue(executorService.awaitTermination(10, TimeUnit.SECONDS));
}
}
@Test
@SneakyThrows
void testRemoveTokensWithEstimate() {
final long now = System.nanoTime();
final long maxCapacity = 10000;
final long fillAmount = 1;
final long fillPeriod = Duration.of(62500, ChronoUnit.MICROS).toNanos();
final long initialAmount = 0;
final Bucket bucket = new LocalBucket(maxCapacity, fillAmount, fillPeriod, initialAmount, now);
Assertions.assertThrows(IllegalArgumentException.class, () -> bucket.removeTokensWithEstimate(maxCapacity + 1, now));
final long requestedTokens = 1000;
final long result = bucket.removeTokensWithEstimate(requestedTokens, now);
Assertions.assertEquals(result, (requestedTokens - initialAmount) * fillPeriod / fillAmount);
final long elapsedTime = Duration.ofSeconds(30).toNanos();
final long result2 = bucket.removeTokensWithEstimate(requestedTokens, now + elapsedTime);
Assertions.assertEquals(result2, (requestedTokens - initialAmount) * fillPeriod / fillAmount - elapsedTime);
}
}