mardwown rendering offloaded to the default threadpool
All checks were successful
CI / Build Pip package (push) Successful in 17s
CI / Build Docker image (push) Successful in 1m4s

This commit is contained in:
2024-10-22 23:11:18 +08:00
parent 011042bc15
commit 016b104d5b
6 changed files with 112 additions and 150 deletions

View File

@@ -1,11 +1,8 @@
import logging import logging
from logging.config import dictConfig as configure_logging from logging.config import dictConfig as configure_logging
from os import environ from os import environ
from pathlib import Path
from pwo import Maybe
from yaml import safe_load from yaml import safe_load
from .server import Server from pathlib import Path
logging_configuration_file = environ.get("LOGGING_CONFIGURATION_FILE", Path(__file__).parent / 'default-conf' / 'logging.yaml') logging_configuration_file = environ.get("LOGGING_CONFIGURATION_FILE", Path(__file__).parent / 'default-conf' / 'logging.yaml')
with open(logging_configuration_file, 'r') as input_file: with open(logging_configuration_file, 'r') as input_file:
@@ -13,15 +10,49 @@ with open(logging_configuration_file, 'r') as input_file:
configure_logging(conf) configure_logging(conf)
from pwo import Maybe
from .server import Server
from asyncio import get_running_loop
log = logging.getLogger('access') log = logging.getLogger('access')
log.propagate = False log.propagate = False
_server = None _server = None
def decode_headers(headers):
result = dict()
for key, value in headers:
if isinstance(key, bytes):
key = key.decode()
if isinstance(value, bytes):
value = value.decode()
l = result.setdefault(key, list())
l.append(value)
return {
k: tuple(v) for k, v in result.items()
}
async def application(ctx, receive, send): async def application(ctx, receive, send):
global _server global _server
if _server is None: if _server is None:
_server = Server(prefix=None) _server = Server(loop=get_running_loop(), prefix=None)
log.info(None, extra=ctx)
def maybe_log(evt):
d = {
'response_headers': (Maybe.of_nullable(evt.get('headers'))
.map(decode_headers)
.or_none()),
'status': evt['status']
}
log.info(None, extra=dict(**{k : v for k, v in d.items() if k is not None}, **ctx))
def wrapped_send(*args, **kwargs):
result = send(*args, **kwargs)
(Maybe.of(args)
.filter(lambda it: len(it) > 0)
.map(lambda it: it[0])
.filter(lambda it: it.get('type') == 'http.response.start')
.if_present(maybe_log))
return result
await _server.handle_request( await _server.handle_request(
ctx['method'], ctx['method'],
ctx['path'], ctx['path'],
@@ -31,6 +62,6 @@ async def application(ctx, receive, send):
.map(lambda it: it.decode()) .map(lambda it: it.decode())
.or_else(None), .or_else(None),
Maybe.of_nullable(ctx.get('query_string', None)).map(lambda it: it.decode()).or_else(None), Maybe.of_nullable(ctx.get('query_string', None)).map(lambda it: it.decode()).or_else(None),
send wrapped_send
) )

View File

