Source code for proxy.http.server.reverse

# -*- coding: utf-8 -*-
"""
    proxy.py
    ~~~~~~~~
    ⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
    Network monitoring, controls & Application development, testing, debugging.

    :copyright: (c) 2013-present by Abhinav Singh and contributors.
    :license: BSD, see LICENSE for more details.
"""
import re
import random
import logging
from typing import TYPE_CHECKING, Any, Dict, List, Tuple, Optional

from proxy.http import Url
from proxy.core.base import TcpUpstreamConnectionHandler
from proxy.http.parser import HttpParser
from proxy.http.server import HttpWebServerBasePlugin, httpProtocolTypes
from proxy.common.utils import text_
from proxy.http.exception import HttpProtocolException
from proxy.common.constants import (
    HTTPS_PROTO, DEFAULT_HTTP_PORT, DEFAULT_HTTPS_PORT,
    DEFAULT_REVERSE_PROXY_ACCESS_LOG_FORMAT,
)


if TYPE_CHECKING:   # pragma: no cover
    from .plugin import ReverseProxyBasePlugin


logger = logging.getLogger(__name__)


[docs]class ReverseProxy(TcpUpstreamConnectionHandler, HttpWebServerBasePlugin): """Extend in-built Web Server to add Reverse Proxy capabilities.""" def __init__(self, *args: Any, **kwargs: Any): super().__init__(*args, **kwargs) self.choice: Optional[Url] = None self.plugins: List['ReverseProxyBasePlugin'] = [] for klass in self.flags.plugins[b'ReverseProxyBasePlugin']: plugin: 'ReverseProxyBasePlugin' = klass( self.uid, self.flags, self.client, self.event_queue, self.upstream_conn_pool, ) self.plugins.append(plugin)
[docs] def handle_upstream_data(self, raw: memoryview) -> None: # TODO: Parse response and implement plugin hook per parsed response object # This will give plugins a chance to modify the responses before dispatching to client self.client.queue(raw)
[docs] def routes(self) -> List[Tuple[int, str]]: r = [] for plugin in self.plugins: for route in plugin.regexes(): r.append((httpProtocolTypes.HTTP, route)) r.append((httpProtocolTypes.HTTPS, route)) return r
[docs] def handle_request(self, request: HttpParser) -> None: # before_routing for plugin in self.plugins: r = plugin.before_routing(request) if r is None: raise HttpProtocolException('before_routing closed connection') request = r # routes for plugin in self.plugins: for route in plugin.routes(): if isinstance(route, tuple): pattern = re.compile(route[0]) if pattern.match(text_(request.path)): self.choice = Url.from_bytes( random.choice(route[1]), ) break elif isinstance(route, str): pattern = re.compile(route) if pattern.match(text_(request.path)): self.choice = plugin.handle_route(request, pattern) break else: raise ValueError('Invalid route') assert self.choice and self.choice.hostname port = self.choice.port or \ DEFAULT_HTTP_PORT \ if self.choice.scheme == b'http' \ else DEFAULT_HTTPS_PORT self.initialize_upstream(text_(self.choice.hostname), port) assert self.upstream try: self.upstream.connect() if self.choice.scheme == HTTPS_PROTO: self.upstream.wrap( text_( self.choice.hostname, ), as_non_blocking=True, ca_file=self.flags.ca_file, ) request.path = self.choice.remainder self.upstream.queue(memoryview(request.build())) except ConnectionRefusedError: raise HttpProtocolException( # pragma: no cover 'Connection refused by upstream server {0}:{1}'.format( text_(self.choice.hostname), port, ), )
[docs] def on_client_connection_close(self) -> None: if self.upstream and not self.upstream.closed: logger.debug('Closing upstream server connection') self.upstream.close() self.upstream = None
[docs] def on_access_log(self, context: Dict[str, Any]) -> Optional[Dict[str, Any]]: context.update({ 'upstream_proxy_pass': str(self.choice) if self.choice else None, }) logger.info(DEFAULT_REVERSE_PROXY_ACCESS_LOG_FORMAT.format_map(context)) return None