simplified notification code

This commit is contained in:
2024-11-03 23:02:01 +08:00
parent 6ef7f8107e
commit 37591f78d9

View File

@@ -1,5 +1,5 @@
from .private import AsyncQueueIterator
from asyncio import Queue, AbstractEventLoop, Future, CancelledError
from asyncio import Queue, AbstractEventLoop, Future, CancelledError, timeout
from typing import Callable, Optional
from logging import getLogger
@@ -23,25 +23,16 @@ class Subscriber:
self._unsubscribe_callback(self)
log.debug('Deleted subscriber %s', id(self))
async def wait(self, tout: float) -> bool:
self._event = self._loop.create_future()
async def wait(self, tout: Optional[float]) -> bool:
def callback() -> None:
evt = self._event
if evt is None:
raise ValueError('Event is None')
evt.cancel()
if not evt.done():
evt.set_result(False)
handle = self._loop.call_later(tout, callback)
future = self._loop.create_future()
self._event = future
try:
log.debug('Subscriber %s is waiting for an event', id(self))
return await self._event
except CancelledError:
async with timeout(tout):
log.debug('Subscriber %s is waiting for an event', id(self))
await future
except TimeoutError:
return False
finally:
handle.cancel()
def notify(self) -> None:
log.debug('Subscriber %s notified', id(self))