9 Commits

Author SHA1 Message Date
6ef7f8107e added Try
All checks were successful
CI / build (push) Successful in 18s
2024-11-03 22:42:41 +08:00
e0e763170f small fixes to type annotations
All checks were successful
CI / build (push) Successful in 18s
2024-11-01 09:21:01 +08:00
40791a9c6e updated dependencies
All checks were successful
CI / build (push) Successful in 15s
added topic class
2024-10-24 03:18:20 +08:00
4bddc35633 fixed bug in Maybe.value
All checks were successful
CI / build (push) Successful in 11s
2024-09-10 00:04:30 +08:00
e89e306e96 improved @tmpdir decorator
All checks were successful
CI / build (push) Successful in 12s
2024-06-24 21:24:48 +08:00
436bd737fa added decorator_with_kwargs
All checks were successful
CI / build (push) Successful in 12s
2024-06-24 21:18:08 +08:00
29dd81159c added tmpdir
All checks were successful
CI / build (push) Successful in 12s
2024-06-24 20:35:38 +08:00
07e511ae2c exported Maybe
All checks were successful
CI / build (push) Successful in 13s
2024-06-24 20:22:41 +08:00
8cf62ff3d2 added __or__ operator to pwo.maybe.Maybe 2024-06-24 20:21:48 +08:00
11 changed files with 619 additions and 85 deletions

View File

@@ -1,25 +1,31 @@
name: CI
on:
push:
branches: [ master ]
tags:
- '*'
jobs:
build:
runs-on: woryzen
steps:
- name: Checkout sources
uses: actions/checkout@v4
with:
fetch-depth: 0
fetch-tags: true
- uses: actions/setup-python@v5
with:
cache: 'pip'
- name: Create virtualenv
run: |
python -m venv .venv
.venv/bin/pip install -r requirements.txt
.venv/bin/pip install -r requirements.txt .
- name: Run mypy
run: .venv/bin/python -m mypy -p src
- name: Run unit tests
run: .venv/bin/python -m unittest discover -s tests
- name: Execute build
run: |
.venv/bin/python -m build
.venv/bin/pyproject-build
- name: Publish artifacts
env:
TWINE_REPOSITORY_URL: ${{ vars.PYPI_REGISTRY_URL }}

5
.gitignore vendored
View File

@@ -1,7 +1,8 @@
dist
.idea
__pycache__
.venv
*.pyc
*.egg-info
/build
/dist
src/pwo/_version.py

View File

@@ -1,10 +1,10 @@
[build-system]
requires = ["setuptools>=61.0"]
requires = ["setuptools>=61.0", "setuptools-scm>=8"]
build-backend = "setuptools.build_meta"
[project]
name = "pwo"
version = "0.0.2"
dynamic = ["version"]
authors = [
{ name="Walter Oggioni", email="oggioni.walter@gmail.com" },
]
@@ -17,7 +17,12 @@ classifiers = [
"Operating System :: OS Independent",
]
dependencies = [
'typing_extensions==4.7.1'
'typing_extensions'
]
[project.optional-dependencies]
dev = [
"pip-tools", "build", "mypy", "ipdb", "twine"
]
[project.urls]
@@ -36,3 +41,6 @@ warn_return_any = true
warn_unused_ignores = true
exclude = ["scripts", "docs", "test"]
strict = true
[tool.setuptools_scm]
version_file = "src/pwo/_version.py"

View File

