Source code for proxy.common.pki

# -*- coding: utf-8 -*-
    ⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
    Network monitoring, controls & Application development, testing, debugging.

    :copyright: (c) 2013-present by Abhinav Singh and contributors.
    :license: BSD, see LICENSE for more details.

    .. spelling::

import os
import sys
import time
import uuid
import logging
import argparse
import tempfile
import contextlib
import subprocess
from typing import List, Tuple, Optional, Generator

from .utils import bytes_
from .version import __version__
from .constants import COMMA

logger = logging.getLogger(__name__)

DEFAULT_CONFIG = b'''[ req ]
#default_bits		= 2048
#default_md		    = sha256
#default_keyfile 	= privkey.pem
distinguished_name	= req_distinguished_name
attributes		    = req_attributes

[ req_distinguished_name ]
countryName			    = Country Name (2 letter code)
countryName_min			= 2
countryName_max			= 2
stateOrProvinceName		= State or Province Name (full name)
localityName			= Locality Name (eg, city)
organizationName		= Organization Name (eg, company)
organizationalUnitName	= Organizational Unit Name (eg, section)
commonName			    = Common Name (eg, fully qualified host name)
commonName_max			= 64
emailAddress			= Email Address
emailAddress_max		= 64

[ req_attributes ]
challengePassword		= A challenge password
challengePassword_min	= 4
challengePassword_max	= 20'''

