Source code for proxy.proxy

# -*- coding: utf-8 -*-
"""
    proxy.py
    ~~~~~~~~
    ⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
    Network monitoring, controls & Application development, testing, debugging.

    :copyright: (c) 2013-present by Abhinav Singh and contributors.
    :license: BSD, see LICENSE for more details.
"""
import os
import sys
import gzip
import json
import time
import pprint
import signal
import socket
import getpass
import logging
import argparse
import threading
from typing import TYPE_CHECKING, Any, Dict, List, Type, Tuple, Optional, cast

from .core.ssh import SshTunnelListener, SshHttpProtocolHandler
from .core.work import ThreadlessPool
from .core.event import EventManager
from .http.codes import httpStatusCodes
from .common.flag import FlagParser, flags
from .http.client import client
from .common.utils import bytes_
from .core.work.fd import RemoteFdExecutor
from .http.methods import httpMethods
from .core.acceptor import AcceptorPool
from .core.listener import ListenerPool
from .core.ssh.base import BaseSshTunnelListener
from .common.plugins import Plugins
from .common.version import __version__
from .common.constants import (
    IS_WINDOWS, HTTPS_PROTO, DEFAULT_PLUGINS, DEFAULT_VERSION,
    DEFAULT_LOG_FILE, DEFAULT_PID_FILE, DEFAULT_LOG_LEVEL, DEFAULT_BASIC_AUTH,
    DEFAULT_LOG_FORMAT, DEFAULT_WORK_KLASS, DEFAULT_OPEN_FILE_LIMIT,
    DEFAULT_ENABLE_DASHBOARD, DEFAULT_ENABLE_SSH_TUNNEL,
    DEFAULT_SSH_LISTENER_KLASS,
)


if TYPE_CHECKING:   # pragma: no cover
    from .core.listener import TcpSocketListener
    from .core.ssh.base import BaseSshTunnelHandler


logger = logging.getLogger(__name__)


flags.add_argument(
    '--version',
    '-v',
    action='store_true',
    default=DEFAULT_VERSION,
    help='Prints proxy.py version.',
)

# TODO: Add --verbose option which also
# starts to log traffic flowing between
# clients and upstream servers.
flags.add_argument(
    '--log-level',
    type=str,
    default=DEFAULT_LOG_LEVEL,
    help='Valid options: DEBUG, INFO (default), WARNING, ERROR, CRITICAL. '
    'Both upper and lowercase values are allowed. '
    'You may also simply use the leading character e.g. --log-level d',
)

flags.add_argument(
    '--log-file',
    type=str,
    default=DEFAULT_LOG_FILE,
    help='Default: sys.stdout. Log file destination.',
)

flags.add_argument(
    '--log-format',
    type=str,
    default=DEFAULT_LOG_FORMAT,
    help='Log format for Python logger.',
)

flags.add_argument(
    '--open-file-limit',
    type=int,
    default=DEFAULT_OPEN_FILE_LIMIT,
    help='Default: 1024. Maximum number of files (TCP connections) '
    'that proxy.py can open concurrently.',
)

flags.add_argument(
    '--plugins',
    action='append',
    nargs='+',
    default=DEFAULT_PLUGINS,
    help='Comma separated plugins.  ' +
    'You may use --plugins flag multiple times.',
)

# TODO: Ideally all `--enable-*` flags must be at the top-level.
# --enable-dashboard is specially needed here because
# ProxyDashboard class is not imported anywhere.
#
# Due to which, if we move this flag definition within dashboard
# plugin, users will have to explicitly enable dashboard plugin
# to also use flags provided by it.
flags.add_argument(
    '--enable-dashboard',
    action='store_true',
    default=DEFAULT_ENABLE_DASHBOARD,
    help='Default: False.  Enables proxy.py dashboard.',
)

# NOTE: Same reason as mention above.
# Ideally this flag belongs to proxy auth plugin.
flags.add_argument(
    '--basic-auth',
    type=str,
    default=DEFAULT_BASIC_AUTH,
    help='Default: No authentication. Specify colon separated user:password '
    'to enable basic authentication.',
)

flags.add_argument(
    '--enable-ssh-tunnel',
    action='store_true',
    default=DEFAULT_ENABLE_SSH_TUNNEL,
    help='Default: False.  Enable SSH tunnel.',
)

flags.add_argument(
    '--work-klass',
    type=str,
    default=DEFAULT_WORK_KLASS,
    help='Default: ' + DEFAULT_WORK_KLASS +
    '.  Work klass to use for work execution.',
)

flags.add_argument(
    '--pid-file',
    type=str,
    default=DEFAULT_PID_FILE,
    help='Default: None. Save "parent" process ID to a file.',
)

flags.add_argument(
    '--openssl',
    type=str,
    default='openssl',
    help='Default: openssl. Path to openssl binary. ' +
    'By default, assumption is that openssl is in your PATH.',
)

flags.add_argument(
    '--data-dir',
    type=str,
    default=None,
    help='Default: ~/.proxypy. Path to proxypy data directory.',
)

