# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
.. spelling::
pre
"""
import socket
import logging
import argparse
import selectors
import threading
import multiprocessing
import multiprocessing.synchronize
from typing import Dict, List, Tuple, Optional
from multiprocessing import connection
from multiprocessing.reduction import recv_handle
from ..work import start_threaded_work, delegate_work_to_pool
from ..event import EventQueue
from ..work.fd import LocalFdExecutor
from ...common.flag import flags
from ...common.types import HostPort
from ...common.logger import Logger
from ...common.backports import NonBlockingQueue
from ...common.constants import DEFAULT_LOCAL_EXECUTOR
logger = logging.getLogger(__name__)
flags.add_argument(
'--local-executor',
type=int,
default=int(DEFAULT_LOCAL_EXECUTOR),
help='Default: ' + ('1' if DEFAULT_LOCAL_EXECUTOR else '0') + '. ' +
'Enabled by default. Use 0 to disable. When enabled acceptors ' +
'will make use of local (same process) executor instead of distributing load across ' +
'remote (other process) executors. Enable this option to achieve CPU affinity between ' +
'acceptors and executors, instead of using underlying OS kernel scheduling algorithm.',
)
[docs]class Acceptor(multiprocessing.Process):
"""Work acceptor process.
On start-up, `Acceptor` accepts a file descriptor which will be used to
accept new work. File descriptor is accepted over a `fd_queue`.
`Acceptor` goes on to listen for new work over the received server socket.
By default, `Acceptor` will spawn a new thread to handle each work.
However, when ``--threadless`` option is enabled without ``--local-executor``,
`Acceptor` process will also pre-spawns a
:class:`~proxy.core.acceptor.threadless.Threadless` process during start-up.
Accepted work is delegated to these :class:`~proxy.core.acceptor.threadless.Threadless`
processes. `Acceptor` process shares accepted work with a
:class:`~proxy.core.acceptor.threadless.Threadless` process over it's dedicated pipe.
"""
def __init__(
self,
idd: int,
fd_queue: connection.Connection,
flags: argparse.Namespace,
lock: 'multiprocessing.synchronize.Lock',
# semaphore: multiprocessing.synchronize.Semaphore,
executor_queues: List[connection.Connection],
executor_pids: List[int],
executor_locks: List['multiprocessing.synchronize.Lock'],
event_queue: Optional[EventQueue] = None,
) -> None:
super().__init__()
self.flags = flags
# Eventing core queue
self.event_queue = event_queue
# Index assigned by `AcceptorPool`
self.idd = idd
# Mutex used for synchronization with acceptors
self.lock = lock
# self.semaphore = semaphore
# Queue over which server socket fd is received on start-up
self.fd_queue: connection.Connection = fd_queue
# Available executors
self.executor_queues = executor_queues
self.executor_pids = executor_pids
self.executor_locks = executor_locks
# Selector
self.running = multiprocessing.Event()
self.selector: Optional[selectors.DefaultSelector] = None
# File descriptors used to accept new work
self.socks: Dict[int, socket.socket] = {}
# Internals
self._total: Optional[int] = None
self._local_work_queue: Optional['NonBlockingQueue'] = None
self._local: Optional[LocalFdExecutor] = None
self._lthread: Optional[threading.Thread] = None
[docs] def accept(
self,
events: List[Tuple[selectors.SelectorKey, int]],
) -> List[Tuple[socket.socket, Optional[HostPort]]]:
works = []
for key, mask in events:
if mask & selectors.EVENT_READ:
try:
conn, addr = self.socks[key.data].accept()
logging.debug(
'Accepting new work#{0}'.format(conn.fileno()),
)
works.append((conn, addr or None))
except BlockingIOError:
# logger.info('blocking io error')
pass
return works
[docs] def run_once(self) -> None:
if self.selector is not None:
events = self.selector.select(timeout=1)
if len(events) == 0:
return
# locked = False
# try:
# if self.lock.acquire(block=False):
# locked = True
# self.semaphore.release()
# finally:
# if locked:
# self.lock.release()
locked, works = False, []
try:
# if not self.semaphore.acquire(False, None):
# return
if self.lock.acquire(block=False):
locked = True
works = self.accept(events)
finally:
if locked:
self.lock.release()
for work in works:
if self.flags.threadless and \
self.flags.local_executor:
assert self._local_work_queue
self._local_work_queue.put(work)
else:
self._work(*work)
[docs] def run(self) -> None:
Logger.setup(
self.flags.log_file, self.flags.log_level,
self.flags.log_format,
)
self.selector = selectors.DefaultSelector()
try:
self._recv_and_setup_socks()
if self.flags.threadless and self.flags.local_executor:
self._start_local()
for fileno in self.socks:
self.selector.register(
fileno, selectors.EVENT_READ, fileno,
)
while not self.running.is_set():
self.run_once()
except KeyboardInterrupt:
pass
finally:
for fileno in self.socks:
self.selector.unregister(fileno)
if self.flags.threadless and self.flags.local_executor:
self._stop_local()
for fileno in self.socks:
self.socks[fileno].close()
self.socks.clear()
self.selector.close()
logger.debug('Acceptor#%d shutdown', self.idd)
[docs] def _recv_and_setup_socks(self) -> None:
# TODO: Use selector on fd_queue so that we can
# dynamically accept from new fds.
for _ in range(self.fd_queue.recv()):
fileno = recv_handle(self.fd_queue)
sock = socket.socket(fileno=socket.dup(fileno)) # type: ignore[attr-defined]
self.socks[fileno] = sock
self.fd_queue.close()
[docs] def _start_local(self) -> None:
assert self.socks
self._local_work_queue = NonBlockingQueue()
self._local = LocalFdExecutor(
iid=self.idd,
work_queue=self._local_work_queue,
flags=self.flags,
event_queue=self.event_queue,
)
self._lthread = threading.Thread(target=self._local.run)
self._lthread.daemon = True
self._lthread.start()
[docs] def _stop_local(self) -> None:
if self._lthread is not None and \
self._local_work_queue is not None:
self._local_work_queue.put(False)
self._lthread.join()
[docs] def _work(self, conn: socket.socket, addr: Optional[HostPort]) -> None:
self._total = self._total or 0
if self.flags.threadless:
# Index of worker to which this work should be dispatched
# Use round-robin strategy by default.
#
# By default all acceptors will start sending work to
# 1st workers. To randomize, we offset index by idd.
index = (self._total + self.idd) % self.flags.num_workers
thread = threading.Thread(
target=delegate_work_to_pool,
args=(
self.executor_pids[index],
self.executor_queues[index],
self.executor_locks[index],
conn,
addr,
self.flags.unix_socket_path,
),
)
thread.start()
# TODO: Move me into target method
logger.debug( # pragma: no cover
'Dispatched work#{0}.{1}.{2} to worker#{3}'.format(
conn.fileno(), self.idd, self._total, index,
),
)
else:
_, thread = start_threaded_work(
self.flags,
conn,
addr,
event_queue=self.event_queue,
publisher_id=self.__class__.__qualname__,
)
# TODO: Move me into target method
logger.debug( # pragma: no cover
'Started work#{0}.{1}.{2} in thread#{3}'.format(
conn.fileno(), self.idd, self._total, thread.ident,
),
)
self._total += 1