@@ -1,34 +1,140 @@
build==1.2.1
certifi==2024.6.2
cffi==1.16.0
charset-normalizer==3.3.2
cryptography==42.0.8
#
# This file is autogenerated by pip-compile with Python 3.12
# by the following command:
#
# pip-compile --extra=dev --output-file=requirements.txt
#
--index-url https://gitea.woggioni.net/api/packages/woggioni/pypi/simple
--extra-index-url https://pypi.org/simple
asttokens==2.4.1
# via stack-data
build==1.2.2.post1
# via
# pip-tools
# pwo (pyproject.toml)
certifi==2024.8.30
# via requests
cffi==1.17.1
# via cryptography
charset-normalizer==3.4.0
# via requests
click==8.1.7
# via pip-tools
cryptography==43.0.3
# via secretstorage
decorator==5.1.1
# via
# ipdb
# ipython
docutils==0.21.2
idna==3.7
importlib_metadata==7.2.0
jaraco.classes==3.4.0
jaraco.context==5.3.0
jaraco.functools==4.0.1
# via readme-renderer
executing==2.1.0
# via stack-data
idna==3.10
# via requests
importlib-metadata==8.5.0
# via twine
ipdb==0.13.13
# via pwo (pyproject.toml)
ipython==8.29.0
# via ipdb
jaraco-classes==3.4.0
# via keyring
jaraco-context==6.0.1
# via keyring
jaraco-functools==4.1.0
# via keyring
jedi==0.19.1
# via ipython
jeepney==0.8.0
keyring==25.2.1
# via
# keyring
# secretstorage
keyring==25.5.0
# via twine
markdown-it-py==3.0.0
# via rich
matplotlib-inline==0.1.7
# via ipython
mdurl==0.1.2
more-itertools==10.3.0
mypy==1.10.0
# via markdown-it-py
more-itertools==10.5.0
# via
# jaraco-classes
# jaraco-functools
mypy==1.13.0
# via pwo (pyproject.toml)
mypy-extensions==1.0.0
nh3==0.2.17
# via mypy
nh3==0.2.18
# via readme-renderer
packaging==24.1
pkginfo==1.11.1
# via build
parso==0.8.4
# via jedi
pexpect==4.9.0
# via ipython
pip-tools==7.4.1
# via pwo (pyproject.toml)
pkginfo==1.10.0
# via twine
prompt-toolkit==3.0.48
# via ipython
ptyprocess==0.7.0
# via pexpect
pure-eval==0.2.3
# via stack-data
pycparser==2.22
Pygments==2.18.0
pyproject_hooks==1.1.0
readme_renderer==43.0
# via cffi
pygments==2.18.0
# via
# ipython
# readme-renderer
# rich
pyproject-hooks==1.2.0
# via
# build
# pip-tools
readme-renderer==44.0
# via twine
requests==2.32.3
# via
# requests-toolbelt
# twine
requests-toolbelt==1.0.0
# via twine
rfc3986==2.0.0
rich==13.7.1
SecretStorage==3.3.3
twine==5.1.0
typing_extensions==4.12.2
urllib3==2.2.2
zipp==3.19.2
# via twine
rich==13.9.4
# via twine
secretstorage==3.3.3
# via keyring
six==1.16.0
# via asttokens
stack-data==0.6.3
# via ipython
traitlets==5.14.3
# via
# ipython
# matplotlib-inline
twine==5.1.1
# via pwo (pyproject.toml)
typing-extensions==4.12.2
# via
# mypy
# pwo (pyproject.toml)
urllib3==2.2.3
# via
# requests
# twine
wcwidth==0.2.13
# via prompt-toolkit
wheel==0.44.0
# via pip-tools
zipp==3.20.2
# via importlib-metadata
# The following packages are considered to be unsafe in a requirements file:
# pip
# setuptools

View File

@@ -1,9 +1,32 @@
from .private import format_filesize, async_retry, retry, async_test, ExceptionHandlerOutcome
from .private import (
format_filesize,
async_retry,
retry,
async_test,
ExceptionHandlerOutcome,
tmpdir,
decorator_with_kwargs,
classproperty,
AsyncQueueIterator,
aenumerate,
)
from .maybe import Maybe
from .notification import TopicManager, Subscriber
from ._try import Try
__all__ = [
'format_filesize',
'async_retry',
'retry',
'async_test',
'ExceptionHandlerOutcome'
'ExceptionHandlerOutcome',
'Maybe',
'tmpdir',
'decorator_with_kwargs',
'classproperty',
'TopicManager',
'Subscriber',
'AsyncQueueIterator',
'aenumerate',
'Try'
]

52
src/pwo/_try.py Normal file
View File

@@ -0,0 +1,52 @@
from typing import (
Callable,
TypeVar,
Optional
)
ERR = TypeVar("ERR", bound=Exception)
class Try[T]:
value: T | Exception
def __init__(self, value: T | Exception):
self.value = value
def handle[U](self, cb: Callable[[Optional[T], Optional[Exception]], U]) -> 'Try[U]':
value = self.value
if isinstance(value, Exception):
return Try.of(lambda: cb(None, value))
else:
return Try.of(lambda: cb(value, None))
def get(self, alternative: Optional[T] = None) -> T:
if isinstance(self.value, Exception):
if alternative is None:
raise self.value
else:
return alternative
else:
return self.value
def then_try[U](self, cb: Callable[[T], U]) -> 'Try[U]':
value = self.value
if isinstance(value, Exception):
return Try.failure(value)
else:
return Try.of(lambda: cb(value))
@staticmethod
def success[U](value: U) -> 'Try[U]':
return Try(value)
@staticmethod
def failure[U](ex: Exception) -> 'Try[U]':
return Try(ex)
@staticmethod
def of[U](cb: Callable[[], U]) -> 'Try[U]':
try:
return Try(cb())
except Exception as ex:
return Try(ex)

