diff --git a/gradle.properties b/gradle.properties index a1c52ab..d362c9a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -jwo.version = 2023.05 -lys.version = 2023.03 +jwo.version = 2023.06.22 +lys.version = 2023.06.14 guice.version = 5.0.1 diff --git a/src/main/java/net/woggioni/jwo/JWO.java b/src/main/java/net/woggioni/jwo/JWO.java index 6550694..34f303a 100644 --- a/src/main/java/net/woggioni/jwo/JWO.java +++ b/src/main/java/net/woggioni/jwo/JWO.java @@ -47,6 +47,10 @@ import java.util.Objects; import java.util.Optional; import java.util.ServiceConfigurationError; import java.util.ServiceLoader; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; @@ -928,4 +932,56 @@ public class JWO { if (!opt1.isPresent() || !opt2.isPresent()) return Optional.empty(); else return Optional.ofNullable(cb.apply(opt1.get(), opt2.get())); } + + + public static Executor delayedExecutor(long delay, TimeUnit unit, + Executor executor) { + if (unit == null || executor == null) + throw new NullPointerException(); + return new DelayedExecutor(delay, unit, executor); + } + private static final class Delayer { + private static ScheduledFuture delay(Runnable command, long delay, + TimeUnit unit) { + return delayer.schedule(command, delay, unit); + } + + private static final class DaemonThreadFactory implements ThreadFactory { + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setDaemon(true); + t.setName("CompletableFutureDelayScheduler"); + return t; + } + } + + private static final ScheduledThreadPoolExecutor delayer; + static { + (delayer = new ScheduledThreadPoolExecutor( + 1, new DaemonThreadFactory())). + setRemoveOnCancelPolicy(true); + } + } + + private static final class DelayedExecutor implements Executor { + final long delay; + final TimeUnit unit; + final Executor executor; + private DelayedExecutor(long delay, TimeUnit unit, Executor executor) { + this.delay = delay; this.unit = unit; this.executor = executor; + } + public void execute(Runnable r) { + Delayer.delay(new TaskSubmitter(executor, r), delay, unit); + } + } + + private static final class TaskSubmitter implements Runnable { + final Executor executor; + final Runnable action; + TaskSubmitter(Executor executor, Runnable action) { + this.executor = executor; + this.action = action; + } + public void run() { executor.execute(action); } + } } diff --git a/src/main/java/net/woggioni/jwo/RetryExecutor.java b/src/main/java/net/woggioni/jwo/RetryExecutor.java new file mode 100644 index 0000000..3303e99 --- /dev/null +++ b/src/main/java/net/woggioni/jwo/RetryExecutor.java @@ -0,0 +1,167 @@ +package net.woggioni.jwo; + +import lombok.Builder; + +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static net.woggioni.jwo.JWO.delayedExecutor; +import static net.woggioni.jwo.JWO.newThrowable; + +@Builder +public class RetryExecutor { + @Builder.Default + private final int maxAttempts = 5; + + @Builder.Default + private final Duration initialDelay = Duration.ofSeconds(1); + + @Builder.Default + private final Double exp = 1.0; + + @Builder.Default + private final ExceptionHandler exceptionHandler = err -> ExceptionHandlerOutcome.CONTINUE; + + @Builder.Default + private final ExecutorService executorService = ForkJoinPool.commonPool(); + + public enum ExceptionHandlerOutcome { + THROW, CONTINUE + } + + public interface ExceptionHandler { + ExceptionHandlerOutcome handleError(Throwable t); + } + + public CompletableFuture submit(Runnable cb) { + return submit(cb, maxAttempts, initialDelay, exp, exceptionHandler, executorService); + } + + public CompletableFuture submit( + Runnable cb, + ExecutorService executorService) { + return submit(cb, maxAttempts, initialDelay, exp, exceptionHandler, executorService); + } + + public CompletableFuture submit( + Runnable cb, + ExceptionHandler exceptionHandler, + ExecutorService executorService) { + return submit(cb, maxAttempts, initialDelay, exp, exceptionHandler, executorService); + } + + public CompletableFuture submit( + Runnable cb, + Double exp, + ExceptionHandler exceptionHandler, + ExecutorService executorService) { + return submit(cb, maxAttempts, initialDelay, exp, exceptionHandler, executorService); + } + + public CompletableFuture submit( + Runnable cb, + Duration initialDelay, + Double exp, + ExceptionHandler exceptionHandler, + ExecutorService executorService) { + return submit(cb, maxAttempts, initialDelay, exp, exceptionHandler, executorService); + } + public static CompletableFuture submit( + Runnable cb, + int maxAttempts, + Duration initialDelay, + Double exp, + ExceptionHandler exceptionHandler, + ExecutorService executorService) { + return submit(() -> { + cb.run(); + return null; + }, maxAttempts, initialDelay, exp, exceptionHandler, executorService); + } + + public CompletableFuture submit(Callable cb) { + return submit(cb, maxAttempts, initialDelay, exp, exceptionHandler, executorService); + } + + public CompletableFuture submit( + Callable cb, + ExecutorService executorService) { + return submit(cb, maxAttempts, initialDelay, exp, exceptionHandler, executorService); + } + + public CompletableFuture submit( + Callable cb, + ExceptionHandler exceptionHandler, + ExecutorService executorService) { + return submit(cb, maxAttempts, initialDelay, exp, exceptionHandler, executorService); + } + + public CompletableFuture submit( + Callable cb, + Double exp, + ExceptionHandler exceptionHandler, + ExecutorService executorService) { + return submit(cb, maxAttempts, initialDelay, exp, exceptionHandler, executorService); + } + + public CompletableFuture submit( + Callable cb, + Duration initialDelay, + Double exp, + ExceptionHandler exceptionHandler, + ExecutorService executorService) { + return submit(cb, maxAttempts, initialDelay, exp, exceptionHandler, executorService); + } + + public static CompletableFuture submit( + Callable cb, + int maxAttempts, + Duration initialDelay, + Double exp, + ExceptionHandler exceptionHandler, + ExecutorService executorService) { + CompletableFuture result = CompletableFuture.supplyAsync((Sup) cb::call, executorService); + double delay = initialDelay.toMillis(); + for(int i = 1; i <= maxAttempts; i++) { + int attempt = i; + double thisAttemptDelay = delay; + result = result.handleAsync((BiFun>) (value, err) -> { + Optional causeOpt = Optional.ofNullable(err).map(Throwable::getCause); + if(!causeOpt.isPresent()) { + return CompletableFuture.completedFuture(value); + } else if(attempt == maxAttempts) { + throw causeOpt.get(); + } else { + Throwable cause = causeOpt.get(); + ExceptionHandlerOutcome eho = exceptionHandler.handleError(cause); + switch (eho) { + case THROW: + throw cause; + case CONTINUE: + Executor delayedExecutor = delayedExecutor( + (long) thisAttemptDelay, + TimeUnit.MILLISECONDS, + executorService + ); + return CompletableFuture.supplyAsync((Sup) cb::call, delayedExecutor); + default: + throw newThrowable(UnsupportedOperationException.class, + "Unsupported value for enum %s: '%s'", + ExceptionHandlerOutcome.class.getName(), + eho + ); + } + } + }, executorService).thenComposeAsync(Function.identity(), executorService); + delay *= exp; + } + return result; + } +} diff --git a/src/main/java9/module-info.java b/src/main/java9/module-info.java index 1a7a9cb..1ad081d 100644 --- a/src/main/java9/module-info.java +++ b/src/main/java9/module-info.java @@ -1,4 +1,5 @@ module net.woggioni.jwo { + requires static lombok; requires org.slf4j; exports net.woggioni.jwo; exports net.woggioni.jwo.exception;