import urllib.parse import re import math import json import logging import pycurl import certifi from time import time, monotonic from threading import Thread, Condition from dataclasses import dataclass, fields import oidc_client from oidc_client.discovery import fetch_provider_config from oidc_client.config import ProviderConfig, DEFAULT_REDIRECT_URI from oidc_client.oauth import TokenResponse from urllib.request import Request from urllib.parse import urlparse, urlunparse, quote, urlencode, ParseResult from pathlib import Path from progress import Progress from progress.bar import Bar from typing import Optional, Any from typing_extensions import Self from argparse import ArgumentParser, Action from .config import load_configuration, Config logger = logging.getLogger('jpacrepo.uploader') package_file_pattern = re.compile('.*\.pkg\.tar\.(xz|zst|gz)$') _size_uoms = ('B', 'KiB', 'MiB', 'GiB', 'KiB') _supported_compression_formats = ('xz', 'zst', 'gz') def format_filesize(size: int) -> str: counter = 0 tmp_size = size while tmp_size > 0: tmp_size //= 1024 counter += 1 counter -= 1 return '%.2f ' % (size / math.pow(1024, counter)) + _size_uoms[counter] @dataclass class UploadPt: uploaded: int time: float class PackageUploadProgressBar(Bar): def __init__(self, uploaded_size: int, packages_total_size: int, start_ts: float = monotonic(), *args: Any, **kwargs: Any): kwargs.setdefault('suffix', 'speed: %(total_speed)s, completed: %(percent).2f%% - ETA: %(eta_td)s') kwargs.setdefault('width', 48) super().__init__(*args, **kwargs) self.uploaded_size = uploaded_size self.packages_total_size = packages_total_size self.start_ts = start_ts @Progress.percent.getter def percent(self) -> float: return (self.uploaded_size + self.index) * 100 / self.packages_total_size @property def total_avg(self) -> int: return int((self.uploaded_size + self.index) / (monotonic() - self.start_ts)) @property def total_speed(self) -> str: return format_filesize(self.total_avg) + '/s' @Progress.eta.getter def eta(self) -> int: total_avg = self.total_avg if total_avg > 0: return int( math.ceil( (self.packages_total_size - self.uploaded_size - self.index) / self.total_avg ) ) else: return 0 @property def upload_progress(self) -> str: return f'{format_filesize(self.index)} / {format_filesize(self.max)}' class XferProgress: def __init__(self, packages_to_upload: int, packages_total_size: int, uploaded_size: int = 0, packages_uploaded: int = 0): self.packages_uploaded = packages_uploaded self.packages_total_size = packages_total_size self.uploaded_size = uploaded_size self.packages_to_upload = packages_to_upload self.bar: Optional[PackageUploadProgressBar] = None def update(self, uploaded: int) -> None: if self.bar: self.bar.goto(uploaded) self.uploaded_size += self.bar.max else: raise RuntimeError('Progress bar is None') def complete(self) -> None: self.packages_uploaded += 1 class JpacrepoClient: def __init__(self, config: Config, **kwargs): self.config: Config = config self.token: Optional[TokenResponse] = None self.provider_config: ProviderConfig = fetch_provider_config(self.config.auth_server_url) self.token_expiry: Optional[int] = None self.cond = Condition() self.thread: Optional[Thread] = None self.verbose: bool = kwargs.get('verbose', False) self.http2: bool = kwargs.get('http2', False) self.http3: bool = kwargs.get('http3', False) def __enter__(self) -> Self: return self def __exit__(self, exc_type: None, exc_val: None, exc_tb: None) -> None: cond = self.cond with cond: cond.notify() thread = self.thread self.thread = None if thread: thread.join() def authenticate(self) -> None: token = oidc_client.login( provider_config=self.provider_config, client_id=self.config.client_id, interactive=not bool(self.config.client_secret), client_secret=self.config.client_secret, redirect_uri=DEFAULT_REDIRECT_URI) self.token = token self.token_expiry = (token.created_at or int(time())) + (token.expires_in or 10) def thread_callback() -> None: cond = self.cond with cond: while self.thread: expires_in = (self.token_expiry or 0) - int(time()) if expires_in < 30: self.refresh_token() else: cond.wait(expires_in - 30) thread = Thread(target=thread_callback) self.thread = thread thread.start() def refresh_token(self) -> None: token = self.token if not token: raise ValueError('token is None') request = urllib.request.Request( self.provider_config.token_endpoint, method='POST', data=urlencode( dict( grant_type='refresh_token', client_id='jpacrepo-client', refresh_token=token.refresh_token, audience=self, scope=token.scope ) ).encode() ) with urllib.request.urlopen(request) as response: if response.code == 200: token = TokenResponse( **{ key: value for key, value in json.load(response).items() # Ignore extra keys that are not token response fields if key in (field.name for field in fields(TokenResponse)) } ) self.token = token self.token_expiry = (token.created_at or int(time())) + (token.expires_in or 0) logger.debug(f'refreshed OIDC token') else: raise RuntimeError(f'Received HTTP error code: {response.code}') def packages_to_upload(self) -> tuple[Path, ...]: package_files: dict[str, Path] = {file.name: file for ext in _supported_compression_formats for package_cache in self.config.repo_folders for file in package_cache.glob(f'**/*.pkg.tar.{ext}') if file.is_file() and package_file_pattern.match(file.name)} headers = { 'Accept': 'application/json', 'Content-Type': 'application/json', } token = self.token if isinstance(token, TokenResponse): headers['Authorization'] = f'Bearer {token.access_token}' url = urlparse(self.config.server_url) new_path = Path(url.path) / 'api/pkg/doYouWantAny' url = url._replace(path=str(new_path)) request = Request( urlunparse(url), headers=headers, data=json.dumps([filename for filename in package_files.keys()]).encode(), method='POST' ) with urllib.request.urlopen(request) as response: if response.code == 200: return tuple((package_files[filename] for filename in json.load(response))) else: raise RuntimeError(f'Received HTTP error code: {response.code}') def upload(self, files: tuple[Path, ...]) -> None: total_size: int = 0 for package_file in files: if package_file.exists(): total_size += package_file.stat().st_size logger.info(f'A total of {format_filesize(total_size)} are going to be uploaded') curl: pycurl.Curl = pycurl.Curl() progress = XferProgress(packages_to_upload=len(files), packages_total_size=total_size) start_ts = monotonic() for i, file in enumerate(files): upload_size = file.stat().st_size kwargs = dict( width=64, max=upload_size, message=f'({i + 1}/{len(files)}) {file.name}', start_ts=start_ts ) with PackageUploadProgressBar(progress.uploaded_size, total_size, **kwargs) as bar: bar.start_ts = start_ts progress.bar = bar self._upload_file(curl, file, progress) progress.uploaded_size += upload_size progress.packages_uploaded += 1 curl.close() def _upload_file(self, curl: pycurl.Curl, file_path: Path, progress: XferProgress) -> None: parse_result = urlparse(self.config.server_url) new_path = Path(parse_result.path) / 'api/pkg/upload' url: str = (urlunparse(parse_result._replace(path=str(new_path))) + ';filename=' + quote(file_path.name)) curl.setopt(pycurl.POST, 1) curl.setopt(pycurl.URL, url) headers = [ 'Content-Type: application/octet-stream', 'User-Agent: jpacrepo-client' ] token = self.token if isinstance(token, TokenResponse): headers.append(f'Authorization: Bearer {token.access_token}') curl.setopt(pycurl.HTTPHEADER, headers) def progress_callback(dltotal: int, dlnow: int, ultotal: int, ulnow: int) -> int: bar = progress.bar if bar: bar.goto(ulnow) else: raise RuntimeError('bar is None') return 0 curl.setopt(pycurl.XFERINFOFUNCTION, progress_callback) curl.setopt(pycurl.NOPROGRESS, False) curl.setopt(pycurl.VERBOSE, self.verbose) curl.setopt(pycurl.CAINFO, certifi.where()) if self.http2: curl.setopt(pycurl.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_2) if self.http3: curl.setopt(pycurl.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_3) with open(str(file_path), 'rb') as file: curl.setopt(pycurl.READDATA, file) curl.perform() http_status_code = curl.getinfo(pycurl.RESPONSE_CODE) if http_status_code != 201: raise RuntimeError(f'Server returned {http_status_code}') def main() -> None: parser = ArgumentParser( prog='jpacrepo-uploader', description='CLI utility', epilog='Text at the bottom of help') parser.add_argument('-v', '--verbose', default=False, action='store_true', help="Enable verbose output") parser.add_argument('-2', '--http2', default=True, action='store_true', help="Enable HTTP/2 protocol") parser.add_argument('-3', '--http3', default=False, action='store_true', help="Enable HTTP/3 protocol") args = parser.parse_args() logging.basicConfig(encoding='utf-8', level=logging.INFO) with JpacrepoClient(load_configuration(), **args.__dict__) as client: client.authenticate() files = client.packages_to_upload() if len(files): logger.debug(f'Files to be uploaded: {files}') client.upload(files) else: logger.info('No packages will be uploaded') if __name__ == '__main__': main()