Files
pwo/src/pwo/topic.py
Walter Oggioni 28543921c3
All checks were successful
CI / build (push) Successful in 18s
updated dependencies
added topic class
2024-10-24 02:50:00 +08:00

92 lines
3.0 KiB
Python

from .private import AsyncQueueIterator
from asyncio import Queue, AbstractEventLoop, Future, CancelledError
from typing import Callable, Optional
from logging import getLogger
log = getLogger(__name__)
class Subscriber:
_unsubscribe_callback: Callable[['Subscriber'], None]
_event: Optional[Future]
_loop: AbstractEventLoop
def __init__(self, unsubscribe: Callable[['Subscriber'], None], loop: AbstractEventLoop):
self._unsubscribe_callback = unsubscribe
self._event: Optional[Future] = None
self._loop = loop
def unsubscribe(self) -> None:
self._event.cancel()
self._unsubscribe_callback(self)
log.debug('Deleted subscriber %s', id(self))
async def wait(self, tout: float) -> bool:
self._event = self._loop.create_future()
def callback():
if not self._event.done():
self._event.set_result(False)
handle = self._loop.call_later(tout, callback)
try:
log.debug('Subscriber %s is waiting for an event', id(self))
return await self._event
except CancelledError:
return False
finally:
handle.cancel()
def notify(self) -> None:
log.debug('Subscriber %s notified', id(self))
if not self._event.done():
self._event.set_result(True)
def reset(self) -> None:
self._event = self._loop.create_future()
class Topic:
_loop: AbstractEventLoop
_queue: Queue
_subscriptions: dict[str, set[Subscriber]]
def __init__(self, loop: AbstractEventLoop):
self._subscriptions: dict[str, set[Subscriber]] = dict()
self._loop = loop
self._queue = Queue()
def subscribe(self, path: str) -> Subscriber:
subscriptions = self._subscriptions
subscriptions_per_path = subscriptions.setdefault(path, set())
def unsubscribe_callback(subscription):
subscriptions_per_path.remove(subscription)
log.debug('Unsubscribed %s from topic %s', id(result), path)
result = Subscriber(unsubscribe_callback, self._loop)
log.debug('Created subscriber %s to topic %s', id(result), path)
subscriptions_per_path.add(result)
return result
def _notify_subscriptions(self, path):
subscriptions = self._subscriptions
subscriptions_per_path = subscriptions.get(path, None)
if subscriptions_per_path:
log.debug(f"Subscribers on '{path}': {len(subscriptions_per_path)}")
for s in subscriptions_per_path:
s.notify()
async def process_events(self):
async for evt in AsyncQueueIterator(self._queue):
log.debug(f"Processed event for path '{evt}'")
self._notify_subscriptions(evt)
log.debug(f"Event processor has completed")
def post_event(self, path):
def callback():
self._queue.put_nowait(path)
log.debug(f"Posted event for topic '{path}', queue size: {self._queue.qsize()}")
self._loop.call_soon_threadsafe(callback)