# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
import struct
import logging
from typing import Tuple, Optional
from .hello import (
TlsClientHello, TlsServerHello, TlsHelloRequest, TlsServerHelloDone,
)
from .types import tlsHandshakeType
from .finished import TlsFinished
from .certificate import (
TlsCertificate, TlsCertificateVerify, TlsCertificateRequest,
)
from .key_exchange import TlsClientKeyExchange, TlsServerKeyExchange
logger = logging.getLogger(__name__)
[docs]class TlsHandshake:
"""TLS Handshake"""
def __init__(self) -> None:
self.msg_type: int = tlsHandshakeType.OTHER
self.length: Optional[bytes] = None
self.hello_request: Optional[TlsHelloRequest] = None
self.client_hello: Optional[TlsClientHello] = None
self.server_hello: Optional[TlsServerHello] = None
self.certificate: Optional[TlsCertificate] = None
self.server_key_exchange: Optional[TlsServerKeyExchange] = None
self.certificate_request: Optional[TlsCertificateRequest] = None
self.server_hello_done: Optional[TlsServerHelloDone] = None
self.certificate_verify: Optional[TlsCertificateVerify] = None
self.client_key_exchange: Optional[TlsClientKeyExchange] = None
self.finished: Optional[TlsFinished] = None
self.data: Optional[bytes] = None
[docs] def parse(self, raw: bytes) -> Tuple[bool, bytes]:
length = len(raw)
if length < 4:
logger.debug('invalid data, len(raw) = %s', length)
return False, raw
payload_length, = struct.unpack('!I', b'\x00' + raw[1:4])
self.length = payload_length
if length < 4 + payload_length:
logger.debug(
'incomplete data, len(raw) = %s, len(payload) = %s', length, payload_length,
)
return False, raw
# parse
self.msg_type = raw[0]
self.length = raw[1:4]
self.data = raw[: 4 + payload_length]
payload = raw[4: 4 + payload_length]
if self.msg_type == tlsHandshakeType.HELLO_REQUEST:
# parse hello request
self.hello_request = TlsHelloRequest()
self.hello_request.parse(payload)
elif self.msg_type == tlsHandshakeType.CLIENT_HELLO:
# parse client hello
self.client_hello = TlsClientHello()
self.client_hello.parse(payload)
elif self.msg_type == tlsHandshakeType.SERVER_HELLO:
# parse server hello
self.server_hello = TlsServerHello()
self.server_hello.parse(payload)
elif self.msg_type == tlsHandshakeType.CERTIFICATE:
# parse certificate
self.certificate = TlsCertificate()
self.certificate.parse(payload)
elif self.msg_type == tlsHandshakeType.SERVER_KEY_EXCHANGE:
# parse server key exchange
self.server_key_exchange = TlsServerKeyExchange()
self.server_key_exchange.parse(payload)
elif self.msg_type == tlsHandshakeType.CERTIFICATE_REQUEST:
# parse certificate request
self.certificate_request = TlsCertificateRequest()
self.certificate_request.parse(payload)
elif self.msg_type == tlsHandshakeType.SERVER_HELLO_DONE:
# parse server hello done
self.server_hello_done = TlsServerHelloDone()
self.server_hello_done.parse(payload)
elif self.msg_type == tlsHandshakeType.CERTIFICATE_VERIFY:
# parse certificate verify
self.certificate_verify = TlsCertificateVerify()
self.certificate_verify.parse(payload)
elif self.msg_type == tlsHandshakeType.CLIENT_KEY_EXCHANGE:
# parse client key exchange
self.client_key_exchange = TlsClientKeyExchange()
self.client_key_exchange.parse(payload)
elif self.msg_type == tlsHandshakeType.FINISHED:
# parse finished
self.finished = TlsFinished()
self.finished.parse(payload)
return True, raw[4 + payload_length:]
[docs] def build(self) -> bytes:
data = b''
data += bytes([self.msg_type])
payload = b''
if self.msg_type == tlsHandshakeType.CLIENT_HELLO:
assert self.client_hello
payload = self.client_hello.build()
elif self.msg_type == tlsHandshakeType.SERVER_HELLO:
assert self.server_hello
payload = self.server_hello.build()
elif self.msg_type == tlsHandshakeType.CERTIFICATE:
assert self.certificate
payload = self.certificate.build()
elif self.msg_type == tlsHandshakeType.SERVER_KEY_EXCHANGE:
assert self.server_key_exchange
payload = self.server_key_exchange.build()
# calculate length
length = struct.pack('!I', len(payload))[1:]
data += length
data += payload
return data