From 016b104d5b50b103f171a41cd8c68faf1c296b5d Mon Sep 17 00:00:00 2001 From: Walter Oggioni Date: Tue, 22 Oct 2024 23:11:18 +0800 Subject: [PATCH] mardwown rendering offloaded to the default threadpool --- src/bugis/asgi.py | 45 ++++++++++--- src/bugis/async_watchdog.py | 17 +++-- src/bugis/default-conf/logging.yaml | 8 ++- src/bugis/file_watch.py | 98 ----------------------------- src/bugis/md2html.py | 23 +++++-- src/bugis/server.py | 71 ++++++++++++--------- 6 files changed, 112 insertions(+), 150 deletions(-) delete mode 100644 src/bugis/file_watch.py diff --git a/src/bugis/asgi.py b/src/bugis/asgi.py index e047c29..afa506a 100644 --- a/src/bugis/asgi.py +++ b/src/bugis/asgi.py @@ -1,11 +1,8 @@ import logging from logging.config import dictConfig as configure_logging from os import environ -from pathlib import Path - -from pwo import Maybe from yaml import safe_load -from .server import Server +from pathlib import Path logging_configuration_file = environ.get("LOGGING_CONFIGURATION_FILE", Path(__file__).parent / 'default-conf' / 'logging.yaml') with open(logging_configuration_file, 'r') as input_file: @@ -13,15 +10,49 @@ with open(logging_configuration_file, 'r') as input_file: configure_logging(conf) +from pwo import Maybe +from .server import Server +from asyncio import get_running_loop + log = logging.getLogger('access') log.propagate = False _server = None + +def decode_headers(headers): + result = dict() + for key, value in headers: + if isinstance(key, bytes): + key = key.decode() + if isinstance(value, bytes): + value = value.decode() + l = result.setdefault(key, list()) + l.append(value) + return { + k: tuple(v) for k, v in result.items() + } + async def application(ctx, receive, send): global _server if _server is None: - _server = Server(prefix=None) - log.info(None, extra=ctx) + _server = Server(loop=get_running_loop(), prefix=None) + + def maybe_log(evt): + d = { + 'response_headers': (Maybe.of_nullable(evt.get('headers')) + .map(decode_headers) + .or_none()), + 'status': evt['status'] + } + log.info(None, extra=dict(**{k : v for k, v in d.items() if k is not None}, **ctx)) + def wrapped_send(*args, **kwargs): + result = send(*args, **kwargs) + (Maybe.of(args) + .filter(lambda it: len(it) > 0) + .map(lambda it: it[0]) + .filter(lambda it: it.get('type') == 'http.response.start') + .if_present(maybe_log)) + return result await _server.handle_request( ctx['method'], ctx['path'], @@ -31,6 +62,6 @@ async def application(ctx, receive, send): .map(lambda it: it.decode()) .or_else(None), Maybe.of_nullable(ctx.get('query_string', None)).map(lambda it: it.decode()).or_else(None), - send + wrapped_send ) diff --git a/src/bugis/async_watchdog.py b/src/bugis/async_watchdog.py index f5b9b17..ddba655 100644 --- a/src/bugis/async_watchdog.py +++ b/src/bugis/async_watchdog.py @@ -1,15 +1,14 @@ import asyncio -import logging from watchdog.events import FileSystemEventHandler, FileSystemEvent, PatternMatchingEventHandler from watchdog.observers import Observer from watchdog.events import FileMovedEvent, FileClosedEvent, FileCreatedEvent, FileModifiedEvent from pathlib import Path from asyncio import Queue, AbstractEventLoop, Future, CancelledError, Task -from typing import Optional, Callable -from logging import getLogger +from typing import Callable +from logging import getLogger, Logger -log : logging.Logger = getLogger(__name__) +log: Logger = getLogger(__name__) class Subscription: _unsubscribe_callback: Callable[['Subscription'], None] @@ -21,6 +20,7 @@ class Subscription: self._event: Future = loop.create_future() self._loop = loop + def unsubscribe(self) -> None: self._unsubscribe_callback(self) log.debug('Deleted subscription %s', id(self)) @@ -139,7 +139,6 @@ class FileWatcher(PatternMatchingEventHandler): case_sensitive=True) self._observer: Observer = Observer() self._observer.schedule(self, path=path, recursive=True) - self.logger = getLogger(FileWatcher.__name__) self._loop = asyncio.get_running_loop() self._subscription_manager = SubscriptionManager(self._loop) self._loop.run_in_executor(None, self._observer.start) @@ -164,14 +163,14 @@ class FileWatcher(PatternMatchingEventHandler): self._subscription_manager.post_event(path) if isinstance(event, FileClosedEvent): - self.logger.debug("Closed %s: %s", what, event.src_path) + log.debug("Closed %s: %s", what, event.src_path) # update_subscriptions() elif isinstance(event, FileMovedEvent): - self.logger.debug("Moved %s: %s to %s", what, event.src_path, event.dest_path) + log.debug("Moved %s: %s to %s", what, event.src_path, event.dest_path) post_event(event.dest_path) elif isinstance(event, FileCreatedEvent): - self.logger.debug("Created %s: %s", what, event.src_path) + log.debug("Created %s: %s", what, event.src_path) post_event(event.src_path) elif isinstance(event, FileModifiedEvent): - self.logger.debug("Modified %s: %s", what, event.src_path) + log.debug("Modified %s: %s", what, event.src_path) post_event(event.src_path) diff --git a/src/bugis/default-conf/logging.yaml b/src/bugis/default-conf/logging.yaml index 4d9ea47..54c12d0 100644 --- a/src/bugis/default-conf/logging.yaml +++ b/src/bugis/default-conf/logging.yaml @@ -4,7 +4,7 @@ handlers: console: class : logging.StreamHandler formatter: default - level : DEBUG + level : INFO stream : ext://sys.stderr access: class : logging.StreamHandler @@ -19,7 +19,7 @@ formatters: style: '{' datefmt: '%Y-%m-%d %H:%M:%S' request: - format: '{asctime} {client[0]}:{client[1]} HTTP/{http_version} {method} {path}' + format: '{asctime} {client[0]}:{client[1]} HTTP/{http_version} {method} {path} - {status}' style: '{' datefmt: '%Y-%m-%d %H:%M:%S' loggers: @@ -28,4 +28,8 @@ loggers: level: DEBUG access: handlers: [access] + level: INFO + watchdog.observers.inotify_buffer: + level: INFO + MARKDOWN: level: INFO \ No newline at end of file diff --git a/src/bugis/file_watch.py b/src/bugis/file_watch.py deleted file mode 100644 index 458cdc3..0000000 --- a/src/bugis/file_watch.py +++ /dev/null @@ -1,98 +0,0 @@ -from threading import Lock -from typing import Optional, Callable -from os import getcwd -from watchdog.events import PatternMatchingEventHandler, FileSystemEvent, \ - FileCreatedEvent, FileModifiedEvent, FileClosedEvent, FileMovedEvent -from watchdog.observers import Observer -import logging -# from gevent.event import Event -from asyncio import Future, BaseEventLoop - -class Subscription: - _unsubscribe_callback: Callable[['Subscription'], None] - _event: Future - _loop: BaseEventLoop - - def __init__(self, unsubscribe: Callable[['Subscription'], None], loop: BaseEventLoop): - self._unsubscribe_callback = unsubscribe - self._event: Future = loop.create_future() - self._loop = loop - - def unsubscribe(self) -> None: - self._unsubscribe_callback(self) - - async def wait(self, tout: float) -> bool: - handle = self._loop.call_later(tout, lambda: self._event.cancel()) - await self._event - return self._event.wait(tout) - - def notify(self) -> None: - self._event.set_result(None) - - def reset(self) -> None: - self._event = self._loop.create_future() - - -class FileWatcher(PatternMatchingEventHandler): - def __init__(self, path): - super().__init__(patterns=['*.md'], - ignore_patterns=None, - ignore_directories=False, - case_sensitive=True) - self.subscriptions: dict[str, set[Subscription]] = dict() - self.observer: Observer = Observer() - self.observer.schedule(self, path=path, recursive=True) - self.observer.start() - self.logger = logging.getLogger(FileWatcher.__name__) - self._lock = Lock() - - def subscribe(self, path: str) -> Subscription: - subscriptions = self.subscriptions - subscriptions_per_path = subscriptions.setdefault(path, set()) - - def unsubscribe_callback(subscription): - with self._lock: - subscriptions_per_path.remove(subscription) - - result = Subscription(unsubscribe_callback) - subscriptions_per_path.add(result) - return result - - def stop(self) -> None: - self.observer.stop() - self.observer.join() - - def on_any_event(self, event: FileSystemEvent) -> None: - what = "directory" if event.is_directory else "file" - - def notify_subscriptions(path): - with self._lock: - subscriptions = self.subscriptions - subscriptions_per_path = subscriptions.get(path, None) - if subscriptions_per_path: - for s in subscriptions_per_path: - s.notify() - - if isinstance(event, FileClosedEvent): - self.logger.debug("Closed %s: %s", what, event.src_path) - # update_subscriptions() - elif isinstance(event, FileMovedEvent): - self.logger.debug("Moved %s: %s to %s", what, event.src_path, event.dest_path) - notify_subscriptions(event.dest_path) - elif isinstance(event, FileCreatedEvent): - self.logger.debug("Created %s: %s", what, event.src_path) - notify_subscriptions(event.src_path) - elif isinstance(event, FileModifiedEvent): - self.logger.debug("Modified %s: %s", what, event.src_path) - notify_subscriptions(event.src_path) - - -if __name__ == '__main__': - - logging.basicConfig( - level=logging.DEBUG, - format='%(asctime)s [%(threadName)s] (%(name)s) %(levelname)s %(message)s' - ) - watcher = FileWatcher(getcwd()) - watcher.observer.join() - diff --git a/src/bugis/md2html.py b/src/bugis/md2html.py index fac045e..82f464d 100644 --- a/src/bugis/md2html.py +++ b/src/bugis/md2html.py @@ -3,7 +3,9 @@ from os.path import dirname, join, relpath from time import time from typing import Optional, TYPE_CHECKING from aiofiles import open as async_open +from asyncio import AbstractEventLoop import markdown +import logging if TYPE_CHECKING: from _typeshed import StrOrBytesPath @@ -19,6 +21,8 @@ STATIC_CACHE: dict[str, tuple[str, float]] = {} MARDOWN_EXTENSIONS = ['extra', 'smarty', 'tables', 'codehilite'] +logger = logging.getLogger(__name__) + async def load_from_cache(path) -> tuple[str, float]: global STATIC_CACHE @@ -29,12 +33,21 @@ async def load_from_cache(path) -> tuple[str, float]: async def compile_html(url_path, - mdfile: 'StrOrBytesPath', - prefix: Optional['StrOrBytesPath'] = None, - extensions: Optional[list[str]] = None, - raw: bool = False) -> str: + mdfile: 'StrOrBytesPath', + loop: AbstractEventLoop, + prefix: Optional['StrOrBytesPath'] = None, + extensions: Optional[list[str]] = None, + raw: bool = False) -> str: async with mdfile and async_open(mdfile, 'r') or sys.stdin as instream: - html = markdown.markdown(await instream.read(), extensions=extensions, output_format='html') + src = await instream.read() + + def render(source) -> str: + logger.debug("Starting markdown rendering for file '%s'", mdfile) + result = markdown.markdown(source, extensions=extensions, output_format='html') + logger.debug("Markdown rendering for file '%s' completed", mdfile) + return result + + html = await loop.run_in_executor(None, render, src) if raw: doc = html else: diff --git a/src/bugis/server.py b/src/bugis/server.py index fa61019..bd57ae6 100644 --- a/src/bugis/server.py +++ b/src/bugis/server.py @@ -10,7 +10,7 @@ from aiofiles.ospath import exists, isdir, isfile, getmtime from aiofiles import open as async_open from aiofiles.base import AiofilesContextManager from aiofiles.threadpool.binary import AsyncBufferedReader -from asyncio import get_running_loop +from asyncio import AbstractEventLoop from .md2html import compile_html, load_from_cache, STATIC_RESOURCES, MARDOWN_EXTENSIONS from shutil import which @@ -45,15 +45,20 @@ def is_markdown(filepath): def is_dotfile(filepath): return has_extension(filepath, ".dot") +logger = logging.getLogger(__name__) class Server: + _loop : AbstractEventLoop - def __init__(self, root_dir: 'StrOrBytesPath' = getcwd(), prefix: Optional['StrOrBytesPath'] = None): + def __init__(self, + root_dir: 'StrOrBytesPath' = getcwd(), + prefix: Optional['StrOrBytesPath'] = None, + loop: AbstractEventLoop = None): self.root_dir = root_dir self.cache = dict['StrOrBytesPath', tuple[str, float]]() self.file_watcher = FileWatcher(cwd) - self.logger = logging.getLogger(Server.__name__) self.prefix = prefix and normpath(f'{prefix.decode()}') + self._loop = loop async def handle_request(self, method: str, url_path: str, etag: Optional[str], query_string: Optional[str], send): if method != 'GET': @@ -75,7 +80,7 @@ class Server: etag, digest = await self.compute_etag_and_digest( etag, url_path, - lambda: AiofilesContextManager(completed_future(AsyncBufferedReader(BytesIO(content), loop=get_running_loop(), executor=None))), + lambda: AiofilesContextManager(completed_future(AsyncBufferedReader(BytesIO(content), loop=self._loop, executor=None))), lambda: completed_future(mtime) ) if etag and etag == digest: @@ -87,10 +92,9 @@ class Server: 'type': 'http.response.start', 'status': 200, 'headers': [ - (b'content-type', f'{mime_type}; charset=UTF-8'.encode()), - (b'etag', f'W/"{digest}"'.encode()), - (b'content-type', f'{mime_type}; charset=UTF-8'.encode()), - (b'Cache-Control', b'must-revalidate, max-age=86400'), + ('content-type', f'{mime_type}; charset=UTF-8'), + ('etag', f'W/"{digest}"'.encode()), + ('Cache-Control', 'must-revalidate, max-age=86400'), ] }) await send({ @@ -106,7 +110,7 @@ class Server: lambda: async_open(path, 'rb'), lambda: getmtime(path) ) - self.logger.debug('%s %s', etag, digest) + logger.debug('Etag: %s, digest: %s', etag, digest) if etag and etag == digest: if is_markdown(path) and query_string == 'reload': subscription = self.file_watcher.subscribe(path) @@ -133,15 +137,20 @@ class Server: raw = query_string == 'reload' await self.render_markdown(url_path, path, raw, digest, send) elif is_dotfile(path) and which("dot"): - graph = pgv.AGraph(path) - body = graph.draw(None, format="svg", prog="dot") + def render_graphviz(filepath: StrOrBytesPath) -> bytes: + logger.debug("Starting Graphviz rendering for file '%s'", filepath) + graph = pgv.AGraph(filepath) + result = graph.draw(None, format="svg", prog="dot") + logger.debug("Completed Graphviz rendering for file '%s'", filepath) + return result + body = await self._loop.run_in_executor(None, render_graphviz, path) await send({ 'type': 'http.response.start', 'status': 200, 'headers': ( - (b'Content-Type', b'image/svg+xml; charset=UTF-8'), - (b'Etag', f'W/"{digest}"'.encode()), - (b'Cache-Control', b'no-cache'), + ('Content-Type', 'image/svg+xml; charset=UTF-8'), + ('Etag', f'W/"{digest}"'), + ('Cache-Control', 'no-cache'), ) }) await send({ @@ -162,9 +171,9 @@ class Server: 'type': 'http.response.start', 'status': 200, 'headers': ( - (b'Content-Type', guess_type(basename(path))[0].encode() or b'application/octet-stream'), - (b'Etag', f'W/"{digest}"'), - (b'Cache-Control', b'no-cache') + ('Content-Type', guess_type(basename(path))[0] or 'application/octet-stream'), + ('Etag', f'W/"{digest}"'), + ('Cache-Control', 'no-cache') ) }) @@ -186,7 +195,7 @@ class Server: 'type': 'http.response.start', 'status': 200, 'headers': ( - (b'Content-Type', b'text/html; charset=UTF-8'), + ('Content-Type', 'text/html; charset=UTF-8'), ) }) await send({ @@ -266,26 +275,30 @@ class Server: path: str, raw: bool, digest: str, - send) -> list[bytes]: - body = (await compile_html(url_path, - path, - self.prefix, - MARDOWN_EXTENSIONS, - raw=raw)).encode() + send) -> None: + + body = await compile_html( + url_path, + path, + self._loop, + self.prefix, + MARDOWN_EXTENSIONS, + raw=raw + ) await send({ 'type': 'http.response.start', 'status': 200, 'headers': ( - (b'Content-Type', b'text/html; charset=UTF-8'), - (b'Etag', f'W/{digest}'.encode()), - (b'Cache-Control', b'no-cache'), + ('Content-Type', 'text/html; charset=UTF-8'), + ('Etag', f'W/{digest}'), + ('Cache-Control', 'no-cache'), ) }) await send({ 'type': 'http.response.body', - 'body': body + 'body': body.encode() }) - return + @staticmethod async def not_modified(send, digest: str, cache_control=('Cache-Control', 'no-cache')) -> []: await send({