Source code for proxy.core.work.work

# -*- coding: utf-8 -*-
"""
    proxy.py
    ~~~~~~~~
    ⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
    Network monitoring, controls & Application development, testing, debugging.

    :copyright: (c) 2013-present by Abhinav Singh and contributors.
    :license: BSD, see LICENSE for more details.

    .. spelling::

       acceptor
"""
import argparse
from abc import ABC, abstractmethod
from uuid import uuid4
from typing import TYPE_CHECKING, Any, Dict, Generic, TypeVar, Optional

from ..event import EventQueue, eventNames
from ...common.types import Readables, Writables, SelectableEvents


if TYPE_CHECKING:   # pragma: no cover
    from ..connection import UpstreamConnectionPool

T = TypeVar('T')


[docs]class Work(ABC, Generic[T]): """Implement Work to hook into the event loop provided by Threadless process.""" def __init__( self, work: T, flags: argparse.Namespace, event_queue: Optional[EventQueue] = None, uid: Optional[str] = None, upstream_conn_pool: Optional['UpstreamConnectionPool'] = None, ) -> None: # Work uuid self.uid: str = uid if uid is not None else uuid4().hex self.flags = flags # Eventing core queue self.event_queue = event_queue # Accept work self.work = work self.upstream_conn_pool = upstream_conn_pool
[docs] @staticmethod @abstractmethod def create(*args: Any) -> T: """Implementations are responsible for creation of work objects from incoming args. This helps keep work core agnostic to creation of externally defined work class objects.""" raise NotImplementedError()
[docs] async def get_events(self) -> SelectableEvents: """Return sockets and events (read or write) that we are interested in.""" return {} # pragma: no cover
[docs] async def handle_events( self, _readables: Readables, _writables: Writables, ) -> bool: """Handle readable and writable sockets. Return True to shutdown work.""" return False # pragma: no cover
[docs] def initialize(self) -> None: """Perform any resource initialization.""" pass # pragma: no cover
[docs] def is_inactive(self) -> bool: """Return True if connection should be considered inactive.""" return False # pragma: no cover
[docs] def shutdown(self) -> None: """Implementation must close any opened resources here and call super().shutdown().""" self.publish_event( event_name=eventNames.WORK_FINISHED, event_payload={}, publisher_id=self.__class__.__qualname__, )
[docs] def run(self) -> None: """run() method is not used by Threadless. It's here for backward compatibility with threaded mode where work class is started as a separate thread. """ pass # pragma: no cover
[docs] def publish_event( self, event_name: int, event_payload: Dict[str, Any], publisher_id: Optional[str] = None, ) -> None: """Convenience method provided to publish events into the global event queue.""" if not self.flags.enable_events: return assert self.event_queue self.event_queue.publish( self.uid, event_name, event_payload, publisher_id, )