@@ -1,15 +1,14 @@
import asyncio import asyncio
import logging
from watchdog.events import FileSystemEventHandler, FileSystemEvent, PatternMatchingEventHandler from watchdog.events import FileSystemEventHandler, FileSystemEvent, PatternMatchingEventHandler
from watchdog.observers import Observer from watchdog.observers import Observer
from watchdog.events import FileMovedEvent, FileClosedEvent, FileCreatedEvent, FileModifiedEvent from watchdog.events import FileMovedEvent, FileClosedEvent, FileCreatedEvent, FileModifiedEvent
from pathlib import Path from pathlib import Path
from asyncio import Queue, AbstractEventLoop, Future, CancelledError, Task from asyncio import Queue, AbstractEventLoop, Future, CancelledError, Task
from typing import Optional, Callable from typing import Callable
from logging import getLogger from logging import getLogger, Logger
log : logging.Logger = getLogger(__name__) log: Logger = getLogger(__name__)
class Subscription: class Subscription:
_unsubscribe_callback: Callable[['Subscription'], None] _unsubscribe_callback: Callable[['Subscription'], None]
@@ -21,6 +20,7 @@ class Subscription:
self._event: Future = loop.create_future() self._event: Future = loop.create_future()
self._loop = loop self._loop = loop
def unsubscribe(self) -> None: def unsubscribe(self) -> None:
self._unsubscribe_callback(self) self._unsubscribe_callback(self)
log.debug('Deleted subscription %s', id(self)) log.debug('Deleted subscription %s', id(self))
@@ -139,7 +139,6 @@ class FileWatcher(PatternMatchingEventHandler):
case_sensitive=True) case_sensitive=True)
self._observer: Observer = Observer() self._observer: Observer = Observer()
self._observer.schedule(self, path=path, recursive=True) self._observer.schedule(self, path=path, recursive=True)
self.logger = getLogger(FileWatcher.__name__)
self._loop = asyncio.get_running_loop() self._loop = asyncio.get_running_loop()
self._subscription_manager = SubscriptionManager(self._loop) self._subscription_manager = SubscriptionManager(self._loop)
self._loop.run_in_executor(None, self._observer.start) self._loop.run_in_executor(None, self._observer.start)
@@ -164,14 +163,14 @@ class FileWatcher(PatternMatchingEventHandler):
self._subscription_manager.post_event(path) self._subscription_manager.post_event(path)
if isinstance(event, FileClosedEvent): if isinstance(event, FileClosedEvent):
self.logger.debug("Closed %s: %s", what, event.src_path) log.debug("Closed %s: %s", what, event.src_path)
# update_subscriptions() # update_subscriptions()
elif isinstance(event, FileMovedEvent): elif isinstance(event, FileMovedEvent):
self.logger.debug("Moved %s: %s to %s", what, event.src_path, event.dest_path) log.debug("Moved %s: %s to %s", what, event.src_path, event.dest_path)
post_event(event.dest_path) post_event(event.dest_path)
elif isinstance(event, FileCreatedEvent): elif isinstance(event, FileCreatedEvent):
self.logger.debug("Created %s: %s", what, event.src_path) log.debug("Created %s: %s", what, event.src_path)
post_event(event.src_path) post_event(event.src_path)
elif isinstance(event, FileModifiedEvent): elif isinstance(event, FileModifiedEvent):
self.logger.debug("Modified %s: %s", what, event.src_path) log.debug("Modified %s: %s", what, event.src_path)
post_event(event.src_path) post_event(event.src_path)

View File

@@ -4,7 +4,7 @@ handlers:
console: console:
class : logging.StreamHandler class : logging.StreamHandler
formatter: default formatter: default
level : DEBUG level : INFO
stream : ext://sys.stderr stream : ext://sys.stderr
access: access:
class : logging.StreamHandler class : logging.StreamHandler
@@ -19,7 +19,7 @@ formatters:
style: '{' style: '{'
datefmt: '%Y-%m-%d %H:%M:%S' datefmt: '%Y-%m-%d %H:%M:%S'
request: request:
format: '{asctime} {client[0]}:{client[1]} HTTP/{http_version} {method} {path}' format: '{asctime} {client[0]}:{client[1]} HTTP/{http_version} {method} {path} - {status}'
style: '{' style: '{'
datefmt: '%Y-%m-%d %H:%M:%S' datefmt: '%Y-%m-%d %H:%M:%S'
loggers: loggers:
@@ -29,3 +29,7 @@ loggers:
access: access:
handlers: [access] handlers: [access]
level: INFO level: INFO
watchdog.observers.inotify_buffer:
level: INFO
MARKDOWN:
level: INFO

View File