View File

@@ -24,11 +24,11 @@ class Maybe(Generic[T]):
@property
def value(self) -> T:
value = self._value
if not value:
result = self._value
if result is None:
raise ValueError('Empty Maybe')
else:
return value
return result
@property
def is_present(self) -> bool:
@@ -52,6 +52,9 @@ class Maybe(Generic[T]):
def flat_map(self, transformer: Callable[[T], Maybe[U]]) -> Maybe[U]:
return transformer(self.value) if self.is_present else Maybe.empty()
def __or__(self, alt: Maybe[T]) -> Maybe[T]:
return self if self.is_present else alt
def or_else(self, alt: T) -> T:
return self.value if self.is_present else alt
@@ -64,7 +67,7 @@ class Maybe(Generic[T]):
else:
raise supplier()
def or_else_get(self, supplier: Callable[[], T]) -> Maybe[T]:
def or_else_get(self, supplier: Callable[[], Optional[T]]) -> Maybe[T]:
return self if self.is_present else Maybe.of_nullable(supplier())
def if_present(self, callback: Callable[[T], U]) -> None:

100
src/pwo/notification.py Normal file
View File

@@ -0,0 +1,100 @@
from .private import AsyncQueueIterator
from asyncio import Queue, AbstractEventLoop, Future, CancelledError
from typing import Callable, Optional
from logging import getLogger
log = getLogger(__name__)
class Subscriber:
_unsubscribe_callback: Callable[['Subscriber'], None]
_event: Optional[Future[bool]]
_loop: AbstractEventLoop
def __init__(self, unsubscribe: Callable[['Subscriber'], None], loop: AbstractEventLoop):
self._unsubscribe_callback = unsubscribe
self._event: Optional[Future[bool]] = None
self._loop = loop
def unsubscribe(self) -> None:
evt = self._event
if evt is not None:
evt.cancel()
self._unsubscribe_callback(self)
log.debug('Deleted subscriber %s', id(self))
async def wait(self, tout: float) -> bool:
self._event = self._loop.create_future()
def callback() -> None:
evt = self._event
if evt is None:
raise ValueError('Event is None')
evt.cancel()
if not evt.done():
evt.set_result(False)
handle = self._loop.call_later(tout, callback)
try:
log.debug('Subscriber %s is waiting for an event', id(self))
return await self._event
except CancelledError:
return False
finally:
handle.cancel()
def notify(self) -> None:
log.debug('Subscriber %s notified', id(self))
evt = self._event
if evt is None:
raise ValueError('Event is None')
if not evt.done():
evt.set_result(True)
def reset(self) -> None:
self._event = self._loop.create_future()
class TopicManager:
_loop: AbstractEventLoop
_queue: Queue[Optional[str]]
_subscribers: dict[str, set[Subscriber]]
def __init__(self, loop: AbstractEventLoop):
self._subscribers: dict[str, set[Subscriber]] = dict()
self._loop = loop
self._queue = Queue()
def subscribe(self, topic: str) -> Subscriber:
subscriptions = self._subscribers
subscriptions_per_topic = subscriptions.setdefault(topic, set())
def unsubscribe_callback(subscription: Subscriber) -> None:
subscriptions_per_topic.remove(subscription)
log.debug('Unsubscribed %s from topic %s', id(result), topic)
result = Subscriber(unsubscribe_callback, self._loop)
log.debug('Created subscriber %s to topic %s', id(result), topic)
subscriptions_per_topic.add(result)
return result
def _notify_subscriptions(self, topic: str) -> None:
subscriptions = self._subscribers
subscriptions_per_topic = subscriptions.get(topic, None)
if subscriptions_per_topic:
log.debug(f"Subscribers on '{topic}': {len(subscriptions_per_topic)}")
for s in subscriptions_per_topic:
s.notify()
async def process_events(self) -> None:
async for evt in AsyncQueueIterator(self._queue):
log.debug(f"Processed event for topic '{evt}'")
self._notify_subscriptions(evt)
log.debug(f"Event processor has completed")
def post_event(self, topic: str) -> None:
def callback() -> None:
self._queue.put_nowait(topic)
log.debug(f"Posted event for topic '{topic}', queue size: {self._queue.qsize()}")
self._loop.call_soon_threadsafe(callback)

View File

