initial commit

This commit is contained in:
2024-06-26 10:26:21 +08:00
parent b31a634606
commit fbac45f94a
27 changed files with 584 additions and 0 deletions

40
pyproject.toml Normal file
View File

@@ -0,0 +1,40 @@
[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"
[project]
name = "jwt-cli"
version = "0.0.1"
authors = [
{ name="Walter Oggioni", email="oggioni.walter@gmail.com" },
]
description = "JWT command line utilities"
readme = "README.md"
requires-python = ">=3.12"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
]
dependencies = [
"jwcrypto >= 1.5.6",
'typing_extensions==4.7.1',
'pwo >= 0.0.2'
]
[project.urls]
"Homepage" = "https://gitea.woggioni.net/woggioni/jwt-cli"
"Bug Tracker" = "https://gitea.woggioni.net/woggioni/jwt-cli/issues"
[project.scripts]
jwt = "jwt_cli:main"
[tool.mypy]
python_version = "3.12"
disallow_untyped_defs = true
show_error_codes = true
no_implicit_optional = true
warn_return_any = true
warn_unused_ignores = true
exclude = ["scripts", "docs", "test"]
strict = true

View File

@@ -0,0 +1,14 @@
Metadata-Version: 2.1
Name: jwt-cli
Version: 0.0.1
Summary: JWT command line utilities
Author-email: Walter Oggioni <oggioni.walter@gmail.com>
Project-URL: Homepage, https://gitea.woggioni.net/woggioni/jwt-cli
Project-URL: Bug Tracker, https://gitea.woggioni.net/woggioni/jwt-cli/issues
Classifier: Programming Language :: Python :: 3
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Requires-Python: >=3.12
Description-Content-Type: text/markdown
Requires-Dist: jwcrypto>=1.5.6
Requires-Dist: typing_extensions==4.7.1

View File

@@ -0,0 +1,12 @@
pyproject.toml
src/jwt_cli/__init__.py
src/jwt_cli/create_key_command.py
src/jwt_cli/key.py
src/jwt_cli/key_command.py
src/jwt_cli/main.py
src/jwt_cli.egg-info/PKG-INFO
src/jwt_cli.egg-info/SOURCES.txt
src/jwt_cli.egg-info/dependency_links.txt
src/jwt_cli.egg-info/entry_points.txt
src/jwt_cli.egg-info/requires.txt
src/jwt_cli.egg-info/top_level.txt

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,2 @@
[console_scripts]
jwt = jwt_cli:main

View File

@@ -0,0 +1,2 @@
jwcrypto>=1.5.6
typing_extensions==4.7.1

View File

@@ -0,0 +1 @@
jwt_cli

1
src/jwt_cli/__init__.py Normal file
View File

