mirror of
https://github.com/trezor/trezor-firmware.git
synced 2024-11-22 23:48:12 +00:00
firmware: recognize, verify and handle firmware v1.8.0 and up
Also reorganize firmware validation code somewhat, so that trezorctl consumes a unified interface.
This commit is contained in:
parent
408c6712c1
commit
cfb19dfb15
126
trezorctl
126
trezorctl
@ -516,53 +516,53 @@ def backup_device(connect):
|
|||||||
#
|
#
|
||||||
|
|
||||||
|
|
||||||
def validate_firmware_v1(fw, expected_fingerprint=None):
|
ALLOWED_FIRMWARE_FORMATS = {
|
||||||
click.echo("Trezor One firmware image.")
|
1: (firmware.FirmwareFormat.TREZOR_ONE, firmware.FirmwareFormat.TREZOR_ONE_V2),
|
||||||
distinct_sig_slots = set(i for i in fw.key_indexes if i != 0)
|
2: (firmware.FirmwareFormat.TREZOR_T),
|
||||||
if not distinct_sig_slots:
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _print_version(version):
|
||||||
|
vstr = "Firmware version {major}.{minor}.{patch} build {build}".format(**version)
|
||||||
|
click.echo(vstr)
|
||||||
|
|
||||||
|
|
||||||
|
def validate_firmware(version, fw, expected_fingerprint=None):
|
||||||
|
if version == firmware.FirmwareFormat.TREZOR_ONE:
|
||||||
|
if fw.embedded_onev2:
|
||||||
|
click.echo("Trezor One firmware with embedded v2 image (1.8.0 or later)")
|
||||||
|
_print_version(fw.embedded_onev2.firmware_header.version)
|
||||||
|
else:
|
||||||
|
click.echo("Trezor One firmware image.")
|
||||||
|
elif version == firmware.FirmwareFormat.TREZOR_ONE_V2:
|
||||||
|
click.echo("Trezor One v2 firmware (1.8.0 or later)")
|
||||||
|
_print_version(fw.firmware_header.version)
|
||||||
|
elif version == firmware.FirmwareFormat.TREZOR_T:
|
||||||
|
click.echo("Trezor T firmware image.")
|
||||||
|
vendor = fw.vendor_header.vendor_string
|
||||||
|
vendor_version = "{major}.{minor}".format(**fw.vendor_header.version)
|
||||||
|
click.echo("Vendor header from {}, version {}".format(vendor, vendor_version))
|
||||||
|
_print_version(fw.firmware_header.version)
|
||||||
|
|
||||||
|
try:
|
||||||
|
firmware.validate(version, fw)
|
||||||
|
click.echo("Signatures are valid.")
|
||||||
|
except firmware.Unsigned:
|
||||||
if not click.confirm("No signatures found. Continue?", default=False):
|
if not click.confirm("No signatures found. Continue?", default=False):
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
elif len(distinct_sig_slots) < 3:
|
try:
|
||||||
click.echo("Badly signed image (need 3 distinct signatures), aborting.")
|
firmware.validate(version, fw, allow_unsigned=True)
|
||||||
sys.exit(1)
|
click.echo("Unsigned firmware looking OK.")
|
||||||
else:
|
except firmware.FirmwareIntegrityError as e:
|
||||||
all_valid = True
|
click.echo(e)
|
||||||
for i in range(len(fw.key_indexes)):
|
click.echo("Firmware validation failed, aborting.")
|
||||||
if not firmware.check_sig_v1(fw, i):
|
|
||||||
click.echo("INVALID signature in slot {}".format(i))
|
|
||||||
all_valid = False
|
|
||||||
|
|
||||||
if all_valid:
|
|
||||||
click.echo("Signatures are valid.")
|
|
||||||
else:
|
|
||||||
click.echo("Invalid signature detected, aborting.")
|
|
||||||
sys.exit(4)
|
sys.exit(4)
|
||||||
|
except firmware.FirmwareIntegrityError as e:
|
||||||
fingerprint = firmware.digest_v1(fw).hex()
|
|
||||||
click.echo("Firmware fingerprint: {}".format(fingerprint))
|
|
||||||
if expected_fingerprint and fingerprint != expected_fingerprint:
|
|
||||||
click.echo("Expected fingerprint: {}".format(expected_fingerprint))
|
|
||||||
click.echo("Fingerprints do not match, aborting.")
|
|
||||||
sys.exit(5)
|
|
||||||
|
|
||||||
|
|
||||||
def validate_firmware_v2(fw, expected_fingerprint=None, skip_vendor_header=False):
|
|
||||||
click.echo("Trezor T firmware image.")
|
|
||||||
vendor = fw.vendor_header.vendor_string
|
|
||||||
vendor_version = "{major}.{minor}".format(**fw.vendor_header.version)
|
|
||||||
version = fw.firmware_header.version
|
|
||||||
click.echo("Vendor header from {}, version {}".format(vendor, vendor_version))
|
|
||||||
click.echo(
|
|
||||||
"Firmware version {major}.{minor}.{patch} build {build}".format(**version)
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
firmware.validate(fw, skip_vendor_header)
|
|
||||||
click.echo("Signatures are valid.")
|
|
||||||
except Exception as e:
|
|
||||||
click.echo(e)
|
click.echo(e)
|
||||||
|
click.echo("Firmware validation failed, aborting.")
|
||||||
sys.exit(4)
|
sys.exit(4)
|
||||||
|
|
||||||
fingerprint = firmware.digest(fw).hex()
|
fingerprint = firmware.digest(version, fw).hex()
|
||||||
click.echo("Firmware fingerprint: {}".format(fingerprint))
|
click.echo("Firmware fingerprint: {}".format(fingerprint))
|
||||||
if expected_fingerprint and fingerprint != expected_fingerprint:
|
if expected_fingerprint and fingerprint != expected_fingerprint:
|
||||||
click.echo("Expected fingerprint: {}".format(expected_fingerprint))
|
click.echo("Expected fingerprint: {}".format(expected_fingerprint))
|
||||||
@ -628,15 +628,18 @@ def find_best_firmware_version(bootloader_version, requested_version=None):
|
|||||||
|
|
||||||
|
|
||||||
@cli.command()
|
@cli.command()
|
||||||
|
# fmt: off
|
||||||
@click.option("-f", "--filename")
|
@click.option("-f", "--filename")
|
||||||
@click.option("-u", "--url")
|
@click.option("-u", "--url")
|
||||||
@click.option("-v", "--version")
|
@click.option("-v", "--version")
|
||||||
@click.option("-s", "--skip-check", is_flag=True)
|
@click.option("-s", "--skip-check", is_flag=True, help="Do not validate firmware integrity")
|
||||||
|
@click.option("--raw", is_flag=True, help="Push raw data to Trezor")
|
||||||
@click.option("--fingerprint", help="Expected firmware fingerprint in hex")
|
@click.option("--fingerprint", help="Expected firmware fingerprint in hex")
|
||||||
@click.option("--skip-vendor-header", help="Skip vendor header validation on Trezor T")
|
@click.option("--skip-vendor-header", help="Skip vendor header validation on Trezor T")
|
||||||
|
# fmt: on
|
||||||
@click.pass_obj
|
@click.pass_obj
|
||||||
def firmware_update(
|
def firmware_update(
|
||||||
connect, filename, url, version, skip_check, fingerprint, skip_vendor_header
|
connect, filename, url, version, skip_check, fingerprint, skip_vendor_header, raw
|
||||||
):
|
):
|
||||||
"""Upload new firmware to device.
|
"""Upload new firmware to device.
|
||||||
|
|
||||||
@ -663,13 +666,13 @@ def firmware_update(
|
|||||||
click.echo("Please switch your device to bootloader mode.")
|
click.echo("Please switch your device to bootloader mode.")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
firmware_version = client.features.major_version
|
f = client.features
|
||||||
|
bootloader_onev2 = f.major_version == 1 and f.minor_version >= 8
|
||||||
|
|
||||||
if filename:
|
if filename:
|
||||||
data = open(filename, "rb").read()
|
data = open(filename, "rb").read()
|
||||||
else:
|
else:
|
||||||
if not url:
|
if not url:
|
||||||
f = client.features
|
|
||||||
bootloader_version = [f.major_version, f.minor_version, f.patch_version]
|
bootloader_version = [f.major_version, f.minor_version, f.patch_version]
|
||||||
version_list = [int(x) for x in version.split(".")] if version else None
|
version_list = [int(x) for x in version.split(".")] if version else None
|
||||||
url, fp = find_best_firmware_version(bootloader_version, version_list)
|
url, fp = find_best_firmware_version(bootloader_version, version_list)
|
||||||
@ -680,26 +683,41 @@ def firmware_update(
|
|||||||
r = requests.get(url)
|
r = requests.get(url)
|
||||||
data = r.content
|
data = r.content
|
||||||
|
|
||||||
if not skip_check:
|
if not raw and not skip_check:
|
||||||
try:
|
try:
|
||||||
version, fw = firmware.parse(data)
|
version, fw = firmware.parse(data)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
click.echo(e)
|
click.echo(e)
|
||||||
sys.exit(2)
|
sys.exit(2)
|
||||||
|
|
||||||
if version == firmware.FirmwareFormat.TREZOR_ONE:
|
validate_firmware(version, fw, fingerprint)
|
||||||
validate_firmware_v1(fw, fingerprint)
|
if (
|
||||||
elif version == firmware.FirmwareFormat.TREZOR_T:
|
bootloader_onev2
|
||||||
validate_firmware_v2(fw, fingerprint)
|
and version == firmware.FirmwareFormat.TREZOR_ONE
|
||||||
else:
|
and not fw.embedded_onev2
|
||||||
click.echo("Unrecognized firmware version.")
|
):
|
||||||
|
click.echo("Firmware is too old for your device. Aborting.")
|
||||||
|
sys.exit(3)
|
||||||
|
elif not bootloader_onev2 and version == firmware.FirmwareFormat.TREZOR_ONE_V2:
|
||||||
|
click.echo("You need to upgrade to bootloader 1.8.0 first.")
|
||||||
|
sys.exit(3)
|
||||||
|
|
||||||
if firmware_version != version.value:
|
if f.major_version not in ALLOWED_FIRMWARE_FORMATS:
|
||||||
|
click.echo("trezorctl doesn't know your device version. Aborting.")
|
||||||
|
sys.exit(3)
|
||||||
|
elif version not in ALLOWED_FIRMWARE_FORMATS[f.major_version]:
|
||||||
click.echo("Firmware does not match your device, aborting.")
|
click.echo("Firmware does not match your device, aborting.")
|
||||||
sys.exit(3)
|
sys.exit(3)
|
||||||
|
|
||||||
|
if not raw:
|
||||||
|
# special handling for embedded-OneV2 format:
|
||||||
|
# for bootloader < 1.8, keep the embedding
|
||||||
|
# for bootloader 1.8.0 and up, strip the old OneV1 header
|
||||||
|
if bootloader_onev2 and data[:4] == b"TRZR" and data[256 : 256 + 4] == b"TRZF":
|
||||||
|
data = data[256:]
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if firmware_version == 1:
|
if f.major_version == 1 and f.firmware_present:
|
||||||
# Trezor One does not send ButtonRequest
|
# Trezor One does not send ButtonRequest
|
||||||
click.echo("Please confirm action on your Trezor device")
|
click.echo("Please confirm action on your Trezor device")
|
||||||
return firmware.update(client, data)
|
return firmware.update(client, data)
|
||||||
|
@ -16,7 +16,7 @@
|
|||||||
|
|
||||||
import hashlib
|
import hashlib
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
from typing import NewType, Tuple
|
from typing import Callable, List, NewType, Tuple
|
||||||
|
|
||||||
import construct as c
|
import construct as c
|
||||||
import ecdsa
|
import ecdsa
|
||||||
@ -41,6 +41,7 @@ V2_BOOTLOADER_KEYS = [
|
|||||||
V2_BOOTLOADER_M = 2
|
V2_BOOTLOADER_M = 2
|
||||||
V2_BOOTLOADER_N = 3
|
V2_BOOTLOADER_N = 3
|
||||||
|
|
||||||
|
ONEV2_CHUNK_SIZE = 1024 * 64
|
||||||
V2_CHUNK_SIZE = 1024 * 128
|
V2_CHUNK_SIZE = 1024 * 128
|
||||||
|
|
||||||
|
|
||||||
@ -57,6 +58,18 @@ def _transform_vendor_trust(data: bytes) -> bytes:
|
|||||||
return bytes(~b & 0xFF for b in data)[::-1]
|
return bytes(~b & 0xFF for b in data)[::-1]
|
||||||
|
|
||||||
|
|
||||||
|
class FirmwareIntegrityError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidSignatureError(FirmwareIntegrityError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class Unsigned(FirmwareIntegrityError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
# fmt: off
|
# fmt: off
|
||||||
Toif = c.Struct(
|
Toif = c.Struct(
|
||||||
"magic" / c.Const(b"TOI"),
|
"magic" / c.Const(b"TOI"),
|
||||||
@ -117,7 +130,7 @@ VersionLong = c.Struct(
|
|||||||
FirmwareHeader = c.Struct(
|
FirmwareHeader = c.Struct(
|
||||||
"_start_offset" / c.Tell,
|
"_start_offset" / c.Tell,
|
||||||
"magic" / c.Const(b"TRZF"),
|
"magic" / c.Const(b"TRZF"),
|
||||||
"_header_len" / c.Padding(4),
|
"header_len" / c.Int32ul,
|
||||||
"expiry" / c.Int32ul,
|
"expiry" / c.Int32ul,
|
||||||
"code_length" / c.Rebuild(
|
"code_length" / c.Rebuild(
|
||||||
c.Int32ul,
|
c.Int32ul,
|
||||||
@ -130,14 +143,21 @@ FirmwareHeader = c.Struct(
|
|||||||
"reserved" / c.Padding(8),
|
"reserved" / c.Padding(8),
|
||||||
"hashes" / c.Bytes(32)[16],
|
"hashes" / c.Bytes(32)[16],
|
||||||
|
|
||||||
"reserved" / c.Padding(415),
|
"v1_signatures" / c.Bytes(64)[V1_SIGNATURE_SLOTS],
|
||||||
|
"v1_key_indexes" / c.Int8ul[V1_SIGNATURE_SLOTS], # pylint: disable=E1136
|
||||||
|
|
||||||
|
"reserved" / c.Padding(220),
|
||||||
"sigmask" / c.Byte,
|
"sigmask" / c.Byte,
|
||||||
"signature" / c.Bytes(64),
|
"signature" / c.Bytes(64),
|
||||||
|
|
||||||
"_end_offset" / c.Tell,
|
"_end_offset" / c.Tell,
|
||||||
"header_len" / c.Pointer(
|
|
||||||
c.this._start_offset + 4,
|
"_rebuild_header_len" / c.If(
|
||||||
c.Rebuild(c.Int32ul, c.this._end_offset - c.this._start_offset)
|
c.this.version.major > 1,
|
||||||
|
c.Pointer(
|
||||||
|
c.this._start_offset + 4,
|
||||||
|
c.Rebuild(c.Int32ul, c.this._end_offset - c.this._start_offset)
|
||||||
|
),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -151,7 +171,15 @@ Firmware = c.Struct(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
FirmwareV1 = c.Struct(
|
FirmwareOneV2 = c.Struct(
|
||||||
|
"firmware_header" / FirmwareHeader,
|
||||||
|
"_code_offset" / c.Tell,
|
||||||
|
"code" / c.Bytes(c.this.firmware_header.code_length),
|
||||||
|
c.Terminated,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
FirmwareOne = c.Struct(
|
||||||
"magic" / c.Const(b"TRZR"),
|
"magic" / c.Const(b"TRZR"),
|
||||||
"code_length" / c.Rebuild(c.Int32ul, c.len_(c.this.code)),
|
"code_length" / c.Rebuild(c.Int32ul, c.len_(c.this.code)),
|
||||||
"key_indexes" / c.Int8ul[V1_SIGNATURE_SLOTS], # pylint: disable=E1136
|
"key_indexes" / c.Int8ul[V1_SIGNATURE_SLOTS], # pylint: disable=E1136
|
||||||
@ -163,6 +191,8 @@ FirmwareV1 = c.Struct(
|
|||||||
"signatures" / c.Bytes(64)[V1_SIGNATURE_SLOTS],
|
"signatures" / c.Bytes(64)[V1_SIGNATURE_SLOTS],
|
||||||
"code" / c.Bytes(c.this.code_length),
|
"code" / c.Bytes(c.this.code_length),
|
||||||
c.Terminated,
|
c.Terminated,
|
||||||
|
|
||||||
|
"embedded_onev2" / c.RestreamData(c.this.code, c.Optional(FirmwareOneV2)),
|
||||||
)
|
)
|
||||||
|
|
||||||
# fmt: on
|
# fmt: on
|
||||||
@ -171,6 +201,7 @@ FirmwareV1 = c.Struct(
|
|||||||
class FirmwareFormat(Enum):
|
class FirmwareFormat(Enum):
|
||||||
TREZOR_ONE = 1
|
TREZOR_ONE = 1
|
||||||
TREZOR_T = 2
|
TREZOR_T = 2
|
||||||
|
TREZOR_ONE_V2 = 3
|
||||||
|
|
||||||
|
|
||||||
FirmwareType = NewType("FirmwareType", c.Container)
|
FirmwareType = NewType("FirmwareType", c.Container)
|
||||||
@ -180,62 +211,137 @@ ParsedFirmware = Tuple[FirmwareFormat, FirmwareType]
|
|||||||
def parse(data: bytes) -> ParsedFirmware:
|
def parse(data: bytes) -> ParsedFirmware:
|
||||||
if data[:4] == b"TRZR":
|
if data[:4] == b"TRZR":
|
||||||
version = FirmwareFormat.TREZOR_ONE
|
version = FirmwareFormat.TREZOR_ONE
|
||||||
cls = FirmwareV1
|
cls = FirmwareOne
|
||||||
elif data[:4] == b"TRZV":
|
elif data[:4] == b"TRZV":
|
||||||
version = FirmwareFormat.TREZOR_T
|
version = FirmwareFormat.TREZOR_T
|
||||||
cls = Firmware
|
cls = Firmware
|
||||||
|
elif data[:4] == b"TRZF":
|
||||||
|
version = FirmwareFormat.TREZOR_ONE_V2
|
||||||
|
cls = FirmwareOneV2
|
||||||
else:
|
else:
|
||||||
raise ValueError("Unrecognized firmware image type")
|
raise ValueError("Unrecognized firmware image type")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
fw = cls.parse(data)
|
fw = cls.parse(data)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise ValueError("Invalid firmware image") from e
|
raise FirmwareIntegrityError("Invalid firmware image") from e
|
||||||
return version, FirmwareType(fw)
|
return version, FirmwareType(fw)
|
||||||
|
|
||||||
|
|
||||||
def digest_v1(fw: FirmwareType) -> bytes:
|
def digest_onev1(fw: FirmwareType) -> bytes:
|
||||||
return hashlib.sha256(fw.code).digest()
|
return hashlib.sha256(fw.code).digest()
|
||||||
|
|
||||||
|
|
||||||
def check_sig_v1(fw: FirmwareType, idx: int) -> bool:
|
def check_sig_v1(
|
||||||
key_idx = fw.key_indexes[idx]
|
digest: bytes, key_indexes: List[int], signatures: List[bytes]
|
||||||
signature = fw.signatures[idx]
|
) -> None:
|
||||||
|
distinct_key_indexes = set(i for i in key_indexes if i != 0)
|
||||||
|
if not distinct_key_indexes:
|
||||||
|
raise Unsigned
|
||||||
|
|
||||||
if key_idx == 0:
|
if len(distinct_key_indexes) < len(key_indexes):
|
||||||
# no signature = invalid signature
|
raise InvalidSignatureError(
|
||||||
return False
|
"Not enough distinct signatures (found {}, need {})".format(
|
||||||
|
len(distinct_key_indexes), len(key_indexes)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
if key_idx not in V1_BOOTLOADER_KEYS:
|
for i in range(len(key_indexes)):
|
||||||
# unknown pubkey
|
key_idx = key_indexes[i]
|
||||||
return False
|
signature = signatures[i]
|
||||||
|
|
||||||
pubkey = bytes.fromhex(V1_BOOTLOADER_KEYS[key_idx])[1:]
|
if key_idx not in V1_BOOTLOADER_KEYS:
|
||||||
verify = ecdsa.VerifyingKey.from_string(
|
# unknown pubkey
|
||||||
pubkey, curve=ecdsa.curves.SECP256k1, hashfunc=hashlib.sha256
|
raise InvalidSignatureError("Unknown key in slot {}".format(i))
|
||||||
)
|
|
||||||
try:
|
pubkey = bytes.fromhex(V1_BOOTLOADER_KEYS[key_idx])[1:]
|
||||||
verify.verify(signature, fw.code)
|
verify = ecdsa.VerifyingKey.from_string(pubkey, curve=ecdsa.curves.SECP256k1)
|
||||||
return True
|
try:
|
||||||
except ecdsa.BadSignatureError:
|
verify.verify_digest(signature, digest)
|
||||||
return False
|
except ecdsa.BadSignatureError as e:
|
||||||
|
raise InvalidSignatureError("Invalid signature in slot {}".format(i)) from e
|
||||||
|
|
||||||
|
|
||||||
def _header_digest(header: c.Container, header_type: c.Construct) -> bytes:
|
def _header_digest(
|
||||||
|
header: c.Container,
|
||||||
|
header_type: c.Construct,
|
||||||
|
hash_function: Callable = pyblake2.blake2s,
|
||||||
|
) -> bytes:
|
||||||
stripped_header = header.copy()
|
stripped_header = header.copy()
|
||||||
stripped_header.sigmask = 0
|
stripped_header.sigmask = 0
|
||||||
stripped_header.signature = b"\0" * 64
|
stripped_header.signature = b"\0" * 64
|
||||||
|
stripped_header.v1_key_indexes = [0, 0, 0]
|
||||||
|
stripped_header.v1_signatures = [b"\0" * 64] * 3
|
||||||
header_bytes = header_type.build(stripped_header)
|
header_bytes = header_type.build(stripped_header)
|
||||||
return pyblake2.blake2s(header_bytes).digest()
|
return hash_function(header_bytes).digest()
|
||||||
|
|
||||||
|
|
||||||
def digest(fw: FirmwareType) -> bytes:
|
def digest_v2(fw: FirmwareType) -> bytes:
|
||||||
return _header_digest(fw.firmware_header, FirmwareHeader)
|
return _header_digest(fw.firmware_header, FirmwareHeader, pyblake2.blake2s)
|
||||||
|
|
||||||
|
|
||||||
def validate(fw: FirmwareType, skip_vendor_header=False) -> bool:
|
def digest_onev2(fw: FirmwareType) -> bytes:
|
||||||
|
return _header_digest(fw.firmware_header, FirmwareHeader, hashlib.sha256)
|
||||||
|
|
||||||
|
|
||||||
|
def validate_code_hashes(
|
||||||
|
fw: FirmwareType,
|
||||||
|
hash_function: Callable = pyblake2.blake2s,
|
||||||
|
chunk_size: int = V2_CHUNK_SIZE,
|
||||||
|
padding_byte: bytes = None,
|
||||||
|
) -> None:
|
||||||
|
for i, expected_hash in enumerate(fw.firmware_header.hashes):
|
||||||
|
if i == 0:
|
||||||
|
# Because first chunk is sent along with headers, there is less code in it.
|
||||||
|
chunk = fw.code[: chunk_size - fw._code_offset]
|
||||||
|
else:
|
||||||
|
# Subsequent chunks are shifted by the "missing header" size.
|
||||||
|
ptr = i * chunk_size - fw._code_offset
|
||||||
|
chunk = fw.code[ptr : ptr + chunk_size]
|
||||||
|
|
||||||
|
# padding for last chunk
|
||||||
|
if padding_byte is not None and i > 1 and chunk and len(chunk) < chunk_size:
|
||||||
|
chunk += padding_byte[0:1] * (chunk_size - len(chunk))
|
||||||
|
|
||||||
|
if not chunk and expected_hash == b"\0" * 32:
|
||||||
|
continue
|
||||||
|
chunk_hash = hash_function(chunk).digest()
|
||||||
|
if chunk_hash != expected_hash:
|
||||||
|
raise FirmwareIntegrityError("Invalid firmware data.")
|
||||||
|
|
||||||
|
|
||||||
|
def validate_onev2(fw: FirmwareType, allow_unsigned: bool = False) -> None:
|
||||||
|
try:
|
||||||
|
check_sig_v1(
|
||||||
|
digest_onev2(fw),
|
||||||
|
fw.firmware_header.v1_key_indexes,
|
||||||
|
fw.firmware_header.v1_signatures,
|
||||||
|
)
|
||||||
|
except Unsigned:
|
||||||
|
if not allow_unsigned:
|
||||||
|
raise
|
||||||
|
|
||||||
|
validate_code_hashes(
|
||||||
|
fw,
|
||||||
|
hash_function=hashlib.sha256,
|
||||||
|
chunk_size=ONEV2_CHUNK_SIZE,
|
||||||
|
padding_byte=b"\xFF",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def validate_onev1(fw: FirmwareType, allow_unsigned: bool = False) -> None:
|
||||||
|
try:
|
||||||
|
check_sig_v1(digest_onev1(fw), fw.key_indexes, fw.signatures)
|
||||||
|
except Unsigned:
|
||||||
|
if not allow_unsigned:
|
||||||
|
raise
|
||||||
|
if fw.embedded_onev2:
|
||||||
|
validate_onev2(fw.embedded_onev2, allow_unsigned)
|
||||||
|
|
||||||
|
|
||||||
|
def validate_v2(fw: FirmwareType, skip_vendor_header=False) -> None:
|
||||||
vendor_fingerprint = _header_digest(fw.vendor_header, VendorHeader)
|
vendor_fingerprint = _header_digest(fw.vendor_header, VendorHeader)
|
||||||
fingerprint = digest(fw)
|
fingerprint = digest_v2(fw)
|
||||||
|
|
||||||
if not skip_vendor_header:
|
if not skip_vendor_header:
|
||||||
try:
|
try:
|
||||||
@ -250,7 +356,7 @@ def validate(fw: FirmwareType, skip_vendor_header=False) -> bool:
|
|||||||
V2_BOOTLOADER_KEYS,
|
V2_BOOTLOADER_KEYS,
|
||||||
)
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
raise ValueError("Invalid vendor header signature.")
|
raise InvalidSignatureError("Invalid vendor header signature.")
|
||||||
|
|
||||||
# XXX expiry is not used now
|
# XXX expiry is not used now
|
||||||
# now = time.gmtime()
|
# now = time.gmtime()
|
||||||
@ -267,28 +373,39 @@ def validate(fw: FirmwareType, skip_vendor_header=False) -> bool:
|
|||||||
fw.vendor_header.pubkeys,
|
fw.vendor_header.pubkeys,
|
||||||
)
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
raise ValueError("Invalid firmware signature.")
|
raise InvalidSignatureError("Invalid firmware signature.")
|
||||||
|
|
||||||
# XXX expiry is not used now
|
# XXX expiry is not used now
|
||||||
# if time.gmtime(fw.firmware_header.expiry) < now:
|
# if time.gmtime(fw.firmware_header.expiry) < now:
|
||||||
# raise ValueError("Firmware header expired.")
|
# raise ValueError("Firmware header expired.")
|
||||||
|
validate_code_hashes(fw)
|
||||||
|
|
||||||
for i, expected_hash in enumerate(fw.firmware_header.hashes):
|
|
||||||
if i == 0:
|
def digest(version: FirmwareFormat, fw: FirmwareType) -> bytes:
|
||||||
# Because first chunk is sent along with headers, there is less code in it.
|
if version == FirmwareFormat.TREZOR_ONE:
|
||||||
chunk = fw.code[: V2_CHUNK_SIZE - fw._code_offset]
|
if fw.embedded_onev2:
|
||||||
|
return digest_onev2(fw.embedded_onev2)
|
||||||
else:
|
else:
|
||||||
# Subsequent chunks are shifted by the "missing header" size.
|
return digest_onev1(fw)
|
||||||
ptr = i * V2_CHUNK_SIZE - fw._code_offset
|
elif version == FirmwareFormat.TREZOR_ONE_V2:
|
||||||
chunk = fw.code[ptr : ptr + V2_CHUNK_SIZE]
|
return digest_onev2(fw)
|
||||||
|
elif version == FirmwareFormat.TREZOR_T:
|
||||||
|
return digest_v2(fw)
|
||||||
|
else:
|
||||||
|
raise ValueError("Unrecognized firmware version")
|
||||||
|
|
||||||
if not chunk and expected_hash == b"\0" * 32:
|
|
||||||
continue
|
|
||||||
chunk_hash = pyblake2.blake2s(chunk).digest()
|
|
||||||
if chunk_hash != expected_hash:
|
|
||||||
raise ValueError("Invalid firmware data.")
|
|
||||||
|
|
||||||
return True
|
def validate(
|
||||||
|
version: FirmwareFormat, fw: FirmwareType, allow_unsigned: bool = True
|
||||||
|
) -> None:
|
||||||
|
if version == FirmwareFormat.TREZOR_ONE:
|
||||||
|
return validate_onev1(fw, allow_unsigned)
|
||||||
|
elif version == FirmwareFormat.TREZOR_ONE_V2:
|
||||||
|
return validate_onev2(fw, allow_unsigned)
|
||||||
|
elif version == FirmwareFormat.TREZOR_T:
|
||||||
|
return validate_v2(fw)
|
||||||
|
else:
|
||||||
|
raise ValueError("Unrecognized firmware version")
|
||||||
|
|
||||||
|
|
||||||
# ====== Client functions ====== #
|
# ====== Client functions ====== #
|
||||||
|
Loading…
Reference in New Issue
Block a user