Source code for

# -*- coding: utf-8 -*-
    ⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
    Network monitoring, controls & Application development, testing, debugging.

    :copyright: (c) 2013-present by Abhinav Singh and contributors.
    :license: BSD, see LICENSE for more details.
import socket
import argparse
import threading
from typing import TYPE_CHECKING, Tuple, TypeVar, Optional

from ..event import EventQueue, eventNames

if TYPE_CHECKING:   # pragma: no cover
    from .work import Work
    from ...common.types import HostPort

T = TypeVar('T')

# TODO: Add generic T
[docs]def start_threaded_work( flags: argparse.Namespace, conn: socket.socket, addr: Optional['HostPort'], event_queue: Optional[EventQueue] = None, publisher_id: Optional[str] = None, ) -> Tuple['Work[T]', threading.Thread]: """Utility method to start a work in a new thread.""" work = flags.work_klass( flags.work_klass.create(conn, addr), flags=flags, event_queue=event_queue, upstream_conn_pool=None, ) # TODO: Keep reference to threads and join during shutdown. # This will ensure connections are not abruptly closed on shutdown # for threaded execution mode. thread = threading.Thread( thread.daemon = True thread.start() work.publish_event( event_name=eventNames.WORK_STARTED, event_payload={'fileno': conn.fileno(), 'addr': addr}, publisher_id=publisher_id or 'thread#{0}'.format( thread.ident, ), ) return (work, thread)