# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
.. spelling::
reusability
"""
import socket
import logging
import selectors
from typing import TYPE_CHECKING, Any, Set, Dict, Tuple
from ..work import Work
from .server import TcpServerConnection
from ...common.flag import flags
from ...common.types import HostPort, Readables, Writables, SelectableEvents
logger = logging.getLogger(__name__)
flags.add_argument(
'--enable-conn-pool',
action='store_true',
default=False,
help='Default: False. (WIP) Enable upstream connection pooling.',
)
[docs]class UpstreamConnectionPool(Work[TcpServerConnection]):
"""Manages connection pool to upstream servers.
`UpstreamConnectionPool` avoids need to reconnect with the upstream
servers repeatedly when a reusable connection is available
in the pool.
A separate pool is maintained for each upstream server.
So internally, it's a pool of pools.
Internal data structure maintains references to connection objects
that pool owns or has borrowed. Borrowed connections are marked as
NOT reusable.
For reusable connections only, pool listens for read events
to detect broken connections. This can happen if pool has opened
a connection, which was never used and eventually reaches
upstream server timeout limit.
When a borrowed connection is returned back to the pool,
the connection is marked as reusable again. However, if
returned connection has already been closed, it is removed
from the internal data structure.
TODO: Ideally, `UpstreamConnectionPool` must be shared across
all cores to make SSL session cache to also work
without additional out-of-bound synchronizations.
TODO: `UpstreamConnectionPool` currently WON'T work for
HTTPS connection. This is because of missing support for
session cache, session ticket, abbr TLS handshake
and other necessary features to make it work.
NOTE: However, currently for all HTTP only upstream connections,
`UpstreamConnectionPool` can be used to remove slow starts.
"""
def __init__(self) -> None:
self.connections: Dict[int, TcpServerConnection] = {}
self.pools: Dict[HostPort, Set[TcpServerConnection]] = {}
[docs] @staticmethod
def create(*args: Any) -> TcpServerConnection: # pragma: no cover
return TcpServerConnection(*args)
[docs] def acquire(self, addr: HostPort) -> Tuple[bool, TcpServerConnection]:
"""Returns a reusable connection from the pool.
If none exists, will create and return a new connection."""
created, conn = False, None
if addr in self.pools:
for old_conn in self.pools[addr]:
if old_conn.is_reusable():
conn = old_conn
logger.debug(
'Reusing connection#{2} for upstream {0}:{1}'.format(
addr[0], addr[1], id(old_conn),
),
)
break
if conn is None:
created, conn = True, self.add(addr)
conn.mark_inuse()
return created, conn
[docs] def release(self, conn: TcpServerConnection) -> None:
"""Release a previously acquired connection.
Releasing a connection will shutdown and close the socket
including internal pool cleanup.
"""
assert not conn.is_reusable()
logger.debug(
'Removing connection#{2} from pool from upstream {0}:{1}'.format(
conn.addr[0], conn.addr[1], id(conn),
),
)
self._remove(conn.connection.fileno())
[docs] def retain(self, conn: TcpServerConnection) -> None:
"""Retained previously acquired connection in the pool for reusability."""
assert not conn.closed
logger.debug(
'Retaining connection#{2} to upstream {0}:{1}'.format(
conn.addr[0], conn.addr[1], id(conn),
),
)
# Reset for reusability
conn.reset()
[docs] async def get_events(self) -> SelectableEvents:
"""Returns read event flag for all reusable connections in the pool."""
events = {}
for connections in self.pools.values():
for conn in connections:
if conn.is_reusable():
events[conn.connection.fileno()] = selectors.EVENT_READ
return events
[docs] async def handle_events(self, readables: Readables, _writables: Writables) -> bool:
"""Removes reusable connection from the pool.
When pool is the owner of connection, we don't expect a read event from upstream
server. A read event means either upstream closed the connection or connection
has somehow reached an illegal state e.g. upstream sending data for previous
connection acquisition lifecycle."""
for fileno in readables:
if TYPE_CHECKING: # pragma: no cover
assert isinstance(fileno, int)
logger.debug('Upstream fd#{0} is read ready'.format(fileno))
self._remove(fileno)
return False
[docs] def add(self, addr: HostPort) -> TcpServerConnection:
"""Creates, connects and adds a new connection to the pool.
Returns newly created connection.
NOTE: You must not use the returned connection, instead use `acquire`.
"""
new_conn = self.create(addr[0], addr[1])
new_conn.connect()
self._add(new_conn)
logger.debug(
'Created new connection#{2} for upstream {0}:{1}'.format(
addr[0], addr[1], id(new_conn),
),
)
return new_conn
[docs] def _add(self, conn: TcpServerConnection) -> None:
"""Adds a new connection to internal data structure."""
if conn.addr not in self.pools:
self.pools[conn.addr] = set()
conn._reusable = True
self.pools[conn.addr].add(conn)
self.connections[conn.connection.fileno()] = conn
[docs] def _remove(self, fileno: int) -> None:
"""Remove a connection by descriptor from the internal data structure."""
conn = self.connections[fileno]
logger.debug('Removing conn#{0} from pool'.format(id(conn)))
try:
conn.connection.shutdown(socket.SHUT_WR)
except OSError:
pass
conn.close()
self.pools[conn.addr].remove(conn)
del self.connections[fileno]