@@ -1,98 +0,0 @@
from threading import Lock
from typing import Optional, Callable
from os import getcwd
from watchdog.events import PatternMatchingEventHandler, FileSystemEvent, \
FileCreatedEvent, FileModifiedEvent, FileClosedEvent, FileMovedEvent
from watchdog.observers import Observer
import logging
# from gevent.event import Event
from asyncio import Future, BaseEventLoop
class Subscription:
_unsubscribe_callback: Callable[['Subscription'], None]
_event: Future
_loop: BaseEventLoop
def __init__(self, unsubscribe: Callable[['Subscription'], None], loop: BaseEventLoop):
self._unsubscribe_callback = unsubscribe
self._event: Future = loop.create_future()
self._loop = loop
def unsubscribe(self) -> None:
self._unsubscribe_callback(self)
async def wait(self, tout: float) -> bool:
handle = self._loop.call_later(tout, lambda: self._event.cancel())
await self._event
return self._event.wait(tout)
def notify(self) -> None:
self._event.set_result(None)
def reset(self) -> None:
self._event = self._loop.create_future()
class FileWatcher(PatternMatchingEventHandler):
def __init__(self, path):
super().__init__(patterns=['*.md'],
ignore_patterns=None,
ignore_directories=False,
case_sensitive=True)
self.subscriptions: dict[str, set[Subscription]] = dict()
self.observer: Observer = Observer()
self.observer.schedule(self, path=path, recursive=True)
self.observer.start()
self.logger = logging.getLogger(FileWatcher.__name__)
self._lock = Lock()
def subscribe(self, path: str) -> Subscription:
subscriptions = self.subscriptions
subscriptions_per_path = subscriptions.setdefault(path, set())
def unsubscribe_callback(subscription):
with self._lock:
subscriptions_per_path.remove(subscription)
result = Subscription(unsubscribe_callback)
subscriptions_per_path.add(result)
return result
def stop(self) -> None:
self.observer.stop()
self.observer.join()
def on_any_event(self, event: FileSystemEvent) -> None:
what = "directory" if event.is_directory else "file"
def notify_subscriptions(path):
with self._lock:
subscriptions = self.subscriptions
subscriptions_per_path = subscriptions.get(path, None)
if subscriptions_per_path:
for s in subscriptions_per_path:
s.notify()
if isinstance(event, FileClosedEvent):
self.logger.debug("Closed %s: %s", what, event.src_path)
# update_subscriptions()
elif isinstance(event, FileMovedEvent):
self.logger.debug("Moved %s: %s to %s", what, event.src_path, event.dest_path)
notify_subscriptions(event.dest_path)
elif isinstance(event, FileCreatedEvent):
self.logger.debug("Created %s: %s", what, event.src_path)
notify_subscriptions(event.src_path)
elif isinstance(event, FileModifiedEvent):
self.logger.debug("Modified %s: %s", what, event.src_path)
notify_subscriptions(event.src_path)
if __name__ == '__main__':
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s [%(threadName)s] (%(name)s) %(levelname)s %(message)s'
)
watcher = FileWatcher(getcwd())
watcher.observer.join()

View File

@@ -3,7 +3,9 @@ from os.path import dirname, join, relpath
from time import time from time import time
from typing import Optional, TYPE_CHECKING from typing import Optional, TYPE_CHECKING
from aiofiles import open as async_open from aiofiles import open as async_open
from asyncio import AbstractEventLoop
import markdown import markdown
import logging
if TYPE_CHECKING: if TYPE_CHECKING:
from _typeshed import StrOrBytesPath from _typeshed import StrOrBytesPath
@@ -19,6 +21,8 @@ STATIC_CACHE: dict[str, tuple[str, float]] = {}
MARDOWN_EXTENSIONS = ['extra', 'smarty', 'tables', 'codehilite'] MARDOWN_EXTENSIONS = ['extra', 'smarty', 'tables', 'codehilite']
logger = logging.getLogger(__name__)
async def load_from_cache(path) -> tuple[str, float]: async def load_from_cache(path) -> tuple[str, float]:
global STATIC_CACHE global STATIC_CACHE
@@ -30,11 +34,20 @@ async def load_from_cache(path) -> tuple[str, float]:
async def compile_html(url_path, async def compile_html(url_path,
mdfile: 'StrOrBytesPath', mdfile: 'StrOrBytesPath',
loop: AbstractEventLoop,
prefix: Optional['StrOrBytesPath'] = None, prefix: Optional['StrOrBytesPath'] = None,
extensions: Optional[list[str]] = None, extensions: Optional[list[str]] = None,
raw: bool = False) -> str: raw: bool = False) -> str:
async with mdfile and async_open(mdfile, 'r') or sys.stdin as instream: async with mdfile and async_open(mdfile, 'r') or sys.stdin as instream:
html = markdown.markdown(await instream.read(), extensions=extensions, output_format='html') src = await instream.read()
def render(source) -> str:
logger.debug("Starting markdown rendering for file '%s'", mdfile)
result = markdown.markdown(source, extensions=extensions, output_format='html')
logger.debug("Markdown rendering for file '%s' completed", mdfile)
return result
html = await loop.run_in_executor(None, render, src)
if raw: if raw:
doc = html doc = html
else: else:

