Source code for proxy.plugin.filter_by_url_regex

# -*- coding: utf-8 -*-
"""
    proxy.py
    ~~~~~~~~
    ⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
    Network monitoring, controls & Application development, testing, debugging.

    :copyright: (c) 2013-present by Abhinav Singh and contributors.
    :license: BSD, see LICENSE for more details.

    .. spelling::

       url
"""
import re
import json
import logging
from typing import Any, Dict, List, Optional

from ..http import httpStatusCodes
from ..http.proxy import HttpProxyBasePlugin
from ..common.flag import flags
from ..http.parser import HttpParser
from ..common.utils import text_
from ..http.exception import HttpRequestRejected


logger = logging.getLogger(__name__)

# See adblock.json file in repository for sample example config
flags.add_argument(
    '--filtered-url-regex-config',
    type=str,
    default='',
    help='Default: No config.  Comma separated list of IPv4 and IPv6 addresses.',
)


[docs]class FilterByURLRegexPlugin(HttpProxyBasePlugin): """Drops traffic by inspecting request URL and checking against a list of regular expressions. Example, default filter list below can be used as a starting point for filtering ads. """ def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) self.filters: List[Dict[str, Any]] = [] if self.flags.filtered_url_regex_config != '': with open(self.flags.filtered_url_regex_config, 'rb') as f: self.filters = json.load(f)
[docs] def handle_client_request( self, request: HttpParser, ) -> Optional[HttpParser]: # determine host request_host = None if request.host: request_host = request.host elif request.headers and b'host' in request.headers: request_host = request.header(b'host') if not request_host: logger.error("Cannot determine host") return request # build URL url = b'%s%s' % ( request_host, request.path, ) # check URL against list for rule_number, blocked_entry in enumerate(self.filters, start=1): # if regex matches on URL if re.search(text_(blocked_entry['regex']), text_(url)): # log that the request has been filtered logger.info( "Blocked: %r with status_code '%r' by rule number '%r'" % ( text_(url), httpStatusCodes.NOT_FOUND, rule_number, ), ) # close the connection with the status code from the filter # list raise HttpRequestRejected( status_code=httpStatusCodes.NOT_FOUND, reason=b'Blocked', ) return request