proxy.core.work.work module#

proxy.py#

⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on Network monitoring, controls & Application development, testing, debugging.

copyright
  1. 2013-present by Abhinav Singh and contributors.

license

BSD, see LICENSE for more details.

class proxy.core.work.work.Work(work: proxy.core.work.work.T, flags: argparse.Namespace, event_queue: Optional[proxy.core.event.queue.EventQueue] = None, uid: Optional[str] = None, upstream_conn_pool: Optional[UpstreamConnectionPool] = None)[source]#

Bases: abc.ABC, Generic[proxy.core.work.work.T]

Implement Work to hook into the event loop provided by Threadless process.

_abc_impl = <_abc._abc_data object>#
abstract static create(*args: Any) proxy.core.work.work.T[source]#

Implementations are responsible for creation of work objects from incoming args. This helps keep work core agnostic to creation of externally defined work class objects.

async get_events() Dict[int, int][source]#

Return sockets and events (read or write) that we are interested in.

async handle_events(_readables: List[int], _writables: List[int]) bool[source]#

Handle readable and writable sockets.

Return True to shutdown work.

initialize() None[source]#

Perform any resource initialization.

is_inactive() bool[source]#

Return True if connection should be considered inactive.

publish_event(event_name: int, event_payload: Dict[str, Any], publisher_id: Optional[str] = None) None[source]#

Convenience method provided to publish events into the global event queue.

run() None[source]#

run() method is not used by Threadless. It’s here for backward compatibility with threaded mode where work class is started as a separate thread.

shutdown() None[source]#

Implementation must close any opened resources here and call super().shutdown().

uid: str#