Files
jpacrepo-uploader/src/jpacrepo_uploader/uploader.py
T
woggioni a3f8059e6b
CI / build (push) Successful in 44s
improved progress bar
2026-04-22 22:08:45 +08:00

409 lines
14 KiB
Python

import json
import logging
import re
import shutil
import urllib.parse
import sys
from argparse import ArgumentParser
from dataclasses import dataclass, fields
from pathlib import Path
from time import time, monotonic
from typing import Optional, Any
from urllib.parse import urlparse, urlunparse, quote, urlencode
from urllib.request import Request
from threading import Thread, Condition
import enum
import certifi
import math
import oidc_client
import pycurl
from oidc_client.config import ProviderConfig, DEFAULT_REDIRECT_URI
from oidc_client.discovery import fetch_provider_config
from oidc_client.oauth import TokenResponse
from progress import Progress
from progress.bar import Bar
from typing_extensions import Self
from pwo import format_filesize, retry, ExceptionHandlerOutcome
from .config import load_configuration, Config
logger = logging.getLogger('jpacrepo-uploader.uploader')
package_file_pattern = re.compile('.*\\.pkg\\.tar\\.(xz|zst|gz)$')
_supported_compression_formats = ('xz', 'zst', 'gz')
terminal_size = shutil.get_terminal_size(fallback=(80, 24))
class HttpException(Exception):
http_status_code : int
message: Optional[str]
def __init__(self, http_status_code: int, msg: Optional[str] = None):
self.message = msg
self.http_status_code = http_status_code
def __repr__(self) -> str:
return f'HTTP status {self.http_status_code}' + f': {self.message}' if self.message else ''
@dataclass
class UploadPt:
uploaded: int
time: float
class PackageUploadProgressBar(Bar):
def __init__(self,
headline: str,
uploaded_size: int,
packages_total_size: int,
start_ts: float = monotonic(),
*args: Any,
**kwargs: Any):
kwargs.setdefault('suffix',
'speed: %(total_speed)s, completed: %(percent).2f%% - ETA: %(eta_td)s')
kwargs.setdefault('width', max(0, terminal_size[0] - 72))
super().__init__(*args, **kwargs)
self._headline = headline
self.uploaded_size = uploaded_size
self.packages_total_size = packages_total_size
self.start_ts = start_ts
@Progress.percent.getter
def percent(self) -> float:
return (self.uploaded_size + self.index) * 100 / self.packages_total_size
@property
def total_avg(self) -> int:
return int((self.uploaded_size + self.index) / (monotonic() - self.start_ts))
@property
def total_speed(self) -> str:
return format_filesize(self.total_avg, 5, 2) + '/s'
@Progress.eta.getter
def eta(self) -> int:
total_avg = self.total_avg
if total_avg > 0:
return int(
math.ceil(
(self.packages_total_size - self.uploaded_size - self.index) / self.total_avg
)
)
else:
return 0
@property
def upload_progress(self) -> str:
return f'{format_filesize(self.index, 5, 2)} / {format_filesize(self.max, 5, 2)}'
@staticmethod
def _clear_previous_line():
sys.stdout.write("\033[1A\033[2K")
sys.stdout.flush()
def __enter__(self):
print(self._headline)
return super().__enter__()
def __exit__(self, exc_type, exc_val, exc_tb):
result = super().__exit__(exc_type, exc_val, exc_tb)
PackageUploadProgressBar._clear_previous_line()
return result
class XferProgress:
def __init__(self,
packages_to_upload: int,
packages_total_size: int,
uploaded_size: int = 0,
packages_uploaded: int = 0):
self.packages_uploaded = packages_uploaded
self.packages_total_size = packages_total_size
self.uploaded_size = uploaded_size
self.packages_to_upload = packages_to_upload
self.bar: Optional[PackageUploadProgressBar] = None
def update(self, uploaded: int) -> None:
if self.bar:
self.bar.goto(uploaded)
self.uploaded_size += self.bar.max
else:
raise RuntimeError('Progress bar is None')
def complete(self) -> None:
self.packages_uploaded += 1
class JpacrepoClient:
config: Config
token: Optional[TokenResponse]
provider_config: ProviderConfig
token_expiry: Optional[int]
cond: Condition
thread: Optional[Thread]
verbose: bool
http_version : int
def __init__(self, config: Config,
verbose: bool = False,
**kwargs,
):
self.config: Config = config
self.token: Optional[TokenResponse] = None
self.provider_config: ProviderConfig = fetch_provider_config(self.config.auth_server_url)
self.token_expiry: Optional[int] = None
self.cond = Condition()
self.thread: Optional[Thread] = None
self.verbose: bool = verbose
if kwargs.get('http1.0'):
self.http_version = pycurl.CURL_HTTP_VERSION_1_0
elif kwargs.get('http1.1'):
self.http_version = pycurl.CURL_HTTP_VERSION_1_1
elif kwargs.get('http2'):
self.http_version = pycurl.CURL_HTTP_VERSION_2
elif kwargs.get('http3'):
self.http_version = pycurl.CURL_HTTP_VERSION_3
else:
self.http_version = pycurl.CURL_HTTP_VERSION_NONE
def __enter__(self) -> Self:
return self
def __exit__(self, exc_type: None, exc_val: None, exc_tb: None) -> None:
cond = self.cond
with cond:
cond.notify()
thread = self.thread
self.thread = None
if thread:
thread.join()
def authenticate(self) -> None:
token = oidc_client.login(
provider_config=self.provider_config,
client_id=self.config.client_id,
interactive=not bool(self.config.client_secret),
client_secret=self.config.client_secret,
redirect_uri=DEFAULT_REDIRECT_URI)
self.token = token
self.token_expiry = (token.created_at or int(time())) + (token.expires_in or 10)
def thread_callback() -> None:
cond = self.cond
with cond:
while self.thread:
expires_in = (self.token_expiry or 0) - int(time())
if expires_in < 60:
self.refresh_token()
else:
cond.wait(expires_in - 60)
thread = Thread(target=thread_callback)
self.thread = thread
thread.start()
def refresh_token(self) -> None:
token = self.token
if not token:
raise ValueError('token is None')
request = urllib.request.Request(
self.provider_config.token_endpoint,
method='POST',
data=urlencode(
dict(
grant_type='refresh_token',
client_id='jpacrepo-client',
refresh_token=token.refresh_token,
audience=self,
scope=token.scope
)
).encode()
)
with urllib.request.urlopen(request) as response:
if response.code == 200:
token = TokenResponse(
**{
key: value
for key, value in json.load(response).items()
# Ignore extra keys that are not token response fields
if key in (field.name for field in fields(TokenResponse))
}
)
self.token = token
self.token_expiry = (token.created_at or int(time())) + (token.expires_in or 0)
logger.debug(f'refreshed OIDC token')
else:
raise RuntimeError(f'Received HTTP error code: {response.code}')
def packages_to_upload(self) -> tuple[Path, ...]:
logger.info('Scanned folders: [%s]' % ', '.join((f"\"{str(path)}\"" for path in self.config.repo_folders)))
package_files: dict[str, Path] = {file.name: file for ext in _supported_compression_formats for package_cache in
self.config.repo_folders
for file in package_cache.glob(f'**/*.pkg.tar.{ext}')
if file.is_file() and package_file_pattern.match(file.name)}
headers = {
'Accept': 'application/json',
'Content-Type': 'application/json',
}
token = self.token
if isinstance(token, TokenResponse):
headers['Authorization'] = f'Bearer {token.access_token}'
url = urlparse(self.config.server_url)
new_path = Path(url.path) / 'api/pkg/doYouWantAny'
url = url._replace(path=str(new_path))
request = Request(
urlunparse(url),
headers=headers,
data=json.dumps([filename for filename in package_files.keys()]).encode(),
method='POST'
)
with urllib.request.urlopen(request) as response:
if response.code == 200:
return tuple((package_files[filename] for filename in json.load(response)))
else:
raise RuntimeError(f'Received HTTP error code: {response.code}')
def upload(self, files: tuple[Path, ...]) -> None:
total_size: int = 0
for package_file in files:
if package_file.exists():
total_size += package_file.stat().st_size
logger.info(f'A total of {format_filesize(total_size)} are going to be uploaded')
curl: pycurl.Curl = pycurl.Curl()
progress = XferProgress(packages_to_upload=len(files), packages_total_size=total_size)
start_ts = monotonic()
for i, file in enumerate(files):
upload_size = file.stat().st_size
if len(file.name) <= terminal_size[0]:
filename = file.name
else:
filename = file.name[:terminal_size[0] - 3] + '...'
kwargs = dict(
message='Uploading ',
max=upload_size,
start_ts=start_ts
)
with PackageUploadProgressBar(f'({i + 1}/{len(files)}) {filename}', progress.uploaded_size, total_size, **kwargs) as bar:
bar.start_ts = start_ts
progress.bar = bar
self._upload_file(curl, file, progress)
progress.uploaded_size += upload_size
progress.packages_uploaded += 1
curl.close()
_RETRIABLE_HTTP_STATUS_CODES = {401, 403, 409, 429, 504}
@staticmethod
def error_handler(ex: Exception) -> ExceptionHandlerOutcome:
if (isinstance(ex, HttpException)
and ex.http_status_code in JpacrepoClient._RETRIABLE_HTTP_STATUS_CODES):
return ExceptionHandlerOutcome.CONTINUE
else:
return ExceptionHandlerOutcome.THROW
@retry(max_attempts=3, initial_delay=0, exception_handler=error_handler)
def _upload_file(self, curl: pycurl.Curl, file_path: Path, progress: XferProgress) -> None:
parse_result = urlparse(self.config.server_url)
new_path = Path(parse_result.path) / 'api/pkg/upload'
url: str = (urlunparse(parse_result._replace(path=str(new_path)))
+ ';filename=' + quote(file_path.name))
curl.setopt(pycurl.POST, 1)
curl.setopt(pycurl.URL, url)
headers = [
'Content-Type: application/octet-stream',
'User-Agent: jpacrepo-client'
]
token = self.token
if isinstance(token, TokenResponse):
headers.append(f'Authorization: Bearer {token.access_token}')
curl.setopt(pycurl.HTTPHEADER, headers)
def progress_callback(dltotal: int,
dlnow: int,
ultotal: int,
ulnow: int) -> int:
bar = progress.bar
if bar:
bar.goto(ulnow)
else:
raise RuntimeError('bar is None')
return 0
curl.setopt(pycurl.XFERINFOFUNCTION, progress_callback)
curl.setopt(pycurl.NOPROGRESS, False)
curl.setopt(pycurl.VERBOSE, self.verbose)
curl.setopt(pycurl.CAINFO, certifi.where())
curl.setopt(pycurl.HTTP_VERSION, self.http_version)
with open(str(file_path), 'rb') as file:
curl.setopt(pycurl.READDATA, file)
curl.perform()
http_status_code = curl.getinfo(pycurl.RESPONSE_CODE)
if http_status_code != 201:
raise HttpException(http_status_code)
class LogLevel(str, enum.Enum):
CREATE = 'create'
DELETE = 'delete'
MODIFY = 'modify'
CRITICAL = 'CRITICAL'
FATAL = CRITICAL
ERROR = 'ERROR'
WARNING = 'WARNING'
WARN = WARNING
INFO = 'INFO'
DEBUG = 'DEBUG'
def __str__(self):
return self.value
def main() -> None:
parser = ArgumentParser(
prog='jpacrepo-uploader',
description='CLI utility',
epilog='Text at the bottom of help')
parser.add_argument('-v', '--verbose',
default=False,
action='store_true',
help="Enable verbose output")
parser.add_argument('-0', '--http1.0',
default=False,
action='store_true',
help="Force HTTP/1.1 protocol")
parser.add_argument('-1', '--http1.1',
default=False,
action='store_true',
help="Force HTTP/1.1 protocol")
parser.add_argument('-2', '--http2',
default=False,
action='store_true',
help="Enable HTTP/2 protocol")
parser.add_argument('-3', '--http3',
default=False,
action='store_true',
help="Enable HTTP/3 protocol")
parser.add_argument('-l', '--log-level',
default=LogLevel.INFO,
type=LogLevel,
help="Set logging level")
args = parser.parse_args()
logging.basicConfig(encoding='utf-8', level=logging.getLevelNamesMapping()[args.log_level])
with JpacrepoClient(load_configuration(), **vars(args)) as client:
client.authenticate()
files = client.packages_to_upload()
if len(files):
logger.debug(f'Files to be uploaded: {files}')
client.upload(files)
else:
logger.info('No packages will be uploaded')
if __name__ == '__main__':
main()