@@ -1,8 +1,89 @@
import math
from asyncio import sleep as async_sleep, Runner, Queue
from enum import Enum, auto
from typing import Callable
from functools import wraps, partial
from inspect import signature
from pathlib import Path
from tempfile import TemporaryDirectory
from time import sleep
from asyncio import sleep as async_sleep, Runner
from typing import (
Callable,
AsyncIterator,
Optional,
Self,
AsyncIterable,
Awaitable,
Any,
Coroutine,
Never,
Tuple
)
def decorator_with_kwargs(decorator: Callable[..., Any]) -> Callable[..., Any]:
"""Decorator factory to give decorated decorators the skill to receive
optional keyword arguments.
If a decorator "some_decorator" is decorated with this function:
@decorator_with_kwargs
def some_decorator(decorated_function, kwarg1=1, kwarg2=2):
def wrapper(*decorated_function_args, **decorated_function_kwargs):
'''Modifies the behavior of decorated_function according
to the value of kwarg1 and kwarg2'''
...
return wrapper
It will be usable in the following ways:
@some_decorator
def func(x):
...
@some_decorator()
def func(x):
...
@some_decorator(kwarg1=3) # or other combinations of kwargs
def func(x, y):
...
:param decorator: decorator to be given optional kwargs-handling skills
:type decorator: Callable
:raises TypeError: if the decorator does not receive a single Callable or
keyword arguments
:raises TypeError: if the signature of the decorated decorator does not
conform to: Callable, **keyword_arguments
:return: modified decorator
:rtype: Callable
"""
@wraps(decorator)
def decorator_wrapper(*args, **kwargs) -> Any: # type: ignore
if (len(kwargs) == 0) and (len(args) == 1) and callable(args[0]):
return decorator(args[0])
if len(args) == 0:
return partial(decorator, **kwargs)
raise TypeError(
f'{decorator.__name__} expects either a single Callable '
'or keyword arguments'
)
signature_values = signature(decorator).parameters.values()
signature_args = [
param.name for param in signature_values
if param.default == param.empty
]
if len(signature_args) != 1:
raise TypeError(
f'{decorator.__name__} signature should be of the form:\n'
f'{decorator.__name__}(function: typing.Callable, '
'kwarg_1=default_1, kwarg_2=default_2, ...) -> Callable'
)
return decorator_wrapper
_size_uoms = ('B', 'KiB', 'MiB', 'GiB', 'KiB')
@@ -17,66 +98,157 @@ def format_filesize(size: int) -> str:
return '%.2f ' % (size / math.pow(1024, counter)) + _size_uoms[counter]
class ExceptionHandlerOutcome(Enum):
THROW = auto()
CONTINUE = auto()
@decorator_with_kwargs
def retry(
function: Callable[..., Any],
max_attempts: int = 3,
multiplier: float = 2,
initial_delay: float = 1.0,
exception_handler: Callable[[Exception], ExceptionHandlerOutcome] =
lambda _: ExceptionHandlerOutcome.CONTINUE
):
def wrapper(function):
def result(*args, **kwargs):
attempts = 0
delay = initial_delay
while True:
attempts += 1
try:
return function(*args, **kwargs)
except Exception as ex:
if attempts < max_attempts and exception_handler(ex) == ExceptionHandlerOutcome.CONTINUE:
sleep(delay)
delay *= multiplier
else:
raise ex
) -> Callable[..., Any]:
@wraps(function)
def result(*args: Any, **kwargs: Any) -> Any:
attempts = 0
delay = initial_delay
while True:
attempts += 1
try:
return function(*args, **kwargs)
except Exception as ex:
if attempts < max_attempts and exception_handler(ex) == ExceptionHandlerOutcome.CONTINUE:
sleep(delay)
delay *= multiplier
else:
raise ex
return result
return wrapper
return result
@decorator_with_kwargs
def async_retry(
function: Callable[..., Any],
max_attempts: int = 3,
multiplier: float = 2,
initial_delay: float = 1.0,
exception_handler=lambda _: ExceptionHandlerOutcome.CONTINUE
):
def wrapper(function):
async def result(*args, **kwargs):
attempts = 0
delay = initial_delay
while True:
attempts += 1
try:
return await function(*args, **kwargs)
except Exception as ex:
if attempts < max_attempts and exception_handler(ex) == ExceptionHandlerOutcome.CONTINUE:
await async_sleep(delay)
delay *= multiplier
else:
raise ex
exception_handler: Callable[[Exception], ExceptionHandlerOutcome] = lambda _: ExceptionHandlerOutcome.CONTINUE
) -> Callable[..., Any]:
@wraps(function)
async def result(*args: Any, **kwargs: Any) -> Any:
attempts = 0
delay = initial_delay
while True:
attempts += 1
try:
return await function(*args, **kwargs)
except Exception as ex:
if attempts < max_attempts and exception_handler(ex) == ExceptionHandlerOutcome.CONTINUE:
await async_sleep(delay)
delay *= multiplier
else:
raise ex
return result
return result
return wrapper
def async_test(coro):
def wrapper(*args, **kwargs):
def async_test(coro: Callable[..., Coroutine[Never, Never, None]]) -> Callable[..., None]:
@wraps(coro)
def wrapper(*args: Any, **kwargs: Any) -> None:
with Runner() as runner:
runner.run(coro(*args, **kwargs))
return wrapper
@decorator_with_kwargs # type: ignore
def tmpdir(f,
argument_name='temp_dir',
suffix=None,
prefix=None,
dir=None,
ignore_cleanup_errors=False,
delete=True):
@wraps(f) # type: ignore
def result(*args, **kwargs):
with TemporaryDirectory(
suffix=suffix,
prefix=prefix,
dir=dir,
ignore_cleanup_errors=ignore_cleanup_errors,
delete=delete) as temp_dir:
f(*args, **kwargs, **{
argument_name: Path(temp_dir)
})
return result
class ClassPropertyDescriptor[T]:
def __init__(self, fget: Callable[[], T], fset: Optional[Callable[[T], None]]=None):
self.fget = fget
self.fset = fset
def __get__(self, obj, klass=None): # type: ignore
if klass is None:
klass = type(obj)
return self.fget.__get__(obj, klass)()
def __set__(self, obj, value): # type: ignore
if not self.fset:
raise AttributeError("can't set attribute")
type_ = type(obj)
return self.fset.__get__(obj, type_)(value)
def setter(self, func): # type: ignore
if not isinstance(func, (classmethod, staticmethod)):
func = classmethod(func)
self.fset = func
return self
def classproperty(func): # type: ignore
if not isinstance(func, (classmethod, staticmethod)):
func = classmethod(func)
return ClassPropertyDescriptor(func)
class AsyncQueueIterator[T](AsyncIterator[T]):
_queue: Queue[Optional[T]]
def __init__(self, queue: Queue[Optional[T]]):
self._queue = queue
def __aiter__(self) -> AsyncIterator[T]:
return self
async def __anext__(self) -> T:
item = await self._queue.get()
if item is None:
raise StopAsyncIteration
return item
class aenumerate[T](AsyncIterator[Tuple[int, T]]):
"""enumerate for async for"""
_aiterable: AsyncIterable[T]
_i: int
def __init__(self, aiterable: AsyncIterable[T], start: int = 0):
self._aiterable = aiterable
self._i = start - 1
def __aiter__(self) -> Self:
self._ait = self._aiterable.__aiter__()
return self
async def __anext__(self) -> Tuple[int, T]:
val = await self._ait.__anext__()
self._i += 1
return self._i, val

