import json import logging import re import urllib.parse from argparse import ArgumentParser from dataclasses import dataclass, fields from pathlib import Path from time import time, monotonic from typing import Optional, Any from urllib.parse import urlparse, urlunparse, quote, urlencode from urllib.request import Request import certifi import math import oidc_client import pycurl from oidc_client.config import ProviderConfig, DEFAULT_REDIRECT_URI from oidc_client.discovery import fetch_provider_config from oidc_client.oauth import TokenResponse from progress import Progress from progress.bar import Bar from .config import load_configuration, Config logger = logging.getLogger('jpacrepo.uploader') package_file_pattern = re.compile('.*\.pkg\.tar\.(xz|zst|gz)$') _size_uoms = ('B', 'KiB', 'MiB', 'GiB', 'KiB') _supported_compression_formats = ('xz', 'zst', 'gz') def format_filesize(size: int) -> str: counter = 0 tmp_size = size while tmp_size > 0: tmp_size //= 1024 counter += 1 counter -= 1 return '%.2f ' % (size / math.pow(1024, counter)) + _size_uoms[counter] @dataclass class UploadPt: uploaded: int time: float class PackageUploadProgressBar(Bar): def __init__(self, uploaded_size: int, packages_total_size: int, start_ts: float = monotonic(), *args: Any, **kwargs: Any): kwargs.setdefault('suffix', 'speed: %(total_speed)s, completed: %(percent).2f%% - ETA: %(eta_td)s') kwargs.setdefault('width', 48) super().__init__(*args, **kwargs) self.uploaded_size = uploaded_size self.packages_total_size = packages_total_size self.start_ts = start_ts @Progress.percent.getter def percent(self) -> float: return (self.uploaded_size + self.index) * 100 / self.packages_total_size @property def total_avg(self) -> int: return int((self.uploaded_size + self.index) / (monotonic() - self.start_ts)) @property def total_speed(self) -> str: return format_filesize(self.total_avg) + '/s' @Progress.eta.getter def eta(self) -> int: total_avg = self.total_avg if total_avg > 0: return int( math.ceil( (self.packages_total_size - self.uploaded_size - self.index) / self.total_avg ) ) else: return 0 @property def upload_progress(self) -> str: return f'{format_filesize(self.index)} / {format_filesize(self.max)}' class XferProgress: def __init__(self, packages_to_upload: int, packages_total_size: int, uploaded_size: int = 0, packages_uploaded: int = 0): self.packages_uploaded = packages_uploaded self.packages_total_size = packages_total_size self.uploaded_size = uploaded_size self.packages_to_upload = packages_to_upload self.bar: Optional[PackageUploadProgressBar] = None def update(self, uploaded: int) -> None: if self.bar: self.bar.goto(uploaded) self.uploaded_size += self.bar.max else: raise RuntimeError('Progress bar is None') def complete(self) -> None: self.packages_uploaded += 1 class JpacrepoClient: def __init__(self, config: Config, **kwargs): self.config: Config = config self.token: Optional[TokenResponse] = None self.provider_config: ProviderConfig = fetch_provider_config(self.config.auth_server_url) self.token_expiry: Optional[int] = None self.verbose: bool = kwargs.get('verbose', False) self.http2: bool = kwargs.get('http2', False) self.http3: bool = kwargs.get('http3', False) def authenticate(self) -> None: token = oidc_client.login( provider_config=self.provider_config, client_id=self.config.client_id, interactive=not bool(self.config.client_secret), client_secret=self.config.client_secret, redirect_uri=DEFAULT_REDIRECT_URI) self.token = token self.token_expiry = (token.created_at or int(time())) + (token.expires_in or 10) def refresh_token(self) -> None: token = self.token if not token: raise ValueError('token is None') request = urllib.request.Request( self.provider_config.token_endpoint, method='POST', data=urlencode( dict( grant_type='refresh_token', client_id='jpacrepo-client', refresh_token=token.refresh_token, audience=self, scope=token.scope ) ).encode() ) with urllib.request.urlopen(request) as response: if response.code == 200: token = TokenResponse( **{ key: value for key, value in json.load(response).items() # Ignore extra keys that are not token response fields if key in (field.name for field in fields(TokenResponse)) } ) self.token = token self.token_expiry = (token.created_at or int(time())) + (token.expires_in or 0) logger.debug(f'refreshed OIDC token') else: raise RuntimeError(f'Received HTTP error code: {response.code}') def packages_to_upload(self) -> tuple[Path, ...]: package_files: dict[str, Path] = {file.name: file for ext in _supported_compression_formats for package_cache in self.config.repo_folders for file in package_cache.glob(f'**/*.pkg.tar.{ext}') if file.is_file() and package_file_pattern.match(file.name)} headers = { 'Accept': 'application/json', 'Content-Type': 'application/json', } token = self.token if isinstance(token, TokenResponse): headers['Authorization'] = f'Bearer {token.access_token}' url = urlparse(self.config.server_url) new_path = Path(url.path) / 'api/pkg/doYouWantAny' url = url._replace(path=str(new_path)) request = Request( urlunparse(url), headers=headers, data=json.dumps([filename for filename in package_files.keys()]).encode(), method='POST' ) with urllib.request.urlopen(request) as response: if response.code == 200: return tuple((package_files[filename] for filename in json.load(response))) else: raise RuntimeError(f'Received HTTP error code: {response.code}') def upload(self, files: tuple[Path, ...]) -> None: total_size: int = 0 for package_file in files: if package_file.exists(): total_size += package_file.stat().st_size logger.info(f'A total of {format_filesize(total_size)} are going to be uploaded') curl: pycurl.Curl = pycurl.Curl() progress = XferProgress(packages_to_upload=len(files), packages_total_size=total_size) start_ts = monotonic() for i, file in enumerate(files): expires_in = (self.token_expiry or 0) - int(time()) if expires_in < 30: self.refresh_token() upload_size = file.stat().st_size kwargs = dict( width=64, max=upload_size, message=f'({i + 1}/{len(files)}) {file.name}', start_ts=start_ts ) with PackageUploadProgressBar(progress.uploaded_size, total_size, **kwargs) as bar: bar.start_ts = start_ts progress.bar = bar self._upload_file(curl, file, progress) progress.uploaded_size += upload_size progress.packages_uploaded += 1 curl.close() def _upload_file(self, curl: pycurl.Curl, file_path: Path, progress: XferProgress) -> None: parse_result = urlparse(self.config.server_url) new_path = Path(parse_result.path) / 'api/pkg/upload' url: str = (urlunparse(parse_result._replace(path=str(new_path))) + ';filename=' + quote(file_path.name)) curl.setopt(pycurl.POST, 1) curl.setopt(pycurl.URL, url) headers = [ 'Content-Type: application/octet-stream', 'User-Agent: jpacrepo-client' ] token = self.token if isinstance(token, TokenResponse): headers.append(f'Authorization: Bearer {token.access_token}') curl.setopt(pycurl.HTTPHEADER, headers) def progress_callback(dltotal: int, dlnow: int, ultotal: int, ulnow: int) -> int: bar = progress.bar if bar: bar.goto(ulnow) else: raise RuntimeError('bar is None') return 0 curl.setopt(pycurl.XFERINFOFUNCTION, progress_callback) curl.setopt(pycurl.NOPROGRESS, False) curl.setopt(pycurl.VERBOSE, self.verbose) curl.setopt(pycurl.CAINFO, certifi.where()) if self.http2: curl.setopt(pycurl.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_2) if self.http3: curl.setopt(pycurl.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_3) with open(str(file_path), 'rb') as file: curl.setopt(pycurl.READDATA, file) curl.perform() http_status_code = curl.getinfo(pycurl.RESPONSE_CODE) if http_status_code != 201: raise RuntimeError(f'Server returned {http_status_code}') def main() -> None: parser = ArgumentParser( prog='jpacrepo-uploader', description='CLI utility', epilog='Text at the bottom of help') parser.add_argument('-v', '--verbose', default=False, action='store_true', help="Enable verbose output") parser.add_argument('-2', '--http2', default=True, action='store_true', help="Enable HTTP/2 protocol") parser.add_argument('-3', '--http3', default=False, action='store_true', help="Enable HTTP/3 protocol") args = parser.parse_args() logging.basicConfig(encoding='utf-8', level=logging.INFO) client = JpacrepoClient(load_configuration(), **args.__dict__) client.authenticate() files = client.packages_to_upload() if len(files): logger.debug(f'Files to be uploaded: {files}') client.upload(files) else: logger.info('No packages will be uploaded') if __name__ == '__main__': main()