# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
import uuid
import queue
import logging
import threading
import multiprocessing
from typing import Any, Dict, Callable, Optional
from multiprocessing import connection
from .names import eventNames
from .queue import EventQueue
logger = logging.getLogger(__name__)
[docs]class EventSubscriber:
"""Core event subscriber.
Usage: Initialize one instance per CPU core for optimum performance.
EventSubscriber can run within various context. E.g. main thread,
another thread or a different process. EventSubscriber context
can be different from publishers. Publishers can even be processes
outside of the proxy.py core.
`multiprocessing.Pipe` is used to initialize a new Queue for
receiving subscribed events from eventing core. Note that,
core EventDispatcher might be running in a separate process
and hence subscription queue must be multiprocess safe.
When `subscribe` method is called, EventManager stars
a relay thread which consumes event out of the subscription queue
and invoke callback.
NOTE: Callback is executed in the context of relay thread.
"""
def __init__(self, event_queue: EventQueue, callback: Callable[[Dict[str, Any]], None]) -> None:
self.event_queue = event_queue
self.callback = callback
self.relay_thread: Optional[threading.Thread] = None
self.relay_shutdown: Optional[threading.Event] = None
self.relay_recv: Optional[connection.Connection] = None
self.relay_send: Optional[connection.Connection] = None
self.relay_sub_id: Optional[str] = None
def __enter__(self) -> 'EventSubscriber':
self.setup()
return self
def __exit__(self, *args: Any) -> None:
self.shutdown()
[docs] def setup(self, do_subscribe: bool = True) -> None:
"""Setup subscription thread.
Call subscribe() to actually start subscription.
"""
self._start_relay_thread()
assert self.relay_sub_id and self.relay_recv
logger.debug(
'Subscriber#%s relay setup done',
self.relay_sub_id,
)
if do_subscribe:
self.subscribe()
[docs] def shutdown(self, do_unsubscribe: bool = True) -> None:
"""Tear down subscription thread.
Call unsubscribe() to actually stop subscription.
"""
self._stop_relay_thread()
logger.debug(
'Subscriber#%s relay shutdown done',
self.relay_sub_id,
)
if do_unsubscribe:
self.unsubscribe()
[docs] def subscribe(self) -> None:
assert self.relay_sub_id and self.relay_send
self.event_queue.subscribe(self.relay_sub_id, self.relay_send)
[docs] def unsubscribe(self) -> None:
if self.relay_sub_id is None:
logger.warning(
'Relay called unsubscribe without an active subscription',
)
return
try:
self.event_queue.unsubscribe(self.relay_sub_id)
except (BrokenPipeError, EOFError):
pass
finally:
# self.relay_sub_id = None
pass
[docs] @staticmethod
def relay(
sub_id: str,
shutdown: threading.Event,
channel: connection.Connection,
callback: Callable[[Dict[str, Any]], None],
) -> None:
while not shutdown.is_set():
try:
if channel.poll(timeout=1):
ev = channel.recv()
if ev['event_name'] == eventNames.SUBSCRIBED:
logger.info(
'Subscriber#{0} subscribe ack received'.format(
sub_id,
),
)
elif ev['event_name'] == eventNames.UNSUBSCRIBED:
logger.info(
'Subscriber#{0} unsubscribe ack received'.format(
sub_id,
),
)
break
elif ev['event_name'] == eventNames.DISPATCHER_SHUTDOWN:
logger.info(
'Subscriber#{0} received dispatcher shutdown event'.format(
sub_id,
),
)
break
else:
callback(ev)
except queue.Empty:
pass
except EOFError:
break
except KeyboardInterrupt:
break
logger.debug('bbye!!!')
[docs] def _start_relay_thread(self) -> None:
self.relay_sub_id = uuid.uuid4().hex
self.relay_shutdown = threading.Event()
self.relay_recv, self.relay_send = multiprocessing.Pipe()
self.relay_thread = threading.Thread(
target=EventSubscriber.relay,
args=(
self.relay_sub_id, self.relay_shutdown,
self.relay_recv, self.callback,
),
)
self.relay_thread.daemon = True
self.relay_thread.start()
[docs] def _stop_relay_thread(self) -> None:
assert self.relay_thread and self.relay_shutdown and self.relay_recv and self.relay_send
self.relay_shutdown.set()
self.relay_thread.join()
self.relay_recv.close()
# Currently relay_send instance here in
# subscriber is not the same as one received
# by dispatcher. This may cause file
# descriptor leakage. So we make a close
# here explicit on our side of relay_send too.
self.relay_send.close()
self.relay_thread = None
self.relay_shutdown = None
self.relay_recv = None
self.relay_send = None