@@ -0,0 +1 @@
from .main import main

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,148 @@
from argparse import ArgumentParser, Action
from enum import Enum
from jwcrypto.jwk import JWK
from typing import Optional, Any
from .key import KeyFileType, write_key
class ECCurve(Enum):
p256 = 'P-256'
p384 = 'P-384'
p521 = 'P-521'
bp256 = 'BP-256'
bp384 = 'BP-384'
bp521 = 'BP-521'
secp256k1 = 'secp256k1'
def __str__(self) -> str:
return self.value
class OKPCurve(Enum):
Ed25519 = 'Ed25519'
Ed448 = 'Ed448'
X25519 = 'X25519'
X448 = 'X448'
def __str__(self) -> str:
return self.value
def _create_ec_key(
output_type: Optional[KeyFileType] = None,
curve: Optional[ECCurve] = None,
output_file: Optional[str] = None,
out_password: Optional[str] = None,
**kwargs: Any
) -> None:
jwk = JWK.generate(kty='EC', crv=curve.value)
write_key(jwk, output_file, output_type, out_password=out_password)
def _create_okp_key(
output_type: Optional[KeyFileType] = None,
curve: Optional[OKPCurve] = None,
output_file: Optional[str] = None,
out_password: Optional[str] = None,
**kwargs: Any
) -> None:
jwk = JWK.generate(kty='OKP', crv=curve.value)
write_key(jwk, output_file, output_type, out_password=out_password)
def _create_oct_key(
output_type: Optional[KeyFileType] = None,
size: Optional[int] = None,
output_file: Optional[str] = None,
out_password: Optional[str] = None,
**kwargs: Any
) -> None:
jwk = JWK.generate(kty='oct', size=size)
write_key(jwk, output_file, output_type, out_password=out_password)
def _create_rsa_key(
output_type: Optional[KeyFileType] = None,
exponent: Optional[int] = None,
size: Optional[int] = None,
output_file: Optional[str] = None,
out_password: Optional[str] = None,
**kwargs: Any
) -> None:
jwk = JWK.generate(kty='RSA', public_exponent=exponent, size=size)
write_key(jwk, output_file, output_type, out_password=out_password)
def _add_common_params(cmd: ArgumentParser) -> None:
cmd.add_argument('-P', '--out-password', help='Password for the input key')
cmd.add_argument(
'-O',
'--output-type',
help='Type of output key material',
type=KeyFileType,
choices=list(KeyFileType),
required=True
)
cmd.add_argument(
'-o',
'--output-file',
help='Output file (defaults to stdout)'
)
def create_key_command(subparsers: Action) -> ArgumentParser:
cmd: ArgumentParser = subparsers.add_parser("create", help="Create a new JWK key")
cmd_subparsers = cmd.add_subparsers()
rsa_cmd = cmd_subparsers.add_parser("rsa", help="Create a new JWK RSA key")
_add_common_params(rsa_cmd)
rsa_cmd.add_argument(
'-e',
'--exponent',
type=int,
help='Public exponent',
default=65537
)
rsa_cmd.add_argument(
'-s',
'--size',
type=int,
help='Size',
default=1024
)
rsa_cmd.set_defaults(func=_create_rsa_key)
ec_cmd = cmd_subparsers.add_parser("ec", help="Create a new JWK EC key")
_add_common_params(ec_cmd)
ec_cmd.add_argument(
'-c',
'--curve',
help='Elliptic curve name',
type=ECCurve,
choices=list(ECCurve),
default=ECCurve.p256
)
ec_cmd.set_defaults(func=_create_ec_key)
okp_cmd = cmd_subparsers.add_parser("okp", help="Create a new JWK OKP key")
_add_common_params(okp_cmd)
okp_cmd.add_argument(
'-c',
'--curve',
help='Elliptic curve name',
type=OKPCurve,
choices=list(OKPCurve),
default=OKPCurve.Ed25519
)
okp_cmd.set_defaults(func=_create_okp_key)
oct_cmd = cmd_subparsers.add_parser("oct", help="Create a new JWK oct key")
_add_common_params(oct_cmd)
oct_cmd.add_argument(
'-s',
'--size',
help='Key size',
type=int,
default=256
)
oct_cmd.set_defaults(func=_create_oct_key)
return cmd

85
src/jwt_cli/key.py Normal file
View File

@@ -0,0 +1,85 @@
import enum
import sys
from typing import Optional, BinaryIO, cast
from jwcrypto.jwk import JWK
from jwcrypto.common import json_decode
from cryptography.hazmat.primitives import serialization
from pwo import Maybe
@enum.unique
class KeyFileType(enum.Enum):
PEM = 'pem'
JSON = 'json'
DER = 'der'
def __str__(self) -> str:
return self.value
def read_key(
input_file: Optional[str] = None,
input_type: Optional[KeyFileType] = None,
in_password: Optional[str] = None,
) -> JWK:
jwk = JWK()
password = (
Maybe.of_nullable(in_password)
.map(lambda it: it.encode('utf-8'))
.or_else(None)
)
def open_input_file() -> BinaryIO:
return open(input_file, 'rb') if input_file else sys.stdin.buffer
if input_type == KeyFileType.PEM:
with open_input_file() as infile:
jwk.import_from_pem(infile.read().decode('utf-8'), password)
elif input_type == KeyFileType.JSON:
with open_input_file() as infile:
jwk.import_key(**json_decode(infile.read().decode('utf-8')))
elif input_type == KeyFileType.DER:
with open_input_file() as infile:
der_obj = serialization.load_der_private_key(infile.read(), password)
jwk.import_from_pyca(der_obj)
return jwk
def write_key(
jwk: JWK,
output_file: Optional[str] = None,
output_type: Optional[KeyFileType] = None,
out_password: Optional[str] = None,
public: bool = False
) -> None:
password = (
Maybe.of_nullable(out_password)
.map(lambda it: it.encode('utf-8'))
.or_none()
)
def open_output_file() -> BinaryIO:
return cast(BinaryIO, open(output_file, 'wb')) if output_file else sys.stdout.buffer
private_key = not public
if output_type == KeyFileType.PEM:
with open_output_file() as outfile:
outfile.write(jwk.export_to_pem(private_key=private_key, password=password))
elif output_type == KeyFileType.JSON:
with open_output_file() as outfile:
outfile.write(jwk.export(private_key=private_key).encode('utf-8'))
elif output_type == KeyFileType.DER:
with open_output_file() as outfile:
pem_obj = serialization.load_pem_private_key(
jwk.export_to_pem(private_key=private_key, password=None),
password=None
)
der_data = pem_obj.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.BestAvailableEncryption(password)
if password
else serialization.NoEncryption()
)
outfile.write(der_data)

