diff --git a/Pipfile b/Pipfile index 0af72c645a..f73d38446a 100644 --- a/Pipfile +++ b/Pipfile @@ -9,6 +9,7 @@ trezor = {editable = true,path = "./python"} scons = "*" protobuf = "==3.6.1" pyblake2 = "*" +Pyro4 = "*" ## test tools pytest = "*" diff --git a/Pipfile.lock b/Pipfile.lock index f2db5a77cc..8a8edaeb5c 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "e8d9a82935300b8716e549422ded189b2ce4408bc31b8d5c91bbe9979bf15a0a" + "sha256": "1ff8a28e128037758ba995c7530207c6caf381d0b062cda5b3a611b5ddd936bd" }, "pipfile-spec": 6, "requires": {}, @@ -407,10 +407,10 @@ }, "packaging": { "hashes": [ - "sha256:28b924174df7a2fa32c1953825ff29c61e2f5e082343165438812f00d3a7fc47", - "sha256:d9551545c6d761f3def1677baf08ab2a3ca17c56879e70fecba2fc4dde4ed108" + "sha256:aec3fdbb8bc9e4bb65f0634b9f551ced63983a529d6a8931817d52fdd0816ddb", + "sha256:fe1d8331dfa7cc0a883b49d75fc76380b2ab2734b220fbb87d774e4fd4b851f8" ], - "version": "==19.2" + "version": "==20.0" }, "pathspec": { "hashes": [ @@ -533,6 +533,14 @@ ], "version": "==2.4.6" }, + "pyro4": { + "hashes": [ + "sha256:2bfe12a22f396474b0e57c898c7e2c561a8f850bf2055d8cf0f7119f0c7a523f", + "sha256:7c4712257ba5bed8bc4ed037bdad5b4683a483a5fd634a2ac3effa5ba787f511" + ], + "index": "pypi", + "version": "==4.77" + }, "pytest": { "hashes": [ "sha256:6b571215b5a790f9b41f19f3531c53a45cf6bb8ef2988bc1ff9afb38270b25fa", @@ -600,6 +608,13 @@ "index": "pypi", "version": "==3.1.2" }, + "serpent": { + "hashes": [ + "sha256:c8e18d7c787612abb811474a2cce310a1e5e737eac9d858a073d4f3c44a4f3b6", + "sha256:f306336ca09aa38e526f3b03cab58eb7e45af09981267233167bcf3bfd6436ab" + ], + "version": "==1.28" + }, "shamir-mnemonic": { "hashes": [ "sha256:1cc08d276e05b13cd32bd3b0c5d1cb6c30254e0086e0f6857ec106d4cceff121", @@ -700,7 +715,8 @@ }, "wcwidth": { "hashes": [ - "sha256:8fd29383f539be45b20bd4df0dc29c20ba48654a41e661925e612311e9f3c603" + "sha256:8fd29383f539be45b20bd4df0dc29c20ba48654a41e661925e612311e9f3c603", + "sha256:f28b3e8a6483e5d49e7f8949ac1a78314e740333ae305b4ba5defd3e74fb37a8" ], "version": "==0.1.8" }, @@ -715,10 +731,10 @@ "develop": { "scan-build": { "hashes": [ - "sha256:29f8a99f61fa5bedd4be4eff00d1dd50d9990ec9853230b9fc826c0c694146fa" + "sha256:296bb899dc4c126b985fca0c15c9ec20cf758b7c7ff830db64c4ca971d886797" ], "index": "pypi", - "version": "==2.0.17" + "version": "==2.0.18" } } } diff --git a/core/Makefile b/core/Makefile index 6a7a98cf4d..7c1ca0e8cd 100644 --- a/core/Makefile +++ b/core/Makefile @@ -214,8 +214,8 @@ gdb_firmware: $(FIRMWARE_BUILD_DIR)/firmware.elf ## start remote gdb session to ## misc commands: binctl: ## print info about binary files - ./tools/binctl $(BOOTLOADER_BUILD_DIR)/bootloader.bin - ./tools/binctl $(FIRMWARE_BUILD_DIR)/firmware.bin + ./tools/headertool.py $(BOOTLOADER_BUILD_DIR)/bootloader.bin + ./tools/headertool.py $(FIRMWARE_BUILD_DIR)/firmware.bin bloaty: ## run bloaty size profiler bloaty -d symbols -n 0 -s file $(FIRMWARE_BUILD_DIR)/firmware.elf | less diff --git a/core/SConscript.bootloader b/core/SConscript.bootloader index 092c351a20..1310a1df63 100644 --- a/core/SConscript.bootloader +++ b/core/SConscript.bootloader @@ -160,8 +160,7 @@ env.Replace( ASPPFLAGS='$CFLAGS $CCFLAGS', ) env.Replace( - BINCTL='tools/binctl', - KEYCTL='tools/keyctl', + HEADERTOOL='tools/headertool.py', ) # @@ -186,6 +185,5 @@ program_bin = env.Command( source=program_elf, action=[ '$OBJCOPY -O binary -j .header -j .flash -j .data $SOURCE $TARGET', - '$BINCTL $TARGET -h', - '$BINCTL $TARGET -s 1:2 `$KEYCTL sign bootloader $TARGET 4141414141414141414141414141414141414141414141414141414141414141 4242424242424242424242424242424242424242424242424242424242424242`' if ARGUMENTS.get('PRODUCTION', '0') == '0' else '', + '$HEADERTOOL $TARGET ' + ('-D' if ARGUMENTS.get('PRODUCTION', '0') == '0' else ''), ], ) diff --git a/core/SConscript.firmware b/core/SConscript.firmware index e83c765d5b..be45c9b210 100644 --- a/core/SConscript.firmware +++ b/core/SConscript.firmware @@ -397,8 +397,7 @@ env.Replace( ASPPFLAGS='$CFLAGS $CCFLAGS', ) env.Replace( - BINCTL='tools/binctl', - KEYCTL='tools/keyctl', + HEADERTOOL='tools/headertool.py', PYTHON='python', MAKEQSTRDATA='$PYTHON vendor/micropython/py/makeqstrdata.py', MAKEVERSIONHDR='$PYTHON vendor/micropython/py/makeversionhdr.py', @@ -613,8 +612,7 @@ if env.get('TREZOR_MODEL') == 'T': '$OBJCOPY -O binary -j .vendorheader -j .header -j .flash -j .data --pad-to 0x08100000 $SOURCE ${TARGET}.p1', '$OBJCOPY -O binary -j .flash2 $SOURCE ${TARGET}.p2', '$CAT ${TARGET}.p1 ${TARGET}.p2 > $TARGET', - '$BINCTL $TARGET -h', - '$BINCTL $TARGET -s 1:2 `$KEYCTL sign firmware $TARGET 4747474747474747474747474747474747474747474747474747474747474747 4848484848484848484848484848484848484848484848484848484848484848`' if ARGUMENTS.get('PRODUCTION', '0') == '0' else '', + '$HEADERTOOL $TARGET ' + ('-D' if ARGUMENTS.get('PRODUCTION', '0') == '0' else ''), '$DD if=$TARGET of=${TARGET}.p1 skip=0 bs=128k count=6', ] else: diff --git a/core/SConscript.prodtest b/core/SConscript.prodtest index 2307c42a25..77695ac4af 100644 --- a/core/SConscript.prodtest +++ b/core/SConscript.prodtest @@ -136,8 +136,7 @@ env.Replace( ASPPFLAGS='$CFLAGS $CCFLAGS', ) env.Replace( - BINCTL='tools/binctl', - KEYCTL='tools/keyctl', + HEADERTOOL='tools/headertool.py', ) # @@ -172,6 +171,5 @@ program_bin = env.Command( source=program_elf, action=[ '$OBJCOPY -O binary -j .vendorheader -j .header -j .flash -j .data $SOURCE $TARGET', - '$BINCTL $TARGET -h', - '$BINCTL $TARGET -s 1:2 `$KEYCTL sign firmware $TARGET 4747474747474747474747474747474747474747474747474747474747474747 4848484848484848484848484848484848484848484848484848484848484848`' if ARGUMENTS.get('PRODUCTION', '0') == '0' else '', + '$HEADERTOOL $TARGET ' + ('-D' if ARGUMENTS.get('PRODUCTION', '0') == '0' else ''), ], ) diff --git a/core/SConscript.reflash b/core/SConscript.reflash index 51c2c0e9b2..0369b9a8d2 100644 --- a/core/SConscript.reflash +++ b/core/SConscript.reflash @@ -130,8 +130,7 @@ env.Replace( ASPPFLAGS='$CFLAGS $CCFLAGS', ) env.Replace( - BINCTL='tools/binctl', - KEYCTL='tools/keyctl', + HEADERTOOL='tools/headertool.py', ) # @@ -166,6 +165,5 @@ program_bin = env.Command( source=program_elf, action=[ '$OBJCOPY -O binary -j .vendorheader -j .header -j .flash -j .data $SOURCE $TARGET', - '$BINCTL $TARGET -h', - '$BINCTL $TARGET -s 1:2 `$KEYCTL sign firmware $TARGET 4747474747474747474747474747474747474747474747474747474747474747 4848484848484848484848484848484848484848484848484848484848484848`' if ARGUMENTS.get('PRODUCTION', '0') == '0' else '', + '$HEADERTOOL $TARGET ' + ('-D' if ARGUMENTS.get('PRODUCTION', '0') == '0' else ''), ], ) diff --git a/core/embed/vendorheader/generate.sh b/core/embed/vendorheader/generate.sh index 0f78b25d73..c566911c5d 100755 --- a/core/embed/vendorheader/generate.sh +++ b/core/embed/vendorheader/generate.sh @@ -1,13 +1,12 @@ -BINCTL=../../tools/binctl -KEYCTL=../../tools/keyctl BUILDVH=../../tools/build_vendorheader +BINCTL=../../tools/headertool.py -# construct the default unsafe vendor header -$BUILDVH e28a8970753332bd72fef413e6b0b2ef1b4aadda7aa2c141f233712a6876b351:d4eec1869fb1b8a4e817516ad5a931557cb56805c3eb16e8f3a803d647df7869:772c8a442b7db06e166cfbc1ccbcbcde6f3eba76a4e98ef3ffc519502237d6ef 2 0.0 xxx...x "UNSAFE, DO NOT USE!" vendor_unsafe.toif vendorheader_unsafe_unsigned.bin +# construct all vendor headers +for fn in *.json; do + name=$(echo $fn | sed 's/vendor_\(.*\)\.json/\1/') + $BUILDVH vendor_${name}.json vendor_${name}.toif vendorheader_${name}_unsigned.bin +done -# sign the default unsafe vendor header using development keys +# sign dev vendor header cp -a vendorheader_unsafe_unsigned.bin vendorheader_unsafe_signed_dev.bin -$BINCTL vendorheader_unsafe_signed_dev.bin -s 1:2 `$KEYCTL sign vendorheader vendorheader_unsafe_signed_dev.bin 4444444444444444444444444444444444444444444444444444444444444444 4545454545454545454545454545454545454545454545454545454545454545` - -# construct SatoshiLabs vendor header -$BUILDVH 47fbdc84d8abef44fe6abde8f87b6ead821b7082ec63b9f7cc33dc53bf6c708d:9af22a52ab47a93091403612b3d6731a2dfef8a33383048ed7556a20e8b03c81:2218c25f8ba70c82eba8ed6a321df209c0a7643d014f33bf9317846f62923830 2 0.0 ....... SatoshiLabs vendor_satoshilabs.toif vendorheader_satoshilabs_unsigned.bin +$BINCTL -D vendorheader_unsafe_signed_dev.bin diff --git a/core/embed/vendorheader/vendor_satoshilabs.json b/core/embed/vendorheader/vendor_satoshilabs.json new file mode 100644 index 0000000000..7c4c6f4567 --- /dev/null +++ b/core/embed/vendorheader/vendor_satoshilabs.json @@ -0,0 +1,20 @@ +{ + "text": "SatoshiLabs", + "expiry": 0, + "version": { + "major": 0, + "minor": 0 + }, + "sig_m": 2, + "trust": { + "show_vendor_string": false, + "require_user_click": false, + "red_background": false, + "delay": 0 + }, + "pubkeys": [ + "47fbdc84d8abef44fe6abde8f87b6ead821b7082ec63b9f7cc33dc53bf6c708d", + "9af22a52ab47a93091403612b3d6731a2dfef8a33383048ed7556a20e8b03c81", + "2218c25f8ba70c82eba8ed6a321df209c0a7643d014f33bf9317846f62923830" + ] +} diff --git a/core/embed/vendorheader/vendor_unsafe.json b/core/embed/vendorheader/vendor_unsafe.json new file mode 100644 index 0000000000..e7af4710e5 --- /dev/null +++ b/core/embed/vendorheader/vendor_unsafe.json @@ -0,0 +1,20 @@ +{ + "text": "UNSAFE, DO NOT USE!", + "expiry": 0, + "version": { + "major": 0, + "minor": 0 + }, + "sig_m": 2, + "trust": { + "show_vendor_string": true, + "require_user_click": true, + "red_background": true, + "delay": 1 + }, + "pubkeys": [ + "e28a8970753332bd72fef413e6b0b2ef1b4aadda7aa2c141f233712a6876b351", + "d4eec1869fb1b8a4e817516ad5a931557cb56805c3eb16e8f3a803d647df7869", + "772c8a442b7db06e166cfbc1ccbcbcde6f3eba76a4e98ef3ffc519502237d6ef" + ] +} diff --git a/core/tools/binctl b/core/tools/binctl deleted file mode 100755 index c86c8bf567..0000000000 --- a/core/tools/binctl +++ /dev/null @@ -1,359 +0,0 @@ -#!/usr/bin/env python3 - -from __future__ import print_function - -import sys -import struct -import binascii - -import pyblake2 - -from trezorlib import cosi - - -def format_sigmask(sigmask): - bits = [str(b + 1) if sigmask & (1 << b) else "." for b in range(8)] - return "0x%02x = [%s]" % (sigmask, " ".join(bits)) - - -def format_vtrust(vtrust): - bits = [str(b) if vtrust & (1 << b) == 0 else "." for b in range(16)] - # see docs/bootloader.md for vtrust constants - desc = "" - wait = (vtrust & 0x000F) ^ 0x000F - if wait > 0: - desc = "WAIT_%d" % wait - if vtrust & 0x0010 == 0: - desc += " RED" - if vtrust & 0x0020 == 0: - desc += " CLICK" - if vtrust & 0x0040 == 0: - desc += " STRING" - return "%d = [%s] = [%s]" % (vtrust, " ".join(bits), desc) - - -# bootloader/firmware headers specification: https://github.com/trezor/trezor-core/blob/master/docs/bootloader.md - -IMAGE_HEADER_SIZE = 1024 -IMAGE_SIG_SIZE = 65 -IMAGE_CHUNK_SIZE = 128 * 1024 -BOOTLOADER_SECTORS_COUNT = 1 -FIRMWARE_SECTORS_COUNT = 6 + 7 - - -class BinImage(object): - def __init__(self, data, magic, max_size): - header = struct.unpack("<4sIIIBBBBBBBB8s512s415sB64s", data[:IMAGE_HEADER_SIZE]) - self.magic, self.hdrlen, self.expiry, self.codelen, self.vmajor, self.vminor, self.vpatch, self.vbuild, self.fix_vmajor, self.fix_vminor, self.fix_vpatch, self.fix_vbuild, self.reserved1, self.hashes, self.reserved2, self.sigmask, self.sig = ( - header - ) - assert self.magic == magic - assert self.hdrlen == IMAGE_HEADER_SIZE - total_len = self.hdrlen + self.codelen - assert total_len % 512 == 0 - assert total_len >= 4 * 1024 - assert total_len <= max_size - assert self.reserved1 == 8 * b"\x00" - assert self.reserved2 == 415 * b"\x00" - self.code = data[self.hdrlen :] - assert len(self.code) == self.codelen - - def print(self): - if self.magic == b"TRZF": - print("Trezor Firmware Image") - total_len = self.vhdrlen + self.hdrlen + self.codelen - elif self.magic == b"TRZB": - print("Trezor Bootloader Image") - total_len = self.hdrlen + self.codelen - else: - print("Trezor Unknown Image") - print(" * magic :", self.magic.decode()) - print(" * hdrlen :", self.hdrlen) - print(" * expiry :", self.expiry) - print(" * codelen :", self.codelen) - print( - " * version : %d.%d.%d.%d" - % (self.vmajor, self.vminor, self.vpatch, self.vbuild) - ) - print( - " * fixver : %d.%d.%d.%d" - % (self.fix_vmajor, self.fix_vminor, self.fix_vpatch, self.fix_vbuild) - ) - print(" * hashes: %s" % ("OK" if self.check_hashes() else "INCORRECT")) - for i in range(16): - print( - " - %02d : %s" - % (i, binascii.hexlify(self.hashes[i * 32 : i * 32 + 32]).decode()) - ) - print(" * sigmask :", format_sigmask(self.sigmask)) - print(" * sig :", binascii.hexlify(self.sig).decode()) - print(" * total : %d bytes" % total_len) - print(" * fngprnt :", self.fingerprint()) - print() - - def compute_hashes(self): - if self.magic == b"TRZF": - hdrlen = self.vhdrlen + self.hdrlen - else: - hdrlen = self.hdrlen - hashes = b"" - for i in range(16): - if i == 0: - d = self.code[: IMAGE_CHUNK_SIZE - hdrlen] - else: - s = IMAGE_CHUNK_SIZE - hdrlen + (i - 1) * IMAGE_CHUNK_SIZE - d = self.code[s : s + IMAGE_CHUNK_SIZE] - if len(d) > 0: - h = pyblake2.blake2s(d).digest() - else: - h = 32 * b"\x00" - hashes += h - return hashes - - def check_hashes(self): - return self.hashes == self.compute_hashes() - - def update_hashes(self): - self.hashes = self.compute_hashes() - - def serialize_header(self, sig=True): - header = struct.pack( - "<4sIIIBBBBBBBB8s512s415s", - self.magic, - self.hdrlen, - self.expiry, - self.codelen, - self.vmajor, - self.vminor, - self.vpatch, - self.vbuild, - self.fix_vmajor, - self.fix_vminor, - self.fix_vpatch, - self.fix_vbuild, - self.reserved1, - self.hashes, - self.reserved2, - ) - if sig: - header += struct.pack(" 0 and self.vsig_m <= self.vsig_n - assert self.vsig_n > 0 and self.vsig_n <= 8 - p = 32 - self.vpub = [] - for _ in range(self.vsig_n): - self.vpub.append(data[p : p + 32]) - p += 32 - self.vstr_len = data[p] - p += 1 - self.vstr = data[p : p + self.vstr_len] - p += self.vstr_len - vstr_pad = -p & 3 - p += vstr_pad - self.vimg_len = len(data) - IMAGE_SIG_SIZE - p - self.vimg = data[p : p + self.vimg_len] - p += self.vimg_len - self.sigmask = data[p] - p += 1 - self.sig = data[p : p + 64] - assert ( - len(data) - == 4 - + 4 - + 4 - + 1 - + 1 - + 1 - + 1 - + 1 - + 15 - + 32 * len(self.vpub) - + 1 - + self.vstr_len - + vstr_pad - + self.vimg_len - + IMAGE_SIG_SIZE - ) - assert len(data) % 512 == 0 - - def print(self): - print("Trezor Vendor Header") - print(" * magic :", self.magic.decode()) - print(" * hdrlen :", self.hdrlen) - print(" * expiry :", self.expiry) - print(" * version : %d.%d" % (self.vmajor, self.vminor)) - print(" * scheme : %d out of %d" % (self.vsig_m, self.vsig_n)) - print(" * trust :", format_vtrust(self.vtrust)) - for i in range(self.vsig_n): - print(" * vpub #%d :" % (i + 1), binascii.hexlify(self.vpub[i]).decode()) - print(" * vstr :", self.vstr.decode()) - print(" * vhash :", binascii.hexlify(self.vhash()).decode()) - print(" * vimg : (%d bytes)" % len(self.vimg)) - print(" * sigmask :", format_sigmask(self.sigmask)) - print(" * sig :", binascii.hexlify(self.sig).decode()) - print(" * fngprnt :", self.fingerprint()) - print() - - def serialize_header(self, sig=True): - header = struct.pack( - "<4sIIBBBBH", - self.magic, - self.hdrlen, - self.expiry, - self.vmajor, - self.vminor, - self.vsig_m, - self.vsig_n, - self.vtrust, - ) - header += 14 * b"\x00" - for i in range(self.vsig_n): - header += self.vpub[i] - header += struct.pack(" 0: - pk = [vheader.vpub[i] for i in range(8) if firmware.sigmask & (1 << i)] - global_pk = cosi.combine_keys(pk) - hdr = ( - subdata[: IMAGE_HEADER_SIZE - IMAGE_SIG_SIZE] - + IMAGE_SIG_SIZE * b"\x00" - ) - digest = pyblake2.blake2s(hdr).digest() - try: - cosi.verify(firmware.sig, digest, global_pk) - print("Firmware signature OK") - except ValueError: - print("Firmware signature INCORRECT") - else: - print("No firmware signature") - return firmware - if magic == b"TRZF": - return FirmwareImage(data, 0) - raise Exception("Unknown file format") - - -def main(): - if len(sys.argv) < 2: - print("Usage: binctl file.bin [-s sigmask signature] [-h]") - return 1 - fn = sys.argv[1] - sign = len(sys.argv) > 2 and sys.argv[2] == "-s" - rehash = len(sys.argv) == 3 and sys.argv[2] == "-h" - b = binopen(fn) - if sign: - sigmask = 0 - if ":" in sys.argv[3]: - for idx in sys.argv[3].split(":"): - sigmask |= 1 << (int(idx) - 1) - else: - sigmask = 1 << (int(sys.argv[3]) - 1) - signature = binascii.unhexlify(sys.argv[4]) - b.sign(sigmask, signature) - b.write(fn) - if rehash: - b.update_hashes() - b.write(fn) - b.print() - - -if __name__ == "__main__": - main() diff --git a/core/tools/build_vendorheader b/core/tools/build_vendorheader index 0658ff78e8..be085c121e 100755 --- a/core/tools/build_vendorheader +++ b/core/tools/build_vendorheader @@ -1,65 +1,23 @@ #!/usr/bin/env python3 -import sys -import struct -import binascii +import json + +import click + +from trezorlib import firmware -# encode vendor name, add length byte and padding to multiple of 4 -def encode_vendor(vname): - vbin = vname.encode() - vbin = struct.pack(" bytes: + """Locally produce a CoSi signature.""" + pubkeys = [cosi.pubkey_from_privkey(sk) for sk in privkeys] + nonces = [cosi.get_nonce(sk, digest, i) for i, sk in enumerate(privkeys)] + + global_pk = cosi.combine_keys(pubkeys) + global_R = cosi.combine_keys(R for r, R in nonces) + + sigs = [ + cosi.sign_with_privkey(digest, sk, global_pk, r, global_R) + for sk, (r, R) in zip(privkeys, nonces) + ] + + signature = cosi.combine_sig(global_R, sigs) + try: + cosi.verify_combined(signature, digest, global_pk) + except Exception as e: + raise click.ClickException(f"Failed to produce valid signature.") from e + + return signature + + +def parse_privkey_args(privkey_data: List[str]) -> Tuple[int, List[bytes]]: + privkeys = [] + sigmask = 0 + for key in privkey_data: + try: + idx, key_hex = key.split(":", maxsplit=1) + privkeys.append(bytes.fromhex(key_hex)) + sigmask |= 1 << (int(idx) - 1) + except ValueError: + click.echo(f"Could not parse key: {key}") + click.echo("Keys must be in the format: :") + raise click.ClickException("Unrecognized key format.") + return sigmask, privkeys + + +def process_remote_signers(fw, addrs: List[str]) -> Tuple[int, List[bytes]]: + if len(addrs) < fw.sigs_required: + raise click.ClickException( + f"Not enough signers (need at least {fw.sigs_required})" + ) + + digest = fw.digest() + name = fw.NAME + + def mkproxy(addr): + return Pyro4.Proxy(f"PYRO:keyctl@{addr}:{PORT}") + + sigmask = 0 + pks, Rs = [], [] + for addr in addrs: + click.echo(f"Connecting to {addr}...") + with mkproxy(addr) as proxy: + pk, R = proxy.get_commit(name, digest) + if pk not in fw.public_keys: + raise click.ClickException( + f"Signer at {addr} commits with unknown public key {pk.hex()}" + ) + idx = fw.public_keys.index(pk) + click.echo(f"Signer at {addr} commits with public key #{idx+1}: {pk.hex()}") + sigmask |= 1 << idx + pks.append(pk) + Rs.append(R) + + # compute global commit + global_pk = cosi.combine_keys(pks) + global_R = cosi.combine_keys(Rs) + + # collect signatures + sigs = [] + for addr in addrs: + click.echo(f"Waiting for {addr} to sign... ", nl=False) + with mkproxy(addr) as proxy: + sig = proxy.get_signature(name, digest, global_R, global_pk) + sigs.append(sig) + click.echo("OK") + + for addr in addrs: + with mkproxy(addr) as proxy: + proxy.finish() + + # compute global signature + return sigmask, cosi.combine_sig(global_R, sigs) + + +# ===================== CLI actions ========================= + + +def do_replace_vendorheader(fw, vh_file) -> None: + if not isinstance(fw, firmware_headers.FirmwareImage): + raise click.ClickException("Invalid image type (must be firmware).") + + vh = firmware.VendorHeader.parse(vh_file.read()) + if vh.header_len != fw.fw.vendor_header.header_len: + raise click.ClickException("New vendor header must have the same size.") + + fw.fw.vendor_header = vh + + +@click.command() +@click.option("-n", "--dry-run", is_flag=True, help="Do not save changes.") +@click.option("-h", "--rehash", is_flag=True, help="Force recalculate hashes.") +@click.option("-v", "--verbose", is_flag=True, help="Show verbose info about headers.") +@click.option( + "-S", + "--sign-private", + "privkey_data", + metavar="INDEX:PRIVKEY_HEX", + multiple=True, + help="Private key to use for signing. Can be repeated.", +) +@click.option( + "-D", "--sign-dev-keys", is_flag=True, help="Sign with development header keys." +) +@click.option( + "-s", + "--signature", + "insert_signature", + nargs=2, + metavar="INDEX:INDEX:INDEX... SIGNATURE_HEX", + help="Insert external signature.", +) +@click.option("-V", "--replace-vendor-header", type=click.File("rb")) +@click.option( + "-d", + "--digest", + "print_digest", + is_flag=True, + help="Only output header digest for signing and exit.", +) +@click.option( + "-r", + "--remote", + metavar="IPADDR", + multiple=True, + help="IP address of remote signer. Can be repeated.", +) +@click.argument("firmware_file", type=click.File("rb+")) +def cli( + firmware_file, + verbose, + rehash, + dry_run, + privkey_data, + sign_dev_keys, + insert_signature, + replace_vendor_header, + print_digest, + remote, +): + """Manage trezor-core firmware headers. + + This tool supports three types of files: raw vendor headers (TRZV), bootloader + images (TRZB), and firmware images which are prefixed with a vendor header + (TRZV+TRZF). + + Run with no options on a file to dump information about that file. + + Run with -d to print the header digest and exit. This works correctly regardless of + whether code hashes have been filled. + + Run with -h to recalculate and fill in code hashes. + + To insert an external signature: + + ./headertool.py firmware.bin -s 1:2:3 ABCDEF<...signature in hex format> + + The string "1:2:3" is a list of 1-based indexes of keys used to generate the signature. + + To sign with local private keys: + + \b + ./headertool.py firmware.bin -S 1:ABCDEF<...hex private key> -S 2:1234<..hex private key> + + Each instance of -S is in the form "index:privkey", where index is the same as + above. Instead of specifying the keys manually, use -D to substitue known + development keys. + + Signature validity is not checked in either of the two cases. + + To sign with remote participants: + + ./headertool.py firmware.bin -r 10.24.13.11 -r 10.24.13.190 ... + + Each participant must be running keyctl-proxy configured on the same file. Signers' + public keys must be in the list of known signers and are matched to indexes + automatically. + """ + firmware_data = firmware_file.read() + + try: + fw = firmware_headers.parse_image(firmware_data) + except Exception as e: + import traceback + + traceback.print_exc() + magic = firmware_data[:4] + raise click.ClickException( + f"Could not parse file (magic bytes: {magic!r})" + ) from e + + digest = fw.digest() + if print_digest: + click.echo(digest.hex()) + return + + if replace_vendor_header: + do_replace_vendorheader(fw, replace_vendor_header) + + if rehash: + fw.rehash() + + if sign_dev_keys: + privkeys = fw.DEV_KEYS + sigmask = fw.DEV_KEY_SIGMASK + else: + sigmask, privkeys = parse_privkey_args(privkey_data) + + signature = None + + if privkeys: + click.echo("Signing with local private keys...", err=True) + signature = sign_with_privkeys(digest, privkeys) + + if insert_signature: + click.echo("Inserting external signature...", err=True) + sigmask_str, signature = insert_signature + signature = bytes.fromhex(signature) + sigmask = 0 + for bit in sigmask_str.split(":"): + sigmask |= 1 << (int(bit) - 1) + + if remote: + if Pyro4 is None: + raise click.ClickException("Please install Pyro4 for remote signing.") + click.echo(fw.format()) + click.echo(f"Signing with {len(remote)} remote participants.") + sigmask, signature = process_remote_signers(fw, remote) + + if signature: + fw.rehash() + fw.insert_signature(signature, sigmask) + + click.echo(fw.format(verbose)) + + updated_data = fw.dump() + if updated_data == firmware_data: + click.echo("No changes made", err=True) + elif dry_run: + click.echo("Not saving changes", err=True) + else: + firmware_file.seek(0) + firmware_file.truncate(0) + firmware_file.write(updated_data) + + +if __name__ == "__main__": + cli() diff --git a/core/tools/keyctl b/core/tools/keyctl deleted file mode 100755 index dfcd907366..0000000000 --- a/core/tools/keyctl +++ /dev/null @@ -1,63 +0,0 @@ -#!/usr/bin/env python3 -import binascii -import struct -import click -import pyblake2 -from trezorlib import cosi - -indexmap = {"bootloader": 0, "vendorheader": 1, "firmware": 2} - - -def header_digest(index, filename): - data = open(filename, "rb").read() - z = bytes(65 * [0x00]) - if index == "bootloader": - header = data[:0x03BF] + z - elif index == "vendorheader": - header = data[:-65] + z - elif index == "firmware": - vhdrlen = struct.unpack(" None: + self.daemon = daemon + self.name = image_type.NAME + self.address_n = parse_path(PATH.format(image_type.BIP32_INDEX)) + self.digest = digest + self.commit = commit + self.signature = None + self.global_params = None - def get_signature(self, index, digest, global_R, global_pk): - digest, global_R, global_pk = ( - serpent.tobytes(digest), - serpent.tobytes(global_R), - serpent.tobytes(global_pk), - ) - path = get_path(index) - signature = None - while signature is None: + def _check_name_digest(self, name, digest): + if name != self.name or digest != self.digest: + click.echo(f"ERROR! Remote wants to sign {name} with digest {digest.hex()}") + click.echo(f"Expected: {self.name} with digest {self.digest.hex()}") + raise ValueError("Unexpected index/digest") + + def get_commit(self, name, digest): + self._check_name_digest(name, digest) + click.echo("Sending commitment!") + return self.commit + + def _make_signature(self, global_R, global_pk): + while True: try: - t = get_trezor() - print( - "\n\n\nSigning hash %s with path %s:" - % (binascii.hexlify(digest).decode(), path) - ) + click.echo("\n\n\nSigning...") signature = cosi.sign( - t, tools.parse_path(path), digest, global_R, global_pk + TREZOR, self.address_n, self.digest, global_R, global_pk ) + return signature.signature except Exception as e: - print(e) + click.echo(e) traceback.print_exc() - print("Trying again ...") - sig = signature.signature - print("Signature sent!") - return sig + click.echo("Trying again ...") + + + def get_signature(self, name, digest, global_R, global_pk): + self._check_name_digest(name, digest) + global_params = global_R, global_pk + if global_params != self.global_params: + self.signature = self._make_signature(global_R, global_pk) + self.global_params = global_params + click.echo("Sending signature!") + return self.signature + + @Pyro4.oneway + def finish(self): + click.echo("Done! \\(^o^)/") + self.daemon.shutdown() + + +@click.command() +@click.option( + "-l", "--listen", "ipaddr", default="0.0.0.0", help="Bind to particular ip address" +) +@click.option("-t", "--header-type", "fw_or_type", type=click.Choice(indexmap.keys())) +@click.option("-d", "--digest") +@click.argument("fw_file", type=click.File("rb"), required=False) +def cli(ipaddr, fw_file, fw_or_type, digest): + """Participate in signing of firmware. + + Specify either fw_file to auto-detect type and digest, or use -t and -d to specify + the type and digest manually. + """ + global TREZOR + + public_keys = None + if fw_file: + if fw_or_type or digest: + raise click.ClickException("Do not specify fw_file together with -t/-d") + + fw_or_type = parse_image(fw_file.read()) + digest = fw_or_type.digest() + public_keys = fw_or_type.public_keys + + click.echo(fw_or_type.format()) + + if not fw_file and (not fw_or_type or not digest): + raise click.ClickException("Please specify either fw_file or -t and -h") + + try: + TREZOR = get_default_client() + TREZOR.ui.always_prompt = True + except Exception as e: + raise click.ClickException("Please connect a Trezor and retry.") from e + + pubkey, R = make_commit(fw_or_type, digest, public_keys) + + daemon = Pyro4.Daemon(host=ipaddr, port=PORT) + proxy = KeyctlProxy(daemon, fw_or_type, digest, (pubkey, R)) + uri = daemon.register(proxy, "keyctl") + click.echo(f"keyctl-proxy running at URI: {uri}") + click.echo("Press Ctrl+C to abort.") + daemon.requestLoop() if __name__ == "__main__": - if len(sys.argv) > 1: - ipaddr = sys.argv[1] - else: - print("Usage: keyctl-proxy ipaddress") - sys.exit(1) - daemon = Pyro4.Daemon(host=ipaddr, port=PORT) - proxy = KeyctlProxy() - uri = daemon.register(proxy, "keyctl") - print('keyctl-proxy running at URI: "%s"' % uri) - daemon.requestLoop() + cli() diff --git a/python/src/trezorlib/_internal/firmware_headers.py b/python/src/trezorlib/_internal/firmware_headers.py new file mode 100644 index 0000000000..69a6165967 --- /dev/null +++ b/python/src/trezorlib/_internal/firmware_headers.py @@ -0,0 +1,373 @@ +import struct +from enum import Enum +from typing import Any, List, Optional + +import click +import construct as c +import pyblake2 + +from trezorlib import cosi, firmware + +SYM_OK = click.style("\u2714", fg="green") +SYM_FAIL = click.style("\u274c", fg="red") + + +class Status(Enum): + VALID = click.style("VALID", fg="green", bold=True) + INVALID = click.style("INVALID", fg="red", bold=True) + MISSING = click.style("MISSING", fg="blue", bold=True) + DEVEL = click.style("DEVEL", fg="red", bold=True) + + def is_ok(self): + return self is Status.VALID or self is Status.DEVEL + + +VHASH_DEVEL = bytes.fromhex( + "c5b4d40cb76911392122c8d1c277937e49c69b2aaf818001ec5c7663fcce258f" +) + + +AnyFirmware = c.Struct( + "vendor_header" / c.Optional(firmware.VendorHeader), + "image" / c.Optional(firmware.FirmwareImage), +) + + +class ImageType(Enum): + VENDOR_HEADER = 0 + BOOTLOADER = 1 + FIRMWARE = 2 + + +def _make_dev_keys(*key_bytes: bytes) -> List[bytes]: + return [k * 32 for k in key_bytes] + + +def compute_vhash(vendor_header): + m = vendor_header.sig_m + n = vendor_header.sig_n + pubkeys = vendor_header.pubkeys + h = pyblake2.blake2s() + h.update(struct.pack(" bool: + return all(b == 0 for b in data) + + +def _check_signature_any( + header: c.Container, m: int, pubkeys: List[bytes], is_devel: bool +) -> Optional[bool]: + if all_zero(header.signature) and header.sigmask == 0: + return Status.MISSING + try: + digest = firmware.header_digest(header) + cosi.verify(header.signature, digest, m, pubkeys, header.sigmask) + return Status.VALID if not is_devel else Status.DEVEL + except Exception: + return Status.INVALID + + +# ====================== formatting functions ==================== + + +class LiteralStr(str): + pass + + +def _format_container( + pb: c.Container, + indent: int = 0, + sep: str = " " * 4, + truncate_after: Optional[int] = 64, + truncate_to: Optional[int] = 32, +) -> str: + def mostly_printable(bytes: bytes) -> bool: + if not bytes: + return True + printable = sum(1 for byte in bytes if 0x20 <= byte <= 0x7E) + return printable / len(bytes) > 0.8 + + def pformat(value: Any, indent: int) -> str: + level = sep * indent + leadin = sep * (indent + 1) + + if isinstance(value, LiteralStr): + return value + + if isinstance(value, list): + # short list of simple values + if not value or isinstance(value, (int, bool, Enum)): + return repr(value) + + # long list, one line per entry + lines = ["[", level + "]"] + lines[1:1] = [leadin + pformat(x, indent + 1) for x in value] + return "\n".join(lines) + + if isinstance(value, dict): + lines = ["{"] + for key, val in value.items(): + if key.startswith("_"): + continue + if val is None or val == []: + continue + lines.append(leadin + key + ": " + pformat(val, indent + 1)) + lines.append(level + "}") + return "\n".join(lines) + + if isinstance(value, (bytes, bytearray)): + length = len(value) + suffix = "" + if truncate_after and length > truncate_after: + suffix = "..." + value = value[: truncate_to or 0] + if mostly_printable(value): + output = repr(value) + else: + output = value.hex() + return "{} bytes {}{}".format(length, output, suffix) + + if isinstance(value, Enum): + return str(value) + + return repr(value) + + return pformat(pb, indent) + + +def _format_version(version: c.Container) -> str: + version_str = ".".join( + str(version[k]) for k in ("major", "minor", "patch") if k in version + ) + if "build" in version: + version_str += " build {}".format(version.build) + return version_str + + +# =========================== functionality implementations =============== + + +class SignableImage: + NAME = "Unrecognized image" + BIP32_INDEX = None + DEV_KEYS = [] + DEV_KEY_SIGMASK = 0b11 + + def __init__(self, fw: c.Container) -> None: + self.fw = fw + self.header = None + self.public_keys = None + self.sigs_required = firmware.V2_SIGS_REQUIRED + + def digest(self) -> bytes: + return firmware.header_digest(self.header) + + def check_signature(self) -> Status: + raise NotImplementedError + + def rehash(self) -> None: + pass + + def insert_signature(self, signature: bytes, sigmask: int) -> None: + self.header.signature = signature + self.header.sigmask = sigmask + + def dump(self) -> bytes: + return AnyFirmware.build(self.fw) + + def format(self, verbose: bool) -> str: + return _format_container(self.fw) + + +class VendorHeader(SignableImage): + NAME = "vendorheader" + BIP32_INDEX = 1 + DEV_KEYS = _make_dev_keys(b"\x44", b"\x45") + + def __init__(self, fw): + super().__init__(fw) + self.header = fw.vendor_header + self.public_keys = firmware.V2_BOOTLOADER_KEYS + + def check_signature(self) -> Status: + return _check_signature_any( + self.header, self.sigs_required, self.public_keys, False + ) + + def _format(self, terse: bool) -> str: + vh = self.fw.vendor_header + if not terse: + vhash = compute_vhash(vh) + output = [ + "Vendor Header " + _format_container(vh), + "Pubkey bundle hash: {}".format(vhash.hex()), + ] + else: + output = [ + "Vendor Header for {vendor} version {version} ({size} bytes)".format( + vendor=click.style(vh.text, bold=True), + version=_format_version(vh.version), + size=vh.header_len, + ), + ] + + fingerprint = firmware.header_digest(vh) + + if not terse: + output.append( + "Fingerprint: {}".format(click.style(fingerprint.hex(), bold=True)) + ) + + sig_status = self.check_signature() + sym = SYM_OK if sig_status.is_ok() else SYM_FAIL + output.append("{} Signature is {}".format(sym, sig_status.value)) + + return "\n".join(output) + + def format(self, verbose: bool = False) -> str: + return self._format(terse=False) + + +class BinImage(SignableImage): + def __init__(self, fw): + super().__init__(fw) + self.header = self.fw.image.header + self.code_hashes = firmware.calculate_code_hashes( + self.fw.image.code, self.fw.image._code_offset + ) + self.digest_header = self.header.copy() + self.digest_header.hashes = self.code_hashes + + def insert_signature(self, signature: bytes, sigmask: int) -> None: + super().insert_signature(signature, sigmask) + self.digest_header.signature = signature + self.digest_header.sigmask = sigmask + + def digest(self) -> bytes: + return firmware.header_digest(self.digest_header) + + def rehash(self): + self.header.hashes = self.code_hashes + + def format(self, verbose: bool = False) -> str: + header_out = self.header.copy() + + if not verbose: + for key in self.header: + if key.startswith("v1"): + del header_out[key] + if "version" in key: + header_out[key] = LiteralStr(_format_version(self.header[key])) + + all_ok = SYM_OK + hash_status = Status.VALID + sig_status = Status.VALID + + hashes_out = [] + for expected, actual in zip(self.header.hashes, self.code_hashes): + status = SYM_OK if expected == actual else SYM_FAIL + hashes_out.append(LiteralStr("{} {}".format(status, expected.hex()))) + + if all(all_zero(h) for h in self.header.hashes): + hash_status = Status.MISSING + elif self.header.hashes != self.code_hashes: + hash_status = Status.INVALID + else: + hash_status = Status.VALID + + header_out["hashes"] = hashes_out + + sig_status = self.check_signature() + all_ok = SYM_OK if hash_status.is_ok() and sig_status.is_ok() else SYM_FAIL + + output = [ + "Firmware Header " + _format_container(header_out), + "Fingerprint: {}".format(click.style(self.digest().hex(), bold=True)), + "{} Signature is {}, hashes are {}".format( + all_ok, sig_status.value, hash_status.value + ), + ] + + return "\n".join(output) + + +class FirmwareImage(BinImage): + NAME = "firmware" + BIP32_INDEX = 2 + DEV_KEYS = _make_dev_keys(b"\x47", b"\x48") + + def __init__(self, fw: c.Container) -> None: + super().__init__(fw) + self.public_keys = fw.vendor_header.pubkeys + self.sigs_required = fw.vendor_header.sig_m + + def check_signature(self) -> Status: + vhash = compute_vhash(self.fw.vendor_header) + return _check_signature_any( + self.digest_header, + self.sigs_required, + self.public_keys, + vhash == VHASH_DEVEL, + ) + + def format(self, verbose: bool = False) -> str: + return ( + VendorHeader(self.fw)._format(terse=not verbose) + + "\n" + + super().format(verbose) + ) + + +class BootloaderImage(BinImage): + NAME = "bootloader" + BIP32_INDEX = 0 + DEV_KEYS = _make_dev_keys(b"\x41", b"\x42") + + def __init__(self, fw): + super().__init__(fw) + self._identify_dev_keys() + + def insert_signature(self, signature: bytes, sigmask: int) -> None: + super().insert_signature(signature, sigmask) + self._identify_dev_keys() + + def _identify_dev_keys(self): + # try checking signature with dev keys first + self.public_keys = firmware.V2_BOARDLOADER_DEV_KEYS + if not self.check_signature().is_ok(): + # validation with dev keys failed, use production keys + self.public_keys = firmware.V2_BOARDLOADER_KEYS + + def check_signature(self) -> Status: + return _check_signature_any( + self.header, + self.sigs_required, + self.public_keys, + self.public_keys == firmware.V2_BOARDLOADER_DEV_KEYS, + ) + + +def parse_image(image: bytes): + fw = AnyFirmware.parse(image) + if fw.vendor_header and not fw.image: + return VendorHeader(fw) + if ( + not fw.vendor_header + and fw.image + and fw.image.header.magic == firmware.HeaderType.BOOTLOADER + ): + return BootloaderImage(fw) + if ( + fw.vendor_header + and fw.image + and fw.image.header.magic == firmware.HeaderType.FIRMWARE + ): + return FirmwareImage(fw) + raise ValueError("Unrecognized image type") diff --git a/python/src/trezorlib/cli/firmware.py b/python/src/trezorlib/cli/firmware.py index 28046763e1..39124a7ddd 100644 --- a/python/src/trezorlib/cli/firmware.py +++ b/python/src/trezorlib/cli/firmware.py @@ -36,18 +36,18 @@ def validate_firmware(version, fw, expected_fingerprint=None): if version == firmware.FirmwareFormat.TREZOR_ONE: if fw.embedded_onev2: click.echo("Trezor One firmware with embedded v2 image (1.8.0 or later)") - _print_version(fw.embedded_onev2.firmware_header.version) + _print_version(fw.embedded_onev2.header.version) else: click.echo("Trezor One firmware image.") elif version == firmware.FirmwareFormat.TREZOR_ONE_V2: click.echo("Trezor One v2 firmware (1.8.0 or later)") - _print_version(fw.firmware_header.version) + _print_version(fw.header.version) elif version == firmware.FirmwareFormat.TREZOR_T: click.echo("Trezor T firmware image.") - vendor = fw.vendor_header.vendor_string + vendor = fw.vendor_header.text vendor_version = "{major}.{minor}".format(**fw.vendor_header.version) click.echo("Vendor header from {}, version {}".format(vendor, vendor_version)) - _print_version(fw.firmware_header.version) + _print_version(fw.image.header.version) try: firmware.validate(version, fw, allow_unsigned=False) @@ -198,17 +198,18 @@ def firmware_update( click.echo("Please switch your device to bootloader mode.") sys.exit(1) + # bootloader for T1 does not export 'model', so we rely on major_version f = client.features - bootloader_onev2 = f.major_version == 1 and f.minor_version >= 8 + bootloader_version = (f.major_version, f.minor_version, f.patch_version) + bootloader_onev2 = f.major_version == 1 and bootloader_version >= (1, 8, 0) if filename: data = open(filename, "rb").read() else: if not url: - bootloader_version = [f.major_version, f.minor_version, f.patch_version] version_list = [int(x) for x in version.split(".")] if version else None url, fp = find_best_firmware_version( - bootloader_version, version_list, beta, bitcoin_only + list(bootloader_version), version_list, beta, bitcoin_only ) if not fingerprint: fingerprint = fp diff --git a/python/src/trezorlib/client.py b/python/src/trezorlib/client.py index 2eaaf2f40b..24b4ccbb48 100644 --- a/python/src/trezorlib/client.py +++ b/python/src/trezorlib/client.py @@ -15,6 +15,7 @@ # If not, see . import logging +import os import sys import warnings from types import SimpleNamespace @@ -73,13 +74,16 @@ def get_default_client(path=None, ui=None, **kwargs): Returns a TrezorClient instance with minimum fuss. - If no path is specified, finds first connected Trezor. Otherwise performs - a prefix-search for the specified device. If no UI is supplied, instantiates - the default CLI UI. + If path is specified, does a prefix-search for the specified device. Otherwise, uses + the value of TREZOR_PATH env variable, or finds first connected Trezor. + If no UI is supplied, instantiates the default CLI UI. """ from .transport import get_transport from .ui import ClickUI + if path is None: + path = os.getenv("TREZOR_PATH") + transport = get_transport(path, prefix_search=True) if ui is None: ui = ClickUI() diff --git a/python/src/trezorlib/cosi.py b/python/src/trezorlib/cosi.py index 057693c9a1..51e59acb33 100644 --- a/python/src/trezorlib/cosi.py +++ b/python/src/trezorlib/cosi.py @@ -67,31 +67,45 @@ def get_nonce( return r, Ed25519PublicPoint(_ed25519.encodepoint(R)) -def verify( +def verify_combined( signature: Ed25519Signature, digest: bytes, pub_key: Ed25519PublicPoint ) -> None: - """Verify Ed25519 signature. Raise exception if the signature is invalid.""" + """Verify Ed25519 signature. Raise exception if the signature is invalid. + + A CoSi combined signature is equivalent to a plain Ed25519 signature with a public + key that is a combination of the cosigners' public keys. This function takes the + combined public key and performs simple Ed25519 verification. + """ # XXX this *might* change to bool function _ed25519.checkvalid(signature, digest, pub_key) -def verify_m_of_n( +def verify( signature: Ed25519Signature, digest: bytes, - m: int, - n: int, - mask: int, + sigs_required: int, keys: List[Ed25519PublicPoint], + mask: int, ) -> None: - if m < 1: - raise ValueError("At least 1 signer must be specified") - selected_keys = [keys[i] for i in range(n) if mask & (1 << i)] - if len(selected_keys) < m: - raise ValueError( - "Not enough signers ({} required, {} found)".format(m, len(selected_keys)) - ) + """Verify a CoSi multi-signature. Raise exception if the signature is invalid. + + This function verifies a M-of-N signature scheme. The arguments are: + - the minimum number M of signatures required + - public keys of all N possible cosigners + - a bitmask specifying which of the N cosigners have produced the signature. + + The verification checks that the mask specifies at least M cosigners, then combines + the selected public keys and verifies the signature against the combined key. + """ + if sigs_required < 1: + raise ValueError("At least one signer must be specified.") + if mask.bit_length() > len(keys): + raise ValueError("Sigmask specifies more public keys than provided.") + selected_keys = [key for i, key in enumerate(keys) if mask & (1 << i)] + if len(selected_keys) < sigs_required: + raise _ed25519.SignatureMismatch("Insufficient number of signatures.") global_pk = combine_keys(selected_keys) - return verify(signature, digest, global_pk) + return verify_combined(signature, digest, global_pk) def pubkey_from_privkey(privkey: Ed25519PrivateKey) -> Ed25519PublicPoint: diff --git a/python/src/trezorlib/firmware.py b/python/src/trezorlib/firmware.py index 36ed6be399..c6947fef75 100644 --- a/python/src/trezorlib/firmware.py +++ b/python/src/trezorlib/firmware.py @@ -16,7 +16,7 @@ import hashlib from enum import Enum -from typing import Callable, List, NewType, Tuple +from typing import Callable, List, Tuple import construct as c import ecdsa @@ -30,21 +30,45 @@ except ImportError: V1_SIGNATURE_SLOTS = 3 -V1_BOOTLOADER_KEYS = { - 1: "04d571b7f148c5e4232c3814f777d8faeaf1a84216c78d569b71041ffc768a5b2d810fc3bb134dd026b57e65005275aedef43e155f48fc11a32ec790a93312bd58", - 2: "0463279c0c0866e50c05c799d32bd6bab0188b6de06536d1109d2ed9ce76cb335c490e55aee10cc901215132e853097d5432eda06b792073bd7740c94ce4516cb1", - 3: "0443aedbb6f7e71c563f8ed2ef64ec9981482519e7ef4f4aa98b27854e8c49126d4956d300ab45fdc34cd26bc8710de0a31dbdf6de7435fd0b492be70ac75fde58", - 4: "04877c39fd7c62237e038235e9c075dab261630f78eeb8edb92487159fffedfdf6046c6f8b881fa407c4a4ce6c28de0b19c1f4e29f1fcbc5a58ffd1432a3e0938a", - 5: "047384c51ae81add0a523adbb186c91b906ffb64c2c765802bf26dbd13bdf12c319e80c2213a136c8ee03d7874fd22b70d68e7dee469decfbbb510ee9a460cda45", -} +V1_BOOTLOADER_KEYS = [ + bytes.fromhex(key) + for key in ( + "04d571b7f148c5e4232c3814f777d8faeaf1a84216c78d569b71041ffc768a5b2d810fc3bb134dd026b57e65005275aedef43e155f48fc11a32ec790a93312bd58", + "0463279c0c0866e50c05c799d32bd6bab0188b6de06536d1109d2ed9ce76cb335c490e55aee10cc901215132e853097d5432eda06b792073bd7740c94ce4516cb1", + "0443aedbb6f7e71c563f8ed2ef64ec9981482519e7ef4f4aa98b27854e8c49126d4956d300ab45fdc34cd26bc8710de0a31dbdf6de7435fd0b492be70ac75fde58", + "04877c39fd7c62237e038235e9c075dab261630f78eeb8edb92487159fffedfdf6046c6f8b881fa407c4a4ce6c28de0b19c1f4e29f1fcbc5a58ffd1432a3e0938a", + "047384c51ae81add0a523adbb186c91b906ffb64c2c765802bf26dbd13bdf12c319e80c2213a136c8ee03d7874fd22b70d68e7dee469decfbbb510ee9a460cda45", + ) +] + +V2_BOARDLOADER_KEYS = [ + bytes.fromhex(key) + for key in ( + "0eb9856be9ba7e972c7f34eac1ed9b6fd0efd172ec00faf0c589759da4ddfba0", + "ac8ab40b32c98655798fd5da5e192be27a22306ea05c6d277cdff4a3f4125cd8", + "ce0fcd12543ef5936cf2804982136707863d17295faced72af171d6e6513ff06", + ) +] + +V2_BOARDLOADER_DEV_KEYS = [ + bytes.fromhex(key) + for key in ( + "db995fe25169d141cab9bbba92baa01f9f2e1ece7df4cb2ac05190f37fcc1f9d", + "2152f8d19b791d24453242e15f2eab6cb7cffa7b6a5ed30097960e069881db12", + "22fc297792f0b6ffc0bfcfdb7edb0c0aa14e025a365ec0e342e86e3829cb74b6", + ) +] V2_BOOTLOADER_KEYS = [ - bytes.fromhex("c2c87a49c5a3460977fbb2ec9dfe60f06bd694db8244bd4981fe3b7a26307f3f"), - bytes.fromhex("80d036b08739b846f4cb77593078deb25dc9487aedcf52e30b4fb7cd7024178a"), - bytes.fromhex("b8307a71f552c60a4cbb317ff48b82cdbf6b6bb5f04c920fec7badf017883751"), + bytes.fromhex(key) + for key in ( + "c2c87a49c5a3460977fbb2ec9dfe60f06bd694db8244bd4981fe3b7a26307f3f", + "80d036b08739b846f4cb77593078deb25dc9487aedcf52e30b4fb7cd7024178a", + "b8307a71f552c60a4cbb317ff48b82cdbf6b6bb5f04c920fec7badf017883751", + ) ] -V2_BOOTLOADER_M = 2 -V2_BOOTLOADER_N = 3 + +V2_SIGS_REQUIRED = 2 ONEV2_CHUNK_SIZE = 1024 * 64 V2_CHUNK_SIZE = 1024 * 128 @@ -80,6 +104,11 @@ class ToifMode(Enum): grayscale = b"g" +class HeaderType(Enum): + FIRMWARE = b"TRZF" + BOOTLOADER = b"TRZB" + + class EnumAdapter(c.Adapter): def __init__(self, subcon, enum): self.enum = enum @@ -106,7 +135,7 @@ Toif = c.Struct( VendorTrust = c.Transformed(c.BitStruct( - "reserved" / c.Default(c.BitsInteger(9), 0), + "_reserved" / c.Default(c.BitsInteger(9), 0), "show_vendor_string" / c.Flag, "require_user_click" / c.Flag, "red_background" / c.Flag, @@ -123,13 +152,13 @@ VendorHeader = c.Struct( "major" / c.Int8ul, "minor" / c.Int8ul, ), - "vendor_sigs_required" / c.Int8ul, - "vendor_sigs_n" / c.Rebuild(c.Int8ul, c.len_(c.this.pubkeys)), - "vendor_trust" / VendorTrust, - "reserved" / c.Padding(14), - "pubkeys" / c.Bytes(32)[c.this.vendor_sigs_n], - "vendor_string" / c.Aligned(4, c.PascalString(c.Int8ul, "utf-8")), - "vendor_image" / Toif, + "sig_m" / c.Int8ul, + "sig_n" / c.Rebuild(c.Int8ul, c.len_(c.this.pubkeys)), + "trust" / VendorTrust, + "_reserved" / c.Padding(14), + "pubkeys" / c.Bytes(32)[c.this.sig_n], + "text" / c.Aligned(4, c.PascalString(c.Int8ul, "utf-8")), + "image" / Toif, "_data_end_offset" / c.Tell, c.Padding(-(c.this._data_end_offset + 65) % 512), @@ -154,7 +183,7 @@ VersionLong = c.Struct( FirmwareHeader = c.Struct( "_start_offset" / c.Tell, - "magic" / c.Const(b"TRZF"), + "magic" / EnumAdapter(c.Bytes(4), HeaderType), "header_len" / c.Int32ul, "expiry" / c.Int32ul, "code_length" / c.Rebuild( @@ -165,13 +194,13 @@ FirmwareHeader = c.Struct( ), "version" / VersionLong, "fix_version" / VersionLong, - "reserved" / c.Padding(8), + "_reserved" / c.Padding(8), "hashes" / c.Bytes(32)[16], "v1_signatures" / c.Bytes(64)[V1_SIGNATURE_SLOTS], "v1_key_indexes" / c.Int8ul[V1_SIGNATURE_SLOTS], # pylint: disable=E1136 - "reserved" / c.Padding(220), + "_reserved" / c.Padding(220), "sigmask" / c.Byte, "signature" / c.Bytes(64), @@ -187,24 +216,37 @@ FirmwareHeader = c.Struct( ) -Firmware = c.Struct( +"""Raw firmware image. + +Consists of firmware header and code block. +This is the expected format of firmware binaries for Trezor One, or bootloader images +for Trezor T.""" +FirmwareImage = c.Struct( + "header" / FirmwareHeader, + "_code_offset" / c.Tell, + "code" / c.Bytes(c.this.header.code_length), + c.Terminated, +) + + +"""Firmware image prefixed by a vendor header. + +This is the expected format of firmware binaries for Trezor T.""" +VendorFirmware = c.Struct( "vendor_header" / VendorHeader, - "firmware_header" / FirmwareHeader, - "_code_offset" / c.Tell, - "code" / c.Bytes(c.this.firmware_header.code_length), + "image" / FirmwareImage, c.Terminated, ) -FirmwareOneV2 = c.Struct( - "firmware_header" / FirmwareHeader, - "_code_offset" / c.Tell, - "code" / c.Bytes(c.this.firmware_header.code_length), - c.Terminated, -) +"""Legacy firmware image. +Consists of a custom header and code block. +This is the expected format of firmware binaries for Trezor One pre-1.8.0. - -FirmwareOne = c.Struct( +The code block can optionally be interpreted as a new-style firmware image. That is the +expected format of firmware binary for Trezor One version 1.8.0, which can be installed +by both the older and the newer bootloader.""" +LegacyFirmware = c.Struct( "magic" / c.Const(b"TRZR"), "code_length" / c.Rebuild(c.Int32ul, c.len_(c.this.code)), "key_indexes" / c.Int8ul[V1_SIGNATURE_SLOTS], # pylint: disable=E1136 @@ -212,12 +254,12 @@ FirmwareOne = c.Struct( c.Padding(7), "restore_storage" / c.Flag, ), - "reserved" / c.Padding(52), + "_reserved" / c.Padding(52), "signatures" / c.Bytes(64)[V1_SIGNATURE_SLOTS], "code" / c.Bytes(c.this.code_length), c.Terminated, - "embedded_onev2" / c.RestreamData(c.this.code, c.Optional(FirmwareOneV2)), + "embedded_onev2" / c.RestreamData(c.this.code, c.Optional(FirmwareImage)), ) # fmt: on @@ -229,20 +271,19 @@ class FirmwareFormat(Enum): TREZOR_ONE_V2 = 3 -FirmwareType = NewType("FirmwareType", c.Container) -ParsedFirmware = Tuple[FirmwareFormat, FirmwareType] +ParsedFirmware = Tuple[FirmwareFormat, c.Container] def parse(data: bytes) -> ParsedFirmware: if data[:4] == b"TRZR": version = FirmwareFormat.TREZOR_ONE - cls = FirmwareOne + cls = LegacyFirmware elif data[:4] == b"TRZV": version = FirmwareFormat.TREZOR_T - cls = Firmware + cls = VendorFirmware elif data[:4] == b"TRZF": version = FirmwareFormat.TREZOR_ONE_V2 - cls = FirmwareOneV2 + cls = FirmwareImage else: raise ValueError("Unrecognized firmware image type") @@ -250,10 +291,10 @@ def parse(data: bytes) -> ParsedFirmware: fw = cls.parse(data) except Exception as e: raise FirmwareIntegrityError("Invalid firmware image") from e - return version, FirmwareType(fw) + return version, fw -def digest_onev1(fw: FirmwareType) -> bytes: +def digest_onev1(fw: c.Container) -> bytes: return hashlib.sha256(fw.code).digest() @@ -272,14 +313,14 @@ def check_sig_v1( ) for i in range(len(key_indexes)): - key_idx = key_indexes[i] + key_idx = key_indexes[i] - 1 signature = signatures[i] - if key_idx not in V1_BOOTLOADER_KEYS: + if key_idx >= len(V1_BOOTLOADER_KEYS): # unknown pubkey raise InvalidSignatureError("Unknown key in slot {}".format(i)) - pubkey = bytes.fromhex(V1_BOOTLOADER_KEYS[key_idx])[1:] + pubkey = V1_BOOTLOADER_KEYS[key_idx][1:] verify = ecdsa.VerifyingKey.from_string(pubkey, curve=ecdsa.curves.SECP256k1) try: verify.verify_digest(signature, digest) @@ -287,72 +328,88 @@ def check_sig_v1( raise InvalidSignatureError("Invalid signature in slot {}".format(i)) from e -def _header_digest( - header: c.Container, header_type: c.Construct, hash_function: Callable = blake2s -) -> bytes: +def header_digest(header: c.Container, hash_function: Callable = blake2s) -> bytes: stripped_header = header.copy() stripped_header.sigmask = 0 stripped_header.signature = b"\0" * 64 stripped_header.v1_key_indexes = [0, 0, 0] stripped_header.v1_signatures = [b"\0" * 64] * 3 + if header.magic == b"TRZV": + header_type = VendorHeader + else: + header_type = FirmwareHeader header_bytes = header_type.build(stripped_header) return hash_function(header_bytes).digest() -def digest_v2(fw: FirmwareType) -> bytes: - return _header_digest(fw.firmware_header, FirmwareHeader, blake2s) +def digest_v2(fw: c.Container) -> bytes: + return header_digest(fw.image.header, blake2s) -def digest_onev2(fw: FirmwareType) -> bytes: - return _header_digest(fw.firmware_header, FirmwareHeader, hashlib.sha256) +def digest_onev2(fw: c.Container) -> bytes: + return header_digest(fw.header, hashlib.sha256) -def validate_code_hashes( - fw: FirmwareType, +def calculate_code_hashes( + code: bytes, + code_offset: int, hash_function: Callable = blake2s, chunk_size: int = V2_CHUNK_SIZE, padding_byte: bytes = None, ) -> None: - for i, expected_hash in enumerate(fw.firmware_header.hashes): - if i == 0: - # Because first chunk is sent along with headers, there is less code in it. - chunk = fw.code[: chunk_size - fw._code_offset] - else: - # Subsequent chunks are shifted by the "missing header" size. - ptr = i * chunk_size - fw._code_offset - chunk = fw.code[ptr : ptr + chunk_size] - - # padding for last chunk - if padding_byte is not None and i > 1 and chunk and len(chunk) < chunk_size: + hashes = [] + # End offset for each chunk. Normally this would be (i+1)*chunk_size for i-th chunk, + # but the first chunk is shorter by code_offset, so all end offsets are shifted. + ends = [(i + 1) * chunk_size - code_offset for i in range(16)] + start = 0 + for end in ends: + chunk = code[start:end] + # padding for last non-empty chunk + if padding_byte is not None and start < len(code) and end > len(code): chunk += padding_byte[0:1] * (chunk_size - len(chunk)) - if not chunk and expected_hash == b"\0" * 32: - continue - chunk_hash = hash_function(chunk).digest() - if chunk_hash != expected_hash: - raise FirmwareIntegrityError("Invalid firmware data.") + if not chunk: + hashes.append(b"\0" * 32) + else: + hashes.append(hash_function(chunk).digest()) + + start = end + + return hashes -def validate_onev2(fw: FirmwareType, allow_unsigned: bool = False) -> None: +def validate_code_hashes(fw: c.Container, version: FirmwareFormat) -> None: + if version == FirmwareFormat.TREZOR_ONE_V2: + image = fw + hash_function = hashlib.sha256 + chunk_size = ONEV2_CHUNK_SIZE + padding_byte = b"\xff" + else: + image = fw.image + hash_function = blake2s + chunk_size = V2_CHUNK_SIZE + padding_byte = None + + expected_hashes = calculate_code_hashes( + image.code, image._code_offset, hash_function, chunk_size, padding_byte + ) + if expected_hashes != image.header.hashes: + raise FirmwareIntegrityError("Invalid firmware data.") + + +def validate_onev2(fw: c.Container, allow_unsigned: bool = False) -> None: try: check_sig_v1( - digest_onev2(fw), - fw.firmware_header.v1_key_indexes, - fw.firmware_header.v1_signatures, + digest_onev2(fw), fw.header.v1_key_indexes, fw.header.v1_signatures, ) except Unsigned: if not allow_unsigned: raise - validate_code_hashes( - fw, - hash_function=hashlib.sha256, - chunk_size=ONEV2_CHUNK_SIZE, - padding_byte=b"\xFF", - ) + validate_code_hashes(fw, FirmwareFormat.TREZOR_ONE_V2) -def validate_onev1(fw: FirmwareType, allow_unsigned: bool = False) -> None: +def validate_onev1(fw: c.Container, allow_unsigned: bool = False) -> None: try: check_sig_v1(digest_onev1(fw), fw.key_indexes, fw.signatures) except Unsigned: @@ -362,21 +419,20 @@ def validate_onev1(fw: FirmwareType, allow_unsigned: bool = False) -> None: validate_onev2(fw.embedded_onev2, allow_unsigned) -def validate_v2(fw: FirmwareType, skip_vendor_header: bool = False) -> None: - vendor_fingerprint = _header_digest(fw.vendor_header, VendorHeader) +def validate_v2(fw: c.Container, skip_vendor_header: bool = False) -> None: + vendor_fingerprint = header_digest(fw.vendor_header) fingerprint = digest_v2(fw) if not skip_vendor_header: try: # if you want to validate a custom vendor header, you can modify # the global variables to match your keys and m-of-n scheme - cosi.verify_m_of_n( + cosi.verify( fw.vendor_header.signature, vendor_fingerprint, - V2_BOOTLOADER_M, - V2_BOOTLOADER_N, - fw.vendor_header.sigmask, + V2_SIGS_REQUIRED, V2_BOOTLOADER_KEYS, + fw.vendor_header.sigmask, ) except Exception: raise InvalidSignatureError("Invalid vendor header signature.") @@ -387,24 +443,23 @@ def validate_v2(fw: FirmwareType, skip_vendor_header: bool = False) -> None: # raise ValueError("Vendor header expired.") try: - cosi.verify_m_of_n( - fw.firmware_header.signature, + cosi.verify( + fw.image.header.signature, fingerprint, - fw.vendor_header.vendor_sigs_required, - fw.vendor_header.vendor_sigs_n, - fw.firmware_header.sigmask, + fw.vendor_header.sig_m, fw.vendor_header.pubkeys, + fw.image.header.sigmask, ) except Exception: raise InvalidSignatureError("Invalid firmware signature.") # XXX expiry is not used now - # if time.gmtime(fw.firmware_header.expiry) < now: + # if time.gmtime(fw.image.header.expiry) < now: # raise ValueError("Firmware header expired.") - validate_code_hashes(fw) + validate_code_hashes(fw, FirmwareFormat.TREZOR_T) -def digest(version: FirmwareFormat, fw: FirmwareType) -> bytes: +def digest(version: FirmwareFormat, fw: c.Container) -> bytes: if version == FirmwareFormat.TREZOR_ONE: return digest_onev1(fw) elif version == FirmwareFormat.TREZOR_ONE_V2: @@ -416,7 +471,7 @@ def digest(version: FirmwareFormat, fw: FirmwareType) -> bytes: def validate( - version: FirmwareFormat, fw: FirmwareType, allow_unsigned: bool = False + version: FirmwareFormat, fw: c.Container, allow_unsigned: bool = False ) -> None: if version == FirmwareFormat.TREZOR_ONE: return validate_onev1(fw, allow_unsigned) diff --git a/python/tests/test_cosi.py b/python/tests/test_cosi.py index 88281f5767..e58b6b1bf6 100644 --- a/python/tests/test_cosi.py +++ b/python/tests/test_cosi.py @@ -104,13 +104,13 @@ def test_single_eddsa_vector(privkey, pubkey, message, signature): my_pubkey = cosi.pubkey_from_privkey(privkey) assert my_pubkey == pubkey try: - cosi.verify(signature, message, pubkey) + cosi.verify_combined(signature, message, pubkey) except ValueError: pytest.fail("Signature does not verify.") fake_signature = signature[:37] + b"\xf0" + signature[38:] with pytest.raises(_ed25519.SignatureMismatch): - cosi.verify(fake_signature, message, pubkey) + cosi.verify_combined(fake_signature, message, pubkey) def test_combine_keys(): @@ -148,7 +148,7 @@ def test_cosi_combination(keyset): global_sig = cosi.combine_sig(global_commit, signatures) try: - cosi.verify(global_sig, message, global_pk) + cosi.verify_combined(global_sig, message, global_pk) except Exception: pytest.fail("Failed to validate global signature") @@ -175,25 +175,27 @@ def test_m_of_n(): try: # this is what we are actually doing - cosi.verify_m_of_n(global_sig, message, 3, 4, sigmask, pubkeys) + cosi.verify(global_sig, message, 3, pubkeys, sigmask) # we can require less signers too - cosi.verify_m_of_n(global_sig, message, 1, 4, sigmask, pubkeys) + cosi.verify(global_sig, message, 1, pubkeys, sigmask) except Exception: pytest.fail("Failed to validate by sigmask") # and now for various ways that should fail with pytest.raises(ValueError) as e: - cosi.verify_m_of_n(global_sig, message, 4, 4, sigmask, pubkeys) - assert "Not enough signers" in e.value.args[0] + cosi.verify(global_sig, message, 3, pubkeys[:2], sigmask) + assert "more public keys than provided" in e.value.args[0] - with pytest.raises(_ed25519.SignatureMismatch): - # when N < number of possible signers, the topmost signers will be ignored - cosi.verify_m_of_n(global_sig, message, 2, 3, sigmask, pubkeys) + with pytest.raises(ValueError) as e: + cosi.verify(global_sig, message, 0, pubkeys, 0) + assert "At least one signer" in e.value.args[0] - with pytest.raises(_ed25519.SignatureMismatch): + with pytest.raises(_ed25519.SignatureMismatch) as e: + # at least 5 signatures required + cosi.verify(global_sig, message, 5, pubkeys, sigmask) + assert "Insufficient number of signatures" in e.value.args[0] + + with pytest.raises(_ed25519.SignatureMismatch) as e: # wrong sigmask - cosi.verify_m_of_n(global_sig, message, 1, 4, 5, pubkeys) - - with pytest.raises(ValueError): - # can't use "0 of N" scheme - cosi.verify_m_of_n(global_sig, message, 0, 4, sigmask, pubkeys) + cosi.verify(global_sig, message, 3, pubkeys, 7) + assert "signature does not pass verification" in e.value.args[0] diff --git a/tests/device_tests/test_cosi.py b/tests/device_tests/test_cosi.py index aadb33e9dd..1f562bb589 100644 --- a/tests/device_tests/test_cosi.py +++ b/tests/device_tests/test_cosi.py @@ -73,7 +73,7 @@ class TestCosi: global_R, [sig0.signature, sig1.signature, sig2.signature] ) - cosi.verify(sig, digest, global_pk) + cosi.verify_combined(sig, digest, global_pk) def test_cosi_compat(self, client): digest = sha256(b"this is not a pipe").digest() @@ -94,4 +94,4 @@ class TestCosi: ) sig = cosi.combine_sig(global_R, [remote_sig.signature, local_sig]) - cosi.verify(sig, digest, global_pk) + cosi.verify_combined(sig, digest, global_pk)