View File

@@ -1,6 +1,7 @@
import unittest
from src.pwo import retry, async_retry, async_test
from pwo import retry, async_retry, async_test, AsyncQueueIterator, aenumerate
from asyncio import Queue
class PrivateTest(unittest.TestCase):
@@ -70,5 +71,25 @@ class PrivateTest(unittest.TestCase):
await bar()
self.assertEqual(max_attempts, attempt)
if __name__ == '__main__':
unittest.main()
@async_test
async def test_async_queue_iterator(self):
queue = Queue()
queue_size = 10
objects = [object() for _ in range(queue_size)]
async def poll() -> int:
completed = 0
async for i, obj in aenumerate(AsyncQueueIterator(queue)):
self.assertIs(objects[i], obj)
completed += 1
return completed
handle = poll()
for o in objects:
queue.put_nowait(o)
queue.put_nowait(None)
processed = await handle
self.assertEqual(queue_size, processed)

42
tests/test_try.py Normal file
View File

@@ -0,0 +1,42 @@
import unittest
from pwo import Try
class TestException(Exception):
def __init__(self, msg: str):
super().__init__(msg)
class TryTest(unittest.TestCase):
def setUp(self):
pass
def test_try(self):
with self.subTest("Test failure"):
def throw_test_exception():
raise TestException("error")
t = Try.of(throw_test_exception)
with self.assertRaises(TestException):
t.get()
t = Try.failure(TestException("error"))
with self.assertRaises(TestException):
t.get()
with self.subTest("Test success"):
def complete_successfully():
return 42
t = Try.of(complete_successfully)
self.assertEqual(42, t.get())
t2 = t.handle(lambda value, err: value * 2)
self.assertEqual(84, t2.get())