Source code for proxy.proxy

# -*- coding: utf-8 -*-
"""
    proxy.py
    ~~~~~~~~
    ⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
    Network monitoring, controls & Application development, testing, debugging.

    :copyright: (c) 2013-present by Abhinav Singh and contributors.
    :license: BSD, see LICENSE for more details.
"""
import os
import sys
import time
import pprint
import signal
import logging
import argparse
import threading
from typing import TYPE_CHECKING, Any, List, Type, Optional, cast

from .core.work import ThreadlessPool
from .core.event import EventManager
from .common.flag import FlagParser, flags
from .common.utils import bytes_
from .core.work.fd import RemoteFdExecutor
from .core.acceptor import AcceptorPool
from .core.listener import ListenerPool
from .core.ssh.base import BaseSshTunnelListener
from .common.constants import (
    IS_WINDOWS, DEFAULT_PLUGINS, DEFAULT_VERSION, DEFAULT_LOG_FILE,
    DEFAULT_PID_FILE, DEFAULT_LOG_LEVEL, DEFAULT_BASIC_AUTH,
    DEFAULT_LOG_FORMAT, DEFAULT_WORK_KLASS, DEFAULT_OPEN_FILE_LIMIT,
    DEFAULT_ENABLE_DASHBOARD, DEFAULT_ENABLE_SSH_TUNNEL,
    DEFAULT_SSH_LISTENER_KLASS,
)


if TYPE_CHECKING:   # pragma: no cover
    from .core.listener import TcpSocketListener
    from .core.ssh.base import BaseSshTunnelHandler


logger = logging.getLogger(__name__)


flags.add_argument(
    '--version',
    '-v',
    action='store_true',
    default=DEFAULT_VERSION,
    help='Prints proxy.py version.',
)

# TODO: Add --verbose option which also
# starts to log traffic flowing between
# clients and upstream servers.
flags.add_argument(
    '--log-level',
    type=str,
    default=DEFAULT_LOG_LEVEL,
    help='Valid options: DEBUG, INFO (default), WARNING, ERROR, CRITICAL. '
    'Both upper and lowercase values are allowed. '
    'You may also simply use the leading character e.g. --log-level d',
)

flags.add_argument(
    '--log-file',
    type=str,
    default=DEFAULT_LOG_FILE,
    help='Default: sys.stdout. Log file destination.',
)

flags.add_argument(
    '--log-format',
    type=str,
    default=DEFAULT_LOG_FORMAT,
    help='Log format for Python logger.',
)

flags.add_argument(
    '--open-file-limit',
    type=int,
    default=DEFAULT_OPEN_FILE_LIMIT,
    help='Default: 1024. Maximum number of files (TCP connections) '
    'that proxy.py can open concurrently.',
)

flags.add_argument(
    '--plugins',
    action='append',
    nargs='+',
    default=DEFAULT_PLUGINS,
    help='Comma separated plugins.  ' +
    'You may use --plugins flag multiple times.',
)

# TODO: Ideally all `--enable-*` flags must be at the top-level.
# --enable-dashboard is specially needed here because
# ProxyDashboard class is not imported anywhere.
#
# Due to which, if we move this flag definition within dashboard
# plugin, users will have to explicitly enable dashboard plugin
# to also use flags provided by it.
flags.add_argument(
    '--enable-dashboard',
    action='store_true',
    default=DEFAULT_ENABLE_DASHBOARD,
    help='Default: False.  Enables proxy.py dashboard.',
)

# NOTE: Same reason as mention above.
# Ideally this flag belongs to proxy auth plugin.
flags.add_argument(
    '--basic-auth',
    type=str,
    default=DEFAULT_BASIC_AUTH,
    help='Default: No authentication. Specify colon separated user:password '
    'to enable basic authentication.',
)

flags.add_argument(
    '--enable-ssh-tunnel',
    action='store_true',
    default=DEFAULT_ENABLE_SSH_TUNNEL,
    help='Default: False.  Enable SSH tunnel.',
)

