diff --git a/poetry.lock b/poetry.lock index 60ef9018d4..bd73c8b4ab 100644 --- a/poetry.lock +++ b/poetry.lock @@ -140,17 +140,6 @@ python-versions = ">=3.6" [package.extras] extras = ["arrow", "cloudpickle", "enum34", "lz4", "numpy", "ruamel.yaml"] -[[package]] -name = "construct-classes" -version = "0.1.2" -description = "Parse your binary structs into dataclasses" -category = "main" -optional = false -python-versions = ">=3.6.2,<4.0" - -[package.dependencies] -construct = ">=2.10,<3.0" - [[package]] name = "coverage" version = "4.5.4" @@ -580,8 +569,8 @@ python-versions = ">=3.6" importlib-metadata = {version = ">=0.12", markers = "python_version < \"3.8\""} [package.extras] -testing = ["pytest-benchmark", "pytest"] -dev = ["tox", "pre-commit"] +dev = ["pre-commit", "tox"] +testing = ["pytest", "pytest-benchmark"] [[package]] name = "protobuf" @@ -846,8 +835,8 @@ click = ">=7,<9" colorama = "*" [package.extras] +dev = ["black", "flake8", "isort"] tests = ["pytest"] -dev = ["isort", "flake8", "black"] [[package]] name = "simple-rlp" @@ -950,7 +939,6 @@ develop = true [package.dependencies] click = ">=7,<8.2" construct = ">=2.9,<2.10.55 || >2.10.55" -construct-classes = ">=0.1.2" ecdsa = ">=0.9" libusb1 = ">=1.6.4" mnemonic = ">=0.20" @@ -1079,31 +1067,7 @@ attrs = [ autoflake = [ {file = "autoflake-1.4.tar.gz", hash = "sha256:61a353012cff6ab94ca062823d1fb2f692c4acda51c76ff83a8d77915fba51ea"}, ] -black = [ - {file = "black-22.3.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:2497f9c2386572e28921fa8bec7be3e51de6801f7459dffd6e62492531c47e09"}, - {file = "black-22.3.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:5795a0375eb87bfe902e80e0c8cfaedf8af4d49694d69161e5bd3206c18618bb"}, - {file = "black-22.3.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:e3556168e2e5c49629f7b0f377070240bd5511e45e25a4497bb0073d9dda776a"}, - {file = "black-22.3.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:67c8301ec94e3bcc8906740fe071391bce40a862b7be0b86fb5382beefecd968"}, - {file = "black-22.3.0-cp310-cp310-win_amd64.whl", hash = "sha256:fd57160949179ec517d32ac2ac898b5f20d68ed1a9c977346efbac9c2f1e779d"}, - {file = "black-22.3.0-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:cc1e1de68c8e5444e8f94c3670bb48a2beef0e91dddfd4fcc29595ebd90bb9ce"}, - {file = "black-22.3.0-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6d2fc92002d44746d3e7db7cf9313cf4452f43e9ea77a2c939defce3b10b5c82"}, - {file = "black-22.3.0-cp36-cp36m-win_amd64.whl", hash = "sha256:a6342964b43a99dbc72f72812bf88cad8f0217ae9acb47c0d4f141a6416d2d7b"}, - {file = "black-22.3.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:328efc0cc70ccb23429d6be184a15ce613f676bdfc85e5fe8ea2a9354b4e9015"}, - {file = "black-22.3.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:06f9d8846f2340dfac80ceb20200ea5d1b3f181dd0556b47af4e8e0b24fa0a6b"}, - {file = "black-22.3.0-cp37-cp37m-win_amd64.whl", hash = "sha256:ad4efa5fad66b903b4a5f96d91461d90b9507a812b3c5de657d544215bb7877a"}, - {file = "black-22.3.0-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:e8477ec6bbfe0312c128e74644ac8a02ca06bcdb8982d4ee06f209be28cdf163"}, - {file = "black-22.3.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:637a4014c63fbf42a692d22b55d8ad6968a946b4a6ebc385c5505d9625b6a464"}, - {file = "black-22.3.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:863714200ada56cbc366dc9ae5291ceb936573155f8bf8e9de92aef51f3ad0f0"}, - {file = "black-22.3.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:10dbe6e6d2988049b4655b2b739f98785a884d4d6b85bc35133a8fb9a2233176"}, - {file = "black-22.3.0-cp38-cp38-win_amd64.whl", hash = "sha256:cee3e11161dde1b2a33a904b850b0899e0424cc331b7295f2a9698e79f9a69a0"}, - {file = "black-22.3.0-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:5891ef8abc06576985de8fa88e95ab70641de6c1fca97e2a15820a9b69e51b20"}, - {file = "black-22.3.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:30d78ba6bf080eeaf0b7b875d924b15cd46fec5fd044ddfbad38c8ea9171043a"}, - {file = "black-22.3.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:ee8f1f7228cce7dffc2b464f07ce769f478968bfb3dd1254a4c2eeed84928aad"}, - {file = "black-22.3.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6ee227b696ca60dd1c507be80a6bc849a5a6ab57ac7352aad1ffec9e8b805f21"}, - {file = "black-22.3.0-cp39-cp39-win_amd64.whl", hash = "sha256:9b542ced1ec0ceeff5b37d69838106a6348e60db7b8fdd245294dc1d26136265"}, - {file = "black-22.3.0-py3-none-any.whl", hash = "sha256:bc58025940a896d7e5356952228b68f793cf5fcb342be703c3a2669a1488cb72"}, - {file = "black-22.3.0.tar.gz", hash = "sha256:35020b8886c022ced9282b51b5a875b6d1ab0c387b31a065b84db7c33085ca79"}, -] +black = [] certifi = [ {file = "certifi-2021.10.8-py2.py3-none-any.whl", hash = "sha256:d62a0163eb4c2344ac042ab2bdf75399a71a2d8c7d47eac2e2ee91b9d6339569"}, {file = "certifi-2021.10.8.tar.gz", hash = "sha256:78884e7c1d4b00ce3cea67b44566851c4343c120abd683433ce934a68ea58872"}, @@ -1178,10 +1142,6 @@ colorama = [ construct = [ {file = "construct-2.10.67.tar.gz", hash = "sha256:730235fedf4f2fee5cfadda1d14b83ef1bf23790fb1cc579073e10f70a050883"}, ] -construct-classes = [ - {file = "construct-classes-0.1.2.tar.gz", hash = "sha256:72ac1abbae5bddb4918688713f991f5a7fb6c9b593646a82f4bf3ac53de7eeb5"}, - {file = "construct_classes-0.1.2-py3-none-any.whl", hash = "sha256:e82437261790758bda41e45fb3d5622b54cfbf044ceb14774af68346faf5e08e"}, -] coverage = [ {file = "coverage-4.5.4-cp26-cp26m-macosx_10_12_x86_64.whl", hash = "sha256:eee64c616adeff7db37cc37da4180a3a5b6177f5c46b187894e633f088fb5b28"}, {file = "coverage-4.5.4-cp27-cp27m-macosx_10_12_x86_64.whl", hash = "sha256:ef824cad1f980d27f26166f86856efe11eff9912c4fed97d3804820d43fa550c"}, @@ -1259,10 +1219,7 @@ ecdsa = [ ed25519 = [ {file = "ed25519-1.5.tar.gz", hash = "sha256:02053ee019ceef0df97294be2d4d5a8fc120fc86e81e08bec1245fc0f9403358"}, ] -execnet = [ - {file = "execnet-1.9.0-py2.py3-none-any.whl", hash = "sha256:a295f7cc774947aac58dde7fdc85f4aa00c42adf5d8f5468fc630c1acf30a142"}, - {file = "execnet-1.9.0.tar.gz", hash = "sha256:8f694f3ba9cc92cab508b152dcfe322153975c29bda272e2fd7f3f00f36e47c5"}, -] +execnet = [] fido2 = [ {file = "fido2-0.8.1.tar.gz", hash = "sha256:449068f6876f397c8bb96ebc6a75c81c2692f045126d3f13ece21d409acdf7c3"}, ] @@ -1605,10 +1562,7 @@ pytest = [ {file = "pytest-6.2.5-py3-none-any.whl", hash = "sha256:7310f8d27bc79ced999e760ca304d69f6ba6c6649c0b60fb0e04a4a77cacc134"}, {file = "pytest-6.2.5.tar.gz", hash = "sha256:131b36680866a76e6781d13f101efb86cf674ebb9762eb70d3082b6f29889e89"}, ] -pytest-forked = [ - {file = "pytest-forked-1.4.0.tar.gz", hash = "sha256:8b67587c8f98cbbadfdd804539ed5455b6ed03802203485dd2f53c1422d7440e"}, - {file = "pytest_forked-1.4.0-py3-none-any.whl", hash = "sha256:bbbb6717efc886b9d64537b41fb1497cfaf3c9601276be8da2cccfea5a3c8ad8"}, -] +pytest-forked = [] pytest-ordering = [ {file = "pytest-ordering-0.6.tar.gz", hash = "sha256:561ad653626bb171da78e682f6d39ac33bb13b3e272d406cd555adb6b006bda6"}, {file = "pytest_ordering-0.6-py2-none-any.whl", hash = "sha256:27fba3fc265f5d0f8597e7557885662c1bdc1969497cd58aff6ed21c3b617de2"}, @@ -1622,10 +1576,7 @@ pytest-timeout = [ {file = "pytest-timeout-2.1.0.tar.gz", hash = "sha256:c07ca07404c612f8abbe22294b23c368e2e5104b521c1790195561f37e1ac3d9"}, {file = "pytest_timeout-2.1.0-py3-none-any.whl", hash = "sha256:f6f50101443ce70ad325ceb4473c4255e9d74e3c7cd0ef827309dfa4c0d975c6"}, ] -pytest-xdist = [ - {file = "pytest-xdist-2.5.0.tar.gz", hash = "sha256:4580deca3ff04ddb2ac53eba39d76cb5dd5edeac050cb6fbc768b0dd712b4edf"}, - {file = "pytest_xdist-2.5.0-py3-none-any.whl", hash = "sha256:6fe5c74fec98906deb8f2d2b616b5c782022744978e7bd4695d39c8f42d0ce65"}, -] +pytest-xdist = [] python-bitcoinlib = [ {file = "python-bitcoinlib-0.11.0.tar.gz", hash = "sha256:3daafd63cb755f6e2067b7c9c514053856034c9f9363c80c37007744d54a2e06"}, {file = "python_bitcoinlib-0.11.0-py3-none-any.whl", hash = "sha256:6e7982734637135599e2136d3c88d622f147e3b29201636665f799365784cd9e"}, @@ -1638,13 +1589,6 @@ pyyaml = [ {file = "PyYAML-6.0-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:f84fbc98b019fef2ee9a1cb3ce93e3187a6df0b2538a651bfb890254ba9f90b5"}, {file = "PyYAML-6.0-cp310-cp310-win32.whl", hash = "sha256:2cd5df3de48857ed0544b34e2d40e9fac445930039f3cfe4bcc592a1f836d513"}, {file = "PyYAML-6.0-cp310-cp310-win_amd64.whl", hash = "sha256:daf496c58a8c52083df09b80c860005194014c3698698d1a57cbcfa182142a3a"}, - {file = "PyYAML-6.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:d4b0ba9512519522b118090257be113b9468d804b19d63c71dbcf4a48fa32358"}, - {file = "PyYAML-6.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:81957921f441d50af23654aa6c5e5eaf9b06aba7f0a19c18a538dc7ef291c5a1"}, - {file = "PyYAML-6.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:afa17f5bc4d1b10afd4466fd3a44dc0e245382deca5b3c353d8b757f9e3ecb8d"}, - {file = "PyYAML-6.0-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:dbad0e9d368bb989f4515da330b88a057617d16b6a8245084f1b05400f24609f"}, - {file = "PyYAML-6.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:432557aa2c09802be39460360ddffd48156e30721f5e8d917f01d31694216782"}, - {file = "PyYAML-6.0-cp311-cp311-win32.whl", hash = "sha256:bfaef573a63ba8923503d27530362590ff4f576c626d86a9fed95822a8255fd7"}, - {file = "PyYAML-6.0-cp311-cp311-win_amd64.whl", hash = "sha256:01b45c0191e6d66c470b6cf1b9531a771a83c1c4208272ead47a3ae4f2f603bf"}, {file = "PyYAML-6.0-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:897b80890765f037df3403d22bab41627ca8811ae55e9a722fd0392850ec4d86"}, {file = "PyYAML-6.0-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:50602afada6d6cbfad699b0c7bb50d5ccffa7e46a3d738092afddc1f9758427f"}, {file = "PyYAML-6.0-cp36-cp36m-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:48c346915c114f5fdb3ead70312bd042a953a8ce5c7106d5bfb1a5254e47da92"}, diff --git a/python/.changelog.d/2576.incompatible b/python/.changelog.d/2576.incompatible deleted file mode 100644 index bf56377943..0000000000 --- a/python/.changelog.d/2576.incompatible +++ /dev/null @@ -1 +0,0 @@ -Refactored firmware parsing and validation to a more object oriented approach. diff --git a/python/requirements.txt b/python/requirements.txt index 916fad94fc..6db6a087ac 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -7,4 +7,3 @@ construct>=2.9,!=2.10.55 typing_extensions>=3.10 dataclasses ; python_version<'3.7' simple-rlp>=0.1.2 ; python_version>='3.7' -construct-classes>=0.1.2 diff --git a/python/src/trezorlib/_internal/firmware_headers.py b/python/src/trezorlib/_internal/firmware_headers.py index 18f8f7d38d..e935f7817f 100644 --- a/python/src/trezorlib/_internal/firmware_headers.py +++ b/python/src/trezorlib/_internal/firmware_headers.py @@ -14,15 +14,13 @@ # You should have received a copy of the License along with this library. # If not, see . -import typing as t -from copy import copy -from dataclasses import asdict +import struct from enum import Enum +from hashlib import blake2s +from typing import Any, List, Optional import click import construct as c -from construct_classes import Struct -from typing_extensions import Protocol, Self, runtime_checkable from .. import cosi, firmware @@ -45,25 +43,48 @@ VHASH_DEVEL = bytes.fromhex( ) +AnyFirmware = c.Struct( + "vendor_header" / c.Optional(firmware.VendorHeader), + "image" / c.Optional(firmware.FirmwareImage), +) + + class ImageType(Enum): VENDOR_HEADER = 0 BOOTLOADER = 1 FIRMWARE = 2 -def _make_dev_keys(*key_bytes: bytes) -> t.Sequence[bytes]: +def _make_dev_keys(*key_bytes: bytes) -> List[bytes]: return [k * 32 for k in key_bytes] +def compute_vhash(vendor_header: c.Container) -> bytes: + m = vendor_header.sig_m + n = vendor_header.sig_n + pubkeys = vendor_header.pubkeys + h = blake2s() + h.update(struct.pack(" bool: return all(b == 0 for b in data) -def _check_signature_any(fw: "SignableImageProto", is_devel: bool) -> Status: - if not fw.signature_present(): +def _check_signature_any( + header: c.Container, m: int, pubkeys: List[bytes], is_devel: bool +) -> Status: + if all_zero(header.signature) and header.sigmask == 0: return Status.MISSING try: - fw.verify() + digest = firmware.header_digest(header) + cosi.verify(header.signature, digest, m, pubkeys, header.sigmask) return Status.VALID if not is_devel else Status.DEVEL except Exception: return Status.INVALID @@ -77,11 +98,11 @@ class LiteralStr(str): def _format_container( - pb: t.Union[c.Container, Struct, dict], + pb: c.Container, indent: int = 0, sep: str = " " * 4, - truncate_after: t.Optional[int] = 64, - truncate_to: t.Optional[int] = 32, + truncate_after: Optional[int] = 64, + truncate_to: Optional[int] = 32, ) -> str: def mostly_printable(bytes: bytes) -> bool: if not bytes: @@ -89,7 +110,7 @@ def _format_container( printable = sum(1 for byte in bytes if 0x20 <= byte <= 0x7E) return printable / len(bytes) > 0.8 - def pformat(value: t.Any, indent: int) -> str: + def pformat(value: Any, indent: int) -> str: level = sep * indent leadin = sep * (indent + 1) @@ -106,9 +127,6 @@ def _format_container( lines[1:1] = [leadin + pformat(x, indent + 1) for x in value] return "\n".join(lines) - if isinstance(value, Struct): - value = asdict(value) - if isinstance(value, dict): lines = ["{"] for key, val in value.items(): @@ -140,140 +158,88 @@ def _format_container( return pformat(pb, indent) -def _format_version(version: t.Tuple[int, ...]) -> str: - return ".".join(str(i) for i in version) - - -def format_header( - header: firmware.core.FirmwareHeader, - code_hashes: t.Sequence[bytes], - digest: bytes, - sig_status: Status, -) -> str: - header_dict = asdict(header) - header_out = header_dict.copy() - - for key, val in header_out.items(): - if "version" in key: - header_out[key] = LiteralStr(_format_version(val)) - - hashes_out = [] - for expected, actual in zip(header.hashes, code_hashes): - status = SYM_OK if expected == actual else SYM_FAIL - hashes_out.append(LiteralStr(f"{status} {expected.hex()}")) - - if all(all_zero(h) for h in header.hashes): - hash_status = Status.MISSING - elif header.hashes != code_hashes: - hash_status = Status.INVALID - else: - hash_status = Status.VALID - - header_out["hashes"] = hashes_out - - all_ok = SYM_OK if hash_status.is_ok() and sig_status.is_ok() else SYM_FAIL - - output = [ - "Firmware Header " + _format_container(header_out), - f"Fingerprint: {click.style(digest.hex(), bold=True)}", - f"{all_ok} Signature is {sig_status.value}, hashes are {hash_status.value}", - ] - - return "\n".join(output) +def _format_version(version: c.Container) -> str: + version_str = ".".join( + str(version[k]) for k in ("major", "minor", "patch") if k in version + ) + if "build" in version: + version_str += f" build {version.build}" + return version_str # =========================== functionality implementations =============== -class SignableImageProto(Protocol): - NAME: t.ClassVar[str] +class SignableImage: + NAME = "Unrecognized image" + BIP32_INDEX: Optional[int] = None + DEV_KEYS: List[bytes] = [] + DEV_KEY_SIGMASK = 0b11 - @classmethod - def parse(cls, data: bytes) -> Self: - ... + def __init__(self, fw: c.Container) -> None: + self.fw = fw + self.header: Any + self.public_keys: List[bytes] + self.sigs_required = firmware.V2_SIGS_REQUIRED def digest(self) -> bytes: - ... + return firmware.header_digest(self.header) - def verify(self) -> None: - ... - - def build(self) -> bytes: - ... - - def format(self, verbose: bool = False) -> str: - ... - - def signature_present(self) -> bool: - ... - - def public_keys(self) -> t.Sequence[bytes]: - ... - - -@runtime_checkable -class CosiSignedImage(SignableImageProto, Protocol): - DEV_KEYS: t.ClassVar[t.Sequence[bytes]] = [] - - def insert_signature(self, signature: bytes, sigmask: int) -> None: - ... - - -@runtime_checkable -class LegacySignedImage(SignableImageProto, Protocol): - def slots(self) -> t.Iterable[int]: - ... - - def insert_signature(self, slot: int, key_index: int, signature: bytes) -> None: - ... - - -class CosiSignatureHeaderProto(Protocol): - signature: bytes - sigmask: int - - -class CosiSignedMixin: - def signature_present(self) -> bool: - header = self.get_header() - return not all_zero(header.signature) or header.sigmask != 0 - - def insert_signature(self, signature: bytes, sigmask: int) -> None: - self.get_header().signature = signature - self.get_header().sigmask = sigmask - - def get_header(self) -> CosiSignatureHeaderProto: + def check_signature(self) -> Status: raise NotImplementedError + def rehash(self) -> None: + pass -class VendorHeader(firmware.VendorHeader, CosiSignedMixin): + def insert_signature(self, signature: bytes, sigmask: int) -> None: + self.header.signature = signature + self.header.sigmask = sigmask + + def dump(self) -> bytes: + return AnyFirmware.build(self.fw) + + def format(self, verbose: bool) -> str: + return _format_container(self.fw) + + +class VendorHeader(SignableImage): NAME = "vendorheader" + BIP32_INDEX = 1 DEV_KEYS = _make_dev_keys(b"\x44", b"\x45") - SUBCON = c.Struct(*firmware.VendorHeader.SUBCON.subcons, c.Terminated) + def __init__(self, fw: c.Container) -> None: + super().__init__(fw) + self.header = fw.vendor_header + self.public_keys = firmware.V2_BOOTLOADER_KEYS - def get_header(self) -> CosiSignatureHeaderProto: - return self + def check_signature(self) -> Status: + return _check_signature_any( + self.header, self.sigs_required, self.public_keys, False + ) def _format(self, terse: bool) -> str: + vh = self.fw.vendor_header if not terse: + vhash = compute_vhash(vh) output = [ - "Vendor Header " + _format_container(self), - f"Pubkey bundle hash: {self.vhash().hex()}", + "Vendor Header " + _format_container(vh), + f"Pubkey bundle hash: {vhash.hex()}", ] else: output = [ "Vendor Header for {vendor} version {version} ({size} bytes)".format( - vendor=click.style(self.text, bold=True), - version=_format_version(self.version), - size=self.header_len, + vendor=click.style(vh.text, bold=True), + version=_format_version(vh.version), + size=vh.header_len, ), ] - if not terse: - output.append(f"Fingerprint: {click.style(self.digest().hex(), bold=True)}") + fingerprint = firmware.header_digest(vh) - sig_status = _check_signature_any(self, is_devel=False) + if not terse: + output.append(f"Fingerprint: {click.style(fingerprint.hex(), bold=True)}") + + sig_status = self.check_signature() sym = SYM_OK if sig_status.is_ok() else SYM_FAIL output.append(f"{sym} Signature is {sig_status.value}") @@ -282,168 +248,138 @@ class VendorHeader(firmware.VendorHeader, CosiSignedMixin): def format(self, verbose: bool = False) -> str: return self._format(terse=False) - def public_keys(self) -> t.Sequence[bytes]: - return firmware.V2_BOOTLOADER_KEYS + +class BinImage(SignableImage): + def __init__(self, fw: c.Container) -> None: + super().__init__(fw) + self.header = self.fw.image.header + self.code_hashes = firmware.calculate_code_hashes( + self.fw.image.code, self.fw.image._code_offset + ) + self.digest_header = self.header.copy() + self.digest_header.hashes = self.code_hashes + + def insert_signature(self, signature: bytes, sigmask: int) -> None: + super().insert_signature(signature, sigmask) + self.digest_header.signature = signature + self.digest_header.sigmask = sigmask + + def digest(self) -> bytes: + return firmware.header_digest(self.digest_header) + + def rehash(self) -> None: + self.header.hashes = self.code_hashes + + def format(self, verbose: bool = False) -> str: + header_out = self.header.copy() + + if not verbose: + for key in self.header: + if key.startswith("v1"): + del header_out[key] + if "version" in key: + header_out[key] = LiteralStr(_format_version(self.header[key])) + + all_ok = SYM_OK + hash_status = Status.VALID + sig_status = Status.VALID + + hashes_out = [] + for expected, actual in zip(self.header.hashes, self.code_hashes): + status = SYM_OK if expected == actual else SYM_FAIL + hashes_out.append(LiteralStr(f"{status} {expected.hex()}")) + + if all(all_zero(h) for h in self.header.hashes): + hash_status = Status.MISSING + elif self.header.hashes != self.code_hashes: + hash_status = Status.INVALID + else: + hash_status = Status.VALID + + header_out["hashes"] = hashes_out + + sig_status = self.check_signature() + all_ok = SYM_OK if hash_status.is_ok() and sig_status.is_ok() else SYM_FAIL + + output = [ + "Firmware Header " + _format_container(header_out), + f"Fingerprint: {click.style(self.digest().hex(), bold=True)}", + f"{all_ok} Signature is {sig_status.value}, hashes are {hash_status.value}", + ] + + return "\n".join(output) -class VendorFirmware(firmware.VendorFirmware, CosiSignedMixin): +class FirmwareImage(BinImage): NAME = "firmware" + BIP32_INDEX = 2 DEV_KEYS = _make_dev_keys(b"\x47", b"\x48") - def get_header(self) -> CosiSignatureHeaderProto: - return self.firmware.header + def __init__(self, fw: c.Container) -> None: + super().__init__(fw) + self.public_keys = fw.vendor_header.pubkeys + self.sigs_required = fw.vendor_header.sig_m - def format(self, verbose: bool = False) -> str: - vh = copy(self.vendor_header) - vh.__class__ = VendorHeader - assert isinstance(vh, VendorHeader) - - is_devel = self.vendor_header.vhash() == VHASH_DEVEL - - return ( - vh._format(terse=not verbose) - + "\n" - + format_header( - self.firmware.header, - self.firmware.code_hashes(), - self.digest(), - _check_signature_any(self, is_devel), - ) + def check_signature(self) -> Status: + vhash = compute_vhash(self.fw.vendor_header) + return _check_signature_any( + self.digest_header, + self.sigs_required, + self.public_keys, + vhash == VHASH_DEVEL, ) - def public_keys(self) -> t.Sequence[bytes]: - return self.vendor_header.pubkeys + def format(self, verbose: bool = False) -> str: + return ( + VendorHeader(self.fw)._format(terse=not verbose) + + "\n" + + super().format(verbose) + ) -class BootloaderImage(firmware.FirmwareImage, CosiSignedMixin): +class BootloaderImage(BinImage): NAME = "bootloader" + BIP32_INDEX = 0 DEV_KEYS = _make_dev_keys(b"\x41", b"\x42") - def get_header(self) -> CosiSignatureHeaderProto: - return self.header + def __init__(self, fw: c.Container) -> None: + super().__init__(fw) + self._identify_dev_keys() - def format(self, verbose: bool = False) -> str: - return format_header( + def insert_signature(self, signature: bytes, sigmask: int) -> None: + super().insert_signature(signature, sigmask) + self._identify_dev_keys() + + def _identify_dev_keys(self) -> None: + # try checking signature with dev keys first + self.public_keys = firmware.V2_BOARDLOADER_DEV_KEYS + if not self.check_signature().is_ok(): + # validation with dev keys failed, use production keys + self.public_keys = firmware.V2_BOARDLOADER_KEYS + + def check_signature(self) -> Status: + return _check_signature_any( self.header, - self.code_hashes(), - self.digest(), - _check_signature_any(self, False), + self.sigs_required, + self.public_keys, + self.public_keys == firmware.V2_BOARDLOADER_DEV_KEYS, ) - def verify(self) -> None: - self.validate_code_hashes() - try: - cosi.verify( - self.header.signature, - self.digest(), - firmware.V2_SIGS_REQUIRED, - firmware.V2_BOARDLOADER_KEYS, - self.header.sigmask, - ) - except Exception: - raise firmware.InvalidSignatureError("Invalid bootloader signature") - def public_keys(self) -> t.Sequence[bytes]: - return firmware.V2_BOARDLOADER_KEYS - - -class LegacyFirmware(firmware.LegacyFirmware): - NAME = "legacy_firmware_v1" - BIP32_INDEX = None - - def signature_present(self) -> bool: - return any(i != 0 for i in self.key_indexes) or any( - not all_zero(sig) for sig in self.signatures - ) - - def insert_signature(self, slot: int, key_index: int, signature: bytes) -> None: - if not 0 <= slot < firmware.V1_SIGNATURE_SLOTS: - raise ValueError("Invalid slot number") - if not 0 <= key_index < len(firmware.V1_BOOTLOADER_KEYS): - raise ValueError("Invalid key index") - self.key_indexes[slot] = key_index - self.signatures[slot] = signature - - def format(self, verbose: bool = False) -> str: - contents = asdict(self).copy() - del contents["embedded_v2"] - if self.embedded_v2: - em = copy(self.embedded_v2) - em.__class__ = LegacyV2Firmware - assert isinstance(em, LegacyV2Firmware) - embedded_content = "\nEmbedded V2 header: " + em.format(verbose=verbose) - else: - embedded_content = "" - - return _format_container(contents) + embedded_content - - def public_keys(self) -> t.Sequence[bytes]: - return firmware.V1_BOOTLOADER_KEYS - - def slots(self) -> t.Iterable[int]: - return self.key_indexes - - -class LegacyV2Firmware(firmware.LegacyV2Firmware): - NAME = "legacy_firmware_v2" - BIP32_INDEX = 5 - - def signature_present(self) -> bool: - return any(i != 0 for i in self.header.v1_key_indexes) or any( - not all_zero(sig) for sig in self.header.v1_signatures - ) - - def insert_signature(self, slot: int, key_index: int, signature: bytes) -> None: - if not 0 <= slot < firmware.V1_SIGNATURE_SLOTS: - raise ValueError("Invalid slot number") - if not 0 <= key_index < len(firmware.V1_BOOTLOADER_KEYS): - raise ValueError("Invalid key index") - if not isinstance(self.header.v1_key_indexes, list): - self.header.v1_key_indexes = list(self.header.v1_key_indexes) - if not isinstance(self.header.v1_signatures, list): - self.header.v1_signatures = list(self.header.v1_signatures) - self.header.v1_key_indexes[slot] = key_index - self.header.v1_signatures[slot] = signature - - def format(self, verbose: bool = False) -> str: - return format_header( - self.header, - self.code_hashes(), - self.digest(), - _check_signature_any(self, False), - ) - - def public_keys(self) -> t.Sequence[bytes]: - return firmware.V1_BOOTLOADER_KEYS - - def slots(self) -> t.Iterable[int]: - return self.header.v1_key_indexes - - -def parse_image(image: bytes) -> SignableImageProto: - try: - return VendorFirmware.parse(image) - except c.ConstructError: - pass - - try: - return VendorHeader.parse(image) - except c.ConstructError: - pass - - try: - firmware_img = firmware.core.FirmwareImage.parse(image) - if firmware_img.header.magic == firmware.core.HeaderType.BOOTLOADER: - return BootloaderImage.parse(image) - if firmware_img.header.magic == firmware.core.HeaderType.FIRMWARE: - return LegacyV2Firmware.parse(image) - raise ValueError("Unrecognized firmware header magic") - except c.ConstructError: - pass - - try: - return LegacyFirmware.parse(image) - except c.ConstructError: - pass - - raise ValueError("Unrecognized firmware type") +def parse_image(image: bytes) -> SignableImage: + fw = AnyFirmware.parse(image) + if fw.vendor_header and not fw.image: + return VendorHeader(fw) + if ( + not fw.vendor_header + and fw.image + and fw.image.header.magic == firmware.HeaderType.BOOTLOADER + ): + return BootloaderImage(fw) + if ( + fw.vendor_header + and fw.image + and fw.image.header.magic == firmware.HeaderType.FIRMWARE + ): + return FirmwareImage(fw) + raise ValueError("Unrecognized image type") diff --git a/python/src/trezorlib/cli/firmware.py b/python/src/trezorlib/cli/firmware.py index 189d59504a..fffbc8dbf9 100644 --- a/python/src/trezorlib/cli/firmware.py +++ b/python/src/trezorlib/cli/firmware.py @@ -26,18 +26,19 @@ from .. import exceptions, firmware from . import with_client if TYPE_CHECKING: + import construct as c from ..client import TrezorClient from . import TrezorConnection ALLOWED_FIRMWARE_FORMATS = { - 1: (firmware.LegacyFirmware, firmware.LegacyV2Firmware), - 2: (firmware.VendorFirmware,), + 1: (firmware.FirmwareFormat.TREZOR_ONE, firmware.FirmwareFormat.TREZOR_ONE_V2), + 2: (firmware.FirmwareFormat.TREZOR_T,), } -def _print_version(version: Tuple[int, int, int, int]) -> None: - major, minor, patch, build = version - click.echo(f"Firmware version {major}.{minor}.{patch} build {build}") +def _print_version(version: dict) -> None: + vstr = "Firmware version {major}.{minor}.{patch} build {build}".format(**version) + click.echo(vstr) def _is_bootloader_onev2(client: "TrezorClient") -> bool: @@ -58,26 +59,32 @@ def _get_file_name_from_url(url: str) -> str: return os.path.basename(full_path) -def print_firmware_version(fw: "firmware.FirmwareType") -> None: +def print_firmware_version( + version: firmware.FirmwareFormat, + fw: "c.Container", +) -> None: """Print out the firmware version and details.""" - if isinstance(fw, firmware.LegacyFirmware): - if fw.embedded_v2: + if version == firmware.FirmwareFormat.TREZOR_ONE: + if fw.embedded_onev2: click.echo("Trezor One firmware with embedded v2 image (1.8.0 or later)") - _print_version(fw.embedded_v2.header.version) + _print_version(fw.embedded_onev2.header.version) else: click.echo("Trezor One firmware image.") - elif isinstance(fw, firmware.LegacyV2Firmware): + elif version == firmware.FirmwareFormat.TREZOR_ONE_V2: click.echo("Trezor One v2 firmware (1.8.0 or later)") _print_version(fw.header.version) - elif isinstance(fw, firmware.VendorFirmware): + elif version == firmware.FirmwareFormat.TREZOR_T: click.echo("Trezor T firmware image.") vendor = fw.vendor_header.text - vendor_version = "{}.{}".format(*fw.vendor_header.version) + vendor_version = "{major}.{minor}".format(**fw.vendor_header.version) click.echo(f"Vendor header from {vendor}, version {vendor_version}") - _print_version(fw.firmware.header.version) + _print_version(fw.image.header.version) -def validate_signatures(fw: "firmware.FirmwareType") -> None: +def validate_signatures( + version: firmware.FirmwareFormat, + fw: "c.Container", +) -> None: """Check the signatures on the firmware. Prints the validity status. @@ -85,25 +92,18 @@ def validate_signatures(fw: "firmware.FirmwareType") -> None: Exits if the validation fails. """ try: - fw.verify() + firmware.validate(version, fw, allow_unsigned=False) click.echo("Signatures are valid.") except firmware.Unsigned: - if not isinstance(fw, firmware.LegacyFirmware): - raise - - # allow legacy firmware without signatures if not click.confirm("No signatures found. Continue?", default=False): sys.exit(1) - if firmware.is_onev2(fw): - try: - assert fw.embedded_v2 is not None - fw.embedded_v2.verify_unsigned() - except firmware.FirmwareIntegrityError as e: - click.echo(e) - click.echo("Firmware validation failed, aborting.") - sys.exit(4) - click.echo("Unsigned firmware looking OK.") - + try: + firmware.validate(version, fw, allow_unsigned=True) + click.echo("Unsigned firmware looking OK.") + except firmware.FirmwareIntegrityError as e: + click.echo(e) + click.echo("Firmware validation failed, aborting.") + sys.exit(4) except firmware.FirmwareIntegrityError as e: click.echo(e) click.echo("Firmware validation failed, aborting.") @@ -111,7 +111,8 @@ def validate_signatures(fw: "firmware.FirmwareType") -> None: def validate_fingerprint( - fw: "firmware.FirmwareType", + version: firmware.FirmwareFormat, + fw: "c.Container", expected_fingerprint: Optional[str] = None, ) -> None: """Determine and validate the firmware fingerprint. @@ -119,11 +120,12 @@ def validate_fingerprint( Prints the fingerprint. Exits if the validation fails. """ - fingerprint = fw.digest().hex() + fingerprint = firmware.digest(version, fw).hex() click.echo(f"Firmware fingerprint: {fingerprint}") - if firmware.is_onev2(fw): - assert fw.embedded_v2 is not None - fingerprint_onev2 = fw.embedded_v2.digest().hex() + if version == firmware.FirmwareFormat.TREZOR_ONE and fw.embedded_onev2: + fingerprint_onev2 = firmware.digest( + firmware.FirmwareFormat.TREZOR_ONE_V2, fw.embedded_onev2 + ).hex() click.echo(f"Embedded v2 image fingerprint: {fingerprint_onev2}") if expected_fingerprint and fingerprint != expected_fingerprint: click.echo(f"Expected fingerprint: {expected_fingerprint}") @@ -132,7 +134,8 @@ def validate_fingerprint( def check_device_match( - fw: "firmware.FirmwareType", + version: firmware.FirmwareFormat, + fw: "c.Container", bootloader_onev2: bool, trezor_major_version: int, ) -> None: @@ -140,24 +143,24 @@ def check_device_match( Prints error message and exits if the validation fails. """ - if trezor_major_version not in ALLOWED_FIRMWARE_FORMATS: - click.echo("trezorctl doesn't know your device version. Aborting.") - sys.exit(3) - elif not isinstance(fw, ALLOWED_FIRMWARE_FORMATS[trezor_major_version]): - click.echo("Firmware does not match your device, aborting.") - sys.exit(3) - if ( bootloader_onev2 - and isinstance(fw, firmware.LegacyFirmware) - and not fw.embedded_v2 + and version == firmware.FirmwareFormat.TREZOR_ONE + and not fw.embedded_onev2 ): click.echo("Firmware is too old for your device. Aborting.") sys.exit(3) - elif not bootloader_onev2 and isinstance(fw, firmware.LegacyV2Firmware): + elif not bootloader_onev2 and version == firmware.FirmwareFormat.TREZOR_ONE_V2: click.echo("You need to upgrade to bootloader 1.8.0 first.") sys.exit(3) + if trezor_major_version not in ALLOWED_FIRMWARE_FORMATS: + click.echo("trezorctl doesn't know your device version. Aborting.") + sys.exit(3) + elif version not in ALLOWED_FIRMWARE_FORMATS[trezor_major_version]: + click.echo("Firmware does not match your device, aborting.") + sys.exit(3) + def get_all_firmware_releases( bitcoin_only: bool, beta: bool, major_version: int @@ -345,17 +348,18 @@ def validate_firmware( - being compatible with the device (when chosen) """ try: - fw = firmware.parse(firmware_data) + version, fw = firmware.parse(firmware_data) except Exception as e: click.echo(e) sys.exit(2) - print_firmware_version(fw) - validate_signatures(fw) - validate_fingerprint(fw, fingerprint) + print_firmware_version(version, fw) + validate_signatures(version, fw) + validate_fingerprint(version, fw, fingerprint) if bootloader_onev2 is not None and trezor_major_version is not None: check_device_match( + version=version, fw=fw, bootloader_onev2=bootloader_onev2, trezor_major_version=trezor_major_version, @@ -368,7 +372,7 @@ def extract_embedded_fw( bootloader_onev2: bool, ) -> bytes: """Modify the firmware data for sending into Trezor, if necessary.""" - # special handling for embedded_v2-OneV2 format: + # special handling for embedded-OneV2 format: # for bootloader < 1.8, keep the embedding # for bootloader 1.8.0 and up, strip the old OneV1 header if ( @@ -376,7 +380,7 @@ def extract_embedded_fw( and firmware_data[:4] == b"TRZR" and firmware_data[256 : 256 + 4] == b"TRZF" ): - click.echo("Extracting embedded_v2 firmware image.") + click.echo("Extracting embedded firmware image.") return firmware_data[256:] return firmware_data diff --git a/python/src/trezorlib/cosi.py b/python/src/trezorlib/cosi.py index f8e423790c..77786a90fb 100644 --- a/python/src/trezorlib/cosi.py +++ b/python/src/trezorlib/cosi.py @@ -16,7 +16,7 @@ import warnings from functools import reduce -from typing import TYPE_CHECKING, Iterable, Optional, Sequence, Tuple +from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple from . import _ed25519, messages from .tools import expect @@ -90,7 +90,7 @@ def verify( signature: Ed25519Signature, digest: bytes, sigs_required: int, - keys: Sequence[Ed25519PublicPoint], + keys: List[Ed25519PublicPoint], mask: int, ) -> None: """Verify a CoSi multi-signature. Raise exception if the signature is invalid. diff --git a/python/src/trezorlib/firmware/__init__.py b/python/src/trezorlib/firmware/__init__.py index 1979d843ec..b2bbf0a094 100644 --- a/python/src/trezorlib/firmware/__init__.py +++ b/python/src/trezorlib/firmware/__init__.py @@ -14,62 +14,443 @@ # You should have received a copy of the License along with this library. # If not, see . -import typing as t +import hashlib +from enum import Enum from hashlib import blake2s +from typing import TYPE_CHECKING, Any, Callable, List, Optional, Tuple -from typing_extensions import Protocol, TypeGuard +import construct as c +import ecdsa -from .. import messages -from ..tools import expect, session -from .core import VendorFirmware -from .legacy import LegacyFirmware, LegacyV2Firmware +from . import cosi, messages +from .toif import ToifStruct +from .tools import expect, session, EnumAdapter -# re-exports: -if True: - # indented block prevents isort from messing with these until we upgrade to 5.x - from .consts import * # noqa: F401, F403 - from .core import * # noqa: F401, F403 - from .legacy import * # noqa: F401, F403 - from .util import ( # noqa: F401 - FirmwareIntegrityError, - InvalidSignatureError, - Unsigned, +if TYPE_CHECKING: + from .client import TrezorClient + +V1_SIGNATURE_SLOTS = 3 +V1_BOOTLOADER_KEYS = [ + bytes.fromhex(key) + for key in ( + "04d571b7f148c5e4232c3814f777d8faeaf1a84216c78d569b71041ffc768a5b2d810fc3bb134dd026b57e65005275aedef43e155f48fc11a32ec790a93312bd58", + "0463279c0c0866e50c05c799d32bd6bab0188b6de06536d1109d2ed9ce76cb335c490e55aee10cc901215132e853097d5432eda06b792073bd7740c94ce4516cb1", + "0443aedbb6f7e71c563f8ed2ef64ec9981482519e7ef4f4aa98b27854e8c49126d4956d300ab45fdc34cd26bc8710de0a31dbdf6de7435fd0b492be70ac75fde58", + "04877c39fd7c62237e038235e9c075dab261630f78eeb8edb92487159fffedfdf6046c6f8b881fa407c4a4ce6c28de0b19c1f4e29f1fcbc5a58ffd1432a3e0938a", + "047384c51ae81add0a523adbb186c91b906ffb64c2c765802bf26dbd13bdf12c319e80c2213a136c8ee03d7874fd22b70d68e7dee469decfbbb510ee9a460cda45", ) - from .vendor import * # noqa: F401, F403 +] -if t.TYPE_CHECKING: - from ..client import TrezorClient +V2_BOARDLOADER_KEYS = [ + bytes.fromhex(key) + for key in ( + "0eb9856be9ba7e972c7f34eac1ed9b6fd0efd172ec00faf0c589759da4ddfba0", + "ac8ab40b32c98655798fd5da5e192be27a22306ea05c6d277cdff4a3f4125cd8", + "ce0fcd12543ef5936cf2804982136707863d17295faced72af171d6e6513ff06", + ) +] - T = t.TypeVar("T", bound="FirmwareType") +V2_BOARDLOADER_DEV_KEYS = [ + bytes.fromhex(key) + for key in ( + "db995fe25169d141cab9bbba92baa01f9f2e1ece7df4cb2ac05190f37fcc1f9d", + "2152f8d19b791d24453242e15f2eab6cb7cffa7b6a5ed30097960e069881db12", + "22fc297792f0b6ffc0bfcfdb7edb0c0aa14e025a365ec0e342e86e3829cb74b6", + ) +] - class FirmwareType(Protocol): - @classmethod - def parse(cls: t.Type[T], data: bytes) -> T: - ... +V2_BOOTLOADER_KEYS = [ + bytes.fromhex(key) + for key in ( + "c2c87a49c5a3460977fbb2ec9dfe60f06bd694db8244bd4981fe3b7a26307f3f", + "80d036b08739b846f4cb77593078deb25dc9487aedcf52e30b4fb7cd7024178a", + "b8307a71f552c60a4cbb317ff48b82cdbf6b6bb5f04c920fec7badf017883751", + ) +] - def verify(self, public_keys: t.Sequence[bytes] = ()) -> None: - ... +V2_SIGS_REQUIRED = 2 - def digest(self) -> bytes: - ... +ONEV2_CHUNK_SIZE = 1024 * 64 +V2_CHUNK_SIZE = 1024 * 128 -def parse(data: bytes) -> "FirmwareType": +def _transform_vendor_trust(data: bytes) -> bytes: + """Byte-swap and bit-invert the VendorTrust field. + + Vendor trust is interpreted as a bitmask in a 16-bit little-endian integer, + with the added twist that 0 means set and 1 means unset. + We feed it to a `BitStruct` that expects a big-endian sequence where bits have + the traditional meaning. We must therefore do a bitwise negation of each byte, + and return them in reverse order. This is the same transformation both ways, + fortunately, so we don't need two separate functions. + """ + return bytes(~b & 0xFF for b in data)[::-1] + + +class FirmwareIntegrityError(Exception): + pass + + +class InvalidSignatureError(FirmwareIntegrityError): + pass + + +class Unsigned(FirmwareIntegrityError): + pass + + +class HeaderType(Enum): + FIRMWARE = b"TRZF" + BOOTLOADER = b"TRZB" + + +# fmt: off +VendorTrust = c.Transformed(c.BitStruct( + "_reserved" / c.Default(c.BitsInteger(9), 0), + "show_vendor_string" / c.Flag, + "require_user_click" / c.Flag, + "red_background" / c.Flag, + "delay" / c.BitsInteger(4), +), _transform_vendor_trust, 2, _transform_vendor_trust, 2) + + +VendorHeader = c.Struct( + "_start_offset" / c.Tell, + "magic" / c.Const(b"TRZV"), + "header_len" / c.Int32ul, + "expiry" / c.Int32ul, + "version" / c.Struct( + "major" / c.Int8ul, + "minor" / c.Int8ul, + ), + "sig_m" / c.Int8ul, + "sig_n" / c.Rebuild(c.Int8ul, c.len_(c.this.pubkeys)), + "trust" / VendorTrust, + "_reserved" / c.Padding(14), + "pubkeys" / c.Bytes(32)[c.this.sig_n], + "text" / c.Aligned(4, c.PascalString(c.Int8ul, "utf-8")), + "image" / ToifStruct, + "_end_offset" / c.Tell, + + "_min_header_len" / c.Check(c.this.header_len > (c.this._end_offset - c.this._start_offset) + 65), + "_header_len_aligned" / c.Check(c.this.header_len % 512 == 0), + + c.Padding(c.this.header_len - c.this._end_offset + c.this._start_offset - 65), + "sigmask" / c.Byte, + "signature" / c.Bytes(64), +) + + +VersionLong = c.Struct( + "major" / c.Int8ul, + "minor" / c.Int8ul, + "patch" / c.Int8ul, + "build" / c.Int8ul, +) + + +FirmwareHeader = c.Struct( + "_start_offset" / c.Tell, + "magic" / EnumAdapter(c.Bytes(4), HeaderType), + "header_len" / c.Int32ul, + "expiry" / c.Int32ul, + "code_length" / c.Rebuild( + c.Int32ul, + lambda this: + len(this._.code) if "code" in this._ + else (this.code_length or 0) + ), + "version" / VersionLong, + "fix_version" / VersionLong, + "_reserved" / c.Padding(8), + "hashes" / c.Bytes(32)[16], + + "v1_signatures" / c.Bytes(64)[V1_SIGNATURE_SLOTS], + "v1_key_indexes" / c.Int8ul[V1_SIGNATURE_SLOTS], # pylint: disable=E1136 + + "_reserved" / c.Padding(220), + "sigmask" / c.Byte, + "signature" / c.Bytes(64), + + "_end_offset" / c.Tell, + + "_rebuild_header_len" / c.If( + c.this.version.major > 1, + c.Pointer( + c.this._start_offset + 4, + c.Rebuild(c.Int32ul, c.this._end_offset - c.this._start_offset) + ), + ), +) + + +"""Raw firmware image. + +Consists of firmware header and code block. +This is the expected format of firmware binaries for Trezor One, or bootloader images +for Trezor T.""" +FirmwareImage = c.Struct( + "header" / FirmwareHeader, + "_code_offset" / c.Tell, + "code" / c.Bytes(c.this.header.code_length), + c.Terminated, +) + + +"""Firmware image prefixed by a vendor header. + +This is the expected format of firmware binaries for Trezor T.""" +VendorFirmware = c.Struct( + "vendor_header" / VendorHeader, + "image" / FirmwareImage, + c.Terminated, +) + + +"""Legacy firmware image. +Consists of a custom header and code block. +This is the expected format of firmware binaries for Trezor One pre-1.8.0. + +The code block can optionally be interpreted as a new-style firmware image. That is the +expected format of firmware binary for Trezor One version 1.8.0, which can be installed +by both the older and the newer bootloader.""" +LegacyFirmware = c.Struct( + "magic" / c.Const(b"TRZR"), + "code_length" / c.Rebuild(c.Int32ul, c.len_(c.this.code)), + "key_indexes" / c.Int8ul[V1_SIGNATURE_SLOTS], # pylint: disable=E1136 + "flags" / c.BitStruct( + c.Padding(7), + "restore_storage" / c.Flag, + ), + "_reserved" / c.Padding(52), + "signatures" / c.Bytes(64)[V1_SIGNATURE_SLOTS], + "code" / c.Bytes(c.this.code_length), + c.Terminated, + + "embedded_onev2" / c.RestreamData(c.this.code, c.Optional(FirmwareImage)), +) + +# fmt: on + + +class FirmwareFormat(Enum): + TREZOR_ONE = 1 + TREZOR_T = 2 + TREZOR_ONE_V2 = 3 + + +ParsedFirmware = Tuple[FirmwareFormat, c.Container] + + +def parse(data: bytes) -> ParsedFirmware: + if data[:4] == b"TRZR": + version = FirmwareFormat.TREZOR_ONE + cls = LegacyFirmware + elif data[:4] == b"TRZV": + version = FirmwareFormat.TREZOR_T + cls = VendorFirmware + elif data[:4] == b"TRZF": + version = FirmwareFormat.TREZOR_ONE_V2 + cls = FirmwareImage + else: + raise ValueError("Unrecognized firmware image type") + try: - if data[:4] == b"TRZR": - return LegacyFirmware.parse(data) - elif data[:4] == b"TRZV": - return VendorFirmware.parse(data) - elif data[:4] == b"TRZF": - return LegacyV2Firmware.parse(data) - else: - raise ValueError("Unrecognized firmware image type") + fw = cls.parse(data) except Exception as e: raise FirmwareIntegrityError("Invalid firmware image") from e + return version, fw -def is_onev2(fw: "FirmwareType") -> TypeGuard[LegacyFirmware]: - return isinstance(fw, LegacyFirmware) and fw.embedded_v2 is not None +def digest_onev1(fw: c.Container) -> bytes: + return hashlib.sha256(fw.code).digest() + + +def check_sig_v1( + digest: bytes, key_indexes: List[int], signatures: List[bytes] +) -> None: + distinct_key_indexes = set(i for i in key_indexes if i != 0) + if not distinct_key_indexes: + raise Unsigned + + if len(distinct_key_indexes) < len(key_indexes): + raise InvalidSignatureError( + f"Not enough distinct signatures (found {len(distinct_key_indexes)}, need {len(key_indexes)})" + ) + + for i in range(len(key_indexes)): + key_idx = key_indexes[i] - 1 + signature = signatures[i] + + if key_idx >= len(V1_BOOTLOADER_KEYS): + # unknown pubkey + raise InvalidSignatureError(f"Unknown key in slot {i}") + + pubkey = V1_BOOTLOADER_KEYS[key_idx][1:] + verify = ecdsa.VerifyingKey.from_string(pubkey, curve=ecdsa.curves.SECP256k1) + try: + verify.verify_digest(signature, digest) + except ecdsa.BadSignatureError as e: + raise InvalidSignatureError(f"Invalid signature in slot {i}") from e + + +def header_digest(header: c.Container, hash_function: Callable = blake2s) -> bytes: + stripped_header = header.copy() + stripped_header.sigmask = 0 + stripped_header.signature = b"\0" * 64 + stripped_header.v1_key_indexes = [0, 0, 0] + stripped_header.v1_signatures = [b"\0" * 64] * 3 + if header.magic == b"TRZV": + header_type = VendorHeader + else: + header_type = FirmwareHeader + header_bytes = header_type.build(stripped_header) + return hash_function(header_bytes).digest() + + +def digest_v2(fw: c.Container) -> bytes: + return header_digest(fw.image.header, blake2s) + + +def digest_onev2(fw: c.Container) -> bytes: + return header_digest(fw.header, hashlib.sha256) + + +def calculate_code_hashes( + code: bytes, + code_offset: int, + hash_function: Callable = blake2s, + chunk_size: int = V2_CHUNK_SIZE, + padding_byte: Optional[bytes] = None, +) -> List[bytes]: + hashes = [] + # End offset for each chunk. Normally this would be (i+1)*chunk_size for i-th chunk, + # but the first chunk is shorter by code_offset, so all end offsets are shifted. + ends = [(i + 1) * chunk_size - code_offset for i in range(16)] + start = 0 + for end in ends: + chunk = code[start:end] + # padding for last non-empty chunk + if padding_byte is not None and start < len(code) and end > len(code): + chunk += padding_byte[0:1] * (end - start - len(chunk)) + + if not chunk: + hashes.append(b"\0" * 32) + else: + hashes.append(hash_function(chunk).digest()) + + start = end + + return hashes + + +def validate_code_hashes(fw: c.Container, version: FirmwareFormat) -> None: + hash_function: Callable + padding_byte: Optional[bytes] + if version == FirmwareFormat.TREZOR_ONE_V2: + image = fw + hash_function = hashlib.sha256 + chunk_size = ONEV2_CHUNK_SIZE + padding_byte = b"\xff" + else: + image = fw.image + hash_function = blake2s + chunk_size = V2_CHUNK_SIZE + padding_byte = None + + expected_hashes = calculate_code_hashes( + image.code, image._code_offset, hash_function, chunk_size, padding_byte + ) + if expected_hashes != image.header.hashes: + raise FirmwareIntegrityError("Invalid firmware data.") + + +def validate_onev2(fw: c.Container, allow_unsigned: bool = False) -> None: + try: + check_sig_v1( + digest_onev2(fw), + fw.header.v1_key_indexes, + fw.header.v1_signatures, + ) + except Unsigned: + if not allow_unsigned: + raise + + validate_code_hashes(fw, FirmwareFormat.TREZOR_ONE_V2) + + +def validate_onev1(fw: c.Container, allow_unsigned: bool = False) -> None: + try: + check_sig_v1(digest_onev1(fw), fw.key_indexes, fw.signatures) + except Unsigned: + if not allow_unsigned: + raise + if fw.embedded_onev2: + validate_onev2(fw.embedded_onev2, allow_unsigned) + + +def validate_v2(fw: c.Container, skip_vendor_header: bool = False) -> None: + vendor_fingerprint = header_digest(fw.vendor_header) + fingerprint = digest_v2(fw) + + if not skip_vendor_header: + try: + # if you want to validate a custom vendor header, you can modify + # the global variables to match your keys and m-of-n scheme + cosi.verify( + fw.vendor_header.signature, + vendor_fingerprint, + V2_SIGS_REQUIRED, + V2_BOOTLOADER_KEYS, + fw.vendor_header.sigmask, + ) + except Exception: + raise InvalidSignatureError("Invalid vendor header signature.") + + # XXX expiry is not used now + # now = time.gmtime() + # if time.gmtime(fw.vendor_header.expiry) < now: + # raise ValueError("Vendor header expired.") + + try: + cosi.verify( + fw.image.header.signature, + fingerprint, + fw.vendor_header.sig_m, + fw.vendor_header.pubkeys, + fw.image.header.sigmask, + ) + except Exception: + raise InvalidSignatureError("Invalid firmware signature.") + + # XXX expiry is not used now + # if time.gmtime(fw.image.header.expiry) < now: + # raise ValueError("Firmware header expired.") + validate_code_hashes(fw, FirmwareFormat.TREZOR_T) + + +def digest(version: FirmwareFormat, fw: c.Container) -> bytes: + if version == FirmwareFormat.TREZOR_ONE: + return digest_onev1(fw) + elif version == FirmwareFormat.TREZOR_ONE_V2: + return digest_onev2(fw) + elif version == FirmwareFormat.TREZOR_T: + return digest_v2(fw) + else: + raise ValueError("Unrecognized firmware version") + + +def validate( + version: FirmwareFormat, fw: c.Container, allow_unsigned: bool = False +) -> None: + if version == FirmwareFormat.TREZOR_ONE: + return validate_onev1(fw, allow_unsigned) + elif version == FirmwareFormat.TREZOR_ONE_V2: + return validate_onev2(fw, allow_unsigned) + elif version == FirmwareFormat.TREZOR_T: + return validate_v2(fw) + else: + raise ValueError("Unrecognized firmware version") # ====== Client functions ====== # @@ -79,7 +460,7 @@ def is_onev2(fw: "FirmwareType") -> TypeGuard[LegacyFirmware]: def update( client: "TrezorClient", data: bytes, - progress_update: t.Callable[[int], t.Any] = lambda _: None, + progress_update: Callable[[int], Any] = lambda _: None, ): if client.features.bootloader_mode is False: raise RuntimeError("Device must be in bootloader mode") @@ -112,5 +493,5 @@ def update( @expect(messages.FirmwareHash, field="hash", ret_type=bytes) -def get_hash(client: "TrezorClient", challenge: t.Optional[bytes]): +def get_hash(client: "TrezorClient", challenge: Optional[bytes]): return client.call(messages.GetFirmwareHash(challenge=challenge)) diff --git a/python/src/trezorlib/firmware/consts.py b/python/src/trezorlib/firmware/consts.py deleted file mode 100644 index 312665c8e4..0000000000 --- a/python/src/trezorlib/firmware/consts.py +++ /dev/null @@ -1,43 +0,0 @@ -V1_SIGNATURE_SLOTS = 3 -V1_BOOTLOADER_KEYS = [ - bytes.fromhex(key) - for key in ( - "04d571b7f148c5e4232c3814f777d8faeaf1a84216c78d569b71041ffc768a5b2d810fc3bb134dd026b57e65005275aedef43e155f48fc11a32ec790a93312bd58", - "0463279c0c0866e50c05c799d32bd6bab0188b6de06536d1109d2ed9ce76cb335c490e55aee10cc901215132e853097d5432eda06b792073bd7740c94ce4516cb1", - "0443aedbb6f7e71c563f8ed2ef64ec9981482519e7ef4f4aa98b27854e8c49126d4956d300ab45fdc34cd26bc8710de0a31dbdf6de7435fd0b492be70ac75fde58", - "04877c39fd7c62237e038235e9c075dab261630f78eeb8edb92487159fffedfdf6046c6f8b881fa407c4a4ce6c28de0b19c1f4e29f1fcbc5a58ffd1432a3e0938a", - "047384c51ae81add0a523adbb186c91b906ffb64c2c765802bf26dbd13bdf12c319e80c2213a136c8ee03d7874fd22b70d68e7dee469decfbbb510ee9a460cda45", - ) -] - -V2_BOARDLOADER_KEYS = [ - bytes.fromhex(key) - for key in ( - "0eb9856be9ba7e972c7f34eac1ed9b6fd0efd172ec00faf0c589759da4ddfba0", - "ac8ab40b32c98655798fd5da5e192be27a22306ea05c6d277cdff4a3f4125cd8", - "ce0fcd12543ef5936cf2804982136707863d17295faced72af171d6e6513ff06", - ) -] - -V2_BOARDLOADER_DEV_KEYS = [ - bytes.fromhex(key) - for key in ( - "db995fe25169d141cab9bbba92baa01f9f2e1ece7df4cb2ac05190f37fcc1f9d", - "2152f8d19b791d24453242e15f2eab6cb7cffa7b6a5ed30097960e069881db12", - "22fc297792f0b6ffc0bfcfdb7edb0c0aa14e025a365ec0e342e86e3829cb74b6", - ) -] - -V2_BOOTLOADER_KEYS = [ - bytes.fromhex(key) - for key in ( - "c2c87a49c5a3460977fbb2ec9dfe60f06bd694db8244bd4981fe3b7a26307f3f", - "80d036b08739b846f4cb77593078deb25dc9487aedcf52e30b4fb7cd7024178a", - "b8307a71f552c60a4cbb317ff48b82cdbf6b6bb5f04c920fec7badf017883751", - ) -] - -V2_SIGS_REQUIRED = 2 - -ONEV2_CHUNK_SIZE = 1024 * 64 -V2_CHUNK_SIZE = 1024 * 128 diff --git a/python/src/trezorlib/firmware/core.py b/python/src/trezorlib/firmware/core.py deleted file mode 100644 index 628b187217..0000000000 --- a/python/src/trezorlib/firmware/core.py +++ /dev/null @@ -1,187 +0,0 @@ -import hashlib -import typing as t -from copy import copy -from enum import Enum - -import construct as c -from construct_classes import Struct, subcon - -from .. import cosi -from ..tools import EnumAdapter, TupleAdapter -from . import consts, util -from .vendor import VendorHeader - -__all__ = [ - "HeaderType", - "FirmwareHeader", - "FirmwareImage", - "VendorFirmware", -] - - -class HeaderType(Enum): - FIRMWARE = b"TRZF" - BOOTLOADER = b"TRZB" - - -class FirmwareHeader(Struct): - magic: HeaderType - header_len: int - expiry: int - code_length: int - version: t.Tuple[int, int, int, int] - fix_version: t.Tuple[int, int, int, int] - hashes: t.Sequence[bytes] - - v1_signatures: t.Sequence[bytes] - v1_key_indexes: t.Sequence[int] - - sigmask: int - signature: bytes - - # fmt: off - SUBCON = c.Struct( - "_start_offset" / c.Tell, - "magic" / EnumAdapter(c.Bytes(4), HeaderType), - "header_len" / c.Int32ul, - "expiry" / c.Int32ul, - "code_length" / c.Rebuild( - c.Int32ul, - lambda this: - len(this._.code) if "code" in this._ - else (this.code_length or 0) - ), - "version" / TupleAdapter(c.Int8ul, c.Int8ul, c.Int8ul, c.Int8ul), - "fix_version" / TupleAdapter(c.Int8ul, c.Int8ul, c.Int8ul, c.Int8ul), - "_reserved" / c.Padding(8), - "hashes" / c.Bytes(32)[16], - - "v1_signatures" / c.Bytes(64)[consts.V1_SIGNATURE_SLOTS], - "v1_key_indexes" / c.Int8ul[consts.V1_SIGNATURE_SLOTS], # pylint: disable=E1136 - - "_reserved" / c.Padding(220), - "sigmask" / c.Byte, - "signature" / c.Bytes(64), - - "_end_offset" / c.Tell, - - "_rebuild_header_len" / c.If( - c.this.version[0] > 1, - c.Pointer( - c.this._start_offset + 4, - c.Rebuild(c.Int32ul, c.this._end_offset - c.this._start_offset) - ), - ), - ) - # fmt: on - - -class FirmwareImage(Struct): - """Raw firmware image. - - Consists of firmware header and code block. - This is the expected format of firmware binaries for Trezor One, or bootloader images - for Trezor T.""" - - header: FirmwareHeader = subcon(FirmwareHeader) - _code_offset: int - code: bytes - - SUBCON = c.Struct( - "header" / FirmwareHeader.SUBCON, - "_code_offset" / c.Tell, - "code" / c.Bytes(c.this.header.code_length), - c.Terminated, - ) - - HASH_PARAMS = util.FirmwareHashParameters( - hash_function=hashlib.blake2s, - chunk_size=consts.V2_CHUNK_SIZE, - padding_byte=None, - ) - - def code_hashes(self) -> t.List[bytes]: - """Calculate hashes of chunks of `code`. - - Assume that the first `code_offset` bytes of `code` are taken up by the header. - """ - hashes = [] - # End offset for each chunk. Normally this would be (i+1)*chunk_size for i-th chunk, - # but the first chunk is shorter by code_offset, so all end offsets are shifted. - ends = [ - (i + 1) * self.HASH_PARAMS.chunk_size - self._code_offset for i in range(16) - ] - start = 0 - for end in ends: - chunk = self.code[start:end] - # padding for last non-empty chunk - if ( - self.HASH_PARAMS.padding_byte is not None - and start < len(self.code) - and end > len(self.code) - ): - chunk += self.HASH_PARAMS.padding_byte[0:1] * (end - start - len(chunk)) - - if not chunk: - hashes.append(b"\0" * 32) - else: - hashes.append(self.HASH_PARAMS.hash_function(chunk).digest()) - - start = end - - return hashes - - def validate_code_hashes(self) -> None: - if self.code_hashes() != self.header.hashes: - raise util.FirmwareIntegrityError("Invalid firmware data.") - - def digest(self) -> bytes: - header = copy(self.header) - header.hashes = self.code_hashes() - header.signature = b"\x00" * 64 - header.sigmask = 0 - header.v1_key_indexes = [0] * consts.V1_SIGNATURE_SLOTS - header.v1_signatures = [b"\x00" * 64] * consts.V1_SIGNATURE_SLOTS - return self.HASH_PARAMS.hash_function(header.build()).digest() - - -class VendorFirmware(Struct): - """Firmware image prefixed by a vendor header. - - This is the expected format of firmware binaries for Trezor T.""" - - vendor_header: VendorHeader = subcon(VendorHeader) - firmware: FirmwareImage = subcon(FirmwareImage) - - SUBCON = c.Struct( - "vendor_header" / VendorHeader.SUBCON, - "firmware" / FirmwareImage.SUBCON, - c.Terminated, - ) - - def digest(self) -> bytes: - return self.firmware.digest() - - def verify(self, _public_keys: t.Sequence[bytes] = ()) -> None: - if _public_keys: - raise ValueError("Cannot supply custom keys for vendor firmware.") - - self.firmware.validate_code_hashes() - - self.vendor_header.verify() - digest = self.digest() - try: - cosi.verify( - self.firmware.header.signature, - digest, - self.vendor_header.sig_m, - self.vendor_header.pubkeys, - self.firmware.header.sigmask, - ) - except Exception: - raise util.InvalidSignatureError("Invalid firmware signature.") - - # XXX expiry is not used now - # now = time.gmtime() - # if time.gmtime(fw.vendor_header.expiry) < now: - # raise ValueError("Vendor header expired.") diff --git a/python/src/trezorlib/firmware/legacy.py b/python/src/trezorlib/firmware/legacy.py deleted file mode 100644 index 58f394c272..0000000000 --- a/python/src/trezorlib/firmware/legacy.py +++ /dev/null @@ -1,124 +0,0 @@ -import hashlib -import typing as t -from dataclasses import field - -import construct as c -import ecdsa -from construct_classes import Struct, subcon - -from . import consts, util -from .core import FirmwareImage - -__all__ = [ - "LegacyFirmware", - "LegacyV2Firmware", - "check_sig_v1", -] - - -def check_sig_v1( - digest: bytes, - key_indexes: t.Sequence[int], - signatures: t.Sequence[bytes], - public_keys: t.Sequence[bytes], -) -> None: - """Validate signatures of `digest` using the Trezor One V1 method.""" - distinct_indexes = set(i for i in key_indexes if i != 0) - if not distinct_indexes: - raise util.Unsigned - - if len(distinct_indexes) < len(key_indexes): - raise util.InvalidSignatureError( - f"Not enough distinct signatures (found {len(distinct_indexes)}, need {len(key_indexes)})" - ) - - for i in range(len(key_indexes)): - key_idx = key_indexes[i] - 1 - signature = signatures[i] - - if key_idx >= len(public_keys): - # unknown pubkey - raise util.InvalidSignatureError(f"Unknown key in slot {i}") - - pubkey = public_keys[key_idx][1:] - verify = ecdsa.VerifyingKey.from_string(pubkey, curve=ecdsa.curves.SECP256k1) - try: - verify.verify_digest(signature, digest) - except ecdsa.BadSignatureError as e: - raise util.InvalidSignatureError(f"Invalid signature in slot {i}") from e - - -class LegacyV2Firmware(FirmwareImage): - """Firmware image in the format used by Trezor One 1.8.0 and newer.""" - - HASH_PARAMS = util.FirmwareHashParameters( - hash_function=hashlib.sha256, - chunk_size=consts.ONEV2_CHUNK_SIZE, - padding_byte=b"\xff", - ) - - def verify( - self, public_keys: t.Sequence[bytes] = consts.V1_BOOTLOADER_KEYS - ) -> None: - self.validate_code_hashes() - check_sig_v1( - self.digest(), - self.header.v1_key_indexes, - self.header.v1_signatures, - public_keys, - ) - - def verify_unsigned(self) -> None: - self.validate_code_hashes() - if any(i != 0 for i in self.header.v1_key_indexes): - raise util.InvalidSignatureError("Firmware is not unsigned.") - - -class LegacyFirmware(Struct): - """Legacy firmware image. - Consists of a custom header and code block. - This is the expected format of firmware binaries for Trezor One pre-1.8.0. - - The code block can optionally be interpreted as a new-style firmware image. That is the - expected format of firmware binary for Trezor One version 1.8.0, which can be installed - by both the older and the newer bootloader.""" - - key_indexes: t.List[int] - signatures: t.List[bytes] - code: bytes - flags: t.Dict[str, t.Any] = field(default_factory=dict) - embedded_v2: t.Optional[LegacyV2Firmware] = subcon(LegacyV2Firmware, default=None) - - # fmt: off - SUBCON = c.Struct( - "magic" / c.Const(b"TRZR"), - "code_length" / c.Rebuild(c.Int32ul, c.len_(c.this.code)), - "key_indexes" / c.Int8ul[consts.V1_SIGNATURE_SLOTS], # pylint: disable=E1136 - "flags" / c.BitStruct( - c.Padding(7), - "restore_storage" / c.Flag, - ), - "_reserved" / c.Padding(52), - "signatures" / c.Bytes(64)[consts.V1_SIGNATURE_SLOTS], - "code" / c.Bytes(c.this.code_length), - c.Terminated, - - "embedded_v2" / c.RestreamData(c.this.code, c.Optional(LegacyV2Firmware.SUBCON)), - ) - # fmt: on - - def digest(self) -> bytes: - return hashlib.sha256(self.code).digest() - - def verify( - self, public_keys: t.Sequence[bytes] = consts.V1_BOOTLOADER_KEYS - ) -> None: - check_sig_v1( - self.digest(), - self.key_indexes, - self.signatures, - public_keys, - ) - - if self.embedded_v2: - self.embedded_v2.verify(consts.V1_BOOTLOADER_KEYS) diff --git a/python/src/trezorlib/firmware/util.py b/python/src/trezorlib/firmware/util.py deleted file mode 100644 index 79fa804768..0000000000 --- a/python/src/trezorlib/firmware/util.py +++ /dev/null @@ -1,34 +0,0 @@ -import typing as t -from dataclasses import dataclass - -from typing_extensions import Protocol - - -class FirmwareIntegrityError(Exception): - pass - - -class InvalidSignatureError(FirmwareIntegrityError): - pass - - -class Unsigned(FirmwareIntegrityError): - pass - - -class DigestCalculator(Protocol): - def update(self, __data: bytes) -> None: - ... - - def digest(self) -> bytes: - ... - - -Hasher = t.Callable[[bytes], DigestCalculator] - - -@dataclass -class FirmwareHashParameters: - hash_function: Hasher - chunk_size: int - padding_byte: t.Optional[bytes] diff --git a/python/src/trezorlib/firmware/vendor.py b/python/src/trezorlib/firmware/vendor.py deleted file mode 100644 index 5855075427..0000000000 --- a/python/src/trezorlib/firmware/vendor.py +++ /dev/null @@ -1,128 +0,0 @@ -import hashlib -import typing as t -from copy import copy - -import construct as c -from construct_classes import Struct, subcon - -from .. import cosi -from ..toif import ToifStruct -from ..tools import TupleAdapter -from . import consts, util - -__all__ = [ - "VendorTrust", - "VendorHeader", -] - - -def _transform_vendor_trust(data: bytes) -> bytes: - """Byte-swap and bit-invert the VendorTrust field. - - Vendor trust is interpreted as a bitmask in a 16-bit little-endian integer, - with the added twist that 0 means set and 1 means unset. - We feed it to a `BitStruct` that expects a big-endian sequence where bits have - the traditional meaning. We must therefore do a bitwise negation of each byte, - and return them in reverse order. This is the same transformation both ways, - fortunately, so we don't need two separate functions. - """ - return bytes(~b & 0xFF for b in data)[::-1] - - -class VendorTrust(Struct): - show_vendor_string: bool - require_user_click: bool - red_background: bool - delay: int - - _reserved: int = 0 - - SUBCON = c.Transformed( - c.BitStruct( - "_reserved" / c.Default(c.BitsInteger(9), 0), - "show_vendor_string" / c.Flag, - "require_user_click" / c.Flag, - "red_background" / c.Flag, - "delay" / c.BitsInteger(4), - ), - _transform_vendor_trust, - 2, - _transform_vendor_trust, - 2, - ) - - -class VendorHeader(Struct): - header_len: int - expiry: int - version: t.Tuple[int, int] - sig_m: int - # sig_n: int - pubkeys: t.List[bytes] - text: str - image: t.Dict[str, t.Any] - sigmask: int - signature: bytes - - trust: VendorTrust = subcon(VendorTrust) - - # fmt: off - SUBCON = c.Struct( - "_start_offset" / c.Tell, - "magic" / c.Const(b"TRZV"), - "header_len" / c.Int32ul, - "expiry" / c.Int32ul, - "version" / TupleAdapter(c.Int8ul, c.Int8ul), - "sig_m" / c.Int8ul, - "sig_n" / c.Rebuild(c.Int8ul, c.len_(c.this.pubkeys)), - "trust" / VendorTrust.SUBCON, - "_reserved" / c.Padding(14), - "pubkeys" / c.Bytes(32)[c.this.sig_n], - "text" / c.Aligned(4, c.PascalString(c.Int8ul, "utf-8")), - "image" / ToifStruct, - "_end_offset" / c.Tell, - - "_min_header_len" / c.Check(c.this.header_len > (c.this._end_offset - c.this._start_offset) + 65), - "_header_len_aligned" / c.Check(c.this.header_len % 512 == 0), - - c.Padding(c.this.header_len - c.this._end_offset + c.this._start_offset - 65), - "sigmask" / c.Byte, - "signature" / c.Bytes(64), - ) - # fmt: on - - def digest(self) -> bytes: - cpy = copy(self) - cpy.sigmask = 0 - cpy.signature = b"\x00" * 64 - return hashlib.blake2s(cpy.build()).digest() - - def vhash(self) -> bytes: - h = hashlib.blake2s() - sig_n = len(self.pubkeys) - h.update(self.sig_m.to_bytes(1, "little")) - h.update(sig_n.to_bytes(1, "little")) - for i in range(8): - if i < sig_n: - h.update(self.pubkeys[i]) - else: - h.update(b"\x00" * 32) - return h.digest() - - def verify(self, pubkeys: t.Sequence[bytes] = consts.V2_BOOTLOADER_KEYS) -> None: - digest = self.digest() - try: - cosi.verify( - self.signature, - digest, - consts.V2_SIGS_REQUIRED, - pubkeys, - self.sigmask, - ) - except Exception: - raise util.InvalidSignatureError("Invalid vendor header signature.") - - # XXX expiry is not used now - # now = time.gmtime() - # if time.gmtime(fw.vendor_header.expiry) < now: - # raise ValueError("Vendor header expired.") diff --git a/python/src/trezorlib/tools.py b/python/src/trezorlib/tools.py index 8162032b54..5898e47fcd 100644 --- a/python/src/trezorlib/tools.py +++ b/python/src/trezorlib/tools.py @@ -389,14 +389,3 @@ class EnumAdapter(construct.Adapter): return self.enum(obj) except ValueError: return obj - - -class TupleAdapter(construct.Adapter): - def __init__(self, *subcons: Any) -> None: - super().__init__(construct.Sequence(*subcons)) - - def _encode(self, obj: Any, ctx: Any, path: Any): - return obj - - def _decode(self, obj: Any, ctx: Any, path: Any): - return tuple(obj) diff --git a/python/tests/test_firmware.py b/python/tests/test_firmware.py deleted file mode 100644 index 6df1c5a419..0000000000 --- a/python/tests/test_firmware.py +++ /dev/null @@ -1,153 +0,0 @@ -from pathlib import Path - -import construct -import pytest -import requests - -from trezorlib import firmware -from trezorlib.firmware import ( - VendorFirmware, - LegacyFirmware, - LegacyV2Firmware, - VendorHeader, -) - -CORE_FW_VERSION = "2.4.2" -CORE_FW_FINGERPRINT = "54ccf155510b5292bd17ed748409d0d135112e24e62eb74184639460beecb213" -LEGACY_FW_VERSION = "1.10.3" -LEGACY_FW_FINGERPRINT = ( - "bf0cc936a9afbf0a4ae7b727a2817fb69fba432d7230a0ff7b79b4a73b845197" -) - -CORE_FW = f"https://data.trezor.io/firmware/2/trezor-{CORE_FW_VERSION}.bin" -LEGACY_FW = f"https://data.trezor.io/firmware/1/trezor-{LEGACY_FW_VERSION}.bin" - -HERE = Path(__file__).parent - -VENDOR_HEADER = ( - HERE.parent.parent - / "core" - / "embed" - / "vendorheader" - / "vendorheader_satoshilabs_signed_prod.bin" -) - - -def _fetch(url: str, version: str) -> bytes: - path = HERE / f"trezor-{version}.bin" - if not path.exists(): - r = requests.get(url) - r.raise_for_status() - path.write_bytes(r.content) - return path.read_bytes() - - -@pytest.fixture() -def legacy_fw() -> bytes: - return _fetch(LEGACY_FW, LEGACY_FW_VERSION) - - -@pytest.fixture() -def core_fw() -> bytes: - return _fetch(CORE_FW, CORE_FW_VERSION) - - -def test_core_basic(core_fw: bytes) -> None: - fw = VendorFirmware.parse(core_fw) - fw.verify() - assert fw.digest().hex() == CORE_FW_FINGERPRINT - version_str = ".".join(str(x) for x in fw.firmware.header.version) - assert version_str.startswith(CORE_FW_VERSION) - assert fw.vendor_header.text == "SatoshiLabs" - assert fw.build() == core_fw - - -def test_vendor_header(core_fw: bytes) -> None: - fw = VendorFirmware.parse(core_fw) - - vh_data = fw.vendor_header.build() - assert vh_data in core_fw - assert vh_data == VENDOR_HEADER.read_bytes() - - vh = VendorHeader.parse(vh_data) - assert vh == fw.vendor_header - vh.verify() - - with pytest.raises(construct.ConstructError): - VendorFirmware.parse(vh_data) - - -def test_core_code_hashes(core_fw: bytes) -> None: - fw = VendorFirmware.parse(core_fw) - fw.firmware.header.hashes = [] - assert fw.digest().hex() == CORE_FW_FINGERPRINT - - -def test_legacy_basic(legacy_fw: bytes) -> None: - fw = LegacyFirmware.parse(legacy_fw) - fw.verify() - assert fw.digest().hex() == LEGACY_FW_FINGERPRINT - assert fw.build() == legacy_fw - - -def test_unsigned(legacy_fw: bytes) -> None: - legacy = LegacyFirmware.parse(legacy_fw) - - legacy.verify() - legacy.key_indexes = [0, 0, 0] - legacy.signatures = [b"", b"", b""] - - with pytest.raises(firmware.Unsigned): - legacy.verify() - - assert legacy.embedded_v2 is not None - legacy.embedded_v2.verify() - - legacy.embedded_v2.header.v1_key_indexes = [0, 0, 0] - legacy.embedded_v2.header.v1_signatures = [b"", b"", b""] - with pytest.raises(firmware.Unsigned): - legacy.embedded_v2.verify() - - -def test_disallow_unsigned(core_fw: bytes) -> None: - core = VendorFirmware.parse(core_fw) - core.firmware.header.sigmask = 0 - core.firmware.header.signature = b"" - with pytest.raises(firmware.InvalidSignatureError): - core.verify() - - -def test_embedded_v2(legacy_fw: bytes) -> None: - legacy = LegacyFirmware.parse(legacy_fw) - assert legacy.embedded_v2 is not None - legacy.embedded_v2.verify() - - embedded_data = legacy.embedded_v2.build() - cutoff_data = legacy_fw[256:] - assert cutoff_data == embedded_data - embedded = LegacyV2Firmware.parse(cutoff_data) - assert embedded == legacy.embedded_v2 - - -def test_integrity_legacy(legacy_fw: bytes) -> None: - legacy = LegacyFirmware.parse(legacy_fw) - legacy.verify() - - modified_data = bytearray(legacy_fw) - modified_data[-1] ^= 0x01 - modified = LegacyFirmware.parse(modified_data) - assert modified.digest() != legacy.digest() - with pytest.raises(firmware.InvalidSignatureError): - modified.verify() - - -def test_integrity_core(core_fw: bytes) -> None: - core = VendorFirmware.parse(core_fw) - core.verify() - - modified_data = bytearray(core_fw) - modified_data[-1] ^= 0x01 - modified = VendorFirmware.parse(modified_data) - assert modified.digest() != core.digest() - with pytest.raises(firmware.FirmwareIntegrityError): - modified.verify() diff --git a/python/tools/firmware-fingerprint.py b/python/tools/firmware-fingerprint.py index 4bb67a0423..33f5c84def 100755 --- a/python/tools/firmware-fingerprint.py +++ b/python/tools/firmware-fingerprint.py @@ -22,6 +22,7 @@ from typing import BinaryIO, TextIO import click from trezorlib import firmware +from trezorlib._internal import firmware_headers @click.command() @@ -32,11 +33,20 @@ def firmware_fingerprint(filename: BinaryIO, output: TextIO) -> None: data = filename.read() try: - click.echo(firmware.parse(data).digest().hex(), file=output) + version, fw = firmware.parse(data) + + # Unsigned production builds for Trezor T do not have valid code hashes. + # Use the internal module which recomputes them first. + if version == firmware.FirmwareFormat.TREZOR_T: + fingerprint = firmware_headers.FirmwareImage(fw).digest() + else: + fingerprint = firmware.digest(version, fw) except Exception as e: click.echo(e, err=True) sys.exit(2) + click.echo(fingerprint.hex(), file=output) + if __name__ == "__main__": firmware_fingerprint()