# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
import logging
import argparse
import multiprocessing
from typing import TYPE_CHECKING, Any, List, Type, TypeVar, Optional
from multiprocessing import connection
from ...common.flag import flags
from ...common.constants import DEFAULT_THREADLESS, DEFAULT_NUM_WORKERS
if TYPE_CHECKING: # pragma: no cover
from ..event import EventQueue
from .threadless import Threadless
T = TypeVar('T', bound='Threadless[Any]')
logger = logging.getLogger(__name__)
flags.add_argument(
'--threadless',
action='store_true',
default=DEFAULT_THREADLESS,
help='Default: ' + ('True' if DEFAULT_THREADLESS else 'False') + '. ' +
'Enabled by default on Python 3.8+ (mac, linux). ' +
'When disabled a new thread is spawned '
'to handle each client connection.',
)
flags.add_argument(
'--threaded',
action='store_true',
default=not DEFAULT_THREADLESS,
help='Default: ' + ('True' if not DEFAULT_THREADLESS else 'False') + '. ' +
'Disabled by default on Python < 3.8 and windows. ' +
'When enabled a new thread is spawned '
'to handle each client connection.',
)
flags.add_argument(
'--num-workers',
type=int,
default=DEFAULT_NUM_WORKERS,
help='Defaults to number of CPU cores.',
)
[docs]class ThreadlessPool:
"""Manages lifecycle of threadless pool and delegates work to them
using a round-robin strategy.
Example usage::
with ThreadlessPool(flags=...) as pool:
while True:
time.sleep(1)
If necessary, start multiple threadless pool with different
work classes.
"""
def __init__(
self,
flags: argparse.Namespace,
executor_klass: Type['T'],
event_queue: Optional['EventQueue'] = None,
) -> None:
self.flags = flags
self.event_queue = event_queue
# Threadless worker communication states
self.work_queues: List[connection.Connection] = []
self.work_pids: List[int] = []
self.work_locks: List['multiprocessing.synchronize.Lock'] = []
# List of threadless workers
self._executor_klass = executor_klass
# FIXME: Instead of Any type must be the executor klass
self._workers: List[Any] = []
self._processes: List[multiprocessing.Process] = []
def __enter__(self) -> 'ThreadlessPool':
self.setup()
return self
def __exit__(self, *args: Any) -> None:
self.shutdown()
[docs] def setup(self) -> None:
"""Setup threadless processes."""
if self.flags.threadless:
for index in range(self.flags.num_workers):
self._start_worker(index)
logger.info(
'Started {0} threadless workers'.format(
self.flags.num_workers,
),
)
[docs] def shutdown(self) -> None:
"""Shutdown threadless processes."""
if self.flags.threadless:
self._shutdown_workers()
logger.info(
'Stopped {0} threadless workers'.format(
self.flags.num_workers,
),
)
[docs] def _start_worker(self, index: int) -> None:
"""Starts a threadless worker."""
self.work_locks.append(multiprocessing.Lock())
pipe = multiprocessing.Pipe()
self.work_queues.append(pipe[0])
w = self._executor_klass(
iid=str(index),
work_queue=pipe[1],
flags=self.flags,
event_queue=self.event_queue,
)
self._workers.append(w)
p = multiprocessing.Process(target=w.run)
# p.daemon = True
self._processes.append(p)
p.start()
assert p.pid
self.work_pids.append(p.pid)
logger.debug('Started threadless#%d process#%d', index, p.pid)
[docs] def _shutdown_workers(self) -> None:
"""Pop a running threadless worker and clean it up."""
for index in range(self.flags.num_workers):
self._workers[index].running.set()
for _ in range(self.flags.num_workers):
pid = self.work_pids[-1]
self._processes.pop().join()
self._workers.pop()
self.work_pids.pop()
self.work_queues.pop().close()
logger.debug('Stopped threadless process#%d', pid)
self.work_locks = []