mirror of
https://github.com/trezor/trezor-firmware.git
synced 2025-01-11 16:00:57 +00:00
feat(python): implement full certificate verification in trezorctl (fixes #3364)
This commit is contained in:
parent
8a3133bacc
commit
d3bdedf421
1
python/.changelog.d/3364.added
Normal file
1
python/.changelog.d/3364.added
Normal file
@ -0,0 +1 @@
|
||||
trezorctl: support for human-friendly Trezor Safe device authenticity check (requires separate installation of `cryptography` library).
|
@ -3,3 +3,4 @@ web3>=5
|
||||
Pillow>=10
|
||||
stellar-sdk>=6
|
||||
rlp>=1.1.0 ; python_version<'3.7'
|
||||
cryptography>=41
|
||||
|
@ -31,6 +31,7 @@ extras_require = {
|
||||
"qt-widgets": ["PyQt5"],
|
||||
"extra": ["Pillow>=10"],
|
||||
"stellar": ["stellar-sdk>=6"],
|
||||
"authentication": ["cryptography>=41"],
|
||||
}
|
||||
|
||||
extras_require["full"] = sum(extras_require.values(), [])
|
||||
|
383
python/src/trezorlib/authentication.py
Normal file
383
python/src/trezorlib/authentication.py
Normal file
@ -0,0 +1,383 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import logging
|
||||
import secrets
|
||||
import typing as t
|
||||
from importlib import metadata
|
||||
|
||||
from . import device
|
||||
from .client import TrezorClient
|
||||
|
||||
try:
|
||||
cryptography_version = metadata.version("cryptography")
|
||||
vsplit = [int(x) for x in cryptography_version.split(".")]
|
||||
if vsplit[0] < 41:
|
||||
raise ImportError(
|
||||
"cryptography>=41 is required for this module, "
|
||||
f"found cryptography=={cryptography_version}"
|
||||
)
|
||||
except ImportError as e:
|
||||
raise ImportError("cryptography>=41 is required for this module") from e
|
||||
|
||||
from cryptography import exceptions, x509
|
||||
from cryptography.hazmat.primitives import hashes, serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import ec, utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _pk_p256(pubkey_hex: str) -> ec.EllipticCurvePublicKey:
|
||||
return ec.EllipticCurvePublicKey.from_encoded_point(
|
||||
ec.SECP256R1(), bytes.fromhex(pubkey_hex)
|
||||
)
|
||||
|
||||
|
||||
CHALLENGE_HEADER = b"AuthenticateDevice:"
|
||||
|
||||
|
||||
class RootCertificate(t.NamedTuple):
|
||||
name: str
|
||||
device: str
|
||||
devel: bool
|
||||
pubkey: ec.EllipticCurvePublicKey
|
||||
|
||||
|
||||
ROOT_PUBLIC_KEYS = [
|
||||
RootCertificate(
|
||||
"Trezor Company",
|
||||
"Trezor Safe 3",
|
||||
False,
|
||||
_pk_p256(
|
||||
"04ca97480ac0d7b1e6efafe518cd433cec2bf8ab9822d76eafd34363b55d63e60"
|
||||
"380bff20acc75cde03cffcb50ab6f8ce70c878e37ebc58ff7cca0a83b16b15fa5"
|
||||
),
|
||||
),
|
||||
RootCertificate(
|
||||
"Trezor Company",
|
||||
"Trezor Safe 5",
|
||||
False,
|
||||
_pk_p256(
|
||||
"041854b27fb1d9f65abb66828e78c9dc0ca301e66081ab0c6a4d104f9df1cd0ad"
|
||||
"5a7c75f77a8c092f55cf825d2abaf734f934c9394d5e75f75a5a06a5ee9be93ae"
|
||||
),
|
||||
),
|
||||
RootCertificate(
|
||||
"TESTING ENVIRONMENT. DO NOT USE THIS DEVICE",
|
||||
"Trezor Safe 3",
|
||||
True,
|
||||
_pk_p256(
|
||||
"047f77368dea2d4d61e989f474a56723c3212dacf8a808d8795595ef38441427c"
|
||||
"4389bc454f02089d7f08b873005e4c28d432468997871c0bf286fd3861e21e96a"
|
||||
),
|
||||
),
|
||||
RootCertificate(
|
||||
"TESTING ENVIRONMENT. DO NOT USE THIS DEVICE",
|
||||
"Trezor Safe 5",
|
||||
True,
|
||||
_pk_p256(
|
||||
"04e48b69cd7962068d3cca3bcc6b1747ef496c1e28b5529e34ad7295215ea161d"
|
||||
"be8fb08ae0479568f9d2cb07630cb3e52f4af0692102da5873559e45e9fa72959"
|
||||
),
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
class DeviceNotAuthentic(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class Certificate:
|
||||
def __init__(self, cert_bytes: bytes) -> None:
|
||||
self.cert_bytes = cert_bytes
|
||||
self.cert = x509.load_der_x509_certificate(cert_bytes)
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self.cert.subject.rfc4514_string()
|
||||
|
||||
def public_key_bytes(self) -> bytes:
|
||||
return self.cert.public_key().public_bytes(
|
||||
serialization.Encoding.X962,
|
||||
serialization.PublicFormat.UncompressedPoint,
|
||||
)
|
||||
|
||||
def verify(self, signature: bytes, message: bytes) -> None:
|
||||
cert_pubkey = self.cert.public_key()
|
||||
assert isinstance(cert_pubkey, ec.EllipticCurvePublicKey)
|
||||
cert_pubkey.verify(
|
||||
self.fix_signature(signature),
|
||||
message,
|
||||
ec.ECDSA(hashes.SHA256()),
|
||||
)
|
||||
|
||||
def verify_by(self, pubkey: ec.EllipticCurvePublicKey) -> None:
|
||||
algo_params = self.cert.signature_algorithm_parameters
|
||||
assert isinstance(algo_params, ec.ECDSA)
|
||||
pubkey.verify(
|
||||
self.fix_signature(self.cert.signature),
|
||||
self.cert.tbs_certificate_bytes,
|
||||
algo_params,
|
||||
)
|
||||
|
||||
def _check_ca_extensions(self) -> bool:
|
||||
"""Check that this certificate is a valid Trezor CA.
|
||||
|
||||
KeyUsage must be present and allow certificate signing.
|
||||
BasicConstraints must be present, have the cA flag and a pathLenConstraint.
|
||||
|
||||
Any unrecognized non-critical extension is allowed. Any unrecognized critical
|
||||
extension is disallowed.
|
||||
"""
|
||||
missing_extension_classes = {x509.KeyUsage, x509.BasicConstraints}
|
||||
passed = True
|
||||
|
||||
for ext in self.cert.extensions:
|
||||
missing_extension_classes.discard(type(ext.value))
|
||||
|
||||
if isinstance(ext.value, x509.KeyUsage):
|
||||
if not ext.value.key_cert_sign:
|
||||
LOG.error(
|
||||
"Not a valid CA certificate: %s (keyCertSign not set)", self
|
||||
)
|
||||
passed = False
|
||||
|
||||
elif isinstance(ext.value, x509.BasicConstraints):
|
||||
if not ext.value.ca:
|
||||
LOG.error("Not a valid CA certificate: %s (cA not set)", self)
|
||||
passed = False
|
||||
if ext.value.path_length is None:
|
||||
LOG.error(
|
||||
"Not a valid CA certificate: %s (pathLenConstraint missing)",
|
||||
self,
|
||||
)
|
||||
passed = False
|
||||
|
||||
elif ext.critical:
|
||||
LOG.error(
|
||||
"Unknown critical extension %s in CA certificate: %s",
|
||||
self,
|
||||
type(ext.value).__name__,
|
||||
)
|
||||
passed = False
|
||||
|
||||
for ext in missing_extension_classes:
|
||||
LOG.error("Missing extension %s in CA certificate: %s", ext.__name__, self)
|
||||
passed = False
|
||||
|
||||
return passed
|
||||
|
||||
def is_issued_by(self, issuer: "Certificate", path_len: int) -> bool:
|
||||
"""Check if this certificate was issued by an issuer.
|
||||
|
||||
Returns True if:
|
||||
* our `issuer` is the same as issuer's `subject`,
|
||||
* the issuer is a valid CA, that is:
|
||||
- has the cA flag set
|
||||
- has a valid pathLenConstraint
|
||||
- pathLenConstraint does not exceed the current path length.
|
||||
* the issuer's public key signs this certificate.
|
||||
"""
|
||||
if issuer.cert.subject != self.cert.issuer:
|
||||
LOG.error("Certificate %s is not issued by %s.", self, issuer)
|
||||
return False
|
||||
|
||||
if not issuer._check_ca_extensions():
|
||||
return False
|
||||
|
||||
basic_constraints = issuer.cert.extensions.get_extension_for_class(
|
||||
x509.BasicConstraints
|
||||
).value
|
||||
assert basic_constraints.path_length is not None # check_ca_extensions
|
||||
if basic_constraints.path_length < path_len:
|
||||
LOG.error(
|
||||
"Issuer %s was not permitted to issue certificate %s", issuer, self
|
||||
)
|
||||
return False
|
||||
|
||||
try:
|
||||
pubkey = issuer.cert.public_key()
|
||||
assert isinstance(pubkey, ec.EllipticCurvePublicKey)
|
||||
self.verify_by(pubkey)
|
||||
return True
|
||||
except exceptions.InvalidSignature:
|
||||
LOG.error("Issuer %s did not sign certificate %s.", issuer, self)
|
||||
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def _decode_signature_permissive(sig_bytes: bytes) -> tuple[int, int]:
|
||||
if len(sig_bytes) > 73:
|
||||
raise ValueError("Unsupported DER signature: too long.")
|
||||
|
||||
reader = io.BytesIO(sig_bytes)
|
||||
tag = reader.read(1)
|
||||
if tag != b"\x30":
|
||||
raise ValueError("Invalid DER signature: not a sequence.")
|
||||
length = reader.read(1)[0]
|
||||
if length != len(sig_bytes) - 2:
|
||||
raise ValueError("Invalid DER signature: invalid length.")
|
||||
|
||||
def read_int() -> int:
|
||||
tag = reader.read(1)
|
||||
if tag != b"\x02":
|
||||
raise ValueError("Invalid DER signature: not an integer.")
|
||||
length = reader.read(1)[0]
|
||||
if length > 33:
|
||||
raise ValueError("Invalid DER signature: integer too long.")
|
||||
return int.from_bytes(reader.read(length), "big")
|
||||
|
||||
r = read_int()
|
||||
s = read_int()
|
||||
if reader.tell() != len(sig_bytes):
|
||||
raise ValueError("Invalid DER signature: trailing data.")
|
||||
return r, s
|
||||
|
||||
@staticmethod
|
||||
def fix_signature(sig_bytes: bytes) -> bytes:
|
||||
r, s = Certificate._decode_signature_permissive(sig_bytes)
|
||||
reencoded = utils.encode_dss_signature(r, s)
|
||||
if reencoded != sig_bytes:
|
||||
LOG.info(
|
||||
"Re-encoding malformed signature: %s -> %s",
|
||||
sig_bytes.hex(),
|
||||
reencoded.hex(),
|
||||
)
|
||||
return reencoded
|
||||
|
||||
|
||||
def verify_authentication_response(
|
||||
challenge: bytes,
|
||||
signature: bytes,
|
||||
cert_chain: t.Iterable[bytes],
|
||||
*,
|
||||
whitelist: t.Collection[bytes] | None,
|
||||
allow_development_devices: bool = False,
|
||||
root_pubkey: bytes | ec.EllipticCurvePublicKey | None = None,
|
||||
) -> None:
|
||||
"""Evaluate the response to an AuthenticateDevice call.
|
||||
|
||||
Performs all steps and logs their results via the logging facility. (The log can be
|
||||
accessed via the `LOG` object in this module.)
|
||||
|
||||
When done, raises DeviceNotAuthentic if the device is not authentic.
|
||||
|
||||
The optional argument `root_pubkey` allows you to specify a root public key either
|
||||
as an `ec.EllipticCurvePublicKey` object or as a byte-string representing P-256
|
||||
public key.
|
||||
"""
|
||||
if isinstance(root_pubkey, (bytes, bytearray, memoryview)):
|
||||
root_pubkey = ec.EllipticCurvePublicKey.from_encoded_point(
|
||||
ec.SECP256R1(), root_pubkey
|
||||
)
|
||||
|
||||
challenge_bytes = (
|
||||
len(CHALLENGE_HEADER).to_bytes(1, "big")
|
||||
+ CHALLENGE_HEADER
|
||||
+ len(challenge).to_bytes(1, "big")
|
||||
+ challenge
|
||||
)
|
||||
|
||||
cert_chain_iter = iter(cert_chain)
|
||||
|
||||
failed = False
|
||||
|
||||
try:
|
||||
cert = Certificate(next(cert_chain_iter))
|
||||
except Exception:
|
||||
LOG.error("Failed to parse device certificate.")
|
||||
raise DeviceNotAuthentic
|
||||
|
||||
try:
|
||||
cert.verify(signature, challenge_bytes)
|
||||
except exceptions.InvalidSignature:
|
||||
LOG.error("Challenge verification failed.")
|
||||
failed = True
|
||||
else:
|
||||
LOG.debug("Challenge verified successfully.")
|
||||
|
||||
cert_label = "Device certificate"
|
||||
for i, issuer_bytes in enumerate(cert_chain_iter, 1):
|
||||
try:
|
||||
ca_cert = Certificate(issuer_bytes)
|
||||
except Exception:
|
||||
LOG.error(f"Failed to parse CA certificate #{i}.")
|
||||
failed = True
|
||||
continue
|
||||
|
||||
if whitelist is None:
|
||||
LOG.warning("Skipping public key whitelist check.")
|
||||
else:
|
||||
if ca_cert.public_key_bytes() not in whitelist:
|
||||
LOG.error(f"CA certificate #{i} not in whitelist: %s", ca_cert)
|
||||
failed = True
|
||||
|
||||
if not cert.is_issued_by(ca_cert, i - 1):
|
||||
failed = True
|
||||
else:
|
||||
LOG.debug(f"{cert_label} verified successfully: %s", cert)
|
||||
|
||||
cert = ca_cert
|
||||
cert_label = f"CA #{i} certificate"
|
||||
|
||||
if root_pubkey is not None:
|
||||
try:
|
||||
cert.verify_by(root_pubkey)
|
||||
except Exception:
|
||||
LOG.error(f"{cert_label} was not issued by the specified root.")
|
||||
failed = True
|
||||
else:
|
||||
LOG.info(f"{cert_label} was issued by the specified root.")
|
||||
|
||||
else:
|
||||
for root in ROOT_PUBLIC_KEYS:
|
||||
try:
|
||||
cert.verify_by(root.pubkey)
|
||||
except Exception:
|
||||
continue
|
||||
else:
|
||||
LOG.debug(f"{cert_label} verified successfully: %s", cert)
|
||||
|
||||
if root.devel:
|
||||
if not allow_development_devices:
|
||||
level = logging.ERROR
|
||||
failed = True
|
||||
else:
|
||||
level = logging.WARNING
|
||||
else:
|
||||
level = logging.DEBUG
|
||||
LOG.log(
|
||||
level,
|
||||
"Successfully verified a %s manufactured by %s.",
|
||||
root.device,
|
||||
root.name,
|
||||
)
|
||||
break
|
||||
else:
|
||||
LOG.error(f"{cert_label} was issued by an unknown root.")
|
||||
failed = True
|
||||
|
||||
if failed:
|
||||
raise DeviceNotAuthentic
|
||||
|
||||
|
||||
def authenticate_device(
|
||||
client: TrezorClient,
|
||||
challenge: bytes | None = None,
|
||||
*,
|
||||
whitelist: t.Collection[bytes] | None = None,
|
||||
allow_development_devices: bool = False,
|
||||
root_pubkey: bytes | ec.EllipticCurvePublicKey | None = None,
|
||||
) -> None:
|
||||
if challenge is None:
|
||||
challenge = secrets.token_bytes(16)
|
||||
|
||||
resp = device.authenticate(client, challenge)
|
||||
|
||||
return verify_authentication_response(
|
||||
challenge,
|
||||
resp.signature,
|
||||
resp.certificates,
|
||||
whitelist=whitelist,
|
||||
allow_development_devices=allow_development_devices,
|
||||
root_pubkey=root_pubkey,
|
||||
)
|
@ -14,9 +14,10 @@
|
||||
# You should have received a copy of the License along with this library.
|
||||
# If not, see <https://www.gnu.org/licenses/lgpl-3.0.html>.
|
||||
|
||||
import logging
|
||||
import secrets
|
||||
import sys
|
||||
from typing import TYPE_CHECKING, Optional, Sequence, Tuple
|
||||
from typing import TYPE_CHECKING, BinaryIO, Optional, Sequence, Tuple
|
||||
|
||||
import click
|
||||
|
||||
@ -46,6 +47,8 @@ SD_PROTECT_OPERATIONS = {
|
||||
"refresh": messages.SdProtectOperationType.REFRESH,
|
||||
}
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@click.group(name="device")
|
||||
def cli() -> None:
|
||||
@ -347,15 +350,88 @@ def set_busy(
|
||||
|
||||
@cli.command()
|
||||
@click.argument("hex_challenge", required=False)
|
||||
@click.option("-R", "--root", type=click.File("rb"), help="Custom root certificate.")
|
||||
@click.option(
|
||||
"-r", "--raw", is_flag=True, help="Print raw cryptographic data and exit."
|
||||
)
|
||||
@click.option(
|
||||
"-s",
|
||||
"--skip-whitelist",
|
||||
is_flag=True,
|
||||
help="Do not check intermediate certificates against the whitelist.",
|
||||
)
|
||||
@with_client
|
||||
def authenticate(client: "TrezorClient", hex_challenge: Optional[str]) -> None:
|
||||
"""Get information to verify the authenticity of the device."""
|
||||
def authenticate(
|
||||
client: "TrezorClient",
|
||||
hex_challenge: Optional[str],
|
||||
root: Optional[BinaryIO],
|
||||
raw: Optional[bool],
|
||||
skip_whitelist: Optional[bool],
|
||||
) -> None:
|
||||
"""Verify the authenticity of the device.
|
||||
|
||||
Use the --raw option to get the raw challenge, signature, and certificate data.
|
||||
|
||||
Otherwise, trezorctl will attempt to decode the signatures and check their
|
||||
authenticity. By default, it will also check the public keys against a built-in
|
||||
whitelist, and in the future also against a whitelist downloaded from Trezor
|
||||
servers. You can skip this check with the --skip-whitelist option.
|
||||
|
||||
\b
|
||||
When not using --raw, 'cryptography' library is required. You can install it via:
|
||||
pip3 install trezor[authentication]
|
||||
"""
|
||||
if hex_challenge is None:
|
||||
hex_challenge = secrets.token_hex(32)
|
||||
click.echo(f"Challenge: {hex_challenge}")
|
||||
|
||||
challenge = bytes.fromhex(hex_challenge)
|
||||
|
||||
if raw:
|
||||
msg = device.authenticate(client, challenge)
|
||||
|
||||
click.echo(f"Challenge: {hex_challenge}")
|
||||
click.echo(f"Signature of challenge: {msg.signature.hex()}")
|
||||
click.echo(f"Device certificate: {msg.certificates[0].hex()}")
|
||||
for cert in msg.certificates[1:]:
|
||||
click.echo(f"CA certificate: {cert.hex()}")
|
||||
return
|
||||
|
||||
try:
|
||||
from .. import authentication
|
||||
except ImportError as e:
|
||||
click.echo("Failed to import the authentication module.")
|
||||
click.echo(f"Error: {e}")
|
||||
click.echo("Make sure you have the required dependencies:")
|
||||
click.echo(" pip3 install trezor[authentication]")
|
||||
sys.exit(4)
|
||||
|
||||
if root is not None:
|
||||
root_bytes = root.read()
|
||||
else:
|
||||
root_bytes = None
|
||||
|
||||
class ColoredFormatter(logging.Formatter):
|
||||
LEVELS = {
|
||||
logging.ERROR: click.style("ERROR", fg="red"),
|
||||
logging.WARNING: click.style("WARNING", fg="yellow"),
|
||||
logging.INFO: click.style("INFO", fg="blue"),
|
||||
logging.DEBUG: click.style("OK", fg="green"),
|
||||
}
|
||||
|
||||
def format(self, record: logging.LogRecord) -> str:
|
||||
prefix = self.LEVELS[record.levelno]
|
||||
bold_args = tuple(
|
||||
click.style(str(arg), bold=True) for arg in record.args or ()
|
||||
)
|
||||
return f"[{prefix}] {record.msg}" % bold_args
|
||||
|
||||
handler = logging.StreamHandler()
|
||||
handler.setFormatter(ColoredFormatter())
|
||||
authentication.LOG.addHandler(handler)
|
||||
authentication.LOG.setLevel(logging.DEBUG)
|
||||
|
||||
try:
|
||||
authentication.authenticate_device(client, challenge, root_pubkey=root_bytes)
|
||||
except authentication.DeviceNotAuthentic:
|
||||
click.echo("Device is not authentic.")
|
||||
sys.exit(5)
|
||||
|
Loading…
Reference in New Issue
Block a user