5 Commits

Author SHA1 Message Date
a417c7484b fixed mypy
All checks were successful
CI / build (push) Successful in 42s
2024-12-01 15:38:54 +08:00
04aae1f976 switched builder to hostinger
Some checks failed
CI / build (push) Failing after 40s
2024-12-01 15:29:34 +08:00
79246f70c4 improved index_of_with_escape
Some checks are pending
CI / build (push) Waiting to run
2024-11-18 09:00:23 +08:00
36f7031fea added index_of_with_escape function 2024-11-11 06:06:22 +08:00
37591f78d9 simplified notification code 2024-11-03 23:02:01 +08:00
5 changed files with 78 additions and 20 deletions

View File

@@ -5,7 +5,7 @@ on:
- '*'
jobs:
build:
runs-on: woryzen
runs-on: hostinger
steps:
- name: Checkout sources
uses: actions/checkout@v4

View File

@@ -9,6 +9,7 @@ from .private import (
classproperty,
AsyncQueueIterator,
aenumerate,
index_of_with_escape
)
from .maybe import Maybe
from .notification import TopicManager, Subscriber
@@ -28,5 +29,6 @@ __all__ = [
'Subscriber',
'AsyncQueueIterator',
'aenumerate',
'Try'
'Try',
'index_of_with_escape'
]

View File

@@ -1,5 +1,5 @@
from .private import AsyncQueueIterator
from asyncio import Queue, AbstractEventLoop, Future, CancelledError
from asyncio import Queue, AbstractEventLoop, Future, CancelledError, timeout
from typing import Callable, Optional
from logging import getLogger
@@ -23,25 +23,16 @@ class Subscriber:
self._unsubscribe_callback(self)
log.debug('Deleted subscriber %s', id(self))
async def wait(self, tout: float) -> bool:
self._event = self._loop.create_future()
async def wait(self, tout: Optional[float]) -> bool:
def callback() -> None:
evt = self._event
if evt is None:
raise ValueError('Event is None')
evt.cancel()
if not evt.done():
evt.set_result(False)
handle = self._loop.call_later(tout, callback)
future: Future[bool] = self._loop.create_future()
self._event = future
try:
log.debug('Subscriber %s is waiting for an event', id(self))
return await self._event
except CancelledError:
async with timeout(tout):
log.debug('Subscriber %s is waiting for an event', id(self))
return await future
except TimeoutError:
return False
finally:
handle.cancel()
def notify(self) -> None:
log.debug('Subscriber %s notified', id(self))

View File

@@ -252,3 +252,34 @@ class aenumerate[T](AsyncIterator[Tuple[int, T]]):
val = await self._ait.__anext__()
self._i += 1
return self._i, val
def index_of_with_escape(haystack: str, needle: str, escape: str, begin: int, end: int = 0) -> int:
result = -1
cursor = begin
if end == 0:
end = len(haystack)
escape_count = 0
while cursor < end:
c = haystack[cursor]
if escape_count > 0:
escape_count -= 1
if c[0] == escape:
result = -1
elif escape_count == 0:
if c[0] == escape:
escape_count += 1
if cursor + len(needle) <= len(haystack):
test = haystack[cursor:cursor + len(needle)]
if test == needle:
result = cursor
if result >= 0 and escape_count == 0:
break
cursor += 1
return result

View File

@@ -1,6 +1,6 @@
import unittest
from pwo import retry, async_retry, async_test, AsyncQueueIterator, aenumerate
from pwo import retry, async_retry, async_test, AsyncQueueIterator, aenumerate, index_of_with_escape
from asyncio import Queue
@@ -93,3 +93,37 @@ class PrivateTest(unittest.TestCase):
self.assertEqual(queue_size, processed)
class TestIndexOfWithEscape(unittest.TestCase):
def run_test_case(self, haystack, needle, escape, expected_solution):
solution = []
i = 0
while True:
i = index_of_with_escape(haystack, needle, escape, i, len(haystack))
if i < 0:
break
solution.append(i)
i += 1
self.assertEqual(expected_solution, solution)
def test_simple(self):
self.run_test_case(" dsds $sdsa \\$dfivbdsf \\\\$sdgsga", '$', '\\', [6, 25])
def test_simple2(self):
self.run_test_case("asdasd$$vdfv$", '$', '$', [12])
def test_no_needle(self):
self.run_test_case("asdasd$$vdfv$", '#', '\\', [])
def test_escaped_needle(self):
self.run_test_case("asdasd$$vdfv$#sdfs", '#', '$', [])
def test_not_escaped_needle(self):
self.run_test_case("asdasd$$#vdfv$#sdfs", '#', '$', [8])
def test_special_case(self):
self.run_test_case("\n${sys:user.home}${env:HOME}", ':', '\\', [6, 22])
def test_wide_needle(self):
self.run_test_case("asdasd\\${#vdfv|${#sdfs}", '${', '\\', [15])