proxy.core.work package#
Subpackages#
Submodules#
Module contents#
proxy.py#
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on Network monitoring, controls & Application development, testing, debugging.
- copyright
2013-present by Abhinav Singh and contributors.
- license
BSD, see LICENSE for more details.
- class proxy.core.work.BaseLocalExecutor(*args: Any, **kwargs: Any)[source]#
Bases:
proxy.core.work.threadless.Threadless
[proxy.common.backports.NonBlockingQueue
]A threadless executor implementation which uses a queue to receive new work.
- _abc_impl = <_abc._abc_data object>#
- property loop: Optional[asyncio.events.AbstractEventLoop]#
- class proxy.core.work.BaseRemoteExecutor(*args: Any, **kwargs: Any)[source]#
Bases:
proxy.core.work.threadless.Threadless
[multiprocessing.connection.Connection
]A threadless executor implementation which receives work over a connection.
- _abc_impl = <_abc._abc_data object>#
- close_work_queue() None [source]#
Only called if
work_queue_fileno
returns an integer. If an fd is select-able for work queue, make sure to close the work queue fd now.
- property loop: Optional[asyncio.events.AbstractEventLoop]#
- class proxy.core.work.Threadless(iid: str, work_queue: proxy.core.work.threadless.T, flags: argparse.Namespace, event_queue: Optional[EventQueue] = None)[source]#
Bases:
abc.ABC
,Generic
[proxy.core.work.threadless.T
]Work executor base class.
Threadless provides an event loop, which is shared across multiple
Work
instances to handle work.Threadless takes input a
work_klass
and anevent_queue
.work_klass
must conform to theWork
protocol. Work is received over theevent_queue
.When a work is accepted, threadless creates a new instance of
work_klass
. Threadless will then invoke necessary lifecycle of theWork
protocol, allowingwork_klass
implementation to handle the assigned work.Example,
BaseTcpServerHandler
implementsWork
protocol. It expects a client connection as work payload and hooks into the threadless event loop to handle the client connection.- _abc_impl = <_abc._abc_data object>#
- _create_tasks(work_by_ids: Dict[int, Tuple[List[int], List[int]]]) Set[_asyncio.Task[bool]] [source]#
- async _selected_events() Tuple[Dict[int, Tuple[List[int], List[int]]], bool] [source]#
For each work, collects events that they are interested in. Calls select for events of interest.
Returns a 2-tuple containing a dictionary and boolean. Dictionary keys are work IDs and values are 2-tuple containing ready readables & writables.
Returned boolean value indicates whether there is a newly accepted work waiting to be received and queued for processing. This is only applicable when
work_queue_fileno
returns a valid fd.
- close_work_queue() None [source]#
Only called if
work_queue_fileno
returns an integer. If an fd is select-able for work queue, make sure to close the work queue fd now.
- abstract property loop: Optional[asyncio.events.AbstractEventLoop]#
- class proxy.core.work.ThreadlessPool(flags: argparse.Namespace, executor_klass: Type[T], event_queue: Optional[EventQueue] = None)[source]#
Bases:
object
Manages lifecycle of threadless pool and delegates work to them using a round-robin strategy.
Example usage:
with ThreadlessPool(flags=...) as pool: while True: time.sleep(1)
If necessary, start multiple threadless pool with different work classes.
- class proxy.core.work.Work(work: proxy.core.work.work.T, flags: argparse.Namespace, event_queue: Optional[proxy.core.event.queue.EventQueue] = None, uid: Optional[str] = None, upstream_conn_pool: Optional[UpstreamConnectionPool] = None)[source]#
Bases:
abc.ABC
,Generic
[proxy.core.work.work.T
]Implement Work to hook into the event loop provided by Threadless process.
- _abc_impl = <_abc._abc_data object>#
- abstract static create(*args: Any) proxy.core.work.work.T [source]#
Implementations are responsible for creation of work objects from incoming args. This helps keep work core agnostic to creation of externally defined work class objects.
- async get_events() Dict[int, int] [source]#
Return sockets and events (read or write) that we are interested in.
- async handle_events(_readables: List[int], _writables: List[int]) bool [source]#
Handle readable and writable sockets.
Return True to shutdown work.
- publish_event(event_name: int, event_payload: Dict[str, Any], publisher_id: Optional[str] = None) None [source]#
Convenience method provided to publish events into the global event queue.
- proxy.core.work.delegate_work_to_pool(worker_pid: int, work_queue: connection.Connection, work_lock: multiprocessing.synchronize.Lock, conn: socket.socket, addr: Optional[HostPort], unix_socket_path: Optional[str] = None) None [source]#
Utility method to delegate a work to threadless executor pool.
- proxy.core.work.start_threaded_work(flags: argparse.Namespace, conn: socket.socket, addr: Optional[HostPort], event_queue: Optional[proxy.core.event.queue.EventQueue] = None, publisher_id: Optional[str] = None) Tuple[Work[T], threading.Thread] [source]#
Utility method to start a work in a new thread.