proxy.core.event package#

Submodules#

Module contents#

proxy.py#

⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on Network monitoring, controls & Application development, testing, debugging.

copyright
  1. 2013-present by Abhinav Singh and contributors.

license

BSD, see LICENSE for more details.

class proxy.core.event.EventDispatcher(shutdown: threading.Event, event_queue: proxy.core.event.queue.EventQueue)[source]#

Bases: object

Core EventDispatcher.

Direct consuming from global events queue outside of dispatcher module is not-recommended. Python native multiprocessing queue doesn’t provide a fanout functionality which core dispatcher module implements so that several plugins can consume the same published event concurrently (when necessary).

When –enable-events is used, a multiprocessing.Queue is created and attached to global flags. This queue can then be used for dispatching an Event dict object into the queue.

When –enable-events is used, dispatcher module is automatically started. Most importantly, dispatcher module ensures that queue is not flooded and doesn’t utilize too much memory in case there are no event subscribers for published messages.

EventDispatcher ensures that subscribers will receive the messages in the order they are published.

_broadcast(ev: Dict[str, Any]) None[source]#
_close(sub_id: str) None[source]#
_close_and_delete(sub_id: str) None[source]#
_send(sub_id: str, payload: Any) bool[source]#
handle_event(ev: Dict[str, Any]) None[source]#
run() None[source]#
run_once() None[source]#
class proxy.core.event.EventManager[source]#

Bases: object

Event manager is a context manager which provides encapsulation around various setup and shutdown steps to start the eventing core.

setup() None[source]#
shutdown() None[source]#
class proxy.core.event.EventNames(SUBSCRIBE, SUBSCRIBED, UNSUBSCRIBE, UNSUBSCRIBED, DISPATCHER_SHUTDOWN, WORK_STARTED, WORK_FINISHED, REQUEST_COMPLETE, RESPONSE_HEADERS_COMPLETE, RESPONSE_CHUNK_RECEIVED, RESPONSE_COMPLETE)#

Bases: tuple

DISPATCHER_SHUTDOWN: int#

Alias for field number 4

REQUEST_COMPLETE: int#

Alias for field number 7

RESPONSE_CHUNK_RECEIVED: int#

Alias for field number 9

RESPONSE_COMPLETE: int#

Alias for field number 10

RESPONSE_HEADERS_COMPLETE: int#

Alias for field number 8

SUBSCRIBE: int#

Alias for field number 0

SUBSCRIBED: int#

Alias for field number 1

UNSUBSCRIBE: int#

Alias for field number 2

UNSUBSCRIBED: int#

Alias for field number 3

WORK_FINISHED: int#

Alias for field number 6

WORK_STARTED: int#

Alias for field number 5

_asdict()#

Return a new dict which maps field names to their values.

_field_defaults = {}#
_fields = ('SUBSCRIBE', 'SUBSCRIBED', 'UNSUBSCRIBE', 'UNSUBSCRIBED', 'DISPATCHER_SHUTDOWN', 'WORK_STARTED', 'WORK_FINISHED', 'REQUEST_COMPLETE', 'RESPONSE_HEADERS_COMPLETE', 'RESPONSE_CHUNK_RECEIVED', 'RESPONSE_COMPLETE')#
classmethod _make(iterable)#

Make a new EventNames object from a sequence or iterable

_replace(**kwds)#

Return a new EventNames object replacing specified fields with new values

class proxy.core.event.EventQueue(queue: queue.Queue)[source]#

Bases: object

Global event queue. Must be a multiprocess safe queue capable of transporting other queues. This is necessary because currently subscribers use a separate subscription queue to consume events. Subscription queue is exchanged over the global event queue.

Each published event contains following schema:

{
    'request_id': 'Globally unique request ID',
    'process_id': 'Process ID of event publisher. This '
                  'will be the process ID of acceptor workers.',
    'thread_id': 'Thread ID of event publisher. '
                 'When --threadless is enabled, this value '
                 'will be same for all the requests.'
    'event_timestamp': 'Time when this event occured',
    'event_name': 'one of the pre-defined or custom event name',
    'event_payload': 'Optional data associated with the event',
    'publisher_id': 'Optional publisher entity unique name',
}
publish(request_id: str, event_name: int, event_payload: Dict[str, Any], publisher_id: Optional[str] = None) None[source]#
subscribe(sub_id: str, channel: multiprocessing.connection.Connection) None[source]#

Subscribe to global events.

sub_id is a subscription identifier which must be globally unique. channel MUST be a multiprocessing connection.

unsubscribe(sub_id: str) None[source]#

Unsubscribe by subscriber id.

class proxy.core.event.EventSubscriber(event_queue: proxy.core.event.queue.EventQueue, callback: Callable[[Dict[str, Any]], None])[source]#

Bases: object

Core event subscriber.

Usage: Initialize one instance per CPU core for optimum performance.

EventSubscriber can run within various context. E.g. main thread, another thread or a different process. EventSubscriber context can be different from publishers. Publishers can even be processes outside of the proxy.py core.

multiprocessing.Pipe is used to initialize a new Queue for receiving subscribed events from eventing core. Note that, core EventDispatcher might be running in a separate process and hence subscription queue must be multiprocess safe.

When subscribe method is called, EventManager stars a relay thread which consumes event out of the subscription queue and invoke callback.

NOTE: Callback is executed in the context of relay thread.

_start_relay_thread() None[source]#
_stop_relay_thread() None[source]#
static relay(sub_id: str, shutdown: threading.Event, channel: multiprocessing.connection.Connection, callback: Callable[[Dict[str, Any]], None]) None[source]#
setup(do_subscribe: bool = True) None[source]#

Setup subscription thread.

Call subscribe() to actually start subscription.

shutdown(do_unsubscribe: bool = True) None[source]#

Tear down subscription thread.

Call unsubscribe() to actually stop subscription.

subscribe() None[source]#
unsubscribe() None[source]#