View File

@@ -0,0 +1,62 @@
from argparse import ArgumentParser
from typing import Optional
from .create_key_command import create_key_command
from .key import read_key, write_key, KeyFileType
def key_command(
input_file: Optional[str] = None,
output_file: Optional[str] = None,
input_type: Optional[KeyFileType] = None,
output_type: Optional[KeyFileType] = None,
in_password: Optional[str] = None,
out_password: Optional[str] = None,
public: bool = False,
**kwargs
) -> None:
jwk = read_key(input_file, input_type, in_password)
write_key(jwk, output_file, output_type, out_password, public)
def add_key_command(subparsers: ArgumentParser) -> ArgumentParser:
cmd: ArgumentParser = subparsers.add_parser("key", help="Manage JWKs")
key_subparsers = cmd.add_subparsers(description="subcommands")
convert: ArgumentParser = key_subparsers.add_parser("convert", help="Convert JWKs in/from different formats")
convert.add_argument('-p', '--in-password',
help='Password for the input key')
convert.add_argument('-P', '--out-password',
help='Password for the input key')
convert.add_argument(
'-I',
'--input-type',
help='Type of input key material',
type=KeyFileType,
choices=list(KeyFileType),
required=True
)
convert.add_argument(
'-O',
'--output-type',
help='Type of output key material',
type=KeyFileType,
choices=list(KeyFileType),
required=True
)
convert.add_argument(
'-i',
'--input-file',
help='Input file (defaults to stdin)'
)
convert.add_argument(
'-o',
'--output-file',
help='Output file (defaults to stdout)'
)
convert.add_argument(
'--public',
help='Only output public key',
action='store_true'
)
convert.set_defaults(func=key_command)
create_key_command(key_subparsers)
return cmd

15
src/jwt_cli/main.py Normal file
View File

@@ -0,0 +1,15 @@
import argparse
from typing import List, Optional
from .key_command import add_key_command
from .token_command import add_token_command
def main(args: Optional[List[str]] = None):
# Create the parser
parser = argparse.ArgumentParser(description="A simple CLI program to manage JWT")
subparsers = parser.add_subparsers(title="subcommands", description="valid subcommands")
key_command = add_key_command(subparsers)
token_command = add_token_command(subparsers)
args = parser.parse_args(args)
args.func(**vars(args))

View File

@@ -0,0 +1,48 @@
from argparse import ArgumentParser
from typing import Optional
from .key import read_key, KeyFileType
def signature_command(
input_file: Optional[str] = None,
input_type: Optional[KeyFileType] = None,
in_password: Optional[str] = None,
output_file: Optional[str] = None,
key: Optional[str] = None,
keys: Optional[str] = None,
**kwargs
) -> None:
jwk = read_key(input_file, input_type, in_password)
jwt = read_token(input_file, jwk=key, jwks=keys)
write_token(jwt, output_file)
def add_signature_command(subparsers: ArgumentParser) -> ArgumentParser:
cmd: ArgumentParser = subparsers.add_parser("signature", help="Manage JWSs")
signature_subparsers = cmd.add_subparsers(description="subcommands")
convert: ArgumentParser = signature_subparsers.add_parser("sign", help="Sign a payload")
convert.add_argument(
'-i',
'--input-file',
help='Input file (defaults to stdin)'
)
convert.add_argument('-p', '--key-password',
help='Password for the signing key')
convert.add_argument(
'-I',
'--input-type',
help='Type of input key material',
type=KeyFileType,
choices=list(KeyFileType),
required=True
)
convert.add_argument(
'-o',
'--output-file',
help='Output file (defaults to stdout)'
)
convert.add_argument(
'--key',
help='Sign using the provided JWK',
)
convert.set_defaults(func=signature_command)
return cmd

