feat(python): implement full certificate verification in trezorctl (fixes #3364)

pull/3464/head
matejcik 5 months ago
parent c1fa1c0cbc
commit c22d20b1df

@ -0,0 +1 @@
trezorctl: support for human-friendly TS3 device authenticity check (requires separate installation of `cryptography` library).

@ -14,9 +14,12 @@
# You should have received a copy of the License along with this library.
# If not, see <https://www.gnu.org/licenses/lgpl-3.0.html>.
import io
import logging
import secrets
import sys
from typing import TYPE_CHECKING, Optional, Sequence
from logging import DEBUG, ERROR, INFO, WARNING
from typing import TYPE_CHECKING, BinaryIO, Optional, Sequence, Tuple
import click
@ -45,6 +48,8 @@ SD_PROTECT_OPERATIONS = {
"refresh": messages.SdProtectOperationType.REFRESH,
}
LOG = logging.getLogger(__name__)
@click.group(name="device")
def cli() -> None:
@ -334,17 +339,278 @@ def set_busy(
return device.set_busy(client, expiry * 1000)
ROOT_PUBKEY_TS3 = bytes.fromhex(
"04ca97480ac0d7b1e6efafe518cd433cec2bf8ab9822d76eafd34363b55d63e60"
"380bff20acc75cde03cffcb50ab6f8ce70c878e37ebc58ff7cca0a83b16b15fa5"
)
TS3_CA_WHITELIST = [
bytes.fromhex(x)
for x in (
"04b12efa295ad825a534b7c0bf276e93ad116434426763fa87bfa8a2f12e726906dcf566813f62eba8f8795f94dba0391c53682809cbbd7e4ba01d960b4f1c68f1",
"04cb87d4c5d0fd5854e829f4c1b666e49a86c25c88a904c0feb66f1338faed0d7760010d7ea1a6474cbcfe1143bd4b5397a4e8b7fe86899113caecf42a984b0c0f",
"0450c45878b2c6403a5a16e97a8957dc3ea36919bce9321b357f6e7ebe6257ee54102a2c2fa5cefed1dabc498fc76dc0bcf3c3a8a415eac7cc32e7c18185f25b0d",
"0454d310d88d55d3044d80fcdbce9a63bf3118545fae71f6eca303272dcc4d25cf775ae3c18ae9f41b2cf29377bc4696fc79c8824a6fd6b9ca5fb6805ed6557aab",
"04e94bf05586a8e7a3e9aba32662a439be5f378da372219c8ee7cf8b4684dbfbd7ba88ed920c06f9f26deab9077654647738df8cf70898fea1c3aaf2ef086fc578",
"048c6c104bd7cc59cd5c5717533786a72ab59685bd13937f5542820e90f6ac6945e520e19d1d627a8e81ef5a94ef87de7a6a0d778e7dc9d389db877a5f9b629dd8",
)
]
DEV_ROOT_PUBKEY_TS3 = bytes.fromhex(
"047f77368dea2d4d61e989f474a56723c3212dacf8a808d8795595ef38441427c"
"4389bc454f02089d7f08b873005e4c28d432468997871c0bf286fd3861e21e96a"
)
def _require_cryptography() -> None:
try:
import cryptography # noqa: I900
version = [int(x) for x in cryptography.__version__.split(".")]
if version[0] < 41:
click.echo(
"You need to upgrade the 'cryptography' library to verify the signature."
)
click.echo("You can do so by running:")
click.echo(" pip3 install --upgrade cryptography")
sys.exit(1)
except ImportError:
click.echo(
"You need to install the 'cryptography' library to verify the signature."
)
click.echo("You can do so by running:")
click.echo(" pip3 install cryptography")
sys.exit(1)
class Certificate:
def __init__(self, cert_bytes: bytes) -> None:
from cryptography import x509 # noqa: I900
self.cert_bytes = cert_bytes
self.cert = x509.load_der_x509_certificate(cert_bytes)
def __str__(self) -> str:
return self.cert.subject.rfc4514_string()
def public_key(self) -> bytes:
from cryptography.hazmat.primitives import serialization # noqa: I900
return self.cert.public_key().public_bytes(
serialization.Encoding.X962,
serialization.PublicFormat.UncompressedPoint,
)
def verify(self, signature: bytes, message: bytes) -> None:
from cryptography.hazmat.primitives import hashes # noqa: I900
from cryptography.hazmat.primitives.asymmetric import ec # noqa: I900
cert_pubkey = self.cert.public_key()
assert isinstance(cert_pubkey, ec.EllipticCurvePublicKey)
cert_pubkey.verify(
self.fix_signature(signature),
message,
ec.ECDSA(hashes.SHA256()),
)
def verify_by(self, pubkey_bytes: bytes) -> None:
from cryptography.hazmat.primitives.asymmetric import ec # noqa: I900
pubkey = ec.EllipticCurvePublicKey.from_encoded_point(
ec.SECP256R1(), pubkey_bytes
)
algo_params = self.cert.signature_algorithm_parameters
assert isinstance(algo_params, ec.ECDSA)
pubkey.verify(
self.fix_signature(self.cert.signature),
self.cert.tbs_certificate_bytes,
algo_params,
)
def verify_issued_by(self, issuer: "Certificate") -> None:
self.verify_by(issuer.public_key())
@staticmethod
def _decode_signature_permissive(sig_bytes: bytes) -> Tuple[int, int]:
if len(sig_bytes) > 73:
raise ValueError("Invalid DER signature: too long.")
reader = io.BytesIO(sig_bytes)
tag = reader.read(1)
if tag != b"\x30":
raise ValueError("Invalid DER signature: not a sequence.")
length = reader.read(1)[0]
if length != len(sig_bytes) - 2:
raise ValueError("Invalid DER signature: invalid length.")
def read_int() -> int:
tag = reader.read(1)
if tag != b"\x02":
raise ValueError("Invalid DER signature: not an integer.")
length = reader.read(1)[0]
if length > 33:
raise ValueError("Invalid DER signature: integer too long.")
return int.from_bytes(reader.read(length), "big")
r = read_int()
s = read_int()
if reader.tell() != len(sig_bytes):
raise ValueError("Invalid DER signature: trailing data.")
return r, s
@staticmethod
def fix_signature(sig_bytes: bytes) -> bytes:
from cryptography.hazmat.primitives.asymmetric import utils # noqa: I900
r, s = Certificate._decode_signature_permissive(sig_bytes)
reencoded = utils.encode_dss_signature(r, s)
if reencoded != sig_bytes:
LOG.info(
"Re-encoding malformed signature: %s -> %s",
sig_bytes.hex(),
reencoded.hex(),
)
return reencoded
@cli.command()
@click.argument("hex_challenge", required=False)
@click.option("-R", "--root", type=click.File("rb"), help="Custom root certificate.")
@click.option(
"-r", "--raw", is_flag=True, help="Print raw cryptographic data and exit."
)
@click.option(
"-s",
"--skip-whitelist",
is_flag=True,
help="Do not check intermediate certificates against the whitelist.",
)
@with_client
def authenticate(client: "TrezorClient", hex_challenge: Optional[str]) -> None:
"""Get information to verify the authenticity of the device."""
def authenticate(
client: "TrezorClient",
hex_challenge: Optional[str],
root: Optional[BinaryIO],
raw: Optional[bool],
skip_whitelist: Optional[bool],
) -> None:
"""Verify the authenticity of the device.
Use the --raw option to get the raw challenge, signature, and certificate data.
Otherwise, trezorctl will attempt to decode the signatures and check their
authenticity. By default, it will also check the public keys against a built-in
whitelist, and in the future also against a whitelist downloaded from Trezor
servers. You can skip this check with the --skip-whitelist option.
\b
When not using --raw, 'cryptography' library is required. You can install it via:
pip3 install cryptography
"""
if hex_challenge is None:
hex_challenge = secrets.token_hex(32)
click.echo(f"Challenge: {hex_challenge}")
challenge = bytes.fromhex(hex_challenge)
msg = device.authenticate(client, challenge)
click.echo(f"Signature of challenge: {msg.signature.hex()}")
click.echo(f"Device certificate: {msg.certificates[0].hex()}")
for cert in msg.certificates[1:]:
click.echo(f"CA certificate: {cert.hex()}")
if raw:
click.echo(f"Challenge: {hex_challenge}")
click.echo(f"Signature of challenge: {msg.signature.hex()}")
click.echo(f"Device certificate: {msg.certificates[0].hex()}")
for cert in msg.certificates[1:]:
click.echo(f"CA certificate: {cert.hex()}")
return
_require_cryptography()
worst_level = DEBUG
def print_step(level: int, text: str) -> None:
nonlocal worst_level
worst_level = max(worst_level, level)
if level == ERROR:
level_str = click.style("ERROR", fg="red")
elif level == WARNING:
level_str = click.style("WARNING", fg="yellow")
elif level == INFO:
level_str = click.style("INFO", fg="blue")
elif level == DEBUG:
level_str = click.style("OK", fg="green")
else:
raise RuntimeError("Invalid log level")
click.echo(f"[{level_str}] {text}")
from cryptography import exceptions # noqa: I900
CHALLENGE_HEADER = b"AuthenticateDevice:"
challenge_bytes = (
len(CHALLENGE_HEADER).to_bytes(1, "big")
+ CHALLENGE_HEADER
+ len(challenge).to_bytes(1, "big")
+ challenge
)
try:
first_cert = Certificate(msg.certificates[0])
except Exception:
print_step(ERROR, "Failed to parse device certificate.")
sys.exit(5)
try:
first_cert.verify(msg.signature, challenge_bytes)
except exceptions.InvalidSignature:
print_step(ERROR, "Challenge verification failed.")
else:
print_step(DEBUG, "Challenge verified successfully.")
for issuer in msg.certificates[1:]:
try:
cert = Certificate(issuer)
except Exception:
print_step(ERROR, "Failed to parse CA certificate.")
continue
if skip_whitelist:
print_step(INFO, "Skipping public key whitelist check.")
else:
if cert.public_key() not in TS3_CA_WHITELIST:
print_step(WARNING, f"CA certificate not in whitelist: {cert}")
try:
first_cert.verify_issued_by(cert)
except exceptions.InvalidSignature:
print_step(ERROR, f"Certificate verification failed: {cert}")
continue
else:
print_step(DEBUG, f"Certificate verified successfully: {cert}")
first_cert = cert
roots = [
(DEBUG, "Trezor Company", ROOT_PUBKEY_TS3),
(ERROR, "TESTING ENVIRONMENT. DO NOT USE THIS DEVICE", DEV_ROOT_PUBKEY_TS3),
]
if root is not None:
try:
root_cert = Certificate(root.read())
roots.append(
(INFO, "the specified root certificate", root_cert.public_key())
)
except Exception:
print_step(ERROR, "Failed to parse provided root certificate.")
for level, issuer, pubkey in roots:
try:
first_cert.verify_by(pubkey)
print_step(level, f"Certificate issued by {issuer}: {first_cert}")
break
except Exception:
continue
if worst_level >= ERROR:
sys.exit(2)
elif worst_level >= WARNING:
sys.exit(1)

Loading…
Cancel
Save