Files
session-security-poc/sspoc/server.py
2024-02-14 21:21:07 +08:00

199 lines
6.0 KiB
Python

import flask
import sys
import ssl
from argparse import ArgumentParser
from flask import Flask, send_from_directory
from flask import request
from functools import wraps
import base64
import random
import json
from dataclasses import dataclass
from typing import Dict, Optional
from time import time
from hashlib import sha256
from binascii import Error
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import hashes
@dataclass
class SessionData:
nonce: bytes
user: str
app = Flask(__name__)
users = {
"user1": "password",
"user2": "password",
"user3": "password",
"user4 ": "password",
}
sessions: Dict[bytes, SessionData] = dict()
# Authentication decorator
def token_required(f):
@wraps(f)
def decorator(*args, **kwargs):
cookie = request.cookies.get('Bearer')
if not cookie:
response = flask.Response()
response.status = 401
response.data = "A valid cookie is missing!"
return response
binary_cookie: bytes = None
try:
binary_cookie = base64.b64decode(cookie)
except Error:
response = flask.Response()
response.status = 401
response.data = "Cookie is invalid"
return response
session: Optional[SessionData] = sessions.get(binary_cookie)
if not session:
response = flask.Response()
response.status = 401
response.data = "Cookie is invalid"
return response
header_token = request.headers.get('x-token')
if not header_token:
response = flask.Response()
response.status = 401
response.data = "Token is missing"
return response
binary_token: bytes = None
try:
binary_token = base64.b64decode(header_token)
except Error:
response = flask.Response()
response.status = 401
response.data = "Token is invalid"
return response
current_tick: int = int(time()) // 3
valid_tokens = [
sha256(session.nonce + (current_tick + 1).to_bytes(8)).digest(),
sha256(session.nonce + current_tick.to_bytes(8)).digest(),
sha256(session.nonce + (current_tick - 1).to_bytes(8)).digest()
]
if binary_token not in valid_tokens:
response = flask.Response()
response.status = 401
response.data = "Token is invalid"
return response
return f(session.user, *args, **kwargs)
return decorator
@app.route('/api/login', methods=['POST'])
def login():
response = flask.Response()
if request.headers.get('Content-Type') != 'application/json':
response.status = 415
response.data = "Wrong request content type"
return response
payload = json.loads(request.data)
user = payload.get('username')
if not user:
response.status = 401
response.data = "Missing username from request"
return response
password = users.get(user)
if not password:
response.status = 401
response.data = "Wrong username"
return response
suppliedPassword = payload.get('password')
if not suppliedPassword:
response.status = 401
response.data = "Missing password from request"
return response
elif suppliedPassword != password:
response.status = 401
response.data = "Wrong password"
return response
sr = random.SystemRandom()
nonce = sr.randbytes(16)
public_key_header = request.headers.get('public-key', None)
if not public_key_header:
response.data = "Missing public key header"
response.status = 400
return response
pem_key = f'-----BEGIN PUBLIC KEY-----\n{public_key_header}\n-----END PUBLIC KEY-----\n'
public_key = serialization.load_pem_public_key(
pem_key.encode(),
backend=default_backend()
)
response = flask.Response()
response.status = 200
ciphertext = public_key.encrypt(
nonce,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
response.headers['nonce'] = base64.b64encode(ciphertext).decode()
cookie_bytes = sr.randbytes(16)
sessions[cookie_bytes] = SessionData(user=user, nonce=nonce)
cookie: str = base64.b64encode(cookie_bytes).decode()
response.set_cookie('Bearer', cookie, secure=True, httponly=True, samesite='Lax')
return response
@app.route('/')
def index():
return send_from_directory('static', 'index.html')
@app.route('/js/<path:path>')
def send_javascript(path):
print(path)
return send_from_directory('static/js', path)
@app.route('/css/<path:path>')
def send_css(path):
return send_from_directory('static/css', path)
@app.route('/api/whoami')
@token_required
def whoami(user):
return f'hello {user}'
@app.route('/api/hello')
def hello():
return 'hello anonymous'
def main():
parser = ArgumentParser(
prog='session-security-poc',
description='Program to demostrate improved user session security')
parser.add_argument('--host', default='127.0.0.1')
parser.add_argument('--port', default='8080')
parser.add_argument('--key-file')
parser.add_argument('--cert-file')
parser.add_argument('--tls-self-signed', action='store_true')
args = parser.parse_args(sys.argv[1:])
if args.tls_self_signed:
ssl_context = 'adhoc'
elif args.key_file and args.cert_file:
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain(certfile=args.cert_file, keyfile=args.key_file)
else:
ssl_context = None
app.run(host=args.host, port=args.port, ssl_context=ssl_context)
if __name__ == '__main__':
main()