0
src/jwt_cli/signature.py Normal file
View File

82
src/jwt_cli/token.py Normal file
View File

@@ -0,0 +1,82 @@
from jwcrypto.jwt import JWT
from jwcrypto.jws import JWS
from jwcrypto.jwk import JWKSet, JWK
import json
from pwo import Maybe
from typing import Optional, BinaryIO, TextIO, cast
import sys
from pwo import retry, ExceptionHandlerOutcome
from urllib.request import Request, urlretrieve, urlopen
class HttpException(Exception):
http_status_code: int
message: Optional[str]
def __init__(self, http_status_code: int, msg: Optional[str] = None):
self.message = msg
self.http_status_code = http_status_code
def __repr__(self) -> str:
return f'HTTP status {self.http_status_code}' + f': {self.message}' if self.message else ''
def error_handler(err):
return ExceptionHandlerOutcome.THROW
@retry(max_attempts=5, initial_delay=1.0, exception_handler=error_handler)
def fetch_keyset(url):
with urlopen(url) as response:
return JWKSet.from_json(response.read())
@retry(max_attempts=5, initial_delay=1.0, exception_handler=error_handler)
def fetch_key(url):
with urlopen(url) as response:
return JWK.from_json(response.read())
def read_token(
input_file: Optional[str] = None,
jwks: Optional[str] = None,
jwk: Optional[str] = None
) -> JWT:
jwt = JWT()
if jwks and jwk:
raise ValueError('Only one between key and keyset must be provided')
keys = (Maybe.of_nullable(jwks)
.map(lambda uri: fetch_keyset(uri)))
key = (Maybe.of_nullable(jwk)
.map(lambda uri: fetch_key(uri)))
def open_input_file() -> TextIO:
return open(input_file, 'r') if input_file else sys.stdin
with open_input_file() as infile:
content = infile.read()
jwt.deserialize(content, key=(keys or key).or_none())
return jwt
def write_token(
jwt: JWT,
output_file: Optional[str] = None,
) -> None:
def open_output_file() -> TextIO:
return cast(TextIO, open(output_file, 'wb')) if output_file else sys.stdout.buffer
with open_output_file() as outfile:
# jws = cast(JWS, jwt.token)
token = jwt.token
if isinstance(token, JWS):
jws = cast(JWS, token)
if jws.is_valid:
outfile.write(jws.payload)
else:
outfile.write(jws.objects['payload'])
# header = jws.jose_header
# issuer = jws.objects['payload']

View File

@@ -0,0 +1,42 @@
from argparse import ArgumentParser
from typing import Optional
from .create_key_command import create_key_command
from .token import read_token, write_token
def token_command(
input_file: Optional[str] = None,
output_file: Optional[str] = None,
key: Optional[str] = None,
keys: Optional[str] = None,
**kwargs
) -> None:
jwt = read_token(input_file, jwk=key, jwks=keys)
write_token(jwt, output_file)
def add_token_command(subparsers: ArgumentParser) -> ArgumentParser:
cmd: ArgumentParser = subparsers.add_parser("token", help="Manage JWTs")
token_subparsers = cmd.add_subparsers(description="subcommands")
convert: ArgumentParser = token_subparsers.add_parser("parse", help="Parse a JWT")
convert.add_argument(
'-i',
'--input-file',
help='Input file (defaults to stdin)'
)
convert.add_argument(
'-o',
'--output-file',
help='Output file (defaults to stdout)'
)
convert.add_argument(
'--key',
help='Verify or decrypt using the provided JWK',
)
convert.add_argument(
'--keys',
help='Verify or decrypt using the provided JWKS',
)
convert.set_defaults(func=token_command)
return cmd

Binary file not shown.

1
tests/auth_token.txt Normal file
View File