[docs]def remove_passphrase( key_in_path: str, password: str, key_out_path: str, timeout: int = 10, ) -> bool: """Remove passphrase from a private key.""" command = [ 'openssl', 'rsa', '-passin', 'pass:%s' % password, '-in', key_in_path, '-out', key_out_path, ] return run_openssl_command(command, timeout)
[docs]def gen_private_key( key_path: str, password: str, bits: int = 2048, timeout: int = 10, ) -> bool: """Generates a private key.""" command = [ 'openssl', 'genrsa', '-aes256', '-passout', 'pass:%s' % password, '-out', key_path, str(bits), ] return run_openssl_command(command, timeout)
[docs]def gen_public_key( public_key_path: str, private_key_path: str, private_key_password: str, subject: str, alt_subj_names: Optional[List[str]] = None, extended_key_usage: Optional[str] = None, validity_in_days: int = 365, timeout: int = 10, ) -> bool: """For a given private key, generates a corresponding public key.""" with ssl_config(alt_subj_names, extended_key_usage) as (config_path, has_extension): command = [ 'openssl', 'req', '-new', '-x509', '-sha256', '-days', str(validity_in_days), '-subj', subject, '-passin', 'pass:%s' % private_key_password, '-config', config_path, '-key', private_key_path, '-out', public_key_path, ] if has_extension: command.extend([ '-extensions', 'PROXY', ]) return run_openssl_command(command, timeout)
[docs]def gen_csr( csr_path: str, key_path: str, password: str, crt_path: str, timeout: int = 10, ) -> bool: """Generates a CSR based upon existing certificate and key file.""" command = [ 'openssl', 'x509', '-x509toreq', '-passin', 'pass:%s' % password, '-in', crt_path, '-signkey', key_path, '-out', csr_path, ] return run_openssl_command(command, timeout)
[docs]def sign_csr( csr_path: str, crt_path: str, ca_key_path: str, ca_key_password: str, ca_crt_path: str, serial: str, alt_subj_names: Optional[List[str]] = None, extended_key_usage: Optional[str] = None, validity_in_days: int = 365, timeout: int = 10, ) -> bool: """Sign a CSR using CA key and certificate.""" with ext_file(alt_subj_names, extended_key_usage) as extension_path: command = [ 'openssl', 'x509', '-req', '-sha256', '-CA', ca_crt_path, '-CAkey', ca_key_path, '-passin', 'pass:%s' % ca_key_password, '-set_serial', serial, '-days', str(validity_in_days), '-extfile', extension_path, '-in', csr_path, '-out', crt_path, ] return run_openssl_command(command, timeout)
[docs]def get_ext_config( alt_subj_names: Optional[List[str]] = None, extended_key_usage: Optional[str] = None, ) -> bytes: config = b'' # Add SAN extension if alt_subj_names is not None and len(alt_subj_names) > 0: alt_names = [] for cname in alt_subj_names: alt_names.append(b'DNS:%s' % bytes_(cname)) config += b'\nsubjectAltName=' + COMMA.join(alt_names) # Add extendedKeyUsage section if extended_key_usage is not None: config += b'\nextendedKeyUsage=' + bytes_(extended_key_usage) return config
[docs]@contextlib.contextmanager def ext_file( alt_subj_names: Optional[List[str]] = None, extended_key_usage: Optional[str] = None, ) -> Generator[str, None, None]: # Write config to temp file config_path = os.path.join(tempfile.gettempdir(), uuid.uuid4().hex) with open(config_path, 'wb') as cnf: cnf.write( get_ext_config(alt_subj_names, extended_key_usage), ) yield config_path # Delete temp file os.remove(config_path)
[docs]@contextlib.contextmanager def ssl_config( alt_subj_names: Optional[List[str]] = None, extended_key_usage: Optional[str] = None, ) -> Generator[Tuple[str, bool], None, None]: config = DEFAULT_CONFIG has_extension = False if (alt_subj_names is not None and len(alt_subj_names) > 0) or \ extended_key_usage is not None: has_extension = True config += b'\n[PROXY]' # Add custom extensions config += get_ext_config(alt_subj_names, extended_key_usage) # Write config to temp file config_path = os.path.join(tempfile.gettempdir(), uuid.uuid4().hex) with open(config_path, 'wb') as cnf: cnf.write(config) yield config_path, has_extension # Delete temp file os.remove(config_path)
[docs]def run_openssl_command(command: List[str], timeout: int) -> bool: cmd = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) cmd.communicate(timeout=timeout) return cmd.returncode == 0
if __name__ == '__main__': available_actions = ( 'remove_passphrase', 'gen_private_key', 'gen_public_key', 'gen_csr', 'sign_csr', ) parser = argparse.ArgumentParser( description=' v%s : PKI Utility' % __version__, ) parser.add_argument( 'action', type=str, default=None, help='Valid actions: ' + ', '.join(available_actions), ) parser.add_argument( '--password', type=str, default='', help='Password to use for encryption. Default:', ) parser.add_argument( '--private-key-path', type=str, default=None, help='Private key path', ) parser.add_argument( '--public-key-path', type=str, default=None, help='Public key path', ) parser.add_argument( '--subject', type=str, default='/CN=localhost', help='Subject to use for public key generation. Default: /CN=localhost', ) parser.add_argument( '--csr-path', type=str, default=None, help='CSR file path. Use with gen_csr and sign_csr action.', ) parser.add_argument( '--crt-path', type=str, default=None, help='Signed certificate path. Use with sign_csr action.', ) parser.add_argument( '--hostname', type=str, default=None, help='Alternative subject names to use during CSR signing.', ) args = parser.parse_args(sys.argv[1:]) # Validation if args.action not in available_actions: logger.error( 'Invalid --action. Valid values ' + ', '.join(available_actions), ) sys.exit(1) if args.action in ('gen_private_key', 'gen_public_key') and \ args.private_key_path is None: logger.error('--private-key-path is required for ' + args.action) sys.exit(1) if args.action == 'gen_public_key' and \ args.public_key_path is None: logger.error( '--public-key-file is required for private key generation', ) sys.exit(1) # Execute if args.action == 'gen_private_key': gen_private_key(args.private_key_path, args.password) elif args.action == 'gen_public_key': gen_public_key( args.public_key_path, args.private_key_path, args.password, args.subject, ) elif args.action == 'remove_passphrase': remove_passphrase( args.private_key_path, args.password, args.private_key_path, ) elif args.action == 'gen_csr': gen_csr( args.csr_path, args.private_key_path, args.password, args.public_key_path, ) elif args.action == 'sign_csr': sign_csr( args.csr_path, args.crt_path, args.private_key_path, args.password, args.public_key_path, str(int(time.time())), alt_subj_names=[args.hostname], )