From c22d20b1dfe874ec073ebd9b13b281a205a1259e Mon Sep 17 00:00:00 2001 From: matejcik Date: Fri, 8 Dec 2023 12:32:23 +0100 Subject: [PATCH] feat(python): implement full certificate verification in trezorctl (fixes #3364) --- python/.changelog.d/3364.added | 1 + python/src/trezorlib/cli/device.py | 282 ++++++++++++++++++++++++++++- 2 files changed, 275 insertions(+), 8 deletions(-) create mode 100644 python/.changelog.d/3364.added diff --git a/python/.changelog.d/3364.added b/python/.changelog.d/3364.added new file mode 100644 index 000000000..58ccdab41 --- /dev/null +++ b/python/.changelog.d/3364.added @@ -0,0 +1 @@ +trezorctl: support for human-friendly TS3 device authenticity check (requires separate installation of `cryptography` library). diff --git a/python/src/trezorlib/cli/device.py b/python/src/trezorlib/cli/device.py index 6ca2e9b0d..01af6c2d5 100644 --- a/python/src/trezorlib/cli/device.py +++ b/python/src/trezorlib/cli/device.py @@ -14,9 +14,12 @@ # You should have received a copy of the License along with this library. # If not, see . +import io +import logging import secrets import sys -from typing import TYPE_CHECKING, Optional, Sequence +from logging import DEBUG, ERROR, INFO, WARNING +from typing import TYPE_CHECKING, BinaryIO, Optional, Sequence, Tuple import click @@ -45,6 +48,8 @@ SD_PROTECT_OPERATIONS = { "refresh": messages.SdProtectOperationType.REFRESH, } +LOG = logging.getLogger(__name__) + @click.group(name="device") def cli() -> None: @@ -334,17 +339,278 @@ def set_busy( return device.set_busy(client, expiry * 1000) +ROOT_PUBKEY_TS3 = bytes.fromhex( + "04ca97480ac0d7b1e6efafe518cd433cec2bf8ab9822d76eafd34363b55d63e60" + "380bff20acc75cde03cffcb50ab6f8ce70c878e37ebc58ff7cca0a83b16b15fa5" +) + +TS3_CA_WHITELIST = [ + bytes.fromhex(x) + for x in ( + "04b12efa295ad825a534b7c0bf276e93ad116434426763fa87bfa8a2f12e726906dcf566813f62eba8f8795f94dba0391c53682809cbbd7e4ba01d960b4f1c68f1", + "04cb87d4c5d0fd5854e829f4c1b666e49a86c25c88a904c0feb66f1338faed0d7760010d7ea1a6474cbcfe1143bd4b5397a4e8b7fe86899113caecf42a984b0c0f", + "0450c45878b2c6403a5a16e97a8957dc3ea36919bce9321b357f6e7ebe6257ee54102a2c2fa5cefed1dabc498fc76dc0bcf3c3a8a415eac7cc32e7c18185f25b0d", + "0454d310d88d55d3044d80fcdbce9a63bf3118545fae71f6eca303272dcc4d25cf775ae3c18ae9f41b2cf29377bc4696fc79c8824a6fd6b9ca5fb6805ed6557aab", + "04e94bf05586a8e7a3e9aba32662a439be5f378da372219c8ee7cf8b4684dbfbd7ba88ed920c06f9f26deab9077654647738df8cf70898fea1c3aaf2ef086fc578", + "048c6c104bd7cc59cd5c5717533786a72ab59685bd13937f5542820e90f6ac6945e520e19d1d627a8e81ef5a94ef87de7a6a0d778e7dc9d389db877a5f9b629dd8", + ) +] + +DEV_ROOT_PUBKEY_TS3 = bytes.fromhex( + "047f77368dea2d4d61e989f474a56723c3212dacf8a808d8795595ef38441427c" + "4389bc454f02089d7f08b873005e4c28d432468997871c0bf286fd3861e21e96a" +) + + +def _require_cryptography() -> None: + try: + import cryptography # noqa: I900 + + version = [int(x) for x in cryptography.__version__.split(".")] + if version[0] < 41: + click.echo( + "You need to upgrade the 'cryptography' library to verify the signature." + ) + click.echo("You can do so by running:") + click.echo(" pip3 install --upgrade cryptography") + sys.exit(1) + + except ImportError: + click.echo( + "You need to install the 'cryptography' library to verify the signature." + ) + click.echo("You can do so by running:") + click.echo(" pip3 install cryptography") + sys.exit(1) + + +class Certificate: + def __init__(self, cert_bytes: bytes) -> None: + from cryptography import x509 # noqa: I900 + + self.cert_bytes = cert_bytes + self.cert = x509.load_der_x509_certificate(cert_bytes) + + def __str__(self) -> str: + return self.cert.subject.rfc4514_string() + + def public_key(self) -> bytes: + from cryptography.hazmat.primitives import serialization # noqa: I900 + + return self.cert.public_key().public_bytes( + serialization.Encoding.X962, + serialization.PublicFormat.UncompressedPoint, + ) + + def verify(self, signature: bytes, message: bytes) -> None: + from cryptography.hazmat.primitives import hashes # noqa: I900 + from cryptography.hazmat.primitives.asymmetric import ec # noqa: I900 + + cert_pubkey = self.cert.public_key() + assert isinstance(cert_pubkey, ec.EllipticCurvePublicKey) + cert_pubkey.verify( + self.fix_signature(signature), + message, + ec.ECDSA(hashes.SHA256()), + ) + + def verify_by(self, pubkey_bytes: bytes) -> None: + from cryptography.hazmat.primitives.asymmetric import ec # noqa: I900 + + pubkey = ec.EllipticCurvePublicKey.from_encoded_point( + ec.SECP256R1(), pubkey_bytes + ) + + algo_params = self.cert.signature_algorithm_parameters + assert isinstance(algo_params, ec.ECDSA) + pubkey.verify( + self.fix_signature(self.cert.signature), + self.cert.tbs_certificate_bytes, + algo_params, + ) + + def verify_issued_by(self, issuer: "Certificate") -> None: + self.verify_by(issuer.public_key()) + + @staticmethod + def _decode_signature_permissive(sig_bytes: bytes) -> Tuple[int, int]: + if len(sig_bytes) > 73: + raise ValueError("Invalid DER signature: too long.") + + reader = io.BytesIO(sig_bytes) + tag = reader.read(1) + if tag != b"\x30": + raise ValueError("Invalid DER signature: not a sequence.") + length = reader.read(1)[0] + if length != len(sig_bytes) - 2: + raise ValueError("Invalid DER signature: invalid length.") + + def read_int() -> int: + tag = reader.read(1) + if tag != b"\x02": + raise ValueError("Invalid DER signature: not an integer.") + length = reader.read(1)[0] + if length > 33: + raise ValueError("Invalid DER signature: integer too long.") + return int.from_bytes(reader.read(length), "big") + + r = read_int() + s = read_int() + if reader.tell() != len(sig_bytes): + raise ValueError("Invalid DER signature: trailing data.") + return r, s + + @staticmethod + def fix_signature(sig_bytes: bytes) -> bytes: + from cryptography.hazmat.primitives.asymmetric import utils # noqa: I900 + + r, s = Certificate._decode_signature_permissive(sig_bytes) + reencoded = utils.encode_dss_signature(r, s) + if reencoded != sig_bytes: + LOG.info( + "Re-encoding malformed signature: %s -> %s", + sig_bytes.hex(), + reencoded.hex(), + ) + return reencoded + + @cli.command() @click.argument("hex_challenge", required=False) +@click.option("-R", "--root", type=click.File("rb"), help="Custom root certificate.") +@click.option( + "-r", "--raw", is_flag=True, help="Print raw cryptographic data and exit." +) +@click.option( + "-s", + "--skip-whitelist", + is_flag=True, + help="Do not check intermediate certificates against the whitelist.", +) @with_client -def authenticate(client: "TrezorClient", hex_challenge: Optional[str]) -> None: - """Get information to verify the authenticity of the device.""" +def authenticate( + client: "TrezorClient", + hex_challenge: Optional[str], + root: Optional[BinaryIO], + raw: Optional[bool], + skip_whitelist: Optional[bool], +) -> None: + """Verify the authenticity of the device. + + Use the --raw option to get the raw challenge, signature, and certificate data. + + Otherwise, trezorctl will attempt to decode the signatures and check their + authenticity. By default, it will also check the public keys against a built-in + whitelist, and in the future also against a whitelist downloaded from Trezor + servers. You can skip this check with the --skip-whitelist option. + + \b + When not using --raw, 'cryptography' library is required. You can install it via: + pip3 install cryptography + """ if hex_challenge is None: hex_challenge = secrets.token_hex(32) - click.echo(f"Challenge: {hex_challenge}") + challenge = bytes.fromhex(hex_challenge) msg = device.authenticate(client, challenge) - click.echo(f"Signature of challenge: {msg.signature.hex()}") - click.echo(f"Device certificate: {msg.certificates[0].hex()}") - for cert in msg.certificates[1:]: - click.echo(f"CA certificate: {cert.hex()}") + + if raw: + click.echo(f"Challenge: {hex_challenge}") + click.echo(f"Signature of challenge: {msg.signature.hex()}") + click.echo(f"Device certificate: {msg.certificates[0].hex()}") + for cert in msg.certificates[1:]: + click.echo(f"CA certificate: {cert.hex()}") + return + + _require_cryptography() + + worst_level = DEBUG + + def print_step(level: int, text: str) -> None: + nonlocal worst_level + + worst_level = max(worst_level, level) + if level == ERROR: + level_str = click.style("ERROR", fg="red") + elif level == WARNING: + level_str = click.style("WARNING", fg="yellow") + elif level == INFO: + level_str = click.style("INFO", fg="blue") + elif level == DEBUG: + level_str = click.style("OK", fg="green") + else: + raise RuntimeError("Invalid log level") + + click.echo(f"[{level_str}] {text}") + + from cryptography import exceptions # noqa: I900 + + CHALLENGE_HEADER = b"AuthenticateDevice:" + challenge_bytes = ( + len(CHALLENGE_HEADER).to_bytes(1, "big") + + CHALLENGE_HEADER + + len(challenge).to_bytes(1, "big") + + challenge + ) + + try: + first_cert = Certificate(msg.certificates[0]) + except Exception: + print_step(ERROR, "Failed to parse device certificate.") + sys.exit(5) + + try: + first_cert.verify(msg.signature, challenge_bytes) + except exceptions.InvalidSignature: + print_step(ERROR, "Challenge verification failed.") + else: + print_step(DEBUG, "Challenge verified successfully.") + + for issuer in msg.certificates[1:]: + try: + cert = Certificate(issuer) + except Exception: + print_step(ERROR, "Failed to parse CA certificate.") + continue + + if skip_whitelist: + print_step(INFO, "Skipping public key whitelist check.") + else: + if cert.public_key() not in TS3_CA_WHITELIST: + print_step(WARNING, f"CA certificate not in whitelist: {cert}") + + try: + first_cert.verify_issued_by(cert) + except exceptions.InvalidSignature: + print_step(ERROR, f"Certificate verification failed: {cert}") + continue + else: + print_step(DEBUG, f"Certificate verified successfully: {cert}") + + first_cert = cert + + roots = [ + (DEBUG, "Trezor Company", ROOT_PUBKEY_TS3), + (ERROR, "TESTING ENVIRONMENT. DO NOT USE THIS DEVICE", DEV_ROOT_PUBKEY_TS3), + ] + if root is not None: + try: + root_cert = Certificate(root.read()) + roots.append( + (INFO, "the specified root certificate", root_cert.public_key()) + ) + except Exception: + print_step(ERROR, "Failed to parse provided root certificate.") + + for level, issuer, pubkey in roots: + try: + first_cert.verify_by(pubkey) + print_step(level, f"Certificate issued by {issuer}: {first_cert}") + break + except Exception: + continue + + if worst_level >= ERROR: + sys.exit(2) + elif worst_level >= WARNING: + sys.exit(1)