diff --git a/src/pwo/notification.py b/src/pwo/notification.py index 16de0f8..524e76e 100644 --- a/src/pwo/notification.py +++ b/src/pwo/notification.py @@ -1,5 +1,5 @@ from .private import AsyncQueueIterator -from asyncio import Queue, AbstractEventLoop, Future, CancelledError +from asyncio import Queue, AbstractEventLoop, Future, CancelledError, timeout from typing import Callable, Optional from logging import getLogger @@ -23,25 +23,16 @@ class Subscriber: self._unsubscribe_callback(self) log.debug('Deleted subscriber %s', id(self)) - async def wait(self, tout: float) -> bool: - self._event = self._loop.create_future() + async def wait(self, tout: Optional[float]) -> bool: - def callback() -> None: - evt = self._event - if evt is None: - raise ValueError('Event is None') - evt.cancel() - if not evt.done(): - evt.set_result(False) - - handle = self._loop.call_later(tout, callback) + future = self._loop.create_future() + self._event = future try: - log.debug('Subscriber %s is waiting for an event', id(self)) - return await self._event - except CancelledError: + async with timeout(tout): + log.debug('Subscriber %s is waiting for an event', id(self)) + await future + except TimeoutError: return False - finally: - handle.cancel() def notify(self) -> None: log.debug('Subscriber %s notified', id(self))