# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
.. spelling::
http
reusability
"""
import os
import ssl
import time
import errno
import socket
import logging
import threading
import subprocess
from typing import Any, Dict, List, Union, Optional, cast
from .plugin import HttpProxyBasePlugin
from ..parser import HttpParser, httpParserTypes, httpParserStates
from ..plugin import HttpProtocolHandlerPlugin
from ..headers import httpHeaders
from ..methods import httpMethods
from ..exception import HttpProtocolException, ProxyConnectionFailed
from ..protocols import httpProtocols
from ..responses import PROXY_TUNNEL_ESTABLISHED_RESPONSE_PKT
from ...common.pki import gen_csr, sign_csr, gen_public_key
from ...core.event import eventNames
from ...common.flag import flags
from ...common.types import Readables, Writables, Descriptors
from ...common.utils import text_
from ...core.connection import (
TcpServerConnection, TcpConnectionUninitializedException,
)
from ...common.constants import (
COMMA, DEFAULT_CA_FILE, PLUGIN_PROXY_AUTH, DEFAULT_CA_CERT_DIR,
DEFAULT_CA_KEY_FILE, DEFAULT_CA_CERT_FILE, DEFAULT_DISABLE_HEADERS,
PROXY_AGENT_HEADER_VALUE, DEFAULT_DISABLE_HTTP_PROXY,
DEFAULT_CA_SIGNING_KEY_FILE, DEFAULT_HTTP_PROXY_ACCESS_LOG_FORMAT,
DEFAULT_HTTPS_PROXY_ACCESS_LOG_FORMAT,
)
logger = logging.getLogger(__name__)
flags.add_argument(
'--disable-http-proxy',
action='store_true',
default=DEFAULT_DISABLE_HTTP_PROXY,
help='Default: False. Whether to disable proxy.HttpProxyPlugin.',
)
flags.add_argument(
'--disable-headers',
type=str,
default=COMMA.join(DEFAULT_DISABLE_HEADERS),
help='Default: None. Comma separated list of headers to remove before '
'dispatching client request to upstream server.',
)
flags.add_argument(
'--ca-key-file',
type=str,
default=DEFAULT_CA_KEY_FILE,
help='Default: None. CA key to use for signing dynamically generated '
'HTTPS certificates. If used, must also pass --ca-cert-file and --ca-signing-key-file',
)
flags.add_argument(
'--ca-cert-dir',
type=str,
default=DEFAULT_CA_CERT_DIR,
help='Default: ~/.proxy/certificates. Directory to store dynamically generated certificates. '
'Also see --ca-key-file, --ca-cert-file and --ca-signing-key-file',
)
flags.add_argument(
'--ca-cert-file',
type=str,
default=DEFAULT_CA_CERT_FILE,
help='Default: None. Signing certificate to use for signing dynamically generated '
'HTTPS certificates. If used, must also pass --ca-key-file and --ca-signing-key-file',
)
flags.add_argument(
'--ca-file',
type=str,
default=str(DEFAULT_CA_FILE),
help='Default: ' + str(DEFAULT_CA_FILE) +
'. Provide path to custom CA bundle for peer certificate verification',
)
flags.add_argument(
'--ca-signing-key-file',
type=str,
default=DEFAULT_CA_SIGNING_KEY_FILE,
help='Default: None. CA signing key to use for dynamic generation of '
'HTTPS certificates. If used, must also pass --ca-key-file and --ca-cert-file',
)
flags.add_argument(
'--auth-plugin',
type=str,
default=PLUGIN_PROXY_AUTH,
help='Default: ' + PLUGIN_PROXY_AUTH + '. ' +
'Auth plugin to use instead of default basic auth plugin.',
)
[docs]class HttpProxyPlugin(HttpProtocolHandlerPlugin):
"""HttpProtocolHandler plugin which implements HttpProxy specifications."""
# Used to synchronization during certificate generation and
# connection pool operations.
lock = threading.Lock()
def __init__(
self,
*args: Any, **kwargs: Any,
) -> None:
super().__init__(*args, **kwargs)
self.start_time: float = time.time()
self.upstream: Optional[TcpServerConnection] = None
self.response: HttpParser = HttpParser(httpParserTypes.RESPONSE_PARSER)
self.pipeline_request: Optional[HttpParser] = None
self.pipeline_response: Optional[HttpParser] = None
self.plugins: Dict[str, HttpProxyBasePlugin] = {}
if b'HttpProxyBasePlugin' in self.flags.plugins:
for klass in self.flags.plugins[b'HttpProxyBasePlugin']:
instance: HttpProxyBasePlugin = klass(
self.uid,
self.flags,
self.client,
self.event_queue,
self.upstream_conn_pool,
)
self.plugins[instance.name()] = instance
[docs] @staticmethod
def protocols() -> List[int]:
return [httpProtocols.HTTP_PROXY]
[docs] async def get_descriptors(self) -> Descriptors:
r: List[int] = []
w: List[int] = []
if (
self.upstream and
not self.upstream.closed and
self.upstream.connection
):
r.append(self.upstream.connection.fileno())
if (
self.upstream and
not self.upstream.closed and
self.upstream.has_buffer() and
self.upstream.connection
):
w.append(self.upstream.connection.fileno())
# TODO(abhinavsingh): We need to keep a mapping of plugin and
# descriptors registered by them, so that within write/read blocks
# we can invoke the right plugin callbacks.
for plugin in self.plugins.values():
plugin_read_desc, plugin_write_desc = await plugin.get_descriptors()
r.extend(plugin_read_desc)
w.extend(plugin_write_desc)
return r, w
[docs] async def write_to_descriptors(self, w: Writables) -> bool:
if (self.upstream and self.upstream.connection.fileno() not in w) or not self.upstream:
# Currently, we just call write/read block of each plugins. It is
# plugins responsibility to ignore this callback, if passed descriptors
# doesn't contain the descriptor they registered.
for plugin in self.plugins.values():
teardown = await plugin.write_to_descriptors(w)
if teardown:
return True
elif self.upstream and not self.upstream.closed and \
self.upstream.has_buffer() and \
self.upstream.connection.fileno() in w:
logger.debug('Server is write ready, flushing...')
try:
self.upstream.flush(self.flags.max_sendbuf_size)
except ssl.SSLWantWriteError:
logger.warning(
'SSLWantWriteError while trying to flush to server, will retry',
)
return False
except BrokenPipeError:
logger.warning(
'BrokenPipeError when flushing buffer for server',
)
return self._close_and_release()
except OSError as e:
logger.exception(
'OSError when flushing buffer to server', exc_info=e,
)
return self._close_and_release()
return False
[docs] async def read_from_descriptors(self, r: Readables) -> bool:
if (
self.upstream and not
self.upstream.closed and
self.upstream.connection.fileno() not in r
) or not self.upstream:
# Currently, we just call write/read block of each plugins. It is
# plugins responsibility to ignore this callback, if passed descriptors
# doesn't contain the descriptor they registered for.
for plugin in self.plugins.values():
teardown = await plugin.read_from_descriptors(r)
if teardown:
return True
elif self.upstream \
and not self.upstream.closed \
and self.upstream.connection.fileno() in r:
logger.debug('Server is read ready, receiving...')
try:
raw = self.upstream.recv(self.flags.server_recvbuf_size)
except TimeoutError as e:
self._close_and_release()
if e.errno == errno.ETIMEDOUT:
logger.warning(
'%s:%d timed out on recv' %
self.upstream.addr,
)
return True
raise e
except ssl.SSLWantReadError: # Try again later
# logger.warning('SSLWantReadError encountered while reading from server, will retry ...')
return False
except OSError as e:
if e.errno == errno.EHOSTUNREACH:
logger.warning(
'%s:%d unreachable on recv' %
self.upstream.addr,
)
if e.errno == errno.ECONNRESET:
logger.warning(
'Connection reset by upstream: {0}:{1}'.format(
*self.upstream.addr,
),
)
else:
logger.warning(
'Exception while receiving from %s connection#%d with reason %r' %
(self.upstream.tag, self.upstream.connection.fileno(), e),
)
return self._close_and_release()
if raw is None:
logger.debug('Server closed connection, tearing down...')
return self._close_and_release()
for plugin in self.plugins.values():
raw = plugin.handle_upstream_chunk(raw)
if raw is None:
break
# parse incoming response packet
# only for non-https requests and when
# tls interception is enabled
if raw is not None:
if (
not self.request.is_https_tunnel
or self.tls_interception_enabled
):
if self.response.is_complete:
self.handle_pipeline_response(raw)
else:
self.response.parse(raw)
self.emit_response_events(len(raw))
else:
self.response.total_size += len(raw)
# queue raw data for client
self.client.queue(raw)
return False
[docs] def on_client_connection_close(self) -> None:
context = {
'client_ip': None if not self.client.addr else self.client.addr[0],
'client_port': None if not self.client.addr else self.client.addr[1],
'server_host': text_(self.upstream.addr[0] if self.upstream else None),
'server_port': text_(self.upstream.addr[1] if self.upstream else None),
'connection_time_ms': '%.2f' % ((time.time() - self.start_time) * 1000),
# Request
'request_method': text_(self.request.method),
'request_path': text_(self.request.path),
'request_bytes': text_(self.request.total_size),
'request_ua': text_(self.request.header(b'user-agent'))
if self.request.has_header(b'user-agent')
else None,
'request_version': text_(self.request.version),
# Response
'response_bytes': self.response.total_size,
'response_code': text_(self.response.code),
'response_reason': text_(self.response.reason),
}
if self.flags.enable_proxy_protocol:
assert self.request.protocol and self.request.protocol.family
context.update({
'protocol': {
'family': text_(self.request.protocol.family),
},
})
if self.request.protocol.source:
context.update({
'protocol': {
'source_ip': text_(self.request.protocol.source[0]),
'source_port': self.request.protocol.source[1],
},
})
if self.request.protocol.destination:
context.update({
'protocol': {
'destination_ip': text_(self.request.protocol.destination[0]),
'destination_port': self.request.protocol.destination[1],
},
})
log_handled = False
for plugin in self.plugins.values():
ctx = plugin.on_access_log(context)
if ctx is None:
log_handled = True
break
context = ctx
if not log_handled:
self.access_log(context)
# Note that, server instance was initialized
# but not necessarily the connection object exists.
#
# Unfortunately this is still being called when an upstream
# server connection was never established. This is done currently
# to assist proxy pool plugin to close its upstream proxy connections.
#
# In short, treat on_upstream_connection_close as on_client_connection_close
# equivalent within proxy plugins.
#
# Invoke plugin.on_upstream_connection_close
for plugin in self.plugins.values():
plugin.on_upstream_connection_close()
# If server was never initialized or was _close_and_release
if self.upstream is None:
return
if self.flags.enable_conn_pool:
assert self.upstream_conn_pool
# Release the connection for reusability
with self.lock:
self.upstream_conn_pool.release(self.upstream)
return
try:
try:
self.upstream.connection.shutdown(socket.SHUT_WR)
except OSError:
pass
finally:
# TODO: Unwrap if wrapped before close?
self.upstream.close()
except TcpConnectionUninitializedException:
pass
finally:
logger.debug(
'Closed server connection, has buffer %s' %
self.upstream.has_buffer(),
)
[docs] def access_log(self, log_attrs: Dict[str, Any]) -> None:
access_log_format = DEFAULT_HTTPS_PROXY_ACCESS_LOG_FORMAT
if not self.request.is_https_tunnel:
access_log_format = DEFAULT_HTTP_PROXY_ACCESS_LOG_FORMAT
logger.info(access_log_format.format_map(log_attrs))
[docs] def on_response_chunk(self, chunk: List[memoryview]) -> List[memoryview]:
# TODO: Allow to output multiple access_log lines
# for each request over a pipelined HTTP connection (not for HTTPS).
# However, this must also be accompanied by resetting both request
# and response objects.
#
# if not self.request.is_https_tunnel and \
# self.response.is_complete:
# self.access_log()
return chunk
# Can return None to tear down connection
[docs] def on_client_data(self, raw: memoryview) -> None:
# For scenarios when an upstream connection was never established,
# let plugin do whatever they wish to. These are special scenarios
# where plugins are trying to do something magical. Within the core
# we don't know the context. In fact, we are not even sure if data
# exchanged is http spec compliant.
#
# Hence, here we pass raw data to HTTP proxy plugins as is.
#
# We only call handle_client_data once original request has been
# completely received
if not self.upstream:
for plugin in self.plugins.values():
o = plugin.handle_client_data(raw)
if o is None:
return
raw = o
elif self.upstream and not self.upstream.closed:
# For http proxy requests, handle pipeline case.
# We also handle pipeline scenario for https proxy
# requests is TLS interception is enabled.
if self.request.is_complete and (
not self.request.is_https_tunnel or
self.tls_interception_enabled
):
if self.pipeline_request is not None and \
self.pipeline_request.is_connection_upgrade:
# Previous pipelined request was a WebSocket
# upgrade request. Incoming client data now
# must be treated as WebSocket protocol packets.
self.upstream.queue(raw)
return
if self.pipeline_request is None:
# For pipeline requests, we never
# want to use --enable-proxy-protocol flag
# as proxy protocol header will not be present
#
# TODO: HTTP parser must be smart about detecting
# HA proxy protocol or we must always explicitly pass
# the flag when we are expecting HA proxy protocol
# request line before HTTP request lines.
self.pipeline_request = HttpParser(
httpParserTypes.REQUEST_PARSER,
)
self.pipeline_request.parse(raw)
if self.pipeline_request.is_complete:
for plugin in self.plugins.values():
assert self.pipeline_request is not None
r = plugin.handle_client_request(self.pipeline_request)
if r is None:
return
self.pipeline_request = r
assert self.pipeline_request is not None
# TODO(abhinavsingh): Remove memoryview wrapping here after
# parser is fully memoryview compliant
self.upstream.queue(
memoryview(
self.pipeline_request.build(),
),
)
if not self.pipeline_request.is_connection_upgrade:
self.pipeline_request = None
# For scenarios where we cannot peek into the data,
# simply queue for upstream server.
else:
self.upstream.queue(raw)
[docs] def on_request_complete(self) -> Union[socket.socket, bool]:
self.emit_request_complete()
# Invoke plugin.before_upstream_connection
#
# before_upstream_connection can:
# 1) Raise HttpRequestRejected exception to reject the connection
# 2) return None to continue without establishing an upstream server connection
# e.g. for scenarios when plugins want to return response from cache, or,
# via out-of-band over the network request.
do_connect = True
for plugin in self.plugins.values():
r = plugin.before_upstream_connection(self.request)
if r is None:
do_connect = False
break
self.request = r
# Connect to upstream
if do_connect:
self.connect_upstream()
# Invoke plugin.handle_client_request
for plugin in self.plugins.values():
assert self.request is not None
r = plugin.handle_client_request(self.request)
if r is not None:
self.request = r
else:
return False
# For https requests, respond back with tunnel established response.
# Optionally, setup interceptor if TLS interception is enabled.
if self.upstream:
if self.request.is_https_tunnel:
self.client.queue(PROXY_TUNNEL_ESTABLISHED_RESPONSE_PKT)
if self.tls_interception_enabled:
# Check if any plugin wants to
# disable interception even
# with flags available
do_intercept = True
for plugin in self.plugins.values():
do_intercept = plugin.do_intercept(self.request)
# A plugin requested to not intercept
# the request
if do_intercept is False:
break
if do_intercept:
return self.intercept()
# If an upstream server connection was established for http request,
# queue the request for upstream server.
else:
# - proxy-connection header is a mistake, it doesn't seem to be
# officially documented in any specification, drop it.
# - proxy-authorization is of no use for upstream, remove it.
self.request.del_headers(
[
httpHeaders.PROXY_AUTHORIZATION,
httpHeaders.PROXY_CONNECTION,
],
)
# - For HTTP/1.0, connection header defaults to close
# - For HTTP/1.1, connection header defaults to keep-alive
# Respect headers sent by client instead of manipulating
# Connection or Keep-Alive header. However, note that per
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Connection
# connection headers are meant for communication between client and
# first intercepting proxy.
self.request.add_headers(
[(b'Via', b'1.1 %s' % PROXY_AGENT_HEADER_VALUE)],
)
# Disable args.disable_headers before dispatching to upstream
self.upstream.queue(
memoryview(
self.request.build(
disable_headers=self.flags.disable_headers,
),
),
)
return False
[docs] def handle_pipeline_response(self, raw: memoryview) -> None:
if self.pipeline_response is None:
self.pipeline_response = HttpParser(
httpParserTypes.RESPONSE_PARSER,
)
self.pipeline_response.parse(raw)
if self.pipeline_response.is_complete:
self.pipeline_response = None
[docs] def connect_upstream(self) -> None:
host, port = self.request.host, self.request.port
if host and port:
try:
# Invoke plugin.resolve_dns
upstream_ip, source_addr = None, None
for plugin in self.plugins.values():
upstream_ip, source_addr = plugin.resolve_dns(
text_(host), port,
)
if upstream_ip or source_addr:
break
logger.debug(
'Connecting to upstream %s:%d' %
(text_(host), port),
)
if self.flags.enable_conn_pool:
assert self.upstream_conn_pool
with self.lock:
created, self.upstream = self.upstream_conn_pool.acquire(
(text_(host), port),
)
else:
created, self.upstream = True, TcpServerConnection(
text_(host), port,
)
# Connect with overridden upstream IP and source address
# if any of the plugin returned a non-null value.
self.upstream.connect(
addr=None if not upstream_ip else (
upstream_ip, port,
), source_address=source_addr,
)
self.upstream.connection.setblocking(False)
if not created:
# NOTE: Acquired connection might be in an unusable state.
#
# This can only be confirmed by reading from connection.
# For stale connections, we will receive None, indicating
# to drop the connection.
#
# If that happen, we must acquire a fresh connection.
logger.info(
'Reusing connection to upstream %s:%d' %
(text_(host), port),
)
return
logger.debug(
'Connected to upstream %s:%s' %
(text_(host), port),
)
except Exception as e: # TimeoutError, socket.gaierror
logger.warning(
'Unable to connect with upstream %s:%d due to %s' % (
text_(host), port, str(e),
),
)
if self.flags.enable_conn_pool and self.upstream:
assert self.upstream_conn_pool
with self.lock:
self.upstream_conn_pool.release(self.upstream)
raise ProxyConnectionFailed(
text_(host), port, repr(e),
) from e
else:
raise HttpProtocolException('Both host and port must exist')
#
# Interceptor related methods
#
[docs] def gen_ca_signed_certificate(
self, cert_file_path: str, certificate: Dict[str, Any],
) -> None:
'''CA signing key (default) is used for generating a public key
for common_name, if one already doesn't exist. Using generated
public key a CSR request is generated, which is then signed by
CA key and secret. Again this process only happen if signed
certificate doesn't already exist.
returns signed certificate path.'''
assert(
self.request.host and self.flags.ca_cert_dir and self.flags.ca_signing_key_file and
self.flags.ca_key_file and self.flags.ca_cert_file
)
upstream_subject = {s[0][0]: s[0][1] for s in certificate['subject']}
public_key_path = os.path.join(
self.flags.ca_cert_dir,
'{0}.{1}'.format(text_(self.request.host), 'pub'),
)
private_key_path = self.flags.ca_signing_key_file
private_key_password = ''
# Build certificate subject
keys = {
'CN': 'commonName',
'C': 'countryName',
'ST': 'stateOrProvinceName',
'L': 'localityName',
'O': 'organizationName',
'OU': 'organizationalUnitName',
}
subject = ''
for key in keys:
if upstream_subject.get(keys[key], None):
subject += '/{0}={1}'.format(
key,
upstream_subject.get(keys[key]),
)
alt_subj_names = [text_(self.request.host)]
validity_in_days = 365 * 2
timeout = 10
# Generate a public key for the common name
if not os.path.isfile(public_key_path):
logger.debug('Generating public key %s', public_key_path)
resp = gen_public_key(
public_key_path=public_key_path, private_key_path=private_key_path,
private_key_password=private_key_password, subject=subject, alt_subj_names=alt_subj_names,
validity_in_days=validity_in_days, timeout=timeout,
openssl=self.flags.openssl,
)
assert(resp is True)
csr_path = os.path.join(
self.flags.ca_cert_dir,
'{0}.{1}'.format(text_(self.request.host), 'csr'),
)
# Generate a CSR request for this common name
if not os.path.isfile(csr_path):
logger.debug('Generating CSR %s', csr_path)
resp = gen_csr(
csr_path=csr_path, key_path=private_key_path, password=private_key_password,
crt_path=public_key_path, timeout=timeout,
openssl=self.flags.openssl,
)
assert(resp is True)
ca_key_path = self.flags.ca_key_file
ca_key_password = ''
ca_crt_path = self.flags.ca_cert_file
serial = '%d%d' % (time.time(), os.getpid())
# Sign generated CSR
if not os.path.isfile(cert_file_path):
logger.debug('Signing CSR %s', cert_file_path)
resp = sign_csr(
csr_path=csr_path, crt_path=cert_file_path, ca_key_path=ca_key_path,
ca_key_password=ca_key_password, ca_crt_path=ca_crt_path,
serial=str(serial), alt_subj_names=alt_subj_names,
validity_in_days=validity_in_days, timeout=timeout,
openssl=self.flags.openssl,
)
assert(resp is True)
[docs] @staticmethod
def generated_cert_file_path(ca_cert_dir: str, host: str) -> str:
return os.path.join(ca_cert_dir, '%s.pem' % host)
[docs] def generate_upstream_certificate(
self, certificate: Dict[str, Any],
) -> str:
if not (
self.flags.ca_cert_dir and self.flags.ca_signing_key_file and
self.flags.ca_cert_file and self.flags.ca_key_file
):
raise HttpProtocolException(
f'For certificate generation all the following flags are mandatory: '
f'--ca-cert-file:{ self.flags.ca_cert_file }, '
f'--ca-key-file:{ self.flags.ca_key_file }, '
f'--ca-signing-key-file:{ self.flags.ca_signing_key_file }',
)
cert_file_path = HttpProxyPlugin.generated_cert_file_path(
self.flags.ca_cert_dir, text_(self.request.host),
)
with self.lock:
if not os.path.isfile(cert_file_path):
self.gen_ca_signed_certificate(cert_file_path, certificate)
return cert_file_path
[docs] def intercept(self) -> Union[socket.socket, bool]:
# Perform SSL/TLS handshake with upstream
teardown = self.wrap_server()
if teardown:
return teardown
# Generate certificate and perform handshake with client
# wrap_client also flushes client data before wrapping
# sending to client can raise, handle expected exceptions
teardown = self.wrap_client()
if teardown:
return teardown
# Update all plugin connection reference
# TODO(abhinavsingh): Is this required?
for plugin in self.plugins.values():
plugin.client._conn = self.client.connection
return self.client.connection
[docs] def wrap_server(self) -> bool:
assert self.upstream is not None
assert isinstance(self.upstream.connection, socket.socket)
do_close = False
try:
self.upstream.wrap(
text_(self.request.host),
self.flags.ca_file,
as_non_blocking=True,
)
except ssl.SSLCertVerificationError: # Server raised certificate verification error
# When --disable-interception-on-ssl-cert-verification-error flag is on,
# we will cache such upstream hosts and avoid intercepting them for future
# requests.
logger.warning(
'ssl.SSLCertVerificationError: ' +
'Server raised cert verification error for upstream: {0}'.format(
self.upstream.addr[0],
),
)
do_close = True
except ssl.SSLError as e:
if e.reason == 'SSLV3_ALERT_HANDSHAKE_FAILURE':
logger.warning(
'{0}: '.format(e.reason) +
'Server raised handshake alert failure for upstream: {0}'.format(
self.upstream.addr[0],
),
)
else:
logger.exception(
'SSLError when wrapping client for upstream: {0}'.format(
self.upstream.addr[0],
), exc_info=e,
)
do_close = True
if not do_close:
assert isinstance(self.upstream.connection, ssl.SSLSocket)
return do_close
[docs] def wrap_client(self) -> bool:
assert self.upstream is not None and self.flags.ca_signing_key_file is not None
assert isinstance(self.upstream.connection, ssl.SSLSocket)
do_close = False
try:
# TODO: Perform async certificate generation
generated_cert = self.generate_upstream_certificate(
cast(Dict[str, Any], self.upstream.connection.getpeercert()),
)
self.client.wrap(self.flags.ca_signing_key_file, generated_cert)
except subprocess.TimeoutExpired as e: # Popen communicate timeout
logger.exception(
'TimeoutExpired during certificate generation', exc_info=e,
)
do_close = True
except ssl.SSLCertVerificationError: # Client raised certificate verification error
# When --disable-interception-on-ssl-cert-verification-error flag is on,
# we will cache such upstream hosts and avoid intercepting them for future
# requests.
logger.warning(
'ssl.SSLCertVerificationError: ' +
'Client raised cert verification error for upstream: {0}'.format(
self.upstream.addr[0],
),
)
do_close = True
except ssl.SSLEOFError as e:
logger.warning(
'ssl.SSLEOFError {0} when wrapping client for upstream: {1}'.format(
str(e), self.upstream.addr[0],
),
)
do_close = True
except ssl.SSLError as e:
if e.reason in ('TLSV1_ALERT_UNKNOWN_CA', 'UNSUPPORTED_PROTOCOL'):
logger.warning(
'{0}: '.format(e.reason) +
'Client raised cert verification error for upstream: {0}'.format(
self.upstream.addr[0],
),
)
else:
logger.exception(
'OSError when wrapping client for upstream: {0}'.format(
self.upstream.addr[0],
), exc_info=e,
)
do_close = True
except BrokenPipeError:
logger.warning(
'BrokenPipeError when wrapping client for upstream: {0}'.format(
self.upstream.addr[0],
),
)
do_close = True
except OSError as e:
logger.exception(
'OSError when wrapping client for upstream: {0}'.format(
self.upstream.addr[0],
), exc_info=e,
)
do_close = True
if not do_close:
logger.debug('TLS intercepting using %s', generated_cert)
return do_close
#
# Event emitter callbacks
#
[docs] def emit_request_complete(self) -> None:
if not self.flags.enable_events:
return
assert self.request.port and self.event_queue
self.event_queue.publish(
request_id=self.uid,
event_name=eventNames.REQUEST_COMPLETE,
event_payload={
'url': text_(self.request.path)
if self.request.is_https_tunnel
else 'http://%s:%d%s' % (text_(self.request.host), self.request.port, text_(self.request.path)),
'method': text_(self.request.method),
'headers': {}
if not self.request.headers else
{
text_(k): text_(v[1])
for k, v in self.request.headers.items()
},
'body': text_(self.request.body, errors='ignore')
if self.request.method == httpMethods.POST
else None,
},
publisher_id=self.__class__.__qualname__,
)
[docs] def emit_response_events(self, chunk_size: int) -> None:
if not self.flags.enable_events:
return
if self.response.is_complete:
self.emit_response_complete()
elif self.response.state == httpParserStates.RCVING_BODY:
self.emit_response_chunk_received(chunk_size)
elif self.response.state == httpParserStates.HEADERS_COMPLETE:
self.emit_response_headers_complete()
[docs] def emit_response_chunk_received(self, chunk_size: int) -> None:
if not self.flags.enable_events:
return
assert self.event_queue
self.event_queue.publish(
request_id=self.uid,
event_name=eventNames.RESPONSE_CHUNK_RECEIVED,
event_payload={
'chunk_size': chunk_size,
'encoded_chunk_size': chunk_size,
},
publisher_id=self.__class__.__qualname__,
)
[docs] def emit_response_complete(self) -> None:
if not self.flags.enable_events:
return
assert self.event_queue
self.event_queue.publish(
request_id=self.uid,
event_name=eventNames.RESPONSE_COMPLETE,
event_payload={
'encoded_response_size': self.response.total_size,
},
publisher_id=self.__class__.__qualname__,
)
#
# Internal methods
#
[docs] def _close_and_release(self) -> bool:
if self.flags.enable_conn_pool:
assert self.upstream and not self.upstream.closed and self.upstream_conn_pool
self.upstream.closed = True
with self.lock:
self.upstream_conn_pool.release(self.upstream)
self.upstream = None
return True