# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
from typing import Any, List, Tuple, Generator, cast
from urllib.parse import parse_qs, urlparse
from multiprocessing.synchronize import Lock
from .plugin import HttpWebServerBasePlugin
from ..parser import HttpParser
from .protocols import httpProtocolTypes
from ...common.flag import flags
from ...common.utils import text_, build_http_response
from ...common.constants import (
DEFAULT_ENABLE_METRICS, DEFAULT_METRICS_URL_PATH,
)
from ...core.event.metrics import MetricsStorage
flags.add_argument(
'--enable-metrics',
action='store_true',
default=DEFAULT_ENABLE_METRICS,
help='Default: False. Enables metrics.',
)
flags.add_argument(
'--metrics-path',
type=str,
default=text_(DEFAULT_METRICS_URL_PATH),
help='Default: %s. Web server path to serve proxy.py metrics.'
% text_(DEFAULT_METRICS_URL_PATH),
)
[docs]def get_collector(metrics_lock: Lock) -> Any:
# pylint: disable=import-outside-toplevel
from prometheus_client.core import Metric
from prometheus_client.registry import Collector
class MetricsCollector(Collector):
def __init__(self, metrics_lock: Lock) -> None:
self.storage = MetricsStorage(metrics_lock)
def collect(self) -> Generator[Metric, None, None]:
"""Serves from aggregates metrics managed by MetricsEventSubscriber."""
# pylint: disable=import-outside-toplevel
from prometheus_client.core import (
GaugeMetricFamily, CounterMetricFamily,
)
started = self.storage.get_counter('work_started')
finished = self.storage.get_counter('work_finished')
work_started = CounterMetricFamily(
'proxypy_work_started',
'Total work accepted and started by proxy.py core',
)
work_started.add_metric(
['proxypy_work_started'],
started,
)
yield work_started
request_complete = CounterMetricFamily(
'proxypy_work_request_received',
'Total work finished sending initial request',
)
request_complete.add_metric(
['proxypy_work_request_received'],
self.storage.get_counter('request_complete'),
)
yield request_complete
work_finished = CounterMetricFamily(
'proxypy_work_finished',
'Total work finished by proxy.py core',
)
work_finished.add_metric(
['work_finished'],
finished,
)
yield work_finished
ongoing_work = GaugeMetricFamily(
'proxypy_work_active',
'Total work under active execution',
value=started - finished,
)
yield ongoing_work
return MetricsCollector(metrics_lock)
[docs]class MetricsWebServerPlugin(HttpWebServerBasePlugin):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
# pylint: disable=import-outside-toplevel
from prometheus_client.core import CollectorRegistry
from prometheus_client.registry import Collector
self.registry = CollectorRegistry()
self.registry.register(cast(Collector, get_collector(self.flags.metrics_lock)))
[docs] def routes(self) -> List[Tuple[int, str]]:
if self.flags.metrics_path:
return [
(
httpProtocolTypes.HTTP,
r'{0}$'.format(
text_(self.flags.metrics_path),
),
),
(
httpProtocolTypes.HTTPS,
r'{0}$'.format(
text_(self.flags.metrics_path),
),
),
]
return [] # pragma: no cover
[docs] def handle_request(self, request: HttpParser) -> None:
# pylint: disable=import-outside-toplevel
from prometheus_client.exposition import _bake_output
# flake8: noqa
status, headers, output = _bake_output( # type: ignore[no-untyped-call]
self.registry,
(
request.header(b'Accept').decode()
if request.has_header(b'Accept')
else '*/*'
),
(
request.header(b'Accept-Encoding').decode()
if request.has_header(b'Accept-Encoding')
else None
),
parse_qs(urlparse(request.path).query),
False,
)
statuses = status.split(' ', maxsplit=1)
response = build_http_response(
int(statuses[0]),
reason=statuses[1].encode(),
headers={key.encode(): value.encode() for key, value in headers},
body=output,
)
self.client.queue(memoryview(response))