@@ -0,0 +1 @@
eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICI4MW40Z2k4Sm1HVDNnN3p3YnpKcmVsMm1LY0FvdURTVVphR3g4b3ZDZjAwIn0.eyJleHAiOjE3MTkxMTk3NTUsImlhdCI6MTcxOTExOTI3NSwiYXV0aF90aW1lIjoxNzE5MTE4ODgyLCJqdGkiOiI3NmY1NmY1Mi1mODBjLTQxZmUtODhhMS03YzQyNzM0OTUzYzgiLCJpc3MiOiJodHRwczovL3dvZ2dpb25pLm5ldC9hdXRoL3JlYWxtcy93b2dnaW9uaS5uZXQiLCJhdWQiOlsianBhY3JlcG8iLCJhY2NvdW50Il0sInN1YiI6ImIxYjAyZDY0LTZmYzctNDdkMC1iNGU5LTNjMTY3YzQ3Mzc4ZCIsInR5cCI6IkJlYXJlciIsImF6cCI6ImpwYWNyZXBvLWNsaWVudCIsInNlc3Npb25fc3RhdGUiOiJiYWJhMGZhZi01ZWIwLTQwYTYtODhmZi1iMDI2MzI1ZGI4YjkiLCJhY3IiOiIwIiwicmVhbG1fYWNjZXNzIjp7InJvbGVzIjpbIm5leHRjbG91ZCIsImdpdGVhIiwiZGVmYXVsdC1yb2xlcy13b2dnaW9uaS5uZXQiLCJ0ZXN0LXJvbGUiLCJqY2hhdCIsIm9mZmxpbmVfYWNjZXNzIiwiamVua2lucyIsInVtYV9hdXRob3JpemF0aW9uIiwianBhY3JlcG8iXX0sInJlc291cmNlX2FjY2VzcyI6eyJhY2NvdW50Ijp7InJvbGVzIjpbIm1hbmFnZS1hY2NvdW50IiwibWFuYWdlLWFjY291bnQtbGlua3MiLCJ2aWV3LXByb2ZpbGUiXX19LCJzY29wZSI6Im9wZW5pZCBwcm9maWxlIGVtYWlsIiwic2lkIjoiYmFiYTBmYWYtNWViMC00MGE2LTg4ZmYtYjAyNjMyNWRiOGI5IiwiZW1haWxfdmVyaWZpZWQiOnRydWUsIm5hbWUiOiJXYWx0ZXIgT2dnaW9uaSIsInByZWZlcnJlZF91c2VybmFtZSI6IndvZ2dpb25pIiwiZ2l2ZW5fbmFtZSI6IldhbHRlciIsImZhbWlseV9uYW1lIjoiT2dnaW9uaSIsImVtYWlsIjoib2dnaW9uaS53YWx0ZXJAZ21haWwuY29tIn0.YOLAaRlcW1tNeVDa4Uq_2PSJCG4huwjVuSDbZb6xapn8oIYw2phZ4R3dCR7gxRR76_xnJeitFlxMj2M_HazzbY761hhv9H3yM0f7SqgQNoGAQr4vDsKMzeLubYVX1wk77D3n8uAA_aMv1tBq8Rmkno9uDvNaofCh2Py1-zuaiSHNygnIhYYIeqU1uwORA05FVU5vcgj4bWLioH_v_5AGyTdQvP4ZWmK0MIRpAOhQd43WgBm3nrPAT0qbrT9X1yIkR-dvrN4YFVvGcscVGsZNkBN4Im4rbrl8SE3Ow5Q1-imuQhg2jtWCATjQK8IqPh8DFMD8lXTVZZnS9GgF_5Jtyw

1
tests/jwks.json Normal file

File diff suppressed because one or more lines are too long

27
tests/test_token.py Normal file
View File

@@ -0,0 +1,27 @@
import unittest
import tempfile
from jwcrypto.jwt import JWT, JWTExpired
from src.jwt_cli.main import main
from asyncio.runners import run
from pwo import async_test, tmpdir
from pathlib import Path
import sys
import json
class TokenTest(unittest.TestCase):
@tmpdir
def test_parse_token(self, temp_dir):
result = temp_dir / 'output.json'
with self.assertRaises(JWTExpired) as _:
main(['token', 'parse', '-i', 'auth_token.txt',
'-o', str(result),
'--keys', 'file:jwks.json'])
main(['token', 'parse', '-i', 'auth_token.txt',
'-o', str(result)])
with open(result, 'r') as infile:
# print(infile.read())
jwt = json.load(infile)
json.dump(jwt, sys.stdout, indent=4)