added async token refresh and upload retry logic
Some checks failed
CI / build (push) Failing after 12s

This commit is contained in:
2024-06-23 13:21:28 +08:00
parent 93e7ccd9b7
commit 50ec2cf03c
8 changed files with 140 additions and 121 deletions

View File

@@ -0,0 +1,29 @@
name: CI
on:
push:
branches: [ master ]
jobs:
build:
runs-on: woryzen
steps:
- name: Checkout sources
uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
cache: 'pip'
- name: Create virtualenv
run: |
python -m venv .venv
.venv/bin/pip install -r requirements.txt
# - name: Run unit tests
# run: .venv/bin/python -m unittest discover -s tests
- name: Execute build
run: |
.venv/bin/python -m build
- name: Publish artifacts
env:
TWINE_REPOSITORY_URL: ${{ vars.PYPI_REGISTRY_URL }}
TWINE_USERNAME: ${{ vars.PUBLISHER_USERNAME }}
TWINE_PASSWORD: ${{ secrets.PUBLISHER_TOKEN }}
run: |
.venv/bin/python -m twine upload --repository gitea dist/*{.whl,tar.gz}

4
.gitignore vendored
View File

@@ -1,5 +1,7 @@
dist dist
.idea .idea
__pycache__
.venv
*.pyc *.pyc
*.egg-info *.egg-info
venv

View File

@@ -1,72 +0,0 @@
from __future__ import annotations
from typing import TypeVar, Generic, Optional, Callable, Any
T = TypeVar('T')
U = TypeVar('U')
class Maybe(Generic[T]):
def __init__(self, value: Optional[T] = None):
self._value: Optional[T] = value
@staticmethod
def of(obj: T) -> Maybe[T]:
return Maybe(obj)
@staticmethod
def of_nullable(obj: Optional[T]) -> Maybe[T]:
return Maybe(obj)
@staticmethod
def empty() -> Maybe[U]:
return _empty
@property
def value(self) -> T:
value = self._value
if not value:
raise ValueError('Empty Maybe')
else:
return value
@property
def is_present(self) -> bool:
return self._value is not None
@property
def is_empty(self) -> bool:
return not self.is_present
def map(self, transformer: Callable[[T], U]) -> Maybe[U]:
result: Maybe[U]
if self.is_present:
result = Maybe(transformer(self.value))
else:
result = Maybe.empty()
return result
def filter(self, predicate: Callable[[T], bool]) -> Maybe[T]:
return self if self.is_present and predicate(self.value) else Maybe.empty()
def flat_map(self, transformer: Callable[[T], Maybe[U]]) -> Maybe[U]:
return transformer(self.value) if self.is_present else Maybe.empty()
def or_else(self, alt: T) -> T:
return self.value if self.is_present else alt
def or_else_throw(self, supplier: Callable[[], Exception]) -> T:
if self.is_present:
return self.value
else:
raise supplier()
def or_else_get(self, supplier: Callable[[], T]) -> Maybe[T]:
return self if self.is_present else Maybe.of_nullable(supplier())
def if_present(self, callback: Callable[[T], U]) -> None:
if self.is_present:
callback(self.value)
_empty: Maybe[Any] = Maybe(None)

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "jpacrepo-uploader" name = "jpacrepo-uploader"
version = "0.0.1" version = "0.0.2"
authors = [ authors = [
{ name="Walter Oggioni", email="oggioni.walter@gmail.com" }, { name="Walter Oggioni", email="oggioni.walter@gmail.com" },
] ]
@@ -22,7 +22,8 @@ dependencies = [
'progress==1.6', 'progress==1.6',
'pycurl==7.45.2', 'pycurl==7.45.2',
'types-pycurl==7.45.2.5', 'types-pycurl==7.45.2.5',
'typing_extensions==4.7.1' 'typing_extensions==4.7.1',
'pwo >= 0.0.2'
] ]
[project.urls] [project.urls]

View File

@@ -1,15 +1,17 @@
argcomplete==3.1.1 argcomplete==3.4.0
build==0.10.0 build==1.2.1
certifi==2023.7.22 certifi==2024.6.2
click==8.1.7 click==8.1.7
mypy==1.5.1 mypy==1.10.0
mypy-extensions==1.0.0 mypy-extensions==1.0.0
oidc-client==0.2.6 oidc-client==0.2.6
packaging==23.1 packaging==24.1
pipx==1.2.0 pipx==1.6.0
platformdirs==4.2.2
progress==1.6 progress==1.6
pycurl==7.45.2 pwo==0.0.2
pyproject_hooks==1.0.0 pycurl==7.45.3
types-pycurl==7.45.2.5 pyproject_hooks==1.1.0
types-pycurl==7.45.3.20240421
typing_extensions==4.7.1 typing_extensions==4.7.1
userpath==1.9.0 userpath==1.9.2

View File

View File

@@ -1,6 +1,6 @@
from configparser import RawConfigParser from configparser import RawConfigParser
from os import environ from os import environ
from .maybe import Maybe from pwo.maybe import Maybe
from pathlib import Path from pathlib import Path
from itertools import chain from itertools import chain
from dataclasses import dataclass from dataclasses import dataclass
@@ -55,11 +55,12 @@ def load_configuration() -> Config:
client_secret = config.get(main_section, 'ClientSecret', fallback=client_secret) client_secret = config.get(main_section, 'ClientSecret', fallback=client_secret)
config_file_maybe.if_present(process_configuration) config_file_maybe.if_present(process_configuration)
return Config(server_url=server_url or 'https://woggioni.net/jpacrepo/', return Config(
auth_server_url=auth_server_url or 'https://woggioni.net/auth/realms/woggioni.net', server_url=server_url or 'https://woggioni.net/jpacrepo/',
repo_folders=tuple(repo_folders) or (Path('/var/cache/pacman/pkg'),), auth_server_url=auth_server_url or 'https://woggioni.net/auth/realms/woggioni.net',
client_id=client_id or 'jpacrepo-client', repo_folders=tuple(repo_folders) or (Path('/var/cache/pacman/pkg'),),
client_secret=Maybe.of_nullable(client_secret) client_id=client_id or 'jpacrepo-client',
.map(lambda v: v.format(**environ)) client_secret=Maybe.of_nullable(client_secret)
.or_else(None) .map(lambda v: v.format(**environ))
) .or_none()
)

View File

@@ -9,6 +9,7 @@ from time import time, monotonic
from typing import Optional, Any from typing import Optional, Any
from urllib.parse import urlparse, urlunparse, quote, urlencode from urllib.parse import urlparse, urlunparse, quote, urlencode
from urllib.request import Request from urllib.request import Request
from threading import Thread, Condition
import certifi import certifi
import math import math
@@ -19,24 +20,28 @@ from oidc_client.discovery import fetch_provider_config
from oidc_client.oauth import TokenResponse from oidc_client.oauth import TokenResponse
from progress import Progress from progress import Progress
from progress.bar import Bar from progress.bar import Bar
from typing_extensions import Self
from pwo import format_filesize, retry, ExceptionHandlerOutcome
from .config import load_configuration, Config from .config import load_configuration, Config
logger = logging.getLogger('jpacrepo.uploader') logger = logging.getLogger('jpacrepo.uploader')
package_file_pattern = re.compile('.*\.pkg\.tar\.(xz|zst|gz)$') package_file_pattern = re.compile('.*\\.pkg\\.tar\\.(xz|zst|gz)$')
_size_uoms = ('B', 'KiB', 'MiB', 'GiB', 'KiB')
_supported_compression_formats = ('xz', 'zst', 'gz') _supported_compression_formats = ('xz', 'zst', 'gz')
def format_filesize(size: int) -> str:
counter = 0 class HttpException(Exception):
tmp_size = size http_status_code : int
while tmp_size > 0: message: Optional[str]
tmp_size //= 1024
counter += 1 def __init__(self, http_status_code: int, msg: Optional[str] = None):
counter -= 1 self.message = msg
return '%.2f ' % (size / math.pow(1024, counter)) + _size_uoms[counter] self.http_status_code = http_status_code
def __repr__(self) -> str:
return f'HTTP status {self.http_status_code}' + f': {self.message}' if self.message else ''
@dataclass @dataclass
@@ -116,14 +121,42 @@ class XferProgress:
class JpacrepoClient: class JpacrepoClient:
def __init__(self, config: Config, **kwargs): config: Config
token: Optional[TokenResponse]
provider_config: ProviderConfig
token_expiry: Optional[int]
cond: Condition
thread: Optional[Thread]
verbose: bool
http2: bool
http3: bool
def __init__(self, config: Config,
verbose: bool = False,
http2: bool = False,
http3: bool = False
):
self.config: Config = config self.config: Config = config
self.token: Optional[TokenResponse] = None self.token: Optional[TokenResponse] = None
self.provider_config: ProviderConfig = fetch_provider_config(self.config.auth_server_url) self.provider_config: ProviderConfig = fetch_provider_config(self.config.auth_server_url)
self.token_expiry: Optional[int] = None self.token_expiry: Optional[int] = None
self.verbose: bool = kwargs.get('verbose', False) self.cond = Condition()
self.http2: bool = kwargs.get('http2', False) self.thread: Optional[Thread] = None
self.http3: bool = kwargs.get('http3', False) self.verbose: bool = verbose
self.http2: bool = http2
self.http3: bool = http3
def __enter__(self) -> Self:
return self
def __exit__(self, exc_type: None, exc_val: None, exc_tb: None) -> None:
cond = self.cond
with cond:
cond.notify()
thread = self.thread
self.thread = None
if thread:
thread.join()
def authenticate(self) -> None: def authenticate(self) -> None:
token = oidc_client.login( token = oidc_client.login(
@@ -135,6 +168,20 @@ class JpacrepoClient:
self.token = token self.token = token
self.token_expiry = (token.created_at or int(time())) + (token.expires_in or 10) self.token_expiry = (token.created_at or int(time())) + (token.expires_in or 10)
def thread_callback() -> None:
cond = self.cond
with cond:
while self.thread:
expires_in = (self.token_expiry or 0) - int(time())
if expires_in < 60:
self.refresh_token()
else:
cond.wait(expires_in - 60)
thread = Thread(target=thread_callback)
self.thread = thread
thread.start()
def refresh_token(self) -> None: def refresh_token(self) -> None:
token = self.token token = self.token
if not token: if not token:
@@ -169,7 +216,8 @@ class JpacrepoClient:
raise RuntimeError(f'Received HTTP error code: {response.code}') raise RuntimeError(f'Received HTTP error code: {response.code}')
def packages_to_upload(self) -> tuple[Path, ...]: def packages_to_upload(self) -> tuple[Path, ...]:
package_files: dict[str, Path] = {file.name: file for ext in _supported_compression_formats for package_cache in self.config.repo_folders package_files: dict[str, Path] = {file.name: file for ext in _supported_compression_formats for package_cache in
self.config.repo_folders
for file in package_cache.glob(f'**/*.pkg.tar.{ext}') for file in package_cache.glob(f'**/*.pkg.tar.{ext}')
if file.is_file() and package_file_pattern.match(file.name)} if file.is_file() and package_file_pattern.match(file.name)}
headers = { headers = {
@@ -204,9 +252,6 @@ class JpacrepoClient:
progress = XferProgress(packages_to_upload=len(files), packages_total_size=total_size) progress = XferProgress(packages_to_upload=len(files), packages_total_size=total_size)
start_ts = monotonic() start_ts = monotonic()
for i, file in enumerate(files): for i, file in enumerate(files):
expires_in = (self.token_expiry or 0) - int(time())
if expires_in < 30:
self.refresh_token()
upload_size = file.stat().st_size upload_size = file.stat().st_size
kwargs = dict( kwargs = dict(
width=64, width=64,
@@ -223,6 +268,17 @@ class JpacrepoClient:
progress.packages_uploaded += 1 progress.packages_uploaded += 1
curl.close() curl.close()
_RETRIABLE_HTTP_STATUS_CODES = {401, 403, 409, 429, 504}
@staticmethod
def error_handler(ex: Exception) -> ExceptionHandlerOutcome:
if (isinstance(ex, HttpException)
and ex.http_status_code in JpacrepoClient._RETRIABLE_HTTP_STATUS_CODES):
return ExceptionHandlerOutcome.CONTINUE
else:
return ExceptionHandlerOutcome.THROW
@retry(max_attempts=3, initial_delay=0, exception_handler=error_handler)
def _upload_file(self, curl: pycurl.Curl, file_path: Path, progress: XferProgress) -> None: def _upload_file(self, curl: pycurl.Curl, file_path: Path, progress: XferProgress) -> None:
parse_result = urlparse(self.config.server_url) parse_result = urlparse(self.config.server_url)
new_path = Path(parse_result.path) / 'api/pkg/upload' new_path = Path(parse_result.path) / 'api/pkg/upload'
@@ -265,7 +321,7 @@ class JpacrepoClient:
curl.perform() curl.perform()
http_status_code = curl.getinfo(pycurl.RESPONSE_CODE) http_status_code = curl.getinfo(pycurl.RESPONSE_CODE)
if http_status_code != 201: if http_status_code != 201:
raise RuntimeError(f'Server returned {http_status_code}') raise HttpException(http_status_code)
def main() -> None: def main() -> None:
@@ -287,14 +343,14 @@ def main() -> None:
help="Enable HTTP/3 protocol") help="Enable HTTP/3 protocol")
args = parser.parse_args() args = parser.parse_args()
logging.basicConfig(encoding='utf-8', level=logging.INFO) logging.basicConfig(encoding='utf-8', level=logging.INFO)
client = JpacrepoClient(load_configuration(), **args.__dict__) with JpacrepoClient(load_configuration(), **vars(args)) as client:
client.authenticate() client.authenticate()
files = client.packages_to_upload() files = client.packages_to_upload()
if len(files): if len(files):
logger.debug(f'Files to be uploaded: {files}') logger.debug(f'Files to be uploaded: {files}')
client.upload(files) client.upload(files)
else: else:
logger.info('No packages will be uploaded') logger.info('No packages will be uploaded')
if __name__ == '__main__': if __name__ == '__main__':