diff --git a/src/pwo/__init__.py b/src/pwo/__init__.py index 5680529..c88c400 100644 --- a/src/pwo/__init__.py +++ b/src/pwo/__init__.py @@ -9,6 +9,7 @@ from .private import ( classproperty, AsyncQueueIterator, aenumerate, + index_of_with_escape ) from .maybe import Maybe from .notification import TopicManager, Subscriber @@ -28,5 +29,6 @@ __all__ = [ 'Subscriber', 'AsyncQueueIterator', 'aenumerate', - 'Try' + 'Try', + 'index_of_with_escape' ] diff --git a/src/pwo/private.py b/src/pwo/private.py index 72252e1..3101516 100644 --- a/src/pwo/private.py +++ b/src/pwo/private.py @@ -252,3 +252,32 @@ class aenumerate[T](AsyncIterator[Tuple[int, T]]): val = await self._ait.__anext__() self._i += 1 return self._i, val + + +def index_of_with_escape(haystack: str, needle: str, escape: str, begin: int, end: int = 0) -> int: + result = -1 + cursor = begin + if end == 0: + end = len(haystack) + escape_count = 0 + + while cursor < end: + c = haystack[cursor] + + if escape_count > 0: + escape_count -= 1 + if c == escape: + result = -1 + elif escape_count == 0: + if c == escape: + escape_count += 1 + if c == needle: + result = cursor + + if result >= 0 and escape_count == 0: + break + + cursor += 1 + + return result + diff --git a/tests/test_private.py b/tests/test_private.py index 2b62b37..0558205 100644 --- a/tests/test_private.py +++ b/tests/test_private.py @@ -1,6 +1,6 @@ import unittest -from pwo import retry, async_retry, async_test, AsyncQueueIterator, aenumerate +from pwo import retry, async_retry, async_test, AsyncQueueIterator, aenumerate, index_of_with_escape from asyncio import Queue @@ -93,3 +93,35 @@ class PrivateTest(unittest.TestCase): self.assertEqual(queue_size, processed) +class TestIndexOfWithEscape(unittest.TestCase): + + def run_test_case(self, haystack, needle, escape, expected_solution): + solution = [] + i = 0 + while True: + i = index_of_with_escape(haystack, needle, escape, i, len(haystack)) + if i < 0: + break + solution.append(i) + i += 1 + self.assertEqual(solution, expected_solution) + + def test_simple(self): + self.run_test_case(" dsds $sdsa \\$dfivbdsf \\\\$sdgsga", '$', '\\', [6, 25]) + + def test_simple2(self): + self.run_test_case("asdasd$$vdfv$", '$', '$', [12]) + + def test_no_needle(self): + self.run_test_case("asdasd$$vdfv$", '#', '\\', []) + + def test_escaped_needle(self): + self.run_test_case("asdasd$$vdfv$#sdfs", '#', '$', []) + + def test_not_escaped_needle(self): + self.run_test_case("asdasd$$#vdfv$#sdfs", '#', '$', [8]) + + def test_special_case(self): + self.run_test_case("\n${sys:user.home}${env:HOME}", ':', '\\', [6, 22]) + +