flags.add_argument(
    '--work-klass',
    type=str,
    default=DEFAULT_WORK_KLASS,
    help='Default: ' + DEFAULT_WORK_KLASS +
    '.  Work klass to use for work execution.',
)

flags.add_argument(
    '--pid-file',
    type=str,
    default=DEFAULT_PID_FILE,
    help='Default: None. Save "parent" process ID to a file.',
)

flags.add_argument(
    '--openssl',
    type=str,
    default='openssl',
    help='Default: openssl. Path to openssl binary. ' +
    'By default, assumption is that openssl is in your PATH.',
)

flags.add_argument(
    '--data-dir',
    type=str,
    default=None,
    help='Default: ~/.proxypy. Path to proxypy data directory.',
)

flags.add_argument(
    '--ssh-listener-klass',
    type=str,
    default=DEFAULT_SSH_LISTENER_KLASS,
    help='Default: '
    + DEFAULT_SSH_LISTENER_KLASS
    + '.  An implementation of BaseSshTunnelListener',
)


[docs]class Proxy: """Proxy is a context manager to control proxy.py library core. By default, :class:`~proxy.core.pool.AcceptorPool` is started with :class:`~proxy.http.handler.HttpProtocolHandler` work class. By definition, it expects HTTP traffic to flow between clients and server. In ``--threadless`` mode and without ``--local-executor``, a :class:`~proxy.core.executors.ThreadlessPool` is also started. Executor pool receives newly accepted work by :class:`~proxy.core.acceptor.Acceptor` and creates an instance of work class for processing the received work. In ``--threadless`` mode and with ``--local-executor 0``, acceptors will start a companion thread to handle accepted client connections. Optionally, Proxy class also initializes the EventManager. A multi-process safe pubsub system which can be used to build various patterns for message sharing and/or signaling. """ def __init__(self, input_args: Optional[List[str]] = None, **opts: Any) -> None: self.opts = opts self.flags = FlagParser.initialize(input_args, **opts) self.listeners: Optional[ListenerPool] = None self.executors: Optional[ThreadlessPool] = None self.acceptors: Optional[AcceptorPool] = None self.event_manager: Optional[EventManager] = None self.ssh_tunnel_listener: Optional[BaseSshTunnelListener] = None def __enter__(self) -> 'Proxy': self.setup() return self def __exit__(self, *args: Any) -> None: self.shutdown()
[docs] def setup(self) -> None: # TODO: Introduce cron feature # https://github.com/abhinavsingh/proxy.py/discussions/808 # # TODO: Introduce ability to change flags dynamically # https://github.com/abhinavsingh/proxy.py/discussions/1020 # # TODO: Python shell within running proxy.py environment # https://github.com/abhinavsingh/proxy.py/discussions/1021 # # TODO: Near realtime resource / stats monitoring # https://github.com/abhinavsingh/proxy.py/discussions/1023 # self._write_pid_file() # We setup listeners first because of flags.port override # in case of ephemeral port being used self.listeners = ListenerPool(flags=self.flags) self.listeners.setup() # Override flags.port to match the actual port # we are listening upon. This is necessary to preserve # the server port when `--port=0` is used. if not self.flags.unix_socket_path: self.flags.port = cast( 'TcpSocketListener', self.listeners.pool[0], )._port # --ports flag can also use 0 as value for ephemeral port selection. # Here, we override flags.ports to reflect actual listening ports. ports = set() offset = 1 if self.flags.unix_socket_path else 0 for index in range(offset, offset + len(self.flags.ports)): ports.add( cast( 'TcpSocketListener', self.listeners.pool[index], )._port, ) if self.flags.port in ports: ports.remove(self.flags.port) self.flags.ports = list(ports) # Write ports to port file self._write_port_file() # Setup EventManager if self.flags.enable_events: logger.info('Core Event enabled') self.event_manager = EventManager() self.event_manager.setup() event_queue = self.event_manager.queue \ if self.event_manager is not None \ else None # Setup remote executors only if # --local-executor mode isn't enabled. if self.remote_executors_enabled: self.executors = ThreadlessPool( flags=self.flags, event_queue=event_queue, executor_klass=RemoteFdExecutor, ) self.executors.setup() # Setup acceptors self.acceptors = AcceptorPool( flags=self.flags, listeners=self.listeners, executor_queues=self.executors.work_queues if self.executors else [], executor_pids=self.executors.work_pids if self.executors else [], executor_locks=self.executors.work_locks if self.executors else [], event_queue=event_queue, ) self.acceptors.setup() # Start SSH tunnel acceptor if enabled if self.flags.enable_ssh_tunnel: self.ssh_tunnel_listener = self._setup_tunnel( flags=self.flags, **self.opts, ) # TODO: May be close listener fd as we don't need it now if threading.current_thread() == threading.main_thread(): self._register_signals()
[docs] @staticmethod def _setup_tunnel( flags: argparse.Namespace, ssh_handler_klass: Type['BaseSshTunnelHandler'], ssh_listener_klass: Any, **kwargs: Any, ) -> BaseSshTunnelListener: tunnel = cast(Type[BaseSshTunnelListener], ssh_listener_klass)( flags=flags, handler=ssh_handler_klass(flags=flags), **kwargs, ) tunnel.setup() return tunnel
[docs] def shutdown(self) -> None: if self.flags.enable_ssh_tunnel: assert self.ssh_tunnel_listener is not None self.ssh_tunnel_listener.shutdown() assert self.acceptors self.acceptors.shutdown() if self.remote_executors_enabled: assert self.executors self.executors.shutdown() if self.flags.enable_events: assert self.event_manager is not None self.event_manager.shutdown() if self.listeners: self.listeners.shutdown() self._delete_port_file() self._delete_pid_file()
@property def remote_executors_enabled(self) -> bool: return self.flags.threadless and \ not self.flags.local_executor
[docs] def _write_pid_file(self) -> None: if self.flags.pid_file: with open(self.flags.pid_file, 'wb') as pid_file: pid_file.write(bytes_(os.getpid()))
[docs] def _delete_pid_file(self) -> None: if self.flags.pid_file \ and os.path.exists(self.flags.pid_file): os.remove(self.flags.pid_file)
[docs] def _write_port_file(self) -> None: if self.flags.port_file: with open(self.flags.port_file, 'wb') as port_file: if not self.flags.unix_socket_path: port_file.write(bytes_(self.flags.port)) port_file.write(b'\n') for port in self.flags.ports: port_file.write(bytes_(port)) port_file.write(b'\n')
[docs] def _delete_port_file(self) -> None: if self.flags.port_file \ and os.path.exists(self.flags.port_file): os.remove(self.flags.port_file)
[docs] def _register_signals(self) -> None: # TODO: Define SIGUSR1, SIGUSR2 signal.signal(signal.SIGINT, self._handle_exit_signal) signal.signal(signal.SIGTERM, self._handle_exit_signal) if not IS_WINDOWS: if hasattr(signal, 'SIGINFO'): signal.signal( # pragma: no cover signal.SIGINFO, # pylint: disable=E1101 self._handle_siginfo, ) signal.signal(signal.SIGHUP, self._handle_exit_signal) # TODO: SIGQUIT is ideally meant to terminate with core dumps signal.signal(signal.SIGQUIT, self._handle_exit_signal)
[docs] @staticmethod def _handle_exit_signal(signum: int, _frame: Any) -> None: logger.debug('Received signal %d' % signum) sys.exit(0)
[docs] def _handle_siginfo(self, _signum: int, _frame: Any) -> None: pprint.pprint(self.flags.__dict__) # pragma: no cover
[docs]def sleep_loop(p: Optional[Proxy] = None) -> None: while True: try: time.sleep(1) except KeyboardInterrupt: break
[docs]def main(**opts: Any) -> None: with Proxy(sys.argv[1:], **opts) as p: sleep_loop(p)
[docs]def entry_point() -> None: main()