5 Commits

Author SHA1 Message Date
a417c7484b fixed mypy
All checks were successful
CI / build (push) Successful in 42s
2024-12-01 15:38:54 +08:00
04aae1f976 switched builder to hostinger
Some checks failed
CI / build (push) Failing after 40s
2024-12-01 15:29:34 +08:00
79246f70c4 improved index_of_with_escape
Some checks are pending
CI / build (push) Waiting to run
2024-11-18 09:00:23 +08:00
36f7031fea added index_of_with_escape function 2024-11-11 06:06:22 +08:00
37591f78d9 simplified notification code 2024-11-03 23:02:01 +08:00
5 changed files with 78 additions and 20 deletions

View File

@@ -5,7 +5,7 @@ on:
- '*' - '*'
jobs: jobs:
build: build:
runs-on: woryzen runs-on: hostinger
steps: steps:
- name: Checkout sources - name: Checkout sources
uses: actions/checkout@v4 uses: actions/checkout@v4

View File

@@ -9,6 +9,7 @@ from .private import (
classproperty, classproperty,
AsyncQueueIterator, AsyncQueueIterator,
aenumerate, aenumerate,
index_of_with_escape
) )
from .maybe import Maybe from .maybe import Maybe
from .notification import TopicManager, Subscriber from .notification import TopicManager, Subscriber
@@ -28,5 +29,6 @@ __all__ = [
'Subscriber', 'Subscriber',
'AsyncQueueIterator', 'AsyncQueueIterator',
'aenumerate', 'aenumerate',
'Try' 'Try',
'index_of_with_escape'
] ]

View File

@@ -1,5 +1,5 @@
from .private import AsyncQueueIterator from .private import AsyncQueueIterator
from asyncio import Queue, AbstractEventLoop, Future, CancelledError from asyncio import Queue, AbstractEventLoop, Future, CancelledError, timeout
from typing import Callable, Optional from typing import Callable, Optional
from logging import getLogger from logging import getLogger
@@ -23,25 +23,16 @@ class Subscriber:
self._unsubscribe_callback(self) self._unsubscribe_callback(self)
log.debug('Deleted subscriber %s', id(self)) log.debug('Deleted subscriber %s', id(self))
async def wait(self, tout: float) -> bool: async def wait(self, tout: Optional[float]) -> bool:
self._event = self._loop.create_future()
def callback() -> None: future: Future[bool] = self._loop.create_future()
evt = self._event self._event = future
if evt is None:
raise ValueError('Event is None')
evt.cancel()
if not evt.done():
evt.set_result(False)
handle = self._loop.call_later(tout, callback)
try: try:
log.debug('Subscriber %s is waiting for an event', id(self)) async with timeout(tout):
return await self._event log.debug('Subscriber %s is waiting for an event', id(self))
except CancelledError: return await future
except TimeoutError:
return False return False
finally:
handle.cancel()
def notify(self) -> None: def notify(self) -> None:
log.debug('Subscriber %s notified', id(self)) log.debug('Subscriber %s notified', id(self))

View File

@@ -252,3 +252,34 @@ class aenumerate[T](AsyncIterator[Tuple[int, T]]):
val = await self._ait.__anext__() val = await self._ait.__anext__()
self._i += 1 self._i += 1
return self._i, val return self._i, val
def index_of_with_escape(haystack: str, needle: str, escape: str, begin: int, end: int = 0) -> int:
result = -1
cursor = begin
if end == 0:
end = len(haystack)
escape_count = 0
while cursor < end:
c = haystack[cursor]
if escape_count > 0:
escape_count -= 1
if c[0] == escape:
result = -1
elif escape_count == 0:
if c[0] == escape:
escape_count += 1
if cursor + len(needle) <= len(haystack):
test = haystack[cursor:cursor + len(needle)]
if test == needle:
result = cursor
if result >= 0 and escape_count == 0:
break
cursor += 1
return result

View File

@@ -1,6 +1,6 @@
import unittest import unittest
from pwo import retry, async_retry, async_test, AsyncQueueIterator, aenumerate from pwo import retry, async_retry, async_test, AsyncQueueIterator, aenumerate, index_of_with_escape
from asyncio import Queue from asyncio import Queue
@@ -93,3 +93,37 @@ class PrivateTest(unittest.TestCase):
self.assertEqual(queue_size, processed) self.assertEqual(queue_size, processed)
class TestIndexOfWithEscape(unittest.TestCase):
def run_test_case(self, haystack, needle, escape, expected_solution):
solution = []
i = 0
while True:
i = index_of_with_escape(haystack, needle, escape, i, len(haystack))
if i < 0:
break
solution.append(i)
i += 1
self.assertEqual(expected_solution, solution)
def test_simple(self):
self.run_test_case(" dsds $sdsa \\$dfivbdsf \\\\$sdgsga", '$', '\\', [6, 25])
def test_simple2(self):
self.run_test_case("asdasd$$vdfv$", '$', '$', [12])
def test_no_needle(self):
self.run_test_case("asdasd$$vdfv$", '#', '\\', [])
def test_escaped_needle(self):
self.run_test_case("asdasd$$vdfv$#sdfs", '#', '$', [])
def test_not_escaped_needle(self):
self.run_test_case("asdasd$$#vdfv$#sdfs", '#', '$', [8])
def test_special_case(self):
self.run_test_case("\n${sys:user.home}${env:HOME}", ':', '\\', [6, 22])
def test_wide_needle(self):
self.run_test_case("asdasd\\${#vdfv|${#sdfs}", '${', '\\', [15])