added retry executor

This commit is contained in:
2023-06-22 08:39:38 +08:00
parent 14a5af82c8
commit 76175afdf8
4 changed files with 226 additions and 2 deletions

View File

@@ -1,3 +1,3 @@
jwo.version = 2023.05 jwo.version = 2023.06.22
lys.version = 2023.03 lys.version = 2023.06.14
guice.version = 5.0.1 guice.version = 5.0.1

View File

@@ -47,6 +47,10 @@ import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.ServiceConfigurationError; import java.util.ServiceConfigurationError;
import java.util.ServiceLoader; import java.util.ServiceLoader;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.function.Consumer; import java.util.function.Consumer;
@@ -928,4 +932,56 @@ public class JWO {
if (!opt1.isPresent() || !opt2.isPresent()) return Optional.empty(); if (!opt1.isPresent() || !opt2.isPresent()) return Optional.empty();
else return Optional.ofNullable(cb.apply(opt1.get(), opt2.get())); else return Optional.ofNullable(cb.apply(opt1.get(), opt2.get()));
} }
public static Executor delayedExecutor(long delay, TimeUnit unit,
Executor executor) {
if (unit == null || executor == null)
throw new NullPointerException();
return new DelayedExecutor(delay, unit, executor);
}
private static final class Delayer {
private static ScheduledFuture<?> delay(Runnable command, long delay,
TimeUnit unit) {
return delayer.schedule(command, delay, unit);
}
private static final class DaemonThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("CompletableFutureDelayScheduler");
return t;
}
}
private static final ScheduledThreadPoolExecutor delayer;
static {
(delayer = new ScheduledThreadPoolExecutor(
1, new DaemonThreadFactory())).
setRemoveOnCancelPolicy(true);
}
}
private static final class DelayedExecutor implements Executor {
final long delay;
final TimeUnit unit;
final Executor executor;
private DelayedExecutor(long delay, TimeUnit unit, Executor executor) {
this.delay = delay; this.unit = unit; this.executor = executor;
}
public void execute(Runnable r) {
Delayer.delay(new TaskSubmitter(executor, r), delay, unit);
}
}
private static final class TaskSubmitter implements Runnable {
final Executor executor;
final Runnable action;
TaskSubmitter(Executor executor, Runnable action) {
this.executor = executor;
this.action = action;
}
public void run() { executor.execute(action); }
}
} }

View File

@@ -0,0 +1,167 @@
package net.woggioni.jwo;
import lombok.Builder;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import static net.woggioni.jwo.JWO.delayedExecutor;
import static net.woggioni.jwo.JWO.newThrowable;
@Builder
public class RetryExecutor {
@Builder.Default
private final int maxAttempts = 5;
@Builder.Default
private final Duration initialDelay = Duration.ofSeconds(1);
@Builder.Default
private final Double exp = 1.0;
@Builder.Default
private final ExceptionHandler exceptionHandler = err -> ExceptionHandlerOutcome.CONTINUE;
@Builder.Default
private final ExecutorService executorService = ForkJoinPool.commonPool();
public enum ExceptionHandlerOutcome {
THROW, CONTINUE
}
public interface ExceptionHandler {
ExceptionHandlerOutcome handleError(Throwable t);
}
public CompletableFuture<Void> submit(Runnable cb) {
return submit(cb, maxAttempts, initialDelay, exp, exceptionHandler, executorService);
}
public CompletableFuture<Void> submit(
Runnable cb,
ExecutorService executorService) {
return submit(cb, maxAttempts, initialDelay, exp, exceptionHandler, executorService);
}
public CompletableFuture<Void> submit(
Runnable cb,
ExceptionHandler exceptionHandler,
ExecutorService executorService) {
return submit(cb, maxAttempts, initialDelay, exp, exceptionHandler, executorService);
}
public CompletableFuture<Void> submit(
Runnable cb,
Double exp,
ExceptionHandler exceptionHandler,
ExecutorService executorService) {
return submit(cb, maxAttempts, initialDelay, exp, exceptionHandler, executorService);
}
public CompletableFuture<Void> submit(
Runnable cb,
Duration initialDelay,
Double exp,
ExceptionHandler exceptionHandler,
ExecutorService executorService) {
return submit(cb, maxAttempts, initialDelay, exp, exceptionHandler, executorService);
}
public static CompletableFuture<Void> submit(
Runnable cb,
int maxAttempts,
Duration initialDelay,
Double exp,
ExceptionHandler exceptionHandler,
ExecutorService executorService) {
return submit(() -> {
cb.run();
return null;
}, maxAttempts, initialDelay, exp, exceptionHandler, executorService);
}
public <T> CompletableFuture<T> submit(Callable<T> cb) {
return submit(cb, maxAttempts, initialDelay, exp, exceptionHandler, executorService);
}
public <T> CompletableFuture<T> submit(
Callable<T> cb,
ExecutorService executorService) {
return submit(cb, maxAttempts, initialDelay, exp, exceptionHandler, executorService);
}
public <T> CompletableFuture<T> submit(
Callable<T> cb,
ExceptionHandler exceptionHandler,
ExecutorService executorService) {
return submit(cb, maxAttempts, initialDelay, exp, exceptionHandler, executorService);
}
public <T> CompletableFuture<T> submit(
Callable<T> cb,
Double exp,
ExceptionHandler exceptionHandler,
ExecutorService executorService) {
return submit(cb, maxAttempts, initialDelay, exp, exceptionHandler, executorService);
}
public <T> CompletableFuture<T> submit(
Callable<T> cb,
Duration initialDelay,
Double exp,
ExceptionHandler exceptionHandler,
ExecutorService executorService) {
return submit(cb, maxAttempts, initialDelay, exp, exceptionHandler, executorService);
}
public static <T> CompletableFuture<T> submit(
Callable<T> cb,
int maxAttempts,
Duration initialDelay,
Double exp,
ExceptionHandler exceptionHandler,
ExecutorService executorService) {
CompletableFuture<T> result = CompletableFuture.supplyAsync((Sup<T>) cb::call, executorService);
double delay = initialDelay.toMillis();
for(int i = 1; i <= maxAttempts; i++) {
int attempt = i;
double thisAttemptDelay = delay;
result = result.handleAsync((BiFun<T, Throwable, CompletableFuture<T>>) (value, err) -> {
Optional<Throwable> causeOpt = Optional.ofNullable(err).map(Throwable::getCause);
if(!causeOpt.isPresent()) {
return CompletableFuture.completedFuture(value);
} else if(attempt == maxAttempts) {
throw causeOpt.get();
} else {
Throwable cause = causeOpt.get();
ExceptionHandlerOutcome eho = exceptionHandler.handleError(cause);
switch (eho) {
case THROW:
throw cause;
case CONTINUE:
Executor delayedExecutor = delayedExecutor(
(long) thisAttemptDelay,
TimeUnit.MILLISECONDS,
executorService
);
return CompletableFuture.supplyAsync((Sup<T>) cb::call, delayedExecutor);
default:
throw newThrowable(UnsupportedOperationException.class,
"Unsupported value for enum %s: '%s'",
ExceptionHandlerOutcome.class.getName(),
eho
);
}
}
}, executorService).thenComposeAsync(Function.identity(), executorService);
delay *= exp;
}
return result;
}
}

View File

@@ -1,4 +1,5 @@
module net.woggioni.jwo { module net.woggioni.jwo {
requires static lombok;
requires org.slf4j; requires org.slf4j;
exports net.woggioni.jwo; exports net.woggioni.jwo;
exports net.woggioni.jwo.exception; exports net.woggioni.jwo.exception;