added retry decorators
Some checks failed
CI / build (push) Failing after 3s

This commit is contained in:
2024-06-22 20:35:37 +08:00
parent 514962d9e3
commit 2914050465
6 changed files with 163 additions and 1 deletions

View File

@@ -11,6 +11,8 @@ jobs:
- uses: actions/setup-python@v5
with:
cache: 'pip'
- name: Run unit tests
run: .venv/bin/python -m unittest discover -s tests
- name: Execute build
env:
TWINE_REPOSITORY_URL: ${{ vars.PYPI_REGISTRY_URL }}

3
.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
__pycache__
.venv
*.pyc

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "pwo"
version = "0.0.1"
version = "0.0.2"
authors = [
{ name="Walter Oggioni", email="oggioni.walter@gmail.com" },
]

1
src/pwo/__init__.py Normal file
View File

@@ -0,0 +1 @@
from .private import format_filesize, async_retry, retry, async_test

82
src/pwo/private.py Normal file
View File

@@ -0,0 +1,82 @@
import math
from enum import Enum, auto
from typing import Callable
from time import sleep
from asyncio import sleep as async_sleep, Runner
_size_uoms = ('B', 'KiB', 'MiB', 'GiB', 'KiB')
def format_filesize(size: int) -> str:
counter = 0
tmp_size = size
while tmp_size > 0:
tmp_size //= 1024
counter += 1
counter -= 1
return '%.2f ' % (size / math.pow(1024, counter)) + _size_uoms[counter]
class ExceptionHandlerOutcome(Enum):
THROW = auto()
CONTINUE = auto()
def retry(
max_attempts: int = 3,
multiplier: float = 2,
initial_delay: float = 1.0,
exception_handler: Callable[[Exception], ExceptionHandlerOutcome] =
lambda _: ExceptionHandlerOutcome.CONTINUE
):
def wrapper(function):
def result(*args, **kwargs):
attempts = 0
delay = initial_delay
while True:
attempts += 1
try:
return function(*args, **kwargs)
except Exception as ex:
if attempts < max_attempts and exception_handler(ex) == ExceptionHandlerOutcome.CONTINUE:
sleep(delay)
delay *= multiplier
else:
raise ex
return result
return wrapper
def async_retry(
max_attempts: int = 3,
multiplier: float = 2,
initial_delay: float = 1.0,
exception_handler=lambda _: ExceptionHandlerOutcome.CONTINUE
):
def wrapper(function):
async def result(*args, **kwargs):
attempts = 0
delay = initial_delay
while True:
attempts += 1
try:
return await function(*args, **kwargs)
except Exception as ex:
if attempts < max_attempts and exception_handler(ex) == ExceptionHandlerOutcome.CONTINUE:
await async_sleep(delay)
delay *= multiplier
else:
raise ex
return result
return wrapper
def async_test(coro):
def wrapper(*args, **kwargs):
with Runner() as runner:
runner.run(coro(*args, **kwargs))
return wrapper

74
tests/test_private.py Normal file
View File

@@ -0,0 +1,74 @@
import unittest
from src.pwo import retry, async_retry, async_test
class PrivateTest(unittest.TestCase):
def test_retry_until_success(self):
max_attempts = 20
attempt = 0
expected_result = object()
@retry(max_attempts=max_attempts, initial_delay=0)
def foo():
nonlocal attempt
attempt += 1
if attempt < 10:
raise Exception()
else:
return expected_result
self.assertEqual(expected_result, foo())
self.assertEqual(10, attempt)
def test_retry_until_max_attempt(self):
max_attempts = 20
attempt = 0
@retry(max_attempts=max_attempts, initial_delay=0)
def bar():
nonlocal attempt
attempt += 1
raise Exception()
with self.assertRaises(Exception):
bar()
self.assertEqual(max_attempts, attempt)
@async_test
async def test_async_retry_until_success(self):
max_attempts = 20
attempt = 0
expected_result = object()
@async_retry(max_attempts=max_attempts, initial_delay=0)
async def foo():
nonlocal attempt
attempt += 1
if attempt < 10:
raise Exception()
else:
return expected_result
self.assertEqual(expected_result, await foo())
self.assertEqual(10, attempt)
@async_test
async def test_async_retry_until_max_attempt(self):
max_attempts = 20
attempt = 0
@async_retry(max_attempts=max_attempts, initial_delay=0)
async def bar():
nonlocal attempt
attempt += 1
raise Exception()
with self.assertRaises(Exception):
await bar()
self.assertEqual(max_attempts, attempt)
if __name__ == '__main__':
unittest.main()