flags.add_argument(
    '--ssh-listener-klass',
    type=str,
    default=DEFAULT_SSH_LISTENER_KLASS,
    help='Default: '
    + DEFAULT_SSH_LISTENER_KLASS
    + '.  An implementation of BaseSshTunnelListener',
)


[docs]class Proxy: """Proxy is a context manager to control proxy.py library core. By default, :class:`~proxy.core.pool.AcceptorPool` is started with :class:`~proxy.http.handler.HttpProtocolHandler` work class. By definition, it expects HTTP traffic to flow between clients and server. In ``--threadless`` mode and without ``--local-executor``, a :class:`~proxy.core.executors.ThreadlessPool` is also started. Executor pool receives newly accepted work by :class:`~proxy.core.acceptor.Acceptor` and creates an instance of work class for processing the received work. In ``--threadless`` mode and with ``--local-executor 0``, acceptors will start a companion thread to handle accepted client connections. Optionally, Proxy class also initializes the EventManager. A multi-process safe pubsub system which can be used to build various patterns for message sharing and/or signaling. """ def __init__(self, input_args: Optional[List[str]] = None, **opts: Any) -> None: self.opts = opts self.flags = FlagParser.initialize(input_args, **opts) self.listeners: Optional[ListenerPool] = None self.executors: Optional[ThreadlessPool] = None self.acceptors: Optional[AcceptorPool] = None self.event_manager: Optional[EventManager] = None self.ssh_tunnel_listener: Optional[BaseSshTunnelListener] = None def __enter__(self) -> 'Proxy': self.setup() return self def __exit__(self, *args: Any) -> None: self.shutdown()
[docs] def setup(self) -> None: # TODO: Introduce cron feature # https://github.com/abhinavsingh/proxy.py/discussions/808 # # TODO: Introduce ability to change flags dynamically # https://github.com/abhinavsingh/proxy.py/discussions/1020 # # TODO: Python shell within running proxy.py environment # https://github.com/abhinavsingh/proxy.py/discussions/1021 # # TODO: Near realtime resource / stats monitoring # https://github.com/abhinavsingh/proxy.py/discussions/1023 # self._write_pid_file() # We setup listeners first because of flags.port override # in case of ephemeral port being used self.listeners = ListenerPool(flags=self.flags) self.listeners.setup() # Override flags.port to match the actual port # we are listening upon. This is necessary to preserve # the server port when `--port=0` is used. if not self.flags.unix_socket_path: self.flags.port = cast( 'TcpSocketListener', self.listeners.pool[0], )._port # --ports flag can also use 0 as value for ephemeral port selection. # Here, we override flags.ports to reflect actual listening ports. ports = set() offset = 1 if self.flags.unix_socket_path else 0 for index in range(offset, offset + len(self.flags.ports)): ports.add( cast( 'TcpSocketListener', self.listeners.pool[index], )._port, ) if self.flags.port in ports: ports.remove(self.flags.port) self.flags.ports = list(ports) # Write ports to port file self._write_port_file() # Setup EventManager if self.flags.enable_events: logger.info('Core Event enabled') self.event_manager = EventManager() self.event_manager.setup() event_queue = self.event_manager.queue \ if self.event_manager is not None \ else None # Setup remote executors only if # --local-executor mode isn't enabled. if self.remote_executors_enabled: self.executors = ThreadlessPool( flags=self.flags, event_queue=event_queue, executor_klass=RemoteFdExecutor, ) self.executors.setup() # Setup acceptors self.acceptors = AcceptorPool( flags=self.flags, listeners=self.listeners, executor_queues=self.executors.work_queues if self.executors else [], executor_pids=self.executors.work_pids if self.executors else [], executor_locks=self.executors.work_locks if self.executors else [], event_queue=event_queue, ) self.acceptors.setup() # Start SSH tunnel acceptor if enabled if self.flags.enable_ssh_tunnel: self.ssh_tunnel_listener = self._setup_tunnel( flags=self.flags, **self.opts, ) # TODO: May be close listener fd as we don't need it now if threading.current_thread() == threading.main_thread(): self._register_signals()
[docs] @staticmethod def _setup_tunnel( flags: argparse.Namespace, ssh_handler_klass: Optional[Type['BaseSshTunnelHandler']] = None, ssh_listener_klass: Optional[Any] = None, **kwargs: Any, ) -> BaseSshTunnelListener: listener_klass = ssh_listener_klass or SshTunnelListener handler_klass = ssh_handler_klass or SshHttpProtocolHandler tunnel = cast(Type[BaseSshTunnelListener], listener_klass)( flags=flags, handler=handler_klass(flags=flags), **kwargs, ) tunnel.setup() return tunnel
[docs] def shutdown(self) -> None: if self.flags.enable_ssh_tunnel: assert self.ssh_tunnel_listener is not None self.ssh_tunnel_listener.shutdown() assert self.acceptors self.acceptors.shutdown() if self.remote_executors_enabled: assert self.executors self.executors.shutdown() if self.flags.enable_events: assert self.event_manager is not None self.event_manager.shutdown() if self.listeners: self.listeners.shutdown() self._delete_port_file() self._delete_pid_file()
@property def remote_executors_enabled(self) -> bool: return self.flags.threadless and \ not self.flags.local_executor
[docs] def _write_pid_file(self) -> None: if self.flags.pid_file: with open(self.flags.pid_file, 'wb') as pid_file: pid_file.write(bytes_(os.getpid()))
[docs] def _delete_pid_file(self) -> None: if self.flags.pid_file \ and os.path.exists(self.flags.pid_file): os.remove(self.flags.pid_file)
[docs] def _write_port_file(self) -> None: if self.flags.port_file: with open(self.flags.port_file, 'wb') as port_file: if not self.flags.unix_socket_path: port_file.write(bytes_(self.flags.port)) port_file.write(b'\n') for port in self.flags.ports: port_file.write(bytes_(port)) port_file.write(b'\n')
[docs] def _delete_port_file(self) -> None: if self.flags.port_file \ and os.path.exists(self.flags.port_file): os.remove(self.flags.port_file)
[docs] def _register_signals(self) -> None: # TODO: Define SIGUSR1, SIGUSR2 signal.signal(signal.SIGINT, self._handle_exit_signal) signal.signal(signal.SIGTERM, self._handle_exit_signal) if not IS_WINDOWS: if hasattr(signal, 'SIGINFO'): signal.signal( # pragma: no cover signal.SIGINFO, # pylint: disable=E1101 self._handle_siginfo, ) signal.signal(signal.SIGHUP, self._handle_exit_signal) # TODO: SIGQUIT is ideally meant to terminate with core dumps signal.signal(signal.SIGQUIT, self._handle_exit_signal)
[docs] @staticmethod def _handle_exit_signal(signum: int, _frame: Any) -> None: logger.debug('Received signal %d' % signum) sys.exit(0)
[docs] def _handle_siginfo(self, _signum: int, _frame: Any) -> None: pprint.pprint(self.flags.__dict__) # pragma: no cover
[docs]def sleep_loop(p: Optional[Proxy] = None) -> None: while True: try: time.sleep(1) except KeyboardInterrupt: break
[docs]def main(**opts: Any) -> None: with Proxy(sys.argv[1:], **opts) as p: sleep_loop(p)
[docs]def entry_point() -> None: main()
[docs]def grout() -> None: # noqa: C901 default_grout_tld = os.environ.get('JAXL_DEFAULT_GROUT_TLD', 'jaxl.io') def _clear_line() -> None: print('\r' + ' ' * 60, end='', flush=True) def _env(scheme: bytes, host: bytes, port: int) -> Optional[Dict[str, Any]]: response = client( scheme=scheme, host=host, port=port, path=b'/env/', method=httpMethods.BIND, body='v={0}&u={1}&h={2}'.format( __version__, os.environ.get('USER', getpass.getuser()), socket.gethostname(), ).encode(), ) if response: if ( response.code is not None and int(response.code) == httpStatusCodes.OK and response.body is not None ): return cast( Dict[str, Any], json.loads( ( gzip.decompress(response.body).decode() if response.has_header(b'content-encoding') and response.header(b'content-encoding') == b'gzip' else response.body.decode() ), ), ) if response.code is None: _clear_line() print('\r\033[91mUnable to fetch\033[0m', end='', flush=True) else: _clear_line() print( '\r\033[91mError code {0}\033[0m'.format( response.code.decode(), ), end='', flush=True, ) else: _clear_line() print('\r\033[91mUnable to connect\033[0m') return None def _parse() -> Tuple[str, int]: """Here we deduce registry host/port based upon input parameters.""" parser = argparse.ArgumentParser(add_help=False) parser.add_argument('route', nargs='?', default=None) parser.add_argument('name', nargs='?', default=None) args, _remaining_args = parser.parse_known_args() grout_tld = default_grout_tld if args.name is not None and '.' in args.name: grout_tld = args.name.split('.', maxsplit=1)[1] grout_tld_parts = grout_tld.split(':') tld_host = grout_tld_parts[0] tld_port = 443 if len(grout_tld_parts) > 1: tld_port = int(grout_tld_parts[1]) return tld_host, tld_port tld_host, tld_port = _parse() env = None attempts = 0 try: while True: env = _env(scheme=HTTPS_PROTO, host=tld_host.encode(), port=int(tld_port)) attempts += 1 if env is not None: print('\rStarting ...' + ' ' * 30 + '\r', end='', flush=True) break time.sleep(1) _clear_line() print( '\rWaiting for connection {0}'.format('.' * (attempts % 4)), end='', flush=True, ) time.sleep(1) except KeyboardInterrupt: sys.exit(1) assert env is not None print('\r' + ' ' * 70 + '\r', end='', flush=True) Plugins.from_bytes(env['m'].encode(), name='client').grout(env=env['e']) # type: ignore[attr-defined]