Source code for proxy.core.base.tcp_upstream

# -*- coding: utf-8 -*-
    ⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
    Network monitoring, controls & Application development, testing, debugging.

    :copyright: (c) 2013-present by Abhinav Singh and contributors.
    :license: BSD, see LICENSE for more details.
import ssl
import logging
from abc import ABC, abstractmethod
from typing import Any, Optional

from ...common.types import Readables, Writables, Descriptors
from ...core.connection import TcpServerConnection

logger = logging.getLogger(__name__)

[docs]class TcpUpstreamConnectionHandler(ABC): """:class:`~proxy.core.base.TcpUpstreamConnectionHandler` can be used to insert an upstream server connection lifecycle. Call `initialize_upstream` to initialize the upstream connection object. Then, directly use ``self.upstream`` object within your class. See :class:`~proxy.plugin.proxy_pool.ProxyPoolPlugin` for example usage. """ def __init__(self, *args: Any, **kwargs: Any) -> None: # This is currently a hack, see comments below for rationale, # will be fixed later. super().__init__(*args, **kwargs) # type: ignore self.upstream: Optional[TcpServerConnection] = None # TODO: Currently, :class:`~proxy.core.base.TcpUpstreamConnectionHandler` # is used within :class:`~proxy.http.server.ReverseProxy` and # :class:`~proxy.plugin.ProxyPoolPlugin`. # # For both of which we expect a 4-tuple as arguments # containing (uuid, flags, client, event_queue). # We really don't need the rest of the args here. # May be uuid? May be event_queue in the future. # But certainly we don't not client here. # A separate tunnel class must be created which handles # client connection too. # # Both :class:`~proxy.http.server.ReverseProxy` and # :class:`~proxy.plugin.ProxyPoolPlugin` are currently # calling client queue within `handle_upstream_data` callback. # # This can be abstracted out too. self.server_recvbuf_size = args[1].server_recvbuf_size self.total_size = 0
[docs] @abstractmethod def handle_upstream_data(self, raw: memoryview) -> None: raise NotImplementedError() # pragma: no cover
[docs] def initialize_upstream(self, addr: str, port: int) -> None: self.upstream = TcpServerConnection(addr, port)
[docs] async def get_descriptors(self) -> Descriptors: if not self.upstream: return [], [] return [self.upstream.connection.fileno()], \ [self.upstream.connection.fileno()] \ if self.upstream.has_buffer() \ else []
[docs] async def read_from_descriptors(self, r: Readables) -> bool: if self.upstream and \ self.upstream.connection.fileno() in r: try: raw = self.upstream.recv(self.server_recvbuf_size) if raw is None: # pragma: no cover # Tear down because upstream proxy closed the connection return True self.total_size += len(raw) self.handle_upstream_data(raw) except TimeoutError: # pragma: no cover'Upstream recv timeout error') return True except ssl.SSLWantReadError: # pragma: no cover'Upstream SSLWantReadError, will retry') return False except ConnectionResetError: # pragma: no cover logger.debug('Connection reset by upstream') return True return False
[docs] async def write_to_descriptors(self, w: Writables) -> bool: if self.upstream and \ self.upstream.connection.fileno() in w and \ self.upstream.has_buffer(): try: # TODO: max sendbuf size flag currently not used here self.upstream.flush() except ssl.SSLWantWriteError: # pragma: no cover'Upstream SSLWantWriteError, will retry') return False except BrokenPipeError: # pragma: no cover logger.debug('BrokenPipeError when flushing to upstream') return True return False