diff --git a/poetry.lock b/poetry.lock index bd73c8b4a..60ef9018d 100644 --- a/poetry.lock +++ b/poetry.lock @@ -140,6 +140,17 @@ python-versions = ">=3.6" [package.extras] extras = ["arrow", "cloudpickle", "enum34", "lz4", "numpy", "ruamel.yaml"] +[[package]] +name = "construct-classes" +version = "0.1.2" +description = "Parse your binary structs into dataclasses" +category = "main" +optional = false +python-versions = ">=3.6.2,<4.0" + +[package.dependencies] +construct = ">=2.10,<3.0" + [[package]] name = "coverage" version = "4.5.4" @@ -569,8 +580,8 @@ python-versions = ">=3.6" importlib-metadata = {version = ">=0.12", markers = "python_version < \"3.8\""} [package.extras] -dev = ["pre-commit", "tox"] -testing = ["pytest", "pytest-benchmark"] +testing = ["pytest-benchmark", "pytest"] +dev = ["tox", "pre-commit"] [[package]] name = "protobuf" @@ -835,8 +846,8 @@ click = ">=7,<9" colorama = "*" [package.extras] -dev = ["black", "flake8", "isort"] tests = ["pytest"] +dev = ["isort", "flake8", "black"] [[package]] name = "simple-rlp" @@ -939,6 +950,7 @@ develop = true [package.dependencies] click = ">=7,<8.2" construct = ">=2.9,<2.10.55 || >2.10.55" +construct-classes = ">=0.1.2" ecdsa = ">=0.9" libusb1 = ">=1.6.4" mnemonic = ">=0.20" @@ -1067,7 +1079,31 @@ attrs = [ autoflake = [ {file = "autoflake-1.4.tar.gz", hash = "sha256:61a353012cff6ab94ca062823d1fb2f692c4acda51c76ff83a8d77915fba51ea"}, ] -black = [] +black = [ + {file = "black-22.3.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:2497f9c2386572e28921fa8bec7be3e51de6801f7459dffd6e62492531c47e09"}, + {file = "black-22.3.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:5795a0375eb87bfe902e80e0c8cfaedf8af4d49694d69161e5bd3206c18618bb"}, + {file = "black-22.3.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:e3556168e2e5c49629f7b0f377070240bd5511e45e25a4497bb0073d9dda776a"}, + {file = "black-22.3.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:67c8301ec94e3bcc8906740fe071391bce40a862b7be0b86fb5382beefecd968"}, + {file = "black-22.3.0-cp310-cp310-win_amd64.whl", hash = "sha256:fd57160949179ec517d32ac2ac898b5f20d68ed1a9c977346efbac9c2f1e779d"}, + {file = "black-22.3.0-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:cc1e1de68c8e5444e8f94c3670bb48a2beef0e91dddfd4fcc29595ebd90bb9ce"}, + {file = "black-22.3.0-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6d2fc92002d44746d3e7db7cf9313cf4452f43e9ea77a2c939defce3b10b5c82"}, + {file = "black-22.3.0-cp36-cp36m-win_amd64.whl", hash = "sha256:a6342964b43a99dbc72f72812bf88cad8f0217ae9acb47c0d4f141a6416d2d7b"}, + {file = "black-22.3.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:328efc0cc70ccb23429d6be184a15ce613f676bdfc85e5fe8ea2a9354b4e9015"}, + {file = "black-22.3.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:06f9d8846f2340dfac80ceb20200ea5d1b3f181dd0556b47af4e8e0b24fa0a6b"}, + {file = "black-22.3.0-cp37-cp37m-win_amd64.whl", hash = "sha256:ad4efa5fad66b903b4a5f96d91461d90b9507a812b3c5de657d544215bb7877a"}, + {file = "black-22.3.0-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:e8477ec6bbfe0312c128e74644ac8a02ca06bcdb8982d4ee06f209be28cdf163"}, + {file = "black-22.3.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:637a4014c63fbf42a692d22b55d8ad6968a946b4a6ebc385c5505d9625b6a464"}, + {file = "black-22.3.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:863714200ada56cbc366dc9ae5291ceb936573155f8bf8e9de92aef51f3ad0f0"}, + {file = "black-22.3.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:10dbe6e6d2988049b4655b2b739f98785a884d4d6b85bc35133a8fb9a2233176"}, + {file = "black-22.3.0-cp38-cp38-win_amd64.whl", hash = "sha256:cee3e11161dde1b2a33a904b850b0899e0424cc331b7295f2a9698e79f9a69a0"}, + {file = "black-22.3.0-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:5891ef8abc06576985de8fa88e95ab70641de6c1fca97e2a15820a9b69e51b20"}, + {file = "black-22.3.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:30d78ba6bf080eeaf0b7b875d924b15cd46fec5fd044ddfbad38c8ea9171043a"}, + {file = "black-22.3.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:ee8f1f7228cce7dffc2b464f07ce769f478968bfb3dd1254a4c2eeed84928aad"}, + {file = "black-22.3.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6ee227b696ca60dd1c507be80a6bc849a5a6ab57ac7352aad1ffec9e8b805f21"}, + {file = "black-22.3.0-cp39-cp39-win_amd64.whl", hash = "sha256:9b542ced1ec0ceeff5b37d69838106a6348e60db7b8fdd245294dc1d26136265"}, + {file = "black-22.3.0-py3-none-any.whl", hash = "sha256:bc58025940a896d7e5356952228b68f793cf5fcb342be703c3a2669a1488cb72"}, + {file = "black-22.3.0.tar.gz", hash = "sha256:35020b8886c022ced9282b51b5a875b6d1ab0c387b31a065b84db7c33085ca79"}, +] certifi = [ {file = "certifi-2021.10.8-py2.py3-none-any.whl", hash = "sha256:d62a0163eb4c2344ac042ab2bdf75399a71a2d8c7d47eac2e2ee91b9d6339569"}, {file = "certifi-2021.10.8.tar.gz", hash = "sha256:78884e7c1d4b00ce3cea67b44566851c4343c120abd683433ce934a68ea58872"}, @@ -1142,6 +1178,10 @@ colorama = [ construct = [ {file = "construct-2.10.67.tar.gz", hash = "sha256:730235fedf4f2fee5cfadda1d14b83ef1bf23790fb1cc579073e10f70a050883"}, ] +construct-classes = [ + {file = "construct-classes-0.1.2.tar.gz", hash = "sha256:72ac1abbae5bddb4918688713f991f5a7fb6c9b593646a82f4bf3ac53de7eeb5"}, + {file = "construct_classes-0.1.2-py3-none-any.whl", hash = "sha256:e82437261790758bda41e45fb3d5622b54cfbf044ceb14774af68346faf5e08e"}, +] coverage = [ {file = "coverage-4.5.4-cp26-cp26m-macosx_10_12_x86_64.whl", hash = "sha256:eee64c616adeff7db37cc37da4180a3a5b6177f5c46b187894e633f088fb5b28"}, {file = "coverage-4.5.4-cp27-cp27m-macosx_10_12_x86_64.whl", hash = "sha256:ef824cad1f980d27f26166f86856efe11eff9912c4fed97d3804820d43fa550c"}, @@ -1219,7 +1259,10 @@ ecdsa = [ ed25519 = [ {file = "ed25519-1.5.tar.gz", hash = "sha256:02053ee019ceef0df97294be2d4d5a8fc120fc86e81e08bec1245fc0f9403358"}, ] -execnet = [] +execnet = [ + {file = "execnet-1.9.0-py2.py3-none-any.whl", hash = "sha256:a295f7cc774947aac58dde7fdc85f4aa00c42adf5d8f5468fc630c1acf30a142"}, + {file = "execnet-1.9.0.tar.gz", hash = "sha256:8f694f3ba9cc92cab508b152dcfe322153975c29bda272e2fd7f3f00f36e47c5"}, +] fido2 = [ {file = "fido2-0.8.1.tar.gz", hash = "sha256:449068f6876f397c8bb96ebc6a75c81c2692f045126d3f13ece21d409acdf7c3"}, ] @@ -1562,7 +1605,10 @@ pytest = [ {file = "pytest-6.2.5-py3-none-any.whl", hash = "sha256:7310f8d27bc79ced999e760ca304d69f6ba6c6649c0b60fb0e04a4a77cacc134"}, {file = "pytest-6.2.5.tar.gz", hash = "sha256:131b36680866a76e6781d13f101efb86cf674ebb9762eb70d3082b6f29889e89"}, ] -pytest-forked = [] +pytest-forked = [ + {file = "pytest-forked-1.4.0.tar.gz", hash = "sha256:8b67587c8f98cbbadfdd804539ed5455b6ed03802203485dd2f53c1422d7440e"}, + {file = "pytest_forked-1.4.0-py3-none-any.whl", hash = "sha256:bbbb6717efc886b9d64537b41fb1497cfaf3c9601276be8da2cccfea5a3c8ad8"}, +] pytest-ordering = [ {file = "pytest-ordering-0.6.tar.gz", hash = "sha256:561ad653626bb171da78e682f6d39ac33bb13b3e272d406cd555adb6b006bda6"}, {file = "pytest_ordering-0.6-py2-none-any.whl", hash = "sha256:27fba3fc265f5d0f8597e7557885662c1bdc1969497cd58aff6ed21c3b617de2"}, @@ -1576,7 +1622,10 @@ pytest-timeout = [ {file = "pytest-timeout-2.1.0.tar.gz", hash = "sha256:c07ca07404c612f8abbe22294b23c368e2e5104b521c1790195561f37e1ac3d9"}, {file = "pytest_timeout-2.1.0-py3-none-any.whl", hash = "sha256:f6f50101443ce70ad325ceb4473c4255e9d74e3c7cd0ef827309dfa4c0d975c6"}, ] -pytest-xdist = [] +pytest-xdist = [ + {file = "pytest-xdist-2.5.0.tar.gz", hash = "sha256:4580deca3ff04ddb2ac53eba39d76cb5dd5edeac050cb6fbc768b0dd712b4edf"}, + {file = "pytest_xdist-2.5.0-py3-none-any.whl", hash = "sha256:6fe5c74fec98906deb8f2d2b616b5c782022744978e7bd4695d39c8f42d0ce65"}, +] python-bitcoinlib = [ {file = "python-bitcoinlib-0.11.0.tar.gz", hash = "sha256:3daafd63cb755f6e2067b7c9c514053856034c9f9363c80c37007744d54a2e06"}, {file = "python_bitcoinlib-0.11.0-py3-none-any.whl", hash = "sha256:6e7982734637135599e2136d3c88d622f147e3b29201636665f799365784cd9e"}, @@ -1589,6 +1638,13 @@ pyyaml = [ {file = "PyYAML-6.0-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:f84fbc98b019fef2ee9a1cb3ce93e3187a6df0b2538a651bfb890254ba9f90b5"}, {file = "PyYAML-6.0-cp310-cp310-win32.whl", hash = "sha256:2cd5df3de48857ed0544b34e2d40e9fac445930039f3cfe4bcc592a1f836d513"}, {file = "PyYAML-6.0-cp310-cp310-win_amd64.whl", hash = "sha256:daf496c58a8c52083df09b80c860005194014c3698698d1a57cbcfa182142a3a"}, + {file = "PyYAML-6.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:d4b0ba9512519522b118090257be113b9468d804b19d63c71dbcf4a48fa32358"}, + {file = "PyYAML-6.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:81957921f441d50af23654aa6c5e5eaf9b06aba7f0a19c18a538dc7ef291c5a1"}, + {file = "PyYAML-6.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:afa17f5bc4d1b10afd4466fd3a44dc0e245382deca5b3c353d8b757f9e3ecb8d"}, + {file = "PyYAML-6.0-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:dbad0e9d368bb989f4515da330b88a057617d16b6a8245084f1b05400f24609f"}, + {file = "PyYAML-6.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:432557aa2c09802be39460360ddffd48156e30721f5e8d917f01d31694216782"}, + {file = "PyYAML-6.0-cp311-cp311-win32.whl", hash = "sha256:bfaef573a63ba8923503d27530362590ff4f576c626d86a9fed95822a8255fd7"}, + {file = "PyYAML-6.0-cp311-cp311-win_amd64.whl", hash = "sha256:01b45c0191e6d66c470b6cf1b9531a771a83c1c4208272ead47a3ae4f2f603bf"}, {file = "PyYAML-6.0-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:897b80890765f037df3403d22bab41627ca8811ae55e9a722fd0392850ec4d86"}, {file = "PyYAML-6.0-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:50602afada6d6cbfad699b0c7bb50d5ccffa7e46a3d738092afddc1f9758427f"}, {file = "PyYAML-6.0-cp36-cp36m-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:48c346915c114f5fdb3ead70312bd042a953a8ce5c7106d5bfb1a5254e47da92"}, diff --git a/python/.changelog.d/2576.incompatible b/python/.changelog.d/2576.incompatible new file mode 100644 index 000000000..bf5637794 --- /dev/null +++ b/python/.changelog.d/2576.incompatible @@ -0,0 +1 @@ +Refactored firmware parsing and validation to a more object oriented approach. diff --git a/python/requirements.txt b/python/requirements.txt index 6db6a087a..916fad94f 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -7,3 +7,4 @@ construct>=2.9,!=2.10.55 typing_extensions>=3.10 dataclasses ; python_version<'3.7' simple-rlp>=0.1.2 ; python_version>='3.7' +construct-classes>=0.1.2 diff --git a/python/src/trezorlib/_internal/firmware_headers.py b/python/src/trezorlib/_internal/firmware_headers.py index e935f7817..18f8f7d38 100644 --- a/python/src/trezorlib/_internal/firmware_headers.py +++ b/python/src/trezorlib/_internal/firmware_headers.py @@ -14,13 +14,15 @@ # You should have received a copy of the License along with this library. # If not, see . -import struct +import typing as t +from copy import copy +from dataclasses import asdict from enum import Enum -from hashlib import blake2s -from typing import Any, List, Optional import click import construct as c +from construct_classes import Struct +from typing_extensions import Protocol, Self, runtime_checkable from .. import cosi, firmware @@ -43,48 +45,25 @@ VHASH_DEVEL = bytes.fromhex( ) -AnyFirmware = c.Struct( - "vendor_header" / c.Optional(firmware.VendorHeader), - "image" / c.Optional(firmware.FirmwareImage), -) - - class ImageType(Enum): VENDOR_HEADER = 0 BOOTLOADER = 1 FIRMWARE = 2 -def _make_dev_keys(*key_bytes: bytes) -> List[bytes]: +def _make_dev_keys(*key_bytes: bytes) -> t.Sequence[bytes]: return [k * 32 for k in key_bytes] -def compute_vhash(vendor_header: c.Container) -> bytes: - m = vendor_header.sig_m - n = vendor_header.sig_n - pubkeys = vendor_header.pubkeys - h = blake2s() - h.update(struct.pack(" bool: return all(b == 0 for b in data) -def _check_signature_any( - header: c.Container, m: int, pubkeys: List[bytes], is_devel: bool -) -> Status: - if all_zero(header.signature) and header.sigmask == 0: +def _check_signature_any(fw: "SignableImageProto", is_devel: bool) -> Status: + if not fw.signature_present(): return Status.MISSING try: - digest = firmware.header_digest(header) - cosi.verify(header.signature, digest, m, pubkeys, header.sigmask) + fw.verify() return Status.VALID if not is_devel else Status.DEVEL except Exception: return Status.INVALID @@ -98,11 +77,11 @@ class LiteralStr(str): def _format_container( - pb: c.Container, + pb: t.Union[c.Container, Struct, dict], indent: int = 0, sep: str = " " * 4, - truncate_after: Optional[int] = 64, - truncate_to: Optional[int] = 32, + truncate_after: t.Optional[int] = 64, + truncate_to: t.Optional[int] = 32, ) -> str: def mostly_printable(bytes: bytes) -> bool: if not bytes: @@ -110,7 +89,7 @@ def _format_container( printable = sum(1 for byte in bytes if 0x20 <= byte <= 0x7E) return printable / len(bytes) > 0.8 - def pformat(value: Any, indent: int) -> str: + def pformat(value: t.Any, indent: int) -> str: level = sep * indent leadin = sep * (indent + 1) @@ -127,6 +106,9 @@ def _format_container( lines[1:1] = [leadin + pformat(x, indent + 1) for x in value] return "\n".join(lines) + if isinstance(value, Struct): + value = asdict(value) + if isinstance(value, dict): lines = ["{"] for key, val in value.items(): @@ -158,88 +140,140 @@ def _format_container( return pformat(pb, indent) -def _format_version(version: c.Container) -> str: - version_str = ".".join( - str(version[k]) for k in ("major", "minor", "patch") if k in version - ) - if "build" in version: - version_str += f" build {version.build}" - return version_str +def _format_version(version: t.Tuple[int, ...]) -> str: + return ".".join(str(i) for i in version) + + +def format_header( + header: firmware.core.FirmwareHeader, + code_hashes: t.Sequence[bytes], + digest: bytes, + sig_status: Status, +) -> str: + header_dict = asdict(header) + header_out = header_dict.copy() + + for key, val in header_out.items(): + if "version" in key: + header_out[key] = LiteralStr(_format_version(val)) + + hashes_out = [] + for expected, actual in zip(header.hashes, code_hashes): + status = SYM_OK if expected == actual else SYM_FAIL + hashes_out.append(LiteralStr(f"{status} {expected.hex()}")) + + if all(all_zero(h) for h in header.hashes): + hash_status = Status.MISSING + elif header.hashes != code_hashes: + hash_status = Status.INVALID + else: + hash_status = Status.VALID + + header_out["hashes"] = hashes_out + + all_ok = SYM_OK if hash_status.is_ok() and sig_status.is_ok() else SYM_FAIL + + output = [ + "Firmware Header " + _format_container(header_out), + f"Fingerprint: {click.style(digest.hex(), bold=True)}", + f"{all_ok} Signature is {sig_status.value}, hashes are {hash_status.value}", + ] + + return "\n".join(output) # =========================== functionality implementations =============== -class SignableImage: - NAME = "Unrecognized image" - BIP32_INDEX: Optional[int] = None - DEV_KEYS: List[bytes] = [] - DEV_KEY_SIGMASK = 0b11 +class SignableImageProto(Protocol): + NAME: t.ClassVar[str] - def __init__(self, fw: c.Container) -> None: - self.fw = fw - self.header: Any - self.public_keys: List[bytes] - self.sigs_required = firmware.V2_SIGS_REQUIRED + @classmethod + def parse(cls, data: bytes) -> Self: + ... def digest(self) -> bytes: - return firmware.header_digest(self.header) + ... - def check_signature(self) -> Status: - raise NotImplementedError + def verify(self) -> None: + ... - def rehash(self) -> None: - pass + def build(self) -> bytes: + ... + + def format(self, verbose: bool = False) -> str: + ... + + def signature_present(self) -> bool: + ... + + def public_keys(self) -> t.Sequence[bytes]: + ... + + +@runtime_checkable +class CosiSignedImage(SignableImageProto, Protocol): + DEV_KEYS: t.ClassVar[t.Sequence[bytes]] = [] def insert_signature(self, signature: bytes, sigmask: int) -> None: - self.header.signature = signature - self.header.sigmask = sigmask + ... + + +@runtime_checkable +class LegacySignedImage(SignableImageProto, Protocol): + def slots(self) -> t.Iterable[int]: + ... - def dump(self) -> bytes: - return AnyFirmware.build(self.fw) + def insert_signature(self, slot: int, key_index: int, signature: bytes) -> None: + ... - def format(self, verbose: bool) -> str: - return _format_container(self.fw) +class CosiSignatureHeaderProto(Protocol): + signature: bytes + sigmask: int -class VendorHeader(SignableImage): + +class CosiSignedMixin: + def signature_present(self) -> bool: + header = self.get_header() + return not all_zero(header.signature) or header.sigmask != 0 + + def insert_signature(self, signature: bytes, sigmask: int) -> None: + self.get_header().signature = signature + self.get_header().sigmask = sigmask + + def get_header(self) -> CosiSignatureHeaderProto: + raise NotImplementedError + + +class VendorHeader(firmware.VendorHeader, CosiSignedMixin): NAME = "vendorheader" - BIP32_INDEX = 1 DEV_KEYS = _make_dev_keys(b"\x44", b"\x45") - def __init__(self, fw: c.Container) -> None: - super().__init__(fw) - self.header = fw.vendor_header - self.public_keys = firmware.V2_BOOTLOADER_KEYS + SUBCON = c.Struct(*firmware.VendorHeader.SUBCON.subcons, c.Terminated) - def check_signature(self) -> Status: - return _check_signature_any( - self.header, self.sigs_required, self.public_keys, False - ) + def get_header(self) -> CosiSignatureHeaderProto: + return self def _format(self, terse: bool) -> str: - vh = self.fw.vendor_header if not terse: - vhash = compute_vhash(vh) output = [ - "Vendor Header " + _format_container(vh), - f"Pubkey bundle hash: {vhash.hex()}", + "Vendor Header " + _format_container(self), + f"Pubkey bundle hash: {self.vhash().hex()}", ] else: output = [ "Vendor Header for {vendor} version {version} ({size} bytes)".format( - vendor=click.style(vh.text, bold=True), - version=_format_version(vh.version), - size=vh.header_len, + vendor=click.style(self.text, bold=True), + version=_format_version(self.version), + size=self.header_len, ), ] - fingerprint = firmware.header_digest(vh) - if not terse: - output.append(f"Fingerprint: {click.style(fingerprint.hex(), bold=True)}") + output.append(f"Fingerprint: {click.style(self.digest().hex(), bold=True)}") - sig_status = self.check_signature() + sig_status = _check_signature_any(self, is_devel=False) sym = SYM_OK if sig_status.is_ok() else SYM_FAIL output.append(f"{sym} Signature is {sig_status.value}") @@ -248,138 +282,168 @@ class VendorHeader(SignableImage): def format(self, verbose: bool = False) -> str: return self._format(terse=False) + def public_keys(self) -> t.Sequence[bytes]: + return firmware.V2_BOOTLOADER_KEYS + + +class VendorFirmware(firmware.VendorFirmware, CosiSignedMixin): + NAME = "firmware" + DEV_KEYS = _make_dev_keys(b"\x47", b"\x48") + + def get_header(self) -> CosiSignatureHeaderProto: + return self.firmware.header -class BinImage(SignableImage): - def __init__(self, fw: c.Container) -> None: - super().__init__(fw) - self.header = self.fw.image.header - self.code_hashes = firmware.calculate_code_hashes( - self.fw.image.code, self.fw.image._code_offset + def format(self, verbose: bool = False) -> str: + vh = copy(self.vendor_header) + vh.__class__ = VendorHeader + assert isinstance(vh, VendorHeader) + + is_devel = self.vendor_header.vhash() == VHASH_DEVEL + + return ( + vh._format(terse=not verbose) + + "\n" + + format_header( + self.firmware.header, + self.firmware.code_hashes(), + self.digest(), + _check_signature_any(self, is_devel), + ) ) - self.digest_header = self.header.copy() - self.digest_header.hashes = self.code_hashes - def insert_signature(self, signature: bytes, sigmask: int) -> None: - super().insert_signature(signature, sigmask) - self.digest_header.signature = signature - self.digest_header.sigmask = sigmask + def public_keys(self) -> t.Sequence[bytes]: + return self.vendor_header.pubkeys - def digest(self) -> bytes: - return firmware.header_digest(self.digest_header) - def rehash(self) -> None: - self.header.hashes = self.code_hashes +class BootloaderImage(firmware.FirmwareImage, CosiSignedMixin): + NAME = "bootloader" + DEV_KEYS = _make_dev_keys(b"\x41", b"\x42") - def format(self, verbose: bool = False) -> str: - header_out = self.header.copy() + def get_header(self) -> CosiSignatureHeaderProto: + return self.header - if not verbose: - for key in self.header: - if key.startswith("v1"): - del header_out[key] - if "version" in key: - header_out[key] = LiteralStr(_format_version(self.header[key])) + def format(self, verbose: bool = False) -> str: + return format_header( + self.header, + self.code_hashes(), + self.digest(), + _check_signature_any(self, False), + ) - all_ok = SYM_OK - hash_status = Status.VALID - sig_status = Status.VALID + def verify(self) -> None: + self.validate_code_hashes() + try: + cosi.verify( + self.header.signature, + self.digest(), + firmware.V2_SIGS_REQUIRED, + firmware.V2_BOARDLOADER_KEYS, + self.header.sigmask, + ) + except Exception: + raise firmware.InvalidSignatureError("Invalid bootloader signature") + + def public_keys(self) -> t.Sequence[bytes]: + return firmware.V2_BOARDLOADER_KEYS + + +class LegacyFirmware(firmware.LegacyFirmware): + NAME = "legacy_firmware_v1" + BIP32_INDEX = None + + def signature_present(self) -> bool: + return any(i != 0 for i in self.key_indexes) or any( + not all_zero(sig) for sig in self.signatures + ) - hashes_out = [] - for expected, actual in zip(self.header.hashes, self.code_hashes): - status = SYM_OK if expected == actual else SYM_FAIL - hashes_out.append(LiteralStr(f"{status} {expected.hex()}")) + def insert_signature(self, slot: int, key_index: int, signature: bytes) -> None: + if not 0 <= slot < firmware.V1_SIGNATURE_SLOTS: + raise ValueError("Invalid slot number") + if not 0 <= key_index < len(firmware.V1_BOOTLOADER_KEYS): + raise ValueError("Invalid key index") + self.key_indexes[slot] = key_index + self.signatures[slot] = signature - if all(all_zero(h) for h in self.header.hashes): - hash_status = Status.MISSING - elif self.header.hashes != self.code_hashes: - hash_status = Status.INVALID + def format(self, verbose: bool = False) -> str: + contents = asdict(self).copy() + del contents["embedded_v2"] + if self.embedded_v2: + em = copy(self.embedded_v2) + em.__class__ = LegacyV2Firmware + assert isinstance(em, LegacyV2Firmware) + embedded_content = "\nEmbedded V2 header: " + em.format(verbose=verbose) else: - hash_status = Status.VALID - - header_out["hashes"] = hashes_out + embedded_content = "" - sig_status = self.check_signature() - all_ok = SYM_OK if hash_status.is_ok() and sig_status.is_ok() else SYM_FAIL + return _format_container(contents) + embedded_content - output = [ - "Firmware Header " + _format_container(header_out), - f"Fingerprint: {click.style(self.digest().hex(), bold=True)}", - f"{all_ok} Signature is {sig_status.value}, hashes are {hash_status.value}", - ] + def public_keys(self) -> t.Sequence[bytes]: + return firmware.V1_BOOTLOADER_KEYS - return "\n".join(output) + def slots(self) -> t.Iterable[int]: + return self.key_indexes -class FirmwareImage(BinImage): - NAME = "firmware" - BIP32_INDEX = 2 - DEV_KEYS = _make_dev_keys(b"\x47", b"\x48") +class LegacyV2Firmware(firmware.LegacyV2Firmware): + NAME = "legacy_firmware_v2" + BIP32_INDEX = 5 - def __init__(self, fw: c.Container) -> None: - super().__init__(fw) - self.public_keys = fw.vendor_header.pubkeys - self.sigs_required = fw.vendor_header.sig_m - - def check_signature(self) -> Status: - vhash = compute_vhash(self.fw.vendor_header) - return _check_signature_any( - self.digest_header, - self.sigs_required, - self.public_keys, - vhash == VHASH_DEVEL, + def signature_present(self) -> bool: + return any(i != 0 for i in self.header.v1_key_indexes) or any( + not all_zero(sig) for sig in self.header.v1_signatures ) + def insert_signature(self, slot: int, key_index: int, signature: bytes) -> None: + if not 0 <= slot < firmware.V1_SIGNATURE_SLOTS: + raise ValueError("Invalid slot number") + if not 0 <= key_index < len(firmware.V1_BOOTLOADER_KEYS): + raise ValueError("Invalid key index") + if not isinstance(self.header.v1_key_indexes, list): + self.header.v1_key_indexes = list(self.header.v1_key_indexes) + if not isinstance(self.header.v1_signatures, list): + self.header.v1_signatures = list(self.header.v1_signatures) + self.header.v1_key_indexes[slot] = key_index + self.header.v1_signatures[slot] = signature + def format(self, verbose: bool = False) -> str: - return ( - VendorHeader(self.fw)._format(terse=not verbose) - + "\n" - + super().format(verbose) + return format_header( + self.header, + self.code_hashes(), + self.digest(), + _check_signature_any(self, False), ) + def public_keys(self) -> t.Sequence[bytes]: + return firmware.V1_BOOTLOADER_KEYS -class BootloaderImage(BinImage): - NAME = "bootloader" - BIP32_INDEX = 0 - DEV_KEYS = _make_dev_keys(b"\x41", b"\x42") + def slots(self) -> t.Iterable[int]: + return self.header.v1_key_indexes - def __init__(self, fw: c.Container) -> None: - super().__init__(fw) - self._identify_dev_keys() - def insert_signature(self, signature: bytes, sigmask: int) -> None: - super().insert_signature(signature, sigmask) - self._identify_dev_keys() - - def _identify_dev_keys(self) -> None: - # try checking signature with dev keys first - self.public_keys = firmware.V2_BOARDLOADER_DEV_KEYS - if not self.check_signature().is_ok(): - # validation with dev keys failed, use production keys - self.public_keys = firmware.V2_BOARDLOADER_KEYS - - def check_signature(self) -> Status: - return _check_signature_any( - self.header, - self.sigs_required, - self.public_keys, - self.public_keys == firmware.V2_BOARDLOADER_DEV_KEYS, - ) +def parse_image(image: bytes) -> SignableImageProto: + try: + return VendorFirmware.parse(image) + except c.ConstructError: + pass + try: + return VendorHeader.parse(image) + except c.ConstructError: + pass + + try: + firmware_img = firmware.core.FirmwareImage.parse(image) + if firmware_img.header.magic == firmware.core.HeaderType.BOOTLOADER: + return BootloaderImage.parse(image) + if firmware_img.header.magic == firmware.core.HeaderType.FIRMWARE: + return LegacyV2Firmware.parse(image) + raise ValueError("Unrecognized firmware header magic") + except c.ConstructError: + pass + + try: + return LegacyFirmware.parse(image) + except c.ConstructError: + pass -def parse_image(image: bytes) -> SignableImage: - fw = AnyFirmware.parse(image) - if fw.vendor_header and not fw.image: - return VendorHeader(fw) - if ( - not fw.vendor_header - and fw.image - and fw.image.header.magic == firmware.HeaderType.BOOTLOADER - ): - return BootloaderImage(fw) - if ( - fw.vendor_header - and fw.image - and fw.image.header.magic == firmware.HeaderType.FIRMWARE - ): - return FirmwareImage(fw) - raise ValueError("Unrecognized image type") + raise ValueError("Unrecognized firmware type") diff --git a/python/src/trezorlib/cli/firmware.py b/python/src/trezorlib/cli/firmware.py index fffbc8dbf..189d59504 100644 --- a/python/src/trezorlib/cli/firmware.py +++ b/python/src/trezorlib/cli/firmware.py @@ -26,19 +26,18 @@ from .. import exceptions, firmware from . import with_client if TYPE_CHECKING: - import construct as c from ..client import TrezorClient from . import TrezorConnection ALLOWED_FIRMWARE_FORMATS = { - 1: (firmware.FirmwareFormat.TREZOR_ONE, firmware.FirmwareFormat.TREZOR_ONE_V2), - 2: (firmware.FirmwareFormat.TREZOR_T,), + 1: (firmware.LegacyFirmware, firmware.LegacyV2Firmware), + 2: (firmware.VendorFirmware,), } -def _print_version(version: dict) -> None: - vstr = "Firmware version {major}.{minor}.{patch} build {build}".format(**version) - click.echo(vstr) +def _print_version(version: Tuple[int, int, int, int]) -> None: + major, minor, patch, build = version + click.echo(f"Firmware version {major}.{minor}.{patch} build {build}") def _is_bootloader_onev2(client: "TrezorClient") -> bool: @@ -59,32 +58,26 @@ def _get_file_name_from_url(url: str) -> str: return os.path.basename(full_path) -def print_firmware_version( - version: firmware.FirmwareFormat, - fw: "c.Container", -) -> None: +def print_firmware_version(fw: "firmware.FirmwareType") -> None: """Print out the firmware version and details.""" - if version == firmware.FirmwareFormat.TREZOR_ONE: - if fw.embedded_onev2: + if isinstance(fw, firmware.LegacyFirmware): + if fw.embedded_v2: click.echo("Trezor One firmware with embedded v2 image (1.8.0 or later)") - _print_version(fw.embedded_onev2.header.version) + _print_version(fw.embedded_v2.header.version) else: click.echo("Trezor One firmware image.") - elif version == firmware.FirmwareFormat.TREZOR_ONE_V2: + elif isinstance(fw, firmware.LegacyV2Firmware): click.echo("Trezor One v2 firmware (1.8.0 or later)") _print_version(fw.header.version) - elif version == firmware.FirmwareFormat.TREZOR_T: + elif isinstance(fw, firmware.VendorFirmware): click.echo("Trezor T firmware image.") vendor = fw.vendor_header.text - vendor_version = "{major}.{minor}".format(**fw.vendor_header.version) + vendor_version = "{}.{}".format(*fw.vendor_header.version) click.echo(f"Vendor header from {vendor}, version {vendor_version}") - _print_version(fw.image.header.version) + _print_version(fw.firmware.header.version) -def validate_signatures( - version: firmware.FirmwareFormat, - fw: "c.Container", -) -> None: +def validate_signatures(fw: "firmware.FirmwareType") -> None: """Check the signatures on the firmware. Prints the validity status. @@ -92,18 +85,25 @@ def validate_signatures( Exits if the validation fails. """ try: - firmware.validate(version, fw, allow_unsigned=False) + fw.verify() click.echo("Signatures are valid.") except firmware.Unsigned: + if not isinstance(fw, firmware.LegacyFirmware): + raise + + # allow legacy firmware without signatures if not click.confirm("No signatures found. Continue?", default=False): sys.exit(1) - try: - firmware.validate(version, fw, allow_unsigned=True) - click.echo("Unsigned firmware looking OK.") - except firmware.FirmwareIntegrityError as e: - click.echo(e) - click.echo("Firmware validation failed, aborting.") - sys.exit(4) + if firmware.is_onev2(fw): + try: + assert fw.embedded_v2 is not None + fw.embedded_v2.verify_unsigned() + except firmware.FirmwareIntegrityError as e: + click.echo(e) + click.echo("Firmware validation failed, aborting.") + sys.exit(4) + click.echo("Unsigned firmware looking OK.") + except firmware.FirmwareIntegrityError as e: click.echo(e) click.echo("Firmware validation failed, aborting.") @@ -111,8 +111,7 @@ def validate_signatures( def validate_fingerprint( - version: firmware.FirmwareFormat, - fw: "c.Container", + fw: "firmware.FirmwareType", expected_fingerprint: Optional[str] = None, ) -> None: """Determine and validate the firmware fingerprint. @@ -120,12 +119,11 @@ def validate_fingerprint( Prints the fingerprint. Exits if the validation fails. """ - fingerprint = firmware.digest(version, fw).hex() + fingerprint = fw.digest().hex() click.echo(f"Firmware fingerprint: {fingerprint}") - if version == firmware.FirmwareFormat.TREZOR_ONE and fw.embedded_onev2: - fingerprint_onev2 = firmware.digest( - firmware.FirmwareFormat.TREZOR_ONE_V2, fw.embedded_onev2 - ).hex() + if firmware.is_onev2(fw): + assert fw.embedded_v2 is not None + fingerprint_onev2 = fw.embedded_v2.digest().hex() click.echo(f"Embedded v2 image fingerprint: {fingerprint_onev2}") if expected_fingerprint and fingerprint != expected_fingerprint: click.echo(f"Expected fingerprint: {expected_fingerprint}") @@ -134,8 +132,7 @@ def validate_fingerprint( def check_device_match( - version: firmware.FirmwareFormat, - fw: "c.Container", + fw: "firmware.FirmwareType", bootloader_onev2: bool, trezor_major_version: int, ) -> None: @@ -143,24 +140,24 @@ def check_device_match( Prints error message and exits if the validation fails. """ + if trezor_major_version not in ALLOWED_FIRMWARE_FORMATS: + click.echo("trezorctl doesn't know your device version. Aborting.") + sys.exit(3) + elif not isinstance(fw, ALLOWED_FIRMWARE_FORMATS[trezor_major_version]): + click.echo("Firmware does not match your device, aborting.") + sys.exit(3) + if ( bootloader_onev2 - and version == firmware.FirmwareFormat.TREZOR_ONE - and not fw.embedded_onev2 + and isinstance(fw, firmware.LegacyFirmware) + and not fw.embedded_v2 ): click.echo("Firmware is too old for your device. Aborting.") sys.exit(3) - elif not bootloader_onev2 and version == firmware.FirmwareFormat.TREZOR_ONE_V2: + elif not bootloader_onev2 and isinstance(fw, firmware.LegacyV2Firmware): click.echo("You need to upgrade to bootloader 1.8.0 first.") sys.exit(3) - if trezor_major_version not in ALLOWED_FIRMWARE_FORMATS: - click.echo("trezorctl doesn't know your device version. Aborting.") - sys.exit(3) - elif version not in ALLOWED_FIRMWARE_FORMATS[trezor_major_version]: - click.echo("Firmware does not match your device, aborting.") - sys.exit(3) - def get_all_firmware_releases( bitcoin_only: bool, beta: bool, major_version: int @@ -348,18 +345,17 @@ def validate_firmware( - being compatible with the device (when chosen) """ try: - version, fw = firmware.parse(firmware_data) + fw = firmware.parse(firmware_data) except Exception as e: click.echo(e) sys.exit(2) - print_firmware_version(version, fw) - validate_signatures(version, fw) - validate_fingerprint(version, fw, fingerprint) + print_firmware_version(fw) + validate_signatures(fw) + validate_fingerprint(fw, fingerprint) if bootloader_onev2 is not None and trezor_major_version is not None: check_device_match( - version=version, fw=fw, bootloader_onev2=bootloader_onev2, trezor_major_version=trezor_major_version, @@ -372,7 +368,7 @@ def extract_embedded_fw( bootloader_onev2: bool, ) -> bytes: """Modify the firmware data for sending into Trezor, if necessary.""" - # special handling for embedded-OneV2 format: + # special handling for embedded_v2-OneV2 format: # for bootloader < 1.8, keep the embedding # for bootloader 1.8.0 and up, strip the old OneV1 header if ( @@ -380,7 +376,7 @@ def extract_embedded_fw( and firmware_data[:4] == b"TRZR" and firmware_data[256 : 256 + 4] == b"TRZF" ): - click.echo("Extracting embedded firmware image.") + click.echo("Extracting embedded_v2 firmware image.") return firmware_data[256:] return firmware_data diff --git a/python/src/trezorlib/cosi.py b/python/src/trezorlib/cosi.py index 77786a90f..f8e423790 100644 --- a/python/src/trezorlib/cosi.py +++ b/python/src/trezorlib/cosi.py @@ -16,7 +16,7 @@ import warnings from functools import reduce -from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple +from typing import TYPE_CHECKING, Iterable, Optional, Sequence, Tuple from . import _ed25519, messages from .tools import expect @@ -90,7 +90,7 @@ def verify( signature: Ed25519Signature, digest: bytes, sigs_required: int, - keys: List[Ed25519PublicPoint], + keys: Sequence[Ed25519PublicPoint], mask: int, ) -> None: """Verify a CoSi multi-signature. Raise exception if the signature is invalid. diff --git a/python/src/trezorlib/firmware/__init__.py b/python/src/trezorlib/firmware/__init__.py index b2bbf0a09..1979d843e 100644 --- a/python/src/trezorlib/firmware/__init__.py +++ b/python/src/trezorlib/firmware/__init__.py @@ -14,443 +14,62 @@ # You should have received a copy of the License along with this library. # If not, see . -import hashlib -from enum import Enum +import typing as t from hashlib import blake2s -from typing import TYPE_CHECKING, Any, Callable, List, Optional, Tuple -import construct as c -import ecdsa - -from . import cosi, messages -from .toif import ToifStruct -from .tools import expect, session, EnumAdapter - -if TYPE_CHECKING: - from .client import TrezorClient - -V1_SIGNATURE_SLOTS = 3 -V1_BOOTLOADER_KEYS = [ - bytes.fromhex(key) - for key in ( - "04d571b7f148c5e4232c3814f777d8faeaf1a84216c78d569b71041ffc768a5b2d810fc3bb134dd026b57e65005275aedef43e155f48fc11a32ec790a93312bd58", - "0463279c0c0866e50c05c799d32bd6bab0188b6de06536d1109d2ed9ce76cb335c490e55aee10cc901215132e853097d5432eda06b792073bd7740c94ce4516cb1", - "0443aedbb6f7e71c563f8ed2ef64ec9981482519e7ef4f4aa98b27854e8c49126d4956d300ab45fdc34cd26bc8710de0a31dbdf6de7435fd0b492be70ac75fde58", - "04877c39fd7c62237e038235e9c075dab261630f78eeb8edb92487159fffedfdf6046c6f8b881fa407c4a4ce6c28de0b19c1f4e29f1fcbc5a58ffd1432a3e0938a", - "047384c51ae81add0a523adbb186c91b906ffb64c2c765802bf26dbd13bdf12c319e80c2213a136c8ee03d7874fd22b70d68e7dee469decfbbb510ee9a460cda45", - ) -] - -V2_BOARDLOADER_KEYS = [ - bytes.fromhex(key) - for key in ( - "0eb9856be9ba7e972c7f34eac1ed9b6fd0efd172ec00faf0c589759da4ddfba0", - "ac8ab40b32c98655798fd5da5e192be27a22306ea05c6d277cdff4a3f4125cd8", - "ce0fcd12543ef5936cf2804982136707863d17295faced72af171d6e6513ff06", - ) -] - -V2_BOARDLOADER_DEV_KEYS = [ - bytes.fromhex(key) - for key in ( - "db995fe25169d141cab9bbba92baa01f9f2e1ece7df4cb2ac05190f37fcc1f9d", - "2152f8d19b791d24453242e15f2eab6cb7cffa7b6a5ed30097960e069881db12", - "22fc297792f0b6ffc0bfcfdb7edb0c0aa14e025a365ec0e342e86e3829cb74b6", - ) -] - -V2_BOOTLOADER_KEYS = [ - bytes.fromhex(key) - for key in ( - "c2c87a49c5a3460977fbb2ec9dfe60f06bd694db8244bd4981fe3b7a26307f3f", - "80d036b08739b846f4cb77593078deb25dc9487aedcf52e30b4fb7cd7024178a", - "b8307a71f552c60a4cbb317ff48b82cdbf6b6bb5f04c920fec7badf017883751", +from typing_extensions import Protocol, TypeGuard + +from .. import messages +from ..tools import expect, session +from .core import VendorFirmware +from .legacy import LegacyFirmware, LegacyV2Firmware + +# re-exports: +if True: + # indented block prevents isort from messing with these until we upgrade to 5.x + from .consts import * # noqa: F401, F403 + from .core import * # noqa: F401, F403 + from .legacy import * # noqa: F401, F403 + from .util import ( # noqa: F401 + FirmwareIntegrityError, + InvalidSignatureError, + Unsigned, ) -] - -V2_SIGS_REQUIRED = 2 - -ONEV2_CHUNK_SIZE = 1024 * 64 -V2_CHUNK_SIZE = 1024 * 128 - - -def _transform_vendor_trust(data: bytes) -> bytes: - """Byte-swap and bit-invert the VendorTrust field. - - Vendor trust is interpreted as a bitmask in a 16-bit little-endian integer, - with the added twist that 0 means set and 1 means unset. - We feed it to a `BitStruct` that expects a big-endian sequence where bits have - the traditional meaning. We must therefore do a bitwise negation of each byte, - and return them in reverse order. This is the same transformation both ways, - fortunately, so we don't need two separate functions. - """ - return bytes(~b & 0xFF for b in data)[::-1] - - -class FirmwareIntegrityError(Exception): - pass - - -class InvalidSignatureError(FirmwareIntegrityError): - pass - - -class Unsigned(FirmwareIntegrityError): - pass - - -class HeaderType(Enum): - FIRMWARE = b"TRZF" - BOOTLOADER = b"TRZB" - - -# fmt: off -VendorTrust = c.Transformed(c.BitStruct( - "_reserved" / c.Default(c.BitsInteger(9), 0), - "show_vendor_string" / c.Flag, - "require_user_click" / c.Flag, - "red_background" / c.Flag, - "delay" / c.BitsInteger(4), -), _transform_vendor_trust, 2, _transform_vendor_trust, 2) + from .vendor import * # noqa: F401, F403 +if t.TYPE_CHECKING: + from ..client import TrezorClient -VendorHeader = c.Struct( - "_start_offset" / c.Tell, - "magic" / c.Const(b"TRZV"), - "header_len" / c.Int32ul, - "expiry" / c.Int32ul, - "version" / c.Struct( - "major" / c.Int8ul, - "minor" / c.Int8ul, - ), - "sig_m" / c.Int8ul, - "sig_n" / c.Rebuild(c.Int8ul, c.len_(c.this.pubkeys)), - "trust" / VendorTrust, - "_reserved" / c.Padding(14), - "pubkeys" / c.Bytes(32)[c.this.sig_n], - "text" / c.Aligned(4, c.PascalString(c.Int8ul, "utf-8")), - "image" / ToifStruct, - "_end_offset" / c.Tell, + T = t.TypeVar("T", bound="FirmwareType") - "_min_header_len" / c.Check(c.this.header_len > (c.this._end_offset - c.this._start_offset) + 65), - "_header_len_aligned" / c.Check(c.this.header_len % 512 == 0), + class FirmwareType(Protocol): + @classmethod + def parse(cls: t.Type[T], data: bytes) -> T: + ... - c.Padding(c.this.header_len - c.this._end_offset + c.this._start_offset - 65), - "sigmask" / c.Byte, - "signature" / c.Bytes(64), -) + def verify(self, public_keys: t.Sequence[bytes] = ()) -> None: + ... + def digest(self) -> bytes: + ... -VersionLong = c.Struct( - "major" / c.Int8ul, - "minor" / c.Int8ul, - "patch" / c.Int8ul, - "build" / c.Int8ul, -) - - -FirmwareHeader = c.Struct( - "_start_offset" / c.Tell, - "magic" / EnumAdapter(c.Bytes(4), HeaderType), - "header_len" / c.Int32ul, - "expiry" / c.Int32ul, - "code_length" / c.Rebuild( - c.Int32ul, - lambda this: - len(this._.code) if "code" in this._ - else (this.code_length or 0) - ), - "version" / VersionLong, - "fix_version" / VersionLong, - "_reserved" / c.Padding(8), - "hashes" / c.Bytes(32)[16], - - "v1_signatures" / c.Bytes(64)[V1_SIGNATURE_SLOTS], - "v1_key_indexes" / c.Int8ul[V1_SIGNATURE_SLOTS], # pylint: disable=E1136 - - "_reserved" / c.Padding(220), - "sigmask" / c.Byte, - "signature" / c.Bytes(64), - - "_end_offset" / c.Tell, - - "_rebuild_header_len" / c.If( - c.this.version.major > 1, - c.Pointer( - c.this._start_offset + 4, - c.Rebuild(c.Int32ul, c.this._end_offset - c.this._start_offset) - ), - ), -) - - -"""Raw firmware image. - -Consists of firmware header and code block. -This is the expected format of firmware binaries for Trezor One, or bootloader images -for Trezor T.""" -FirmwareImage = c.Struct( - "header" / FirmwareHeader, - "_code_offset" / c.Tell, - "code" / c.Bytes(c.this.header.code_length), - c.Terminated, -) - - -"""Firmware image prefixed by a vendor header. - -This is the expected format of firmware binaries for Trezor T.""" -VendorFirmware = c.Struct( - "vendor_header" / VendorHeader, - "image" / FirmwareImage, - c.Terminated, -) - - -"""Legacy firmware image. -Consists of a custom header and code block. -This is the expected format of firmware binaries for Trezor One pre-1.8.0. - -The code block can optionally be interpreted as a new-style firmware image. That is the -expected format of firmware binary for Trezor One version 1.8.0, which can be installed -by both the older and the newer bootloader.""" -LegacyFirmware = c.Struct( - "magic" / c.Const(b"TRZR"), - "code_length" / c.Rebuild(c.Int32ul, c.len_(c.this.code)), - "key_indexes" / c.Int8ul[V1_SIGNATURE_SLOTS], # pylint: disable=E1136 - "flags" / c.BitStruct( - c.Padding(7), - "restore_storage" / c.Flag, - ), - "_reserved" / c.Padding(52), - "signatures" / c.Bytes(64)[V1_SIGNATURE_SLOTS], - "code" / c.Bytes(c.this.code_length), - c.Terminated, - - "embedded_onev2" / c.RestreamData(c.this.code, c.Optional(FirmwareImage)), -) - -# fmt: on - - -class FirmwareFormat(Enum): - TREZOR_ONE = 1 - TREZOR_T = 2 - TREZOR_ONE_V2 = 3 - - -ParsedFirmware = Tuple[FirmwareFormat, c.Container] - - -def parse(data: bytes) -> ParsedFirmware: - if data[:4] == b"TRZR": - version = FirmwareFormat.TREZOR_ONE - cls = LegacyFirmware - elif data[:4] == b"TRZV": - version = FirmwareFormat.TREZOR_T - cls = VendorFirmware - elif data[:4] == b"TRZF": - version = FirmwareFormat.TREZOR_ONE_V2 - cls = FirmwareImage - else: - raise ValueError("Unrecognized firmware image type") +def parse(data: bytes) -> "FirmwareType": try: - fw = cls.parse(data) + if data[:4] == b"TRZR": + return LegacyFirmware.parse(data) + elif data[:4] == b"TRZV": + return VendorFirmware.parse(data) + elif data[:4] == b"TRZF": + return LegacyV2Firmware.parse(data) + else: + raise ValueError("Unrecognized firmware image type") except Exception as e: raise FirmwareIntegrityError("Invalid firmware image") from e - return version, fw - - -def digest_onev1(fw: c.Container) -> bytes: - return hashlib.sha256(fw.code).digest() - - -def check_sig_v1( - digest: bytes, key_indexes: List[int], signatures: List[bytes] -) -> None: - distinct_key_indexes = set(i for i in key_indexes if i != 0) - if not distinct_key_indexes: - raise Unsigned - - if len(distinct_key_indexes) < len(key_indexes): - raise InvalidSignatureError( - f"Not enough distinct signatures (found {len(distinct_key_indexes)}, need {len(key_indexes)})" - ) - - for i in range(len(key_indexes)): - key_idx = key_indexes[i] - 1 - signature = signatures[i] - - if key_idx >= len(V1_BOOTLOADER_KEYS): - # unknown pubkey - raise InvalidSignatureError(f"Unknown key in slot {i}") - - pubkey = V1_BOOTLOADER_KEYS[key_idx][1:] - verify = ecdsa.VerifyingKey.from_string(pubkey, curve=ecdsa.curves.SECP256k1) - try: - verify.verify_digest(signature, digest) - except ecdsa.BadSignatureError as e: - raise InvalidSignatureError(f"Invalid signature in slot {i}") from e - - -def header_digest(header: c.Container, hash_function: Callable = blake2s) -> bytes: - stripped_header = header.copy() - stripped_header.sigmask = 0 - stripped_header.signature = b"\0" * 64 - stripped_header.v1_key_indexes = [0, 0, 0] - stripped_header.v1_signatures = [b"\0" * 64] * 3 - if header.magic == b"TRZV": - header_type = VendorHeader - else: - header_type = FirmwareHeader - header_bytes = header_type.build(stripped_header) - return hash_function(header_bytes).digest() - - -def digest_v2(fw: c.Container) -> bytes: - return header_digest(fw.image.header, blake2s) - - -def digest_onev2(fw: c.Container) -> bytes: - return header_digest(fw.header, hashlib.sha256) - - -def calculate_code_hashes( - code: bytes, - code_offset: int, - hash_function: Callable = blake2s, - chunk_size: int = V2_CHUNK_SIZE, - padding_byte: Optional[bytes] = None, -) -> List[bytes]: - hashes = [] - # End offset for each chunk. Normally this would be (i+1)*chunk_size for i-th chunk, - # but the first chunk is shorter by code_offset, so all end offsets are shifted. - ends = [(i + 1) * chunk_size - code_offset for i in range(16)] - start = 0 - for end in ends: - chunk = code[start:end] - # padding for last non-empty chunk - if padding_byte is not None and start < len(code) and end > len(code): - chunk += padding_byte[0:1] * (end - start - len(chunk)) - - if not chunk: - hashes.append(b"\0" * 32) - else: - hashes.append(hash_function(chunk).digest()) - - start = end - - return hashes -def validate_code_hashes(fw: c.Container, version: FirmwareFormat) -> None: - hash_function: Callable - padding_byte: Optional[bytes] - if version == FirmwareFormat.TREZOR_ONE_V2: - image = fw - hash_function = hashlib.sha256 - chunk_size = ONEV2_CHUNK_SIZE - padding_byte = b"\xff" - else: - image = fw.image - hash_function = blake2s - chunk_size = V2_CHUNK_SIZE - padding_byte = None - - expected_hashes = calculate_code_hashes( - image.code, image._code_offset, hash_function, chunk_size, padding_byte - ) - if expected_hashes != image.header.hashes: - raise FirmwareIntegrityError("Invalid firmware data.") - - -def validate_onev2(fw: c.Container, allow_unsigned: bool = False) -> None: - try: - check_sig_v1( - digest_onev2(fw), - fw.header.v1_key_indexes, - fw.header.v1_signatures, - ) - except Unsigned: - if not allow_unsigned: - raise - - validate_code_hashes(fw, FirmwareFormat.TREZOR_ONE_V2) - - -def validate_onev1(fw: c.Container, allow_unsigned: bool = False) -> None: - try: - check_sig_v1(digest_onev1(fw), fw.key_indexes, fw.signatures) - except Unsigned: - if not allow_unsigned: - raise - if fw.embedded_onev2: - validate_onev2(fw.embedded_onev2, allow_unsigned) - - -def validate_v2(fw: c.Container, skip_vendor_header: bool = False) -> None: - vendor_fingerprint = header_digest(fw.vendor_header) - fingerprint = digest_v2(fw) - - if not skip_vendor_header: - try: - # if you want to validate a custom vendor header, you can modify - # the global variables to match your keys and m-of-n scheme - cosi.verify( - fw.vendor_header.signature, - vendor_fingerprint, - V2_SIGS_REQUIRED, - V2_BOOTLOADER_KEYS, - fw.vendor_header.sigmask, - ) - except Exception: - raise InvalidSignatureError("Invalid vendor header signature.") - - # XXX expiry is not used now - # now = time.gmtime() - # if time.gmtime(fw.vendor_header.expiry) < now: - # raise ValueError("Vendor header expired.") - - try: - cosi.verify( - fw.image.header.signature, - fingerprint, - fw.vendor_header.sig_m, - fw.vendor_header.pubkeys, - fw.image.header.sigmask, - ) - except Exception: - raise InvalidSignatureError("Invalid firmware signature.") - - # XXX expiry is not used now - # if time.gmtime(fw.image.header.expiry) < now: - # raise ValueError("Firmware header expired.") - validate_code_hashes(fw, FirmwareFormat.TREZOR_T) - - -def digest(version: FirmwareFormat, fw: c.Container) -> bytes: - if version == FirmwareFormat.TREZOR_ONE: - return digest_onev1(fw) - elif version == FirmwareFormat.TREZOR_ONE_V2: - return digest_onev2(fw) - elif version == FirmwareFormat.TREZOR_T: - return digest_v2(fw) - else: - raise ValueError("Unrecognized firmware version") - - -def validate( - version: FirmwareFormat, fw: c.Container, allow_unsigned: bool = False -) -> None: - if version == FirmwareFormat.TREZOR_ONE: - return validate_onev1(fw, allow_unsigned) - elif version == FirmwareFormat.TREZOR_ONE_V2: - return validate_onev2(fw, allow_unsigned) - elif version == FirmwareFormat.TREZOR_T: - return validate_v2(fw) - else: - raise ValueError("Unrecognized firmware version") +def is_onev2(fw: "FirmwareType") -> TypeGuard[LegacyFirmware]: + return isinstance(fw, LegacyFirmware) and fw.embedded_v2 is not None # ====== Client functions ====== # @@ -460,7 +79,7 @@ def validate( def update( client: "TrezorClient", data: bytes, - progress_update: Callable[[int], Any] = lambda _: None, + progress_update: t.Callable[[int], t.Any] = lambda _: None, ): if client.features.bootloader_mode is False: raise RuntimeError("Device must be in bootloader mode") @@ -493,5 +112,5 @@ def update( @expect(messages.FirmwareHash, field="hash", ret_type=bytes) -def get_hash(client: "TrezorClient", challenge: Optional[bytes]): +def get_hash(client: "TrezorClient", challenge: t.Optional[bytes]): return client.call(messages.GetFirmwareHash(challenge=challenge)) diff --git a/python/src/trezorlib/firmware/consts.py b/python/src/trezorlib/firmware/consts.py new file mode 100644 index 000000000..312665c8e --- /dev/null +++ b/python/src/trezorlib/firmware/consts.py @@ -0,0 +1,43 @@ +V1_SIGNATURE_SLOTS = 3 +V1_BOOTLOADER_KEYS = [ + bytes.fromhex(key) + for key in ( + "04d571b7f148c5e4232c3814f777d8faeaf1a84216c78d569b71041ffc768a5b2d810fc3bb134dd026b57e65005275aedef43e155f48fc11a32ec790a93312bd58", + "0463279c0c0866e50c05c799d32bd6bab0188b6de06536d1109d2ed9ce76cb335c490e55aee10cc901215132e853097d5432eda06b792073bd7740c94ce4516cb1", + "0443aedbb6f7e71c563f8ed2ef64ec9981482519e7ef4f4aa98b27854e8c49126d4956d300ab45fdc34cd26bc8710de0a31dbdf6de7435fd0b492be70ac75fde58", + "04877c39fd7c62237e038235e9c075dab261630f78eeb8edb92487159fffedfdf6046c6f8b881fa407c4a4ce6c28de0b19c1f4e29f1fcbc5a58ffd1432a3e0938a", + "047384c51ae81add0a523adbb186c91b906ffb64c2c765802bf26dbd13bdf12c319e80c2213a136c8ee03d7874fd22b70d68e7dee469decfbbb510ee9a460cda45", + ) +] + +V2_BOARDLOADER_KEYS = [ + bytes.fromhex(key) + for key in ( + "0eb9856be9ba7e972c7f34eac1ed9b6fd0efd172ec00faf0c589759da4ddfba0", + "ac8ab40b32c98655798fd5da5e192be27a22306ea05c6d277cdff4a3f4125cd8", + "ce0fcd12543ef5936cf2804982136707863d17295faced72af171d6e6513ff06", + ) +] + +V2_BOARDLOADER_DEV_KEYS = [ + bytes.fromhex(key) + for key in ( + "db995fe25169d141cab9bbba92baa01f9f2e1ece7df4cb2ac05190f37fcc1f9d", + "2152f8d19b791d24453242e15f2eab6cb7cffa7b6a5ed30097960e069881db12", + "22fc297792f0b6ffc0bfcfdb7edb0c0aa14e025a365ec0e342e86e3829cb74b6", + ) +] + +V2_BOOTLOADER_KEYS = [ + bytes.fromhex(key) + for key in ( + "c2c87a49c5a3460977fbb2ec9dfe60f06bd694db8244bd4981fe3b7a26307f3f", + "80d036b08739b846f4cb77593078deb25dc9487aedcf52e30b4fb7cd7024178a", + "b8307a71f552c60a4cbb317ff48b82cdbf6b6bb5f04c920fec7badf017883751", + ) +] + +V2_SIGS_REQUIRED = 2 + +ONEV2_CHUNK_SIZE = 1024 * 64 +V2_CHUNK_SIZE = 1024 * 128 diff --git a/python/src/trezorlib/firmware/core.py b/python/src/trezorlib/firmware/core.py new file mode 100644 index 000000000..628b18721 --- /dev/null +++ b/python/src/trezorlib/firmware/core.py @@ -0,0 +1,187 @@ +import hashlib +import typing as t +from copy import copy +from enum import Enum + +import construct as c +from construct_classes import Struct, subcon + +from .. import cosi +from ..tools import EnumAdapter, TupleAdapter +from . import consts, util +from .vendor import VendorHeader + +__all__ = [ + "HeaderType", + "FirmwareHeader", + "FirmwareImage", + "VendorFirmware", +] + + +class HeaderType(Enum): + FIRMWARE = b"TRZF" + BOOTLOADER = b"TRZB" + + +class FirmwareHeader(Struct): + magic: HeaderType + header_len: int + expiry: int + code_length: int + version: t.Tuple[int, int, int, int] + fix_version: t.Tuple[int, int, int, int] + hashes: t.Sequence[bytes] + + v1_signatures: t.Sequence[bytes] + v1_key_indexes: t.Sequence[int] + + sigmask: int + signature: bytes + + # fmt: off + SUBCON = c.Struct( + "_start_offset" / c.Tell, + "magic" / EnumAdapter(c.Bytes(4), HeaderType), + "header_len" / c.Int32ul, + "expiry" / c.Int32ul, + "code_length" / c.Rebuild( + c.Int32ul, + lambda this: + len(this._.code) if "code" in this._ + else (this.code_length or 0) + ), + "version" / TupleAdapter(c.Int8ul, c.Int8ul, c.Int8ul, c.Int8ul), + "fix_version" / TupleAdapter(c.Int8ul, c.Int8ul, c.Int8ul, c.Int8ul), + "_reserved" / c.Padding(8), + "hashes" / c.Bytes(32)[16], + + "v1_signatures" / c.Bytes(64)[consts.V1_SIGNATURE_SLOTS], + "v1_key_indexes" / c.Int8ul[consts.V1_SIGNATURE_SLOTS], # pylint: disable=E1136 + + "_reserved" / c.Padding(220), + "sigmask" / c.Byte, + "signature" / c.Bytes(64), + + "_end_offset" / c.Tell, + + "_rebuild_header_len" / c.If( + c.this.version[0] > 1, + c.Pointer( + c.this._start_offset + 4, + c.Rebuild(c.Int32ul, c.this._end_offset - c.this._start_offset) + ), + ), + ) + # fmt: on + + +class FirmwareImage(Struct): + """Raw firmware image. + + Consists of firmware header and code block. + This is the expected format of firmware binaries for Trezor One, or bootloader images + for Trezor T.""" + + header: FirmwareHeader = subcon(FirmwareHeader) + _code_offset: int + code: bytes + + SUBCON = c.Struct( + "header" / FirmwareHeader.SUBCON, + "_code_offset" / c.Tell, + "code" / c.Bytes(c.this.header.code_length), + c.Terminated, + ) + + HASH_PARAMS = util.FirmwareHashParameters( + hash_function=hashlib.blake2s, + chunk_size=consts.V2_CHUNK_SIZE, + padding_byte=None, + ) + + def code_hashes(self) -> t.List[bytes]: + """Calculate hashes of chunks of `code`. + + Assume that the first `code_offset` bytes of `code` are taken up by the header. + """ + hashes = [] + # End offset for each chunk. Normally this would be (i+1)*chunk_size for i-th chunk, + # but the first chunk is shorter by code_offset, so all end offsets are shifted. + ends = [ + (i + 1) * self.HASH_PARAMS.chunk_size - self._code_offset for i in range(16) + ] + start = 0 + for end in ends: + chunk = self.code[start:end] + # padding for last non-empty chunk + if ( + self.HASH_PARAMS.padding_byte is not None + and start < len(self.code) + and end > len(self.code) + ): + chunk += self.HASH_PARAMS.padding_byte[0:1] * (end - start - len(chunk)) + + if not chunk: + hashes.append(b"\0" * 32) + else: + hashes.append(self.HASH_PARAMS.hash_function(chunk).digest()) + + start = end + + return hashes + + def validate_code_hashes(self) -> None: + if self.code_hashes() != self.header.hashes: + raise util.FirmwareIntegrityError("Invalid firmware data.") + + def digest(self) -> bytes: + header = copy(self.header) + header.hashes = self.code_hashes() + header.signature = b"\x00" * 64 + header.sigmask = 0 + header.v1_key_indexes = [0] * consts.V1_SIGNATURE_SLOTS + header.v1_signatures = [b"\x00" * 64] * consts.V1_SIGNATURE_SLOTS + return self.HASH_PARAMS.hash_function(header.build()).digest() + + +class VendorFirmware(Struct): + """Firmware image prefixed by a vendor header. + + This is the expected format of firmware binaries for Trezor T.""" + + vendor_header: VendorHeader = subcon(VendorHeader) + firmware: FirmwareImage = subcon(FirmwareImage) + + SUBCON = c.Struct( + "vendor_header" / VendorHeader.SUBCON, + "firmware" / FirmwareImage.SUBCON, + c.Terminated, + ) + + def digest(self) -> bytes: + return self.firmware.digest() + + def verify(self, _public_keys: t.Sequence[bytes] = ()) -> None: + if _public_keys: + raise ValueError("Cannot supply custom keys for vendor firmware.") + + self.firmware.validate_code_hashes() + + self.vendor_header.verify() + digest = self.digest() + try: + cosi.verify( + self.firmware.header.signature, + digest, + self.vendor_header.sig_m, + self.vendor_header.pubkeys, + self.firmware.header.sigmask, + ) + except Exception: + raise util.InvalidSignatureError("Invalid firmware signature.") + + # XXX expiry is not used now + # now = time.gmtime() + # if time.gmtime(fw.vendor_header.expiry) < now: + # raise ValueError("Vendor header expired.") diff --git a/python/src/trezorlib/firmware/legacy.py b/python/src/trezorlib/firmware/legacy.py new file mode 100644 index 000000000..58f394c27 --- /dev/null +++ b/python/src/trezorlib/firmware/legacy.py @@ -0,0 +1,124 @@ +import hashlib +import typing as t +from dataclasses import field + +import construct as c +import ecdsa +from construct_classes import Struct, subcon + +from . import consts, util +from .core import FirmwareImage + +__all__ = [ + "LegacyFirmware", + "LegacyV2Firmware", + "check_sig_v1", +] + + +def check_sig_v1( + digest: bytes, + key_indexes: t.Sequence[int], + signatures: t.Sequence[bytes], + public_keys: t.Sequence[bytes], +) -> None: + """Validate signatures of `digest` using the Trezor One V1 method.""" + distinct_indexes = set(i for i in key_indexes if i != 0) + if not distinct_indexes: + raise util.Unsigned + + if len(distinct_indexes) < len(key_indexes): + raise util.InvalidSignatureError( + f"Not enough distinct signatures (found {len(distinct_indexes)}, need {len(key_indexes)})" + ) + + for i in range(len(key_indexes)): + key_idx = key_indexes[i] - 1 + signature = signatures[i] + + if key_idx >= len(public_keys): + # unknown pubkey + raise util.InvalidSignatureError(f"Unknown key in slot {i}") + + pubkey = public_keys[key_idx][1:] + verify = ecdsa.VerifyingKey.from_string(pubkey, curve=ecdsa.curves.SECP256k1) + try: + verify.verify_digest(signature, digest) + except ecdsa.BadSignatureError as e: + raise util.InvalidSignatureError(f"Invalid signature in slot {i}") from e + + +class LegacyV2Firmware(FirmwareImage): + """Firmware image in the format used by Trezor One 1.8.0 and newer.""" + + HASH_PARAMS = util.FirmwareHashParameters( + hash_function=hashlib.sha256, + chunk_size=consts.ONEV2_CHUNK_SIZE, + padding_byte=b"\xff", + ) + + def verify( + self, public_keys: t.Sequence[bytes] = consts.V1_BOOTLOADER_KEYS + ) -> None: + self.validate_code_hashes() + check_sig_v1( + self.digest(), + self.header.v1_key_indexes, + self.header.v1_signatures, + public_keys, + ) + + def verify_unsigned(self) -> None: + self.validate_code_hashes() + if any(i != 0 for i in self.header.v1_key_indexes): + raise util.InvalidSignatureError("Firmware is not unsigned.") + + +class LegacyFirmware(Struct): + """Legacy firmware image. + Consists of a custom header and code block. + This is the expected format of firmware binaries for Trezor One pre-1.8.0. + + The code block can optionally be interpreted as a new-style firmware image. That is the + expected format of firmware binary for Trezor One version 1.8.0, which can be installed + by both the older and the newer bootloader.""" + + key_indexes: t.List[int] + signatures: t.List[bytes] + code: bytes + flags: t.Dict[str, t.Any] = field(default_factory=dict) + embedded_v2: t.Optional[LegacyV2Firmware] = subcon(LegacyV2Firmware, default=None) + + # fmt: off + SUBCON = c.Struct( + "magic" / c.Const(b"TRZR"), + "code_length" / c.Rebuild(c.Int32ul, c.len_(c.this.code)), + "key_indexes" / c.Int8ul[consts.V1_SIGNATURE_SLOTS], # pylint: disable=E1136 + "flags" / c.BitStruct( + c.Padding(7), + "restore_storage" / c.Flag, + ), + "_reserved" / c.Padding(52), + "signatures" / c.Bytes(64)[consts.V1_SIGNATURE_SLOTS], + "code" / c.Bytes(c.this.code_length), + c.Terminated, + + "embedded_v2" / c.RestreamData(c.this.code, c.Optional(LegacyV2Firmware.SUBCON)), + ) + # fmt: on + + def digest(self) -> bytes: + return hashlib.sha256(self.code).digest() + + def verify( + self, public_keys: t.Sequence[bytes] = consts.V1_BOOTLOADER_KEYS + ) -> None: + check_sig_v1( + self.digest(), + self.key_indexes, + self.signatures, + public_keys, + ) + + if self.embedded_v2: + self.embedded_v2.verify(consts.V1_BOOTLOADER_KEYS) diff --git a/python/src/trezorlib/firmware/util.py b/python/src/trezorlib/firmware/util.py new file mode 100644 index 000000000..79fa80476 --- /dev/null +++ b/python/src/trezorlib/firmware/util.py @@ -0,0 +1,34 @@ +import typing as t +from dataclasses import dataclass + +from typing_extensions import Protocol + + +class FirmwareIntegrityError(Exception): + pass + + +class InvalidSignatureError(FirmwareIntegrityError): + pass + + +class Unsigned(FirmwareIntegrityError): + pass + + +class DigestCalculator(Protocol): + def update(self, __data: bytes) -> None: + ... + + def digest(self) -> bytes: + ... + + +Hasher = t.Callable[[bytes], DigestCalculator] + + +@dataclass +class FirmwareHashParameters: + hash_function: Hasher + chunk_size: int + padding_byte: t.Optional[bytes] diff --git a/python/src/trezorlib/firmware/vendor.py b/python/src/trezorlib/firmware/vendor.py new file mode 100644 index 000000000..585507542 --- /dev/null +++ b/python/src/trezorlib/firmware/vendor.py @@ -0,0 +1,128 @@ +import hashlib +import typing as t +from copy import copy + +import construct as c +from construct_classes import Struct, subcon + +from .. import cosi +from ..toif import ToifStruct +from ..tools import TupleAdapter +from . import consts, util + +__all__ = [ + "VendorTrust", + "VendorHeader", +] + + +def _transform_vendor_trust(data: bytes) -> bytes: + """Byte-swap and bit-invert the VendorTrust field. + + Vendor trust is interpreted as a bitmask in a 16-bit little-endian integer, + with the added twist that 0 means set and 1 means unset. + We feed it to a `BitStruct` that expects a big-endian sequence where bits have + the traditional meaning. We must therefore do a bitwise negation of each byte, + and return them in reverse order. This is the same transformation both ways, + fortunately, so we don't need two separate functions. + """ + return bytes(~b & 0xFF for b in data)[::-1] + + +class VendorTrust(Struct): + show_vendor_string: bool + require_user_click: bool + red_background: bool + delay: int + + _reserved: int = 0 + + SUBCON = c.Transformed( + c.BitStruct( + "_reserved" / c.Default(c.BitsInteger(9), 0), + "show_vendor_string" / c.Flag, + "require_user_click" / c.Flag, + "red_background" / c.Flag, + "delay" / c.BitsInteger(4), + ), + _transform_vendor_trust, + 2, + _transform_vendor_trust, + 2, + ) + + +class VendorHeader(Struct): + header_len: int + expiry: int + version: t.Tuple[int, int] + sig_m: int + # sig_n: int + pubkeys: t.List[bytes] + text: str + image: t.Dict[str, t.Any] + sigmask: int + signature: bytes + + trust: VendorTrust = subcon(VendorTrust) + + # fmt: off + SUBCON = c.Struct( + "_start_offset" / c.Tell, + "magic" / c.Const(b"TRZV"), + "header_len" / c.Int32ul, + "expiry" / c.Int32ul, + "version" / TupleAdapter(c.Int8ul, c.Int8ul), + "sig_m" / c.Int8ul, + "sig_n" / c.Rebuild(c.Int8ul, c.len_(c.this.pubkeys)), + "trust" / VendorTrust.SUBCON, + "_reserved" / c.Padding(14), + "pubkeys" / c.Bytes(32)[c.this.sig_n], + "text" / c.Aligned(4, c.PascalString(c.Int8ul, "utf-8")), + "image" / ToifStruct, + "_end_offset" / c.Tell, + + "_min_header_len" / c.Check(c.this.header_len > (c.this._end_offset - c.this._start_offset) + 65), + "_header_len_aligned" / c.Check(c.this.header_len % 512 == 0), + + c.Padding(c.this.header_len - c.this._end_offset + c.this._start_offset - 65), + "sigmask" / c.Byte, + "signature" / c.Bytes(64), + ) + # fmt: on + + def digest(self) -> bytes: + cpy = copy(self) + cpy.sigmask = 0 + cpy.signature = b"\x00" * 64 + return hashlib.blake2s(cpy.build()).digest() + + def vhash(self) -> bytes: + h = hashlib.blake2s() + sig_n = len(self.pubkeys) + h.update(self.sig_m.to_bytes(1, "little")) + h.update(sig_n.to_bytes(1, "little")) + for i in range(8): + if i < sig_n: + h.update(self.pubkeys[i]) + else: + h.update(b"\x00" * 32) + return h.digest() + + def verify(self, pubkeys: t.Sequence[bytes] = consts.V2_BOOTLOADER_KEYS) -> None: + digest = self.digest() + try: + cosi.verify( + self.signature, + digest, + consts.V2_SIGS_REQUIRED, + pubkeys, + self.sigmask, + ) + except Exception: + raise util.InvalidSignatureError("Invalid vendor header signature.") + + # XXX expiry is not used now + # now = time.gmtime() + # if time.gmtime(fw.vendor_header.expiry) < now: + # raise ValueError("Vendor header expired.") diff --git a/python/src/trezorlib/tools.py b/python/src/trezorlib/tools.py index 5898e47fc..8162032b5 100644 --- a/python/src/trezorlib/tools.py +++ b/python/src/trezorlib/tools.py @@ -389,3 +389,14 @@ class EnumAdapter(construct.Adapter): return self.enum(obj) except ValueError: return obj + + +class TupleAdapter(construct.Adapter): + def __init__(self, *subcons: Any) -> None: + super().__init__(construct.Sequence(*subcons)) + + def _encode(self, obj: Any, ctx: Any, path: Any): + return obj + + def _decode(self, obj: Any, ctx: Any, path: Any): + return tuple(obj) diff --git a/python/tests/test_firmware.py b/python/tests/test_firmware.py new file mode 100644 index 000000000..6df1c5a41 --- /dev/null +++ b/python/tests/test_firmware.py @@ -0,0 +1,153 @@ +from pathlib import Path + +import construct +import pytest +import requests + +from trezorlib import firmware +from trezorlib.firmware import ( + VendorFirmware, + LegacyFirmware, + LegacyV2Firmware, + VendorHeader, +) + +CORE_FW_VERSION = "2.4.2" +CORE_FW_FINGERPRINT = "54ccf155510b5292bd17ed748409d0d135112e24e62eb74184639460beecb213" +LEGACY_FW_VERSION = "1.10.3" +LEGACY_FW_FINGERPRINT = ( + "bf0cc936a9afbf0a4ae7b727a2817fb69fba432d7230a0ff7b79b4a73b845197" +) + +CORE_FW = f"https://data.trezor.io/firmware/2/trezor-{CORE_FW_VERSION}.bin" +LEGACY_FW = f"https://data.trezor.io/firmware/1/trezor-{LEGACY_FW_VERSION}.bin" + +HERE = Path(__file__).parent + +VENDOR_HEADER = ( + HERE.parent.parent + / "core" + / "embed" + / "vendorheader" + / "vendorheader_satoshilabs_signed_prod.bin" +) + + +def _fetch(url: str, version: str) -> bytes: + path = HERE / f"trezor-{version}.bin" + if not path.exists(): + r = requests.get(url) + r.raise_for_status() + path.write_bytes(r.content) + return path.read_bytes() + + +@pytest.fixture() +def legacy_fw() -> bytes: + return _fetch(LEGACY_FW, LEGACY_FW_VERSION) + + +@pytest.fixture() +def core_fw() -> bytes: + return _fetch(CORE_FW, CORE_FW_VERSION) + + +def test_core_basic(core_fw: bytes) -> None: + fw = VendorFirmware.parse(core_fw) + fw.verify() + assert fw.digest().hex() == CORE_FW_FINGERPRINT + version_str = ".".join(str(x) for x in fw.firmware.header.version) + assert version_str.startswith(CORE_FW_VERSION) + assert fw.vendor_header.text == "SatoshiLabs" + assert fw.build() == core_fw + + +def test_vendor_header(core_fw: bytes) -> None: + fw = VendorFirmware.parse(core_fw) + + vh_data = fw.vendor_header.build() + assert vh_data in core_fw + assert vh_data == VENDOR_HEADER.read_bytes() + + vh = VendorHeader.parse(vh_data) + assert vh == fw.vendor_header + vh.verify() + + with pytest.raises(construct.ConstructError): + VendorFirmware.parse(vh_data) + + +def test_core_code_hashes(core_fw: bytes) -> None: + fw = VendorFirmware.parse(core_fw) + fw.firmware.header.hashes = [] + assert fw.digest().hex() == CORE_FW_FINGERPRINT + + +def test_legacy_basic(legacy_fw: bytes) -> None: + fw = LegacyFirmware.parse(legacy_fw) + fw.verify() + assert fw.digest().hex() == LEGACY_FW_FINGERPRINT + assert fw.build() == legacy_fw + + +def test_unsigned(legacy_fw: bytes) -> None: + legacy = LegacyFirmware.parse(legacy_fw) + + legacy.verify() + legacy.key_indexes = [0, 0, 0] + legacy.signatures = [b"", b"", b""] + + with pytest.raises(firmware.Unsigned): + legacy.verify() + + assert legacy.embedded_v2 is not None + legacy.embedded_v2.verify() + + legacy.embedded_v2.header.v1_key_indexes = [0, 0, 0] + legacy.embedded_v2.header.v1_signatures = [b"", b"", b""] + with pytest.raises(firmware.Unsigned): + legacy.embedded_v2.verify() + + +def test_disallow_unsigned(core_fw: bytes) -> None: + core = VendorFirmware.parse(core_fw) + core.firmware.header.sigmask = 0 + core.firmware.header.signature = b"" + with pytest.raises(firmware.InvalidSignatureError): + core.verify() + + +def test_embedded_v2(legacy_fw: bytes) -> None: + legacy = LegacyFirmware.parse(legacy_fw) + assert legacy.embedded_v2 is not None + legacy.embedded_v2.verify() + + embedded_data = legacy.embedded_v2.build() + cutoff_data = legacy_fw[256:] + assert cutoff_data == embedded_data + embedded = LegacyV2Firmware.parse(cutoff_data) + assert embedded == legacy.embedded_v2 + + +def test_integrity_legacy(legacy_fw: bytes) -> None: + legacy = LegacyFirmware.parse(legacy_fw) + legacy.verify() + + modified_data = bytearray(legacy_fw) + modified_data[-1] ^= 0x01 + modified = LegacyFirmware.parse(modified_data) + assert modified.digest() != legacy.digest() + with pytest.raises(firmware.InvalidSignatureError): + modified.verify() + + +def test_integrity_core(core_fw: bytes) -> None: + core = VendorFirmware.parse(core_fw) + core.verify() + + modified_data = bytearray(core_fw) + modified_data[-1] ^= 0x01 + modified = VendorFirmware.parse(modified_data) + assert modified.digest() != core.digest() + with pytest.raises(firmware.FirmwareIntegrityError): + modified.verify() diff --git a/python/tools/firmware-fingerprint.py b/python/tools/firmware-fingerprint.py index 33f5c84de..4bb67a042 100755 --- a/python/tools/firmware-fingerprint.py +++ b/python/tools/firmware-fingerprint.py @@ -22,7 +22,6 @@ from typing import BinaryIO, TextIO import click from trezorlib import firmware -from trezorlib._internal import firmware_headers @click.command() @@ -33,20 +32,11 @@ def firmware_fingerprint(filename: BinaryIO, output: TextIO) -> None: data = filename.read() try: - version, fw = firmware.parse(data) - - # Unsigned production builds for Trezor T do not have valid code hashes. - # Use the internal module which recomputes them first. - if version == firmware.FirmwareFormat.TREZOR_T: - fingerprint = firmware_headers.FirmwareImage(fw).digest() - else: - fingerprint = firmware.digest(version, fw) + click.echo(firmware.parse(data).digest().hex(), file=output) except Exception as e: click.echo(e, err=True) sys.exit(2) - click.echo(fingerprint.hex(), file=output) - if __name__ == "__main__": firmware_fingerprint()