View File

@@ -10,7 +10,7 @@ from aiofiles.ospath import exists, isdir, isfile, getmtime
from aiofiles import open as async_open from aiofiles import open as async_open
from aiofiles.base import AiofilesContextManager from aiofiles.base import AiofilesContextManager
from aiofiles.threadpool.binary import AsyncBufferedReader from aiofiles.threadpool.binary import AsyncBufferedReader
from asyncio import get_running_loop from asyncio import AbstractEventLoop
from .md2html import compile_html, load_from_cache, STATIC_RESOURCES, MARDOWN_EXTENSIONS from .md2html import compile_html, load_from_cache, STATIC_RESOURCES, MARDOWN_EXTENSIONS
from shutil import which from shutil import which
@@ -45,15 +45,20 @@ def is_markdown(filepath):
def is_dotfile(filepath): def is_dotfile(filepath):
return has_extension(filepath, ".dot") return has_extension(filepath, ".dot")
logger = logging.getLogger(__name__)
class Server: class Server:
_loop : AbstractEventLoop
def __init__(self, root_dir: 'StrOrBytesPath' = getcwd(), prefix: Optional['StrOrBytesPath'] = None): def __init__(self,
root_dir: 'StrOrBytesPath' = getcwd(),
prefix: Optional['StrOrBytesPath'] = None,
loop: AbstractEventLoop = None):
self.root_dir = root_dir self.root_dir = root_dir
self.cache = dict['StrOrBytesPath', tuple[str, float]]() self.cache = dict['StrOrBytesPath', tuple[str, float]]()
self.file_watcher = FileWatcher(cwd) self.file_watcher = FileWatcher(cwd)
self.logger = logging.getLogger(Server.__name__)
self.prefix = prefix and normpath(f'{prefix.decode()}') self.prefix = prefix and normpath(f'{prefix.decode()}')
self._loop = loop
async def handle_request(self, method: str, url_path: str, etag: Optional[str], query_string: Optional[str], send): async def handle_request(self, method: str, url_path: str, etag: Optional[str], query_string: Optional[str], send):
if method != 'GET': if method != 'GET':
@@ -75,7 +80,7 @@ class Server:
etag, digest = await self.compute_etag_and_digest( etag, digest = await self.compute_etag_and_digest(
etag, etag,
url_path, url_path,
lambda: AiofilesContextManager(completed_future(AsyncBufferedReader(BytesIO(content), loop=get_running_loop(), executor=None))), lambda: AiofilesContextManager(completed_future(AsyncBufferedReader(BytesIO(content), loop=self._loop, executor=None))),
lambda: completed_future(mtime) lambda: completed_future(mtime)
) )
if etag and etag == digest: if etag and etag == digest:
@@ -87,10 +92,9 @@ class Server:
'type': 'http.response.start', 'type': 'http.response.start',
'status': 200, 'status': 200,
'headers': [ 'headers': [
(b'content-type', f'{mime_type}; charset=UTF-8'.encode()), ('content-type', f'{mime_type}; charset=UTF-8'),
(b'etag', f'W/"{digest}"'.encode()), ('etag', f'W/"{digest}"'.encode()),
(b'content-type', f'{mime_type}; charset=UTF-8'.encode()), ('Cache-Control', 'must-revalidate, max-age=86400'),
(b'Cache-Control', b'must-revalidate, max-age=86400'),
] ]
}) })
await send({ await send({
@@ -106,7 +110,7 @@ class Server:
lambda: async_open(path, 'rb'), lambda: async_open(path, 'rb'),
lambda: getmtime(path) lambda: getmtime(path)
) )
self.logger.debug('%s %s', etag, digest) logger.debug('Etag: %s, digest: %s', etag, digest)
if etag and etag == digest: if etag and etag == digest:
if is_markdown(path) and query_string == 'reload': if is_markdown(path) and query_string == 'reload':
subscription = self.file_watcher.subscribe(path) subscription = self.file_watcher.subscribe(path)
@@ -133,15 +137,20 @@ class Server:
raw = query_string == 'reload' raw = query_string == 'reload'
await self.render_markdown(url_path, path, raw, digest, send) await self.render_markdown(url_path, path, raw, digest, send)
elif is_dotfile(path) and which("dot"): elif is_dotfile(path) and which("dot"):
graph = pgv.AGraph(path) def render_graphviz(filepath: StrOrBytesPath) -> bytes:
body = graph.draw(None, format="svg", prog="dot") logger.debug("Starting Graphviz rendering for file '%s'", filepath)
graph = pgv.AGraph(filepath)
result = graph.draw(None, format="svg", prog="dot")
logger.debug("Completed Graphviz rendering for file '%s'", filepath)
return result
body = await self._loop.run_in_executor(None, render_graphviz, path)
await send({ await send({
'type': 'http.response.start', 'type': 'http.response.start',
'status': 200, 'status': 200,
'headers': ( 'headers': (
(b'Content-Type', b'image/svg+xml; charset=UTF-8'), ('Content-Type', 'image/svg+xml; charset=UTF-8'),
(b'Etag', f'W/"{digest}"'.encode()), ('Etag', f'W/"{digest}"'),
(b'Cache-Control', b'no-cache'), ('Cache-Control', 'no-cache'),
) )
}) })
await send({ await send({
@@ -162,9 +171,9 @@ class Server:
'type': 'http.response.start', 'type': 'http.response.start',
'status': 200, 'status': 200,
'headers': ( 'headers': (
(b'Content-Type', guess_type(basename(path))[0].encode() or b'application/octet-stream'), ('Content-Type', guess_type(basename(path))[0] or 'application/octet-stream'),
(b'Etag', f'W/"{digest}"'), ('Etag', f'W/"{digest}"'),
(b'Cache-Control', b'no-cache') ('Cache-Control', 'no-cache')
) )
}) })
@@ -186,7 +195,7 @@ class Server:
'type': 'http.response.start', 'type': 'http.response.start',
'status': 200, 'status': 200,
'headers': ( 'headers': (
(b'Content-Type', b'text/html; charset=UTF-8'), ('Content-Type', 'text/html; charset=UTF-8'),
) )
}) })
await send({ await send({
@@ -266,26 +275,30 @@ class Server:
path: str, path: str,
raw: bool, raw: bool,
digest: str, digest: str,
send) -> list[bytes]: send) -> None:
body = (await compile_html(url_path,
body = await compile_html(
url_path,
path, path,
self._loop,
self.prefix, self.prefix,
MARDOWN_EXTENSIONS, MARDOWN_EXTENSIONS,
raw=raw)).encode() raw=raw
)
await send({ await send({
'type': 'http.response.start', 'type': 'http.response.start',
'status': 200, 'status': 200,
'headers': ( 'headers': (
(b'Content-Type', b'text/html; charset=UTF-8'), ('Content-Type', 'text/html; charset=UTF-8'),
(b'Etag', f'W/{digest}'.encode()), ('Etag', f'W/{digest}'),
(b'Cache-Control', b'no-cache'), ('Cache-Control', 'no-cache'),
) )
}) })
await send({ await send({
'type': 'http.response.body', 'type': 'http.response.body',
'body': body 'body': body.encode()
}) })
return
@staticmethod @staticmethod
async def not_modified(send, digest: str, cache_control=('Cache-Control', 'no-cache')) -> []: async def not_modified(send, digest: str, cache_control=('Cache-Control', 'no-cache')) -> []:
await send({ await send({