# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
.. spelling::
acceptor
"""
import argparse
from abc import ABC, abstractmethod
from uuid import uuid4
from typing import TYPE_CHECKING, Any, Dict, Generic, TypeVar, Optional
from ..event import EventQueue, eventNames
from ...common.types import Readables, Writables, SelectableEvents
if TYPE_CHECKING: # pragma: no cover
from ..connection import UpstreamConnectionPool
T = TypeVar('T')
[docs]class Work(ABC, Generic[T]):
"""Implement Work to hook into the event loop provided by Threadless process."""
def __init__(
self,
work: T,
flags: argparse.Namespace,
event_queue: Optional[EventQueue] = None,
uid: Optional[str] = None,
upstream_conn_pool: Optional['UpstreamConnectionPool'] = None,
) -> None:
# Work uuid
self.uid: str = uid if uid is not None else uuid4().hex
self.flags = flags
# Eventing core queue
self.event_queue = event_queue
# Accept work
self.work = work
self.upstream_conn_pool = upstream_conn_pool
[docs] @staticmethod
@abstractmethod
def create(*args: Any) -> T:
"""Implementations are responsible for creation of work objects
from incoming args. This helps keep work core agnostic to
creation of externally defined work class objects."""
raise NotImplementedError()
[docs] async def get_events(self) -> SelectableEvents:
"""Return sockets and events (read or write) that we are interested in."""
return {} # pragma: no cover
[docs] async def handle_events(
self,
_readables: Readables,
_writables: Writables,
) -> bool:
"""Handle readable and writable sockets.
Return True to shutdown work."""
return False # pragma: no cover
[docs] def initialize(self) -> None:
"""Perform any resource initialization."""
pass # pragma: no cover
[docs] def is_inactive(self) -> bool:
"""Return True if connection should be considered inactive."""
return False # pragma: no cover
[docs] def shutdown(self) -> None:
"""Implementation must close any opened resources here
and call super().shutdown()."""
self.publish_event(
event_name=eventNames.WORK_FINISHED,
event_payload={},
publisher_id=self.__class__.__qualname__,
)
[docs] def run(self) -> None:
"""run() method is not used by Threadless. It's here for backward
compatibility with threaded mode where work class is started as
a separate thread.
"""
pass # pragma: no cover
[docs] def publish_event(
self,
event_name: int,
event_payload: Dict[str, Any],
publisher_id: Optional[str] = None,
) -> None:
"""Convenience method provided to publish events into the global event queue."""
if not self.flags.enable_events:
return
assert self.event_queue
self.event_queue.publish(
self.uid,
event_name,
event_payload,
publisher_id,
)