This commit is contained in:
@@ -4,7 +4,8 @@ from .private import (
|
|||||||
retry,
|
retry,
|
||||||
async_test,
|
async_test,
|
||||||
ExceptionHandlerOutcome,
|
ExceptionHandlerOutcome,
|
||||||
tmpdir
|
tmpdir,
|
||||||
|
decorator_with_kwargs
|
||||||
)
|
)
|
||||||
from .maybe import Maybe
|
from .maybe import Maybe
|
||||||
|
|
||||||
@@ -15,5 +16,6 @@ __all__ = [
|
|||||||
'async_test',
|
'async_test',
|
||||||
'ExceptionHandlerOutcome',
|
'ExceptionHandlerOutcome',
|
||||||
'Maybe',
|
'Maybe',
|
||||||
'tmpdir'
|
'tmpdir',
|
||||||
|
'decorator_with_kwargs'
|
||||||
]
|
]
|
||||||
|
@@ -3,8 +3,76 @@ from tempfile import TemporaryDirectory
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from enum import Enum, auto
|
from enum import Enum, auto
|
||||||
from typing import Callable
|
from typing import Callable
|
||||||
|
from inspect import signature
|
||||||
from time import sleep
|
from time import sleep
|
||||||
from asyncio import sleep as async_sleep, Runner
|
from asyncio import sleep as async_sleep, Runner
|
||||||
|
from functools import wraps, partial
|
||||||
|
|
||||||
|
|
||||||
|
def decorator_with_kwargs(decorator: Callable) -> Callable:
|
||||||
|
"""Decorator factory to give decorated decorators the skill to receive
|
||||||
|
optional keyword arguments.
|
||||||
|
|
||||||
|
If a decorator "some_decorator" is decorated with this function:
|
||||||
|
|
||||||
|
@decorator_with_kwargs
|
||||||
|
def some_decorator(decorated_function, kwarg1=1, kwarg2=2):
|
||||||
|
def wrapper(*decorated_function_args, **decorated_function_kwargs):
|
||||||
|
'''Modifies the behavior of decorated_function according
|
||||||
|
to the value of kwarg1 and kwarg2'''
|
||||||
|
...
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
It will be usable in the following ways:
|
||||||
|
|
||||||
|
@some_decorator
|
||||||
|
def func(x):
|
||||||
|
...
|
||||||
|
|
||||||
|
@some_decorator()
|
||||||
|
def func(x):
|
||||||
|
...
|
||||||
|
|
||||||
|
@some_decorator(kwarg1=3) # or other combinations of kwargs
|
||||||
|
def func(x, y):
|
||||||
|
...
|
||||||
|
|
||||||
|
:param decorator: decorator to be given optional kwargs-handling skills
|
||||||
|
:type decorator: Callable
|
||||||
|
:raises TypeError: if the decorator does not receive a single Callable or
|
||||||
|
keyword arguments
|
||||||
|
:raises TypeError: if the signature of the decorated decorator does not
|
||||||
|
conform to: Callable, **keyword_arguments
|
||||||
|
:return: modified decorator
|
||||||
|
:rtype: Callable
|
||||||
|
"""
|
||||||
|
|
||||||
|
@wraps(decorator)
|
||||||
|
def decorator_wrapper(*args, **kwargs):
|
||||||
|
if (len(kwargs) == 0) and (len(args) == 1) and callable(args[0]):
|
||||||
|
return decorator(args[0])
|
||||||
|
if len(args) == 0:
|
||||||
|
return partial(decorator, **kwargs)
|
||||||
|
raise TypeError(
|
||||||
|
f'{decorator.__name__} expects either a single Callable '
|
||||||
|
'or keyword arguments'
|
||||||
|
)
|
||||||
|
|
||||||
|
signature_values = signature(decorator).parameters.values()
|
||||||
|
signature_args = [
|
||||||
|
param.name for param in signature_values
|
||||||
|
if param.default == param.empty
|
||||||
|
]
|
||||||
|
|
||||||
|
if len(signature_args) != 1:
|
||||||
|
raise TypeError(
|
||||||
|
f'{decorator.__name__} signature should be of the form:\n'
|
||||||
|
f'{decorator.__name__}(function: typing.Callable, '
|
||||||
|
'kwarg_1=default_1, kwarg_2=default_2, ...) -> Callable'
|
||||||
|
)
|
||||||
|
|
||||||
|
return decorator_wrapper
|
||||||
|
|
||||||
|
|
||||||
_size_uoms = ('B', 'KiB', 'MiB', 'GiB', 'KiB')
|
_size_uoms = ('B', 'KiB', 'MiB', 'GiB', 'KiB')
|
||||||
|
|
||||||
@@ -24,60 +92,61 @@ class ExceptionHandlerOutcome(Enum):
|
|||||||
CONTINUE = auto()
|
CONTINUE = auto()
|
||||||
|
|
||||||
|
|
||||||
|
@decorator_with_kwargs
|
||||||
def retry(
|
def retry(
|
||||||
|
function,
|
||||||
max_attempts: int = 3,
|
max_attempts: int = 3,
|
||||||
multiplier: float = 2,
|
multiplier: float = 2,
|
||||||
initial_delay: float = 1.0,
|
initial_delay: float = 1.0,
|
||||||
exception_handler: Callable[[Exception], ExceptionHandlerOutcome] =
|
exception_handler: Callable[[Exception], ExceptionHandlerOutcome] =
|
||||||
lambda _: ExceptionHandlerOutcome.CONTINUE
|
lambda _: ExceptionHandlerOutcome.CONTINUE
|
||||||
):
|
):
|
||||||
def wrapper(function):
|
@wraps(function)
|
||||||
def result(*args, **kwargs):
|
def result(*args, **kwargs):
|
||||||
attempts = 0
|
attempts = 0
|
||||||
delay = initial_delay
|
delay = initial_delay
|
||||||
while True:
|
while True:
|
||||||
attempts += 1
|
attempts += 1
|
||||||
try:
|
try:
|
||||||
return function(*args, **kwargs)
|
return function(*args, **kwargs)
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
if attempts < max_attempts and exception_handler(ex) == ExceptionHandlerOutcome.CONTINUE:
|
if attempts < max_attempts and exception_handler(ex) == ExceptionHandlerOutcome.CONTINUE:
|
||||||
sleep(delay)
|
sleep(delay)
|
||||||
delay *= multiplier
|
delay *= multiplier
|
||||||
else:
|
else:
|
||||||
raise ex
|
raise ex
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
return wrapper
|
|
||||||
|
|
||||||
|
|
||||||
|
@decorator_with_kwargs
|
||||||
def async_retry(
|
def async_retry(
|
||||||
|
function,
|
||||||
max_attempts: int = 3,
|
max_attempts: int = 3,
|
||||||
multiplier: float = 2,
|
multiplier: float = 2,
|
||||||
initial_delay: float = 1.0,
|
initial_delay: float = 1.0,
|
||||||
exception_handler=lambda _: ExceptionHandlerOutcome.CONTINUE
|
exception_handler=lambda _: ExceptionHandlerOutcome.CONTINUE
|
||||||
):
|
):
|
||||||
def wrapper(function):
|
@wraps(function)
|
||||||
async def result(*args, **kwargs):
|
async def result(*args, **kwargs):
|
||||||
attempts = 0
|
attempts = 0
|
||||||
delay = initial_delay
|
delay = initial_delay
|
||||||
while True:
|
while True:
|
||||||
attempts += 1
|
attempts += 1
|
||||||
try:
|
try:
|
||||||
return await function(*args, **kwargs)
|
return await function(*args, **kwargs)
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
if attempts < max_attempts and exception_handler(ex) == ExceptionHandlerOutcome.CONTINUE:
|
if attempts < max_attempts and exception_handler(ex) == ExceptionHandlerOutcome.CONTINUE:
|
||||||
await async_sleep(delay)
|
await async_sleep(delay)
|
||||||
delay *= multiplier
|
delay *= multiplier
|
||||||
else:
|
else:
|
||||||
raise ex
|
raise ex
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
return wrapper
|
|
||||||
|
|
||||||
|
|
||||||
def async_test(coro):
|
def async_test(coro):
|
||||||
|
@wraps(coro)
|
||||||
def wrapper(*args, **kwargs):
|
def wrapper(*args, **kwargs):
|
||||||
with Runner() as runner:
|
with Runner() as runner:
|
||||||
runner.run(coro(*args, **kwargs))
|
runner.run(coro(*args, **kwargs))
|
||||||
@@ -85,14 +154,13 @@ def async_test(coro):
|
|||||||
return wrapper
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
def tmpdir(argument_name='tmpdir'):
|
@decorator_with_kwargs
|
||||||
def wrapper(fun):
|
def tmpdir(f, argument_name='tmpdir'):
|
||||||
def result(*args, **kwargs):
|
@wraps(f)
|
||||||
with TemporaryDirectory() as temp_dir:
|
def result(*args, **kwargs):
|
||||||
fun(*args, **kwargs, **{
|
with TemporaryDirectory() as temp_dir:
|
||||||
argument_name: Path(temp_dir)
|
f(*args, **kwargs, **{
|
||||||
})
|
argument_name: Path(temp_dir)
|
||||||
|
})
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
return wrapper
|
|
||||||
|
Reference in New Issue
Block a user