commit
06003d0e01
@ -1,13 +1,12 @@
|
||||
BINCTL=../../tools/binctl
|
||||
KEYCTL=../../tools/keyctl
|
||||
BUILDVH=../../tools/build_vendorheader
|
||||
BINCTL=../../tools/headertool.py
|
||||
|
||||
# construct the default unsafe vendor header
|
||||
$BUILDVH e28a8970753332bd72fef413e6b0b2ef1b4aadda7aa2c141f233712a6876b351:d4eec1869fb1b8a4e817516ad5a931557cb56805c3eb16e8f3a803d647df7869:772c8a442b7db06e166cfbc1ccbcbcde6f3eba76a4e98ef3ffc519502237d6ef 2 0.0 xxx...x "UNSAFE, DO NOT USE!" vendor_unsafe.toif vendorheader_unsafe_unsigned.bin
|
||||
# construct all vendor headers
|
||||
for fn in *.json; do
|
||||
name=$(echo $fn | sed 's/vendor_\(.*\)\.json/\1/')
|
||||
$BUILDVH vendor_${name}.json vendor_${name}.toif vendorheader_${name}_unsigned.bin
|
||||
done
|
||||
|
||||
# sign the default unsafe vendor header using development keys
|
||||
# sign dev vendor header
|
||||
cp -a vendorheader_unsafe_unsigned.bin vendorheader_unsafe_signed_dev.bin
|
||||
$BINCTL vendorheader_unsafe_signed_dev.bin -s 1:2 `$KEYCTL sign vendorheader vendorheader_unsafe_signed_dev.bin 4444444444444444444444444444444444444444444444444444444444444444 4545454545454545454545454545454545454545454545454545454545454545`
|
||||
|
||||
# construct SatoshiLabs vendor header
|
||||
$BUILDVH 47fbdc84d8abef44fe6abde8f87b6ead821b7082ec63b9f7cc33dc53bf6c708d:9af22a52ab47a93091403612b3d6731a2dfef8a33383048ed7556a20e8b03c81:2218c25f8ba70c82eba8ed6a321df209c0a7643d014f33bf9317846f62923830 2 0.0 ....... SatoshiLabs vendor_satoshilabs.toif vendorheader_satoshilabs_unsigned.bin
|
||||
$BINCTL -D vendorheader_unsafe_signed_dev.bin
|
||||
|
@ -0,0 +1,20 @@
|
||||
{
|
||||
"text": "SatoshiLabs",
|
||||
"expiry": 0,
|
||||
"version": {
|
||||
"major": 0,
|
||||
"minor": 0
|
||||
},
|
||||
"sig_m": 2,
|
||||
"trust": {
|
||||
"show_vendor_string": false,
|
||||
"require_user_click": false,
|
||||
"red_background": false,
|
||||
"delay": 0
|
||||
},
|
||||
"pubkeys": [
|
||||
"47fbdc84d8abef44fe6abde8f87b6ead821b7082ec63b9f7cc33dc53bf6c708d",
|
||||
"9af22a52ab47a93091403612b3d6731a2dfef8a33383048ed7556a20e8b03c81",
|
||||
"2218c25f8ba70c82eba8ed6a321df209c0a7643d014f33bf9317846f62923830"
|
||||
]
|
||||
}
|
@ -0,0 +1,20 @@
|
||||
{
|
||||
"text": "UNSAFE, DO NOT USE!",
|
||||
"expiry": 0,
|
||||
"version": {
|
||||
"major": 0,
|
||||
"minor": 0
|
||||
},
|
||||
"sig_m": 2,
|
||||
"trust": {
|
||||
"show_vendor_string": true,
|
||||
"require_user_click": true,
|
||||
"red_background": true,
|
||||
"delay": 1
|
||||
},
|
||||
"pubkeys": [
|
||||
"e28a8970753332bd72fef413e6b0b2ef1b4aadda7aa2c141f233712a6876b351",
|
||||
"d4eec1869fb1b8a4e817516ad5a931557cb56805c3eb16e8f3a803d647df7869",
|
||||
"772c8a442b7db06e166cfbc1ccbcbcde6f3eba76a4e98ef3ffc519502237d6ef"
|
||||
]
|
||||
}
|
@ -1,359 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from __future__ import print_function
|
||||
|
||||
import sys
|
||||
import struct
|
||||
import binascii
|
||||
|
||||
import pyblake2
|
||||
|
||||
from trezorlib import cosi
|
||||
|
||||
|
||||
def format_sigmask(sigmask):
|
||||
bits = [str(b + 1) if sigmask & (1 << b) else "." for b in range(8)]
|
||||
return "0x%02x = [%s]" % (sigmask, " ".join(bits))
|
||||
|
||||
|
||||
def format_vtrust(vtrust):
|
||||
bits = [str(b) if vtrust & (1 << b) == 0 else "." for b in range(16)]
|
||||
# see docs/bootloader.md for vtrust constants
|
||||
desc = ""
|
||||
wait = (vtrust & 0x000F) ^ 0x000F
|
||||
if wait > 0:
|
||||
desc = "WAIT_%d" % wait
|
||||
if vtrust & 0x0010 == 0:
|
||||
desc += " RED"
|
||||
if vtrust & 0x0020 == 0:
|
||||
desc += " CLICK"
|
||||
if vtrust & 0x0040 == 0:
|
||||
desc += " STRING"
|
||||
return "%d = [%s] = [%s]" % (vtrust, " ".join(bits), desc)
|
||||
|
||||
|
||||
# bootloader/firmware headers specification: https://github.com/trezor/trezor-core/blob/master/docs/bootloader.md
|
||||
|
||||
IMAGE_HEADER_SIZE = 1024
|
||||
IMAGE_SIG_SIZE = 65
|
||||
IMAGE_CHUNK_SIZE = 128 * 1024
|
||||
BOOTLOADER_SECTORS_COUNT = 1
|
||||
FIRMWARE_SECTORS_COUNT = 6 + 7
|
||||
|
||||
|
||||
class BinImage(object):
|
||||
def __init__(self, data, magic, max_size):
|
||||
header = struct.unpack("<4sIIIBBBBBBBB8s512s415sB64s", data[:IMAGE_HEADER_SIZE])
|
||||
self.magic, self.hdrlen, self.expiry, self.codelen, self.vmajor, self.vminor, self.vpatch, self.vbuild, self.fix_vmajor, self.fix_vminor, self.fix_vpatch, self.fix_vbuild, self.reserved1, self.hashes, self.reserved2, self.sigmask, self.sig = (
|
||||
header
|
||||
)
|
||||
assert self.magic == magic
|
||||
assert self.hdrlen == IMAGE_HEADER_SIZE
|
||||
total_len = self.hdrlen + self.codelen
|
||||
assert total_len % 512 == 0
|
||||
assert total_len >= 4 * 1024
|
||||
assert total_len <= max_size
|
||||
assert self.reserved1 == 8 * b"\x00"
|
||||
assert self.reserved2 == 415 * b"\x00"
|
||||
self.code = data[self.hdrlen :]
|
||||
assert len(self.code) == self.codelen
|
||||
|
||||
def print(self):
|
||||
if self.magic == b"TRZF":
|
||||
print("Trezor Firmware Image")
|
||||
total_len = self.vhdrlen + self.hdrlen + self.codelen
|
||||
elif self.magic == b"TRZB":
|
||||
print("Trezor Bootloader Image")
|
||||
total_len = self.hdrlen + self.codelen
|
||||
else:
|
||||
print("Trezor Unknown Image")
|
||||
print(" * magic :", self.magic.decode())
|
||||
print(" * hdrlen :", self.hdrlen)
|
||||
print(" * expiry :", self.expiry)
|
||||
print(" * codelen :", self.codelen)
|
||||
print(
|
||||
" * version : %d.%d.%d.%d"
|
||||
% (self.vmajor, self.vminor, self.vpatch, self.vbuild)
|
||||
)
|
||||
print(
|
||||
" * fixver : %d.%d.%d.%d"
|
||||
% (self.fix_vmajor, self.fix_vminor, self.fix_vpatch, self.fix_vbuild)
|
||||
)
|
||||
print(" * hashes: %s" % ("OK" if self.check_hashes() else "INCORRECT"))
|
||||
for i in range(16):
|
||||
print(
|
||||
" - %02d : %s"
|
||||
% (i, binascii.hexlify(self.hashes[i * 32 : i * 32 + 32]).decode())
|
||||
)
|
||||
print(" * sigmask :", format_sigmask(self.sigmask))
|
||||
print(" * sig :", binascii.hexlify(self.sig).decode())
|
||||
print(" * total : %d bytes" % total_len)
|
||||
print(" * fngprnt :", self.fingerprint())
|
||||
print()
|
||||
|
||||
def compute_hashes(self):
|
||||
if self.magic == b"TRZF":
|
||||
hdrlen = self.vhdrlen + self.hdrlen
|
||||
else:
|
||||
hdrlen = self.hdrlen
|
||||
hashes = b""
|
||||
for i in range(16):
|
||||
if i == 0:
|
||||
d = self.code[: IMAGE_CHUNK_SIZE - hdrlen]
|
||||
else:
|
||||
s = IMAGE_CHUNK_SIZE - hdrlen + (i - 1) * IMAGE_CHUNK_SIZE
|
||||
d = self.code[s : s + IMAGE_CHUNK_SIZE]
|
||||
if len(d) > 0:
|
||||
h = pyblake2.blake2s(d).digest()
|
||||
else:
|
||||
h = 32 * b"\x00"
|
||||
hashes += h
|
||||
return hashes
|
||||
|
||||
def check_hashes(self):
|
||||
return self.hashes == self.compute_hashes()
|
||||
|
||||
def update_hashes(self):
|
||||
self.hashes = self.compute_hashes()
|
||||
|
||||
def serialize_header(self, sig=True):
|
||||
header = struct.pack(
|
||||
"<4sIIIBBBBBBBB8s512s415s",
|
||||
self.magic,
|
||||
self.hdrlen,
|
||||
self.expiry,
|
||||
self.codelen,
|
||||
self.vmajor,
|
||||
self.vminor,
|
||||
self.vpatch,
|
||||
self.vbuild,
|
||||
self.fix_vmajor,
|
||||
self.fix_vminor,
|
||||
self.fix_vpatch,
|
||||
self.fix_vbuild,
|
||||
self.reserved1,
|
||||
self.hashes,
|
||||
self.reserved2,
|
||||
)
|
||||
if sig:
|
||||
header += struct.pack("<B64s", self.sigmask, self.sig)
|
||||
else:
|
||||
header += IMAGE_SIG_SIZE * b"\x00"
|
||||
assert len(header) == self.hdrlen
|
||||
return header
|
||||
|
||||
def fingerprint(self):
|
||||
return pyblake2.blake2s(self.serialize_header(sig=False)).hexdigest()
|
||||
|
||||
def sign(self, sigmask, signature):
|
||||
header = self.serialize_header(sig=False)
|
||||
data = header + self.code
|
||||
assert len(data) == self.hdrlen + self.codelen
|
||||
self.sigmask = sigmask
|
||||
self.sig = signature
|
||||
|
||||
def write(self, filename):
|
||||
with open(filename, "wb") as f:
|
||||
f.write(self.serialize_header())
|
||||
f.write(self.code)
|
||||
|
||||
|
||||
class FirmwareImage(BinImage):
|
||||
def __init__(self, data, vhdrlen):
|
||||
super(FirmwareImage, self).__init__(
|
||||
data[vhdrlen:],
|
||||
magic=b"TRZF",
|
||||
max_size=FIRMWARE_SECTORS_COUNT * IMAGE_CHUNK_SIZE,
|
||||
)
|
||||
self.vhdrlen = vhdrlen
|
||||
self.vheader = data[:vhdrlen]
|
||||
|
||||
def write(self, filename):
|
||||
with open(filename, "wb") as f:
|
||||
f.write(self.vheader)
|
||||
f.write(self.serialize_header())
|
||||
f.write(self.code)
|
||||
|
||||
|
||||
class BootloaderImage(BinImage):
|
||||
def __init__(self, data):
|
||||
super(BootloaderImage, self).__init__(
|
||||
data, magic=b"TRZB", max_size=BOOTLOADER_SECTORS_COUNT * IMAGE_CHUNK_SIZE
|
||||
)
|
||||
|
||||
|
||||
class VendorHeader(object):
|
||||
def __init__(self, data):
|
||||
header = struct.unpack("<4sIIBBBBH", data[:18])
|
||||
self.magic, self.hdrlen, self.expiry, self.vmajor, self.vminor, self.vsig_m, self.vsig_n, self.vtrust = (
|
||||
header
|
||||
)
|
||||
assert self.magic == b"TRZV"
|
||||
data = data[: self.hdrlen] # strip remaining data (firmware)
|
||||
assert self.vsig_m > 0 and self.vsig_m <= self.vsig_n
|
||||
assert self.vsig_n > 0 and self.vsig_n <= 8
|
||||
p = 32
|
||||
self.vpub = []
|
||||
for _ in range(self.vsig_n):
|
||||
self.vpub.append(data[p : p + 32])
|
||||
p += 32
|
||||
self.vstr_len = data[p]
|
||||
p += 1
|
||||
self.vstr = data[p : p + self.vstr_len]
|
||||
p += self.vstr_len
|
||||
vstr_pad = -p & 3
|
||||
p += vstr_pad
|
||||
self.vimg_len = len(data) - IMAGE_SIG_SIZE - p
|
||||
self.vimg = data[p : p + self.vimg_len]
|
||||
p += self.vimg_len
|
||||
self.sigmask = data[p]
|
||||
p += 1
|
||||
self.sig = data[p : p + 64]
|
||||
assert (
|
||||
len(data)
|
||||
== 4
|
||||
+ 4
|
||||
+ 4
|
||||
+ 1
|
||||
+ 1
|
||||
+ 1
|
||||
+ 1
|
||||
+ 1
|
||||
+ 15
|
||||
+ 32 * len(self.vpub)
|
||||
+ 1
|
||||
+ self.vstr_len
|
||||
+ vstr_pad
|
||||
+ self.vimg_len
|
||||
+ IMAGE_SIG_SIZE
|
||||
)
|
||||
assert len(data) % 512 == 0
|
||||
|
||||
def print(self):
|
||||
print("Trezor Vendor Header")
|
||||
print(" * magic :", self.magic.decode())
|
||||
print(" * hdrlen :", self.hdrlen)
|
||||
print(" * expiry :", self.expiry)
|
||||
print(" * version : %d.%d" % (self.vmajor, self.vminor))
|
||||
print(" * scheme : %d out of %d" % (self.vsig_m, self.vsig_n))
|
||||
print(" * trust :", format_vtrust(self.vtrust))
|
||||
for i in range(self.vsig_n):
|
||||
print(" * vpub #%d :" % (i + 1), binascii.hexlify(self.vpub[i]).decode())
|
||||
print(" * vstr :", self.vstr.decode())
|
||||
print(" * vhash :", binascii.hexlify(self.vhash()).decode())
|
||||
print(" * vimg : (%d bytes)" % len(self.vimg))
|
||||
print(" * sigmask :", format_sigmask(self.sigmask))
|
||||
print(" * sig :", binascii.hexlify(self.sig).decode())
|
||||
print(" * fngprnt :", self.fingerprint())
|
||||
print()
|
||||
|
||||
def serialize_header(self, sig=True):
|
||||
header = struct.pack(
|
||||
"<4sIIBBBBH",
|
||||
self.magic,
|
||||
self.hdrlen,
|
||||
self.expiry,
|
||||
self.vmajor,
|
||||
self.vminor,
|
||||
self.vsig_m,
|
||||
self.vsig_n,
|
||||
self.vtrust,
|
||||
)
|
||||
header += 14 * b"\x00"
|
||||
for i in range(self.vsig_n):
|
||||
header += self.vpub[i]
|
||||
header += struct.pack("<B", self.vstr_len) + self.vstr
|
||||
header += (-len(header) & 3) * b"\x00" # vstr_pad
|
||||
header += self.vimg
|
||||
if sig:
|
||||
header += struct.pack("<B64s", self.sigmask, self.sig)
|
||||
else:
|
||||
header += IMAGE_SIG_SIZE * b"\x00"
|
||||
assert len(header) == self.hdrlen
|
||||
return header
|
||||
|
||||
def fingerprint(self):
|
||||
return pyblake2.blake2s(self.serialize_header(sig=False)).hexdigest()
|
||||
|
||||
def vhash(self):
|
||||
h = pyblake2.blake2s()
|
||||
h.update(struct.pack("<BB", self.vsig_m, self.vsig_n))
|
||||
for i in range(8):
|
||||
if i < self.vsig_n:
|
||||
h.update(self.vpub[i])
|
||||
else:
|
||||
h.update(b"\x00" * 32)
|
||||
return h.digest()
|
||||
|
||||
def sign(self, sigmask, signature):
|
||||
header = self.serialize_header(sig=False)
|
||||
assert len(header) == self.hdrlen
|
||||
self.sigmask = sigmask
|
||||
self.sig = signature
|
||||
|
||||
def write(self, filename):
|
||||
with open(filename, "wb") as f:
|
||||
f.write(self.serialize_header())
|
||||
|
||||
|
||||
def binopen(filename):
|
||||
data = open(filename, "rb").read()
|
||||
magic = data[:4]
|
||||
if magic == b"TRZB":
|
||||
return BootloaderImage(data)
|
||||
if magic == b"TRZV":
|
||||
vheader = VendorHeader(data)
|
||||
if len(data) == vheader.hdrlen:
|
||||
return vheader
|
||||
vheader.print()
|
||||
subdata = data[vheader.hdrlen :]
|
||||
if subdata[:4] == b"TRZF":
|
||||
firmware = FirmwareImage(data, vheader.hdrlen)
|
||||
# check signatures against signing keys in the vendor header
|
||||
if firmware.sigmask > 0:
|
||||
pk = [vheader.vpub[i] for i in range(8) if firmware.sigmask & (1 << i)]
|
||||
global_pk = cosi.combine_keys(pk)
|
||||
hdr = (
|
||||
subdata[: IMAGE_HEADER_SIZE - IMAGE_SIG_SIZE]
|
||||
+ IMAGE_SIG_SIZE * b"\x00"
|
||||
)
|
||||
digest = pyblake2.blake2s(hdr).digest()
|
||||
try:
|
||||
cosi.verify(firmware.sig, digest, global_pk)
|
||||
print("Firmware signature OK")
|
||||
except ValueError:
|
||||
print("Firmware signature INCORRECT")
|
||||
else:
|
||||
print("No firmware signature")
|
||||
return firmware
|
||||
if magic == b"TRZF":
|
||||
return FirmwareImage(data, 0)
|
||||
raise Exception("Unknown file format")
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) < 2:
|
||||
print("Usage: binctl file.bin [-s sigmask signature] [-h]")
|
||||
return 1
|
||||
fn = sys.argv[1]
|
||||
sign = len(sys.argv) > 2 and sys.argv[2] == "-s"
|
||||
rehash = len(sys.argv) == 3 and sys.argv[2] == "-h"
|
||||
b = binopen(fn)
|
||||
if sign:
|
||||
sigmask = 0
|
||||
if ":" in sys.argv[3]:
|
||||
for idx in sys.argv[3].split(":"):
|
||||
sigmask |= 1 << (int(idx) - 1)
|
||||
else:
|
||||
sigmask = 1 << (int(sys.argv[3]) - 1)
|
||||
signature = binascii.unhexlify(sys.argv[4])
|
||||
b.sign(sigmask, signature)
|
||||
b.write(fn)
|
||||
if rehash:
|
||||
b.update_hashes()
|
||||
b.write(fn)
|
||||
b.print()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
@ -1,65 +1,23 @@
|
||||
#!/usr/bin/env python3
|
||||
import sys
|
||||
import struct
|
||||
import binascii
|
||||
import json
|
||||
|
||||
import click
|
||||
|
||||
# encode vendor name, add length byte and padding to multiple of 4
|
||||
def encode_vendor(vname):
|
||||
vbin = vname.encode()
|
||||
vbin = struct.pack("<B", len(vbin)) + vbin
|
||||
vbin += b"\0" * (-len(vbin) & 3)
|
||||
return vbin
|
||||
from trezorlib import firmware
|
||||
|
||||
|
||||
def encode_pubkey(pubkey):
|
||||
if len(pubkey) != 64:
|
||||
raise Exception("Wrong public key length")
|
||||
return binascii.unhexlify(pubkey)
|
||||
@click.command()
|
||||
@click.argument("specfile", type=click.File("r"))
|
||||
@click.argument("image", type=click.File("rb"))
|
||||
@click.argument("outfile", type=click.File("wb"))
|
||||
def build_vendorheader(specfile, image, outfile):
|
||||
spec = json.load(specfile)
|
||||
spec["pubkeys"] = [bytes.fromhex(k) for k in spec["pubkeys"]]
|
||||
spec["image"] = firmware.Toif.parse(image.read())
|
||||
spec["sigmask"] = 0
|
||||
spec["signature"] = b"\x00" * 64
|
||||
outfile.write(firmware.VendorHeader.build(spec))
|
||||
|
||||
|
||||
def decode_vtrust(vtrust):
|
||||
t = 0xFFFF
|
||||
for i, b in enumerate(reversed(vtrust)):
|
||||
if b != ".":
|
||||
t &= ~(1 << i)
|
||||
return t
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) < 7:
|
||||
print(
|
||||
'Usage build_vendorheader "pubkey1hex:pubkey2hex:..." m version vendortrust vendorname vendorimage.toif vendorheader.bin'
|
||||
)
|
||||
return 1
|
||||
|
||||
keys = [encode_pubkey(x) for x in sys.argv[1].split(":")]
|
||||
m = int(sys.argv[2])
|
||||
(vmajor, vminor) = [int(x) for x in sys.argv[3].split(".")]
|
||||
vtrust = decode_vtrust(sys.argv[4])
|
||||
vname = sys.argv[5]
|
||||
ifn = sys.argv[6]
|
||||
ofn = sys.argv[7]
|
||||
if not ifn.endswith(".toif"):
|
||||
print("Must provide TOIF file")
|
||||
return 2
|
||||
|
||||
expiry = 0
|
||||
vheader = b"TRZV" + struct.pack(
|
||||
"<IIBBBBH", 0, expiry, vmajor, vminor, m, len(keys), vtrust
|
||||
)
|
||||
vheader += 14 * b"\0"
|
||||
for k in keys:
|
||||
vheader += k
|
||||
vheader += encode_vendor(vname) + open(ifn, "rb").read()
|
||||
padding = 65 + (-len(vheader) - 65) & 511
|
||||
vheader += b"\0" * padding
|
||||
|
||||
# put in length
|
||||
vheader = vheader[0:4] + struct.pack("<I", len(vheader)) + vheader[8:]
|
||||
|
||||
with open(ofn, "wb") as f:
|
||||
f.write(vheader)
|
||||
|
||||
|
||||
main()
|
||||
if __name__ == "__main__":
|
||||
build_vendorheader()
|
||||
|
@ -0,0 +1,279 @@
|
||||
#!/usr/bin/env python3
|
||||
import click
|
||||
|
||||
from trezorlib import cosi, firmware
|
||||
from trezorlib._internal import firmware_headers
|
||||
|
||||
from typing import List, Tuple
|
||||
|
||||
|
||||
try:
|
||||
import Pyro4
|
||||
Pyro4.config.SERIALIZER = "marshal"
|
||||
except ImportError:
|
||||
Pyro4 = None
|
||||
|
||||
PORT = 5001
|
||||
|
||||
# =========================== signing =========================
|
||||
|
||||
|
||||
def sign_with_privkeys(digest: bytes, privkeys: List[bytes]) -> bytes:
|
||||
"""Locally produce a CoSi signature."""
|
||||
pubkeys = [cosi.pubkey_from_privkey(sk) for sk in privkeys]
|
||||
nonces = [cosi.get_nonce(sk, digest, i) for i, sk in enumerate(privkeys)]
|
||||
|
||||
global_pk = cosi.combine_keys(pubkeys)
|
||||
global_R = cosi.combine_keys(R for r, R in nonces)
|
||||
|
||||
sigs = [
|
||||
cosi.sign_with_privkey(digest, sk, global_pk, r, global_R)
|
||||
for sk, (r, R) in zip(privkeys, nonces)
|
||||
]
|
||||
|
||||
signature = cosi.combine_sig(global_R, sigs)
|
||||
try:
|
||||
cosi.verify_combined(signature, digest, global_pk)
|
||||
except Exception as e:
|
||||
raise click.ClickException(f"Failed to produce valid signature.") from e
|
||||
|
||||
return signature
|
||||
|
||||
|
||||
def parse_privkey_args(privkey_data: List[str]) -> Tuple[int, List[bytes]]:
|
||||
privkeys = []
|
||||
sigmask = 0
|
||||
for key in privkey_data:
|
||||
try:
|
||||
idx, key_hex = key.split(":", maxsplit=1)
|
||||
privkeys.append(bytes.fromhex(key_hex))
|
||||
sigmask |= 1 << (int(idx) - 1)
|
||||
except ValueError:
|
||||
click.echo(f"Could not parse key: {key}")
|
||||
click.echo("Keys must be in the format: <key index>:<hex-encoded key>")
|
||||
raise click.ClickException("Unrecognized key format.")
|
||||
return sigmask, privkeys
|
||||
|
||||
|
||||
def process_remote_signers(fw, addrs: List[str]) -> Tuple[int, List[bytes]]:
|
||||
if len(addrs) < fw.sigs_required:
|
||||
raise click.ClickException(
|
||||
f"Not enough signers (need at least {fw.sigs_required})"
|
||||
)
|
||||
|
||||
digest = fw.digest()
|
||||
name = fw.NAME
|
||||
|
||||
def mkproxy(addr):
|
||||
return Pyro4.Proxy(f"PYRO:keyctl@{addr}:{PORT}")
|
||||
|
||||
sigmask = 0
|
||||
pks, Rs = [], []
|
||||
for addr in addrs:
|
||||
click.echo(f"Connecting to {addr}...")
|
||||
with mkproxy(addr) as proxy:
|
||||
pk, R = proxy.get_commit(name, digest)
|
||||
if pk not in fw.public_keys:
|
||||
raise click.ClickException(
|
||||
f"Signer at {addr} commits with unknown public key {pk.hex()}"
|
||||
)
|
||||
idx = fw.public_keys.index(pk)
|
||||
click.echo(f"Signer at {addr} commits with public key #{idx+1}: {pk.hex()}")
|
||||
sigmask |= 1 << idx
|
||||
pks.append(pk)
|
||||
Rs.append(R)
|
||||
|
||||
# compute global commit
|
||||
global_pk = cosi.combine_keys(pks)
|
||||
global_R = cosi.combine_keys(Rs)
|
||||
|
||||
# collect signatures
|
||||
sigs = []
|
||||
for addr in addrs:
|
||||
click.echo(f"Waiting for {addr} to sign... ", nl=False)
|
||||
with mkproxy(addr) as proxy:
|
||||
sig = proxy.get_signature(name, digest, global_R, global_pk)
|
||||
sigs.append(sig)
|
||||
click.echo("OK")
|
||||
|
||||
for addr in addrs:
|
||||
with mkproxy(addr) as proxy:
|
||||
proxy.finish()
|
||||
|
||||
# compute global signature
|
||||
return sigmask, cosi.combine_sig(global_R, sigs)
|
||||
|
||||
|
||||
# ===================== CLI actions =========================
|
||||
|
||||
|
||||
def do_replace_vendorheader(fw, vh_file) -> None:
|
||||
if not isinstance(fw, firmware_headers.FirmwareImage):
|
||||
raise click.ClickException("Invalid image type (must be firmware).")
|
||||
|
||||
vh = firmware.VendorHeader.parse(vh_file.read())
|
||||
if vh.header_len != fw.fw.vendor_header.header_len:
|
||||
raise click.ClickException("New vendor header must have the same size.")
|
||||
|
||||
fw.fw.vendor_header = vh
|
||||
|
||||
|
||||
@click.command()
|
||||
@click.option("-n", "--dry-run", is_flag=True, help="Do not save changes.")
|
||||
@click.option("-h", "--rehash", is_flag=True, help="Force recalculate hashes.")
|
||||
@click.option("-v", "--verbose", is_flag=True, help="Show verbose info about headers.")
|
||||
@click.option(
|
||||
"-S",
|
||||
"--sign-private",
|
||||
"privkey_data",
|
||||
metavar="INDEX:PRIVKEY_HEX",
|
||||
multiple=True,
|
||||
help="Private key to use for signing. Can be repeated.",
|
||||
)
|
||||
@click.option(
|
||||
"-D", "--sign-dev-keys", is_flag=True, help="Sign with development header keys."
|
||||
)
|
||||
@click.option(
|
||||
"-s",
|
||||
"--signature",
|
||||
"insert_signature",
|
||||
nargs=2,
|
||||
metavar="INDEX:INDEX:INDEX... SIGNATURE_HEX",
|
||||
help="Insert external signature.",
|
||||
)
|
||||
@click.option("-V", "--replace-vendor-header", type=click.File("rb"))
|
||||
@click.option(
|
||||
"-d",
|
||||
"--digest",
|
||||
"print_digest",
|
||||
is_flag=True,
|
||||
help="Only output header digest for signing and exit.",
|
||||
)
|
||||
@click.option(
|
||||
"-r",
|
||||
"--remote",
|
||||
metavar="IPADDR",
|
||||
multiple=True,
|
||||
help="IP address of remote signer. Can be repeated.",
|
||||
)
|
||||
@click.argument("firmware_file", type=click.File("rb+"))
|
||||
def cli(
|
||||
firmware_file,
|
||||
verbose,
|
||||
rehash,
|
||||
dry_run,
|
||||
privkey_data,
|
||||
sign_dev_keys,
|
||||
insert_signature,
|
||||
replace_vendor_header,
|
||||
print_digest,
|
||||
remote,
|
||||
):
|
||||
"""Manage trezor-core firmware headers.
|
||||
|
||||
This tool supports three types of files: raw vendor headers (TRZV), bootloader
|
||||
images (TRZB), and firmware images which are prefixed with a vendor header
|
||||
(TRZV+TRZF).
|
||||
|
||||
Run with no options on a file to dump information about that file.
|
||||
|
||||
Run with -d to print the header digest and exit. This works correctly regardless of
|
||||
whether code hashes have been filled.
|
||||
|
||||
Run with -h to recalculate and fill in code hashes.
|
||||
|
||||
To insert an external signature:
|
||||
|
||||
./headertool.py firmware.bin -s 1:2:3 ABCDEF<...signature in hex format>
|
||||
|
||||
The string "1:2:3" is a list of 1-based indexes of keys used to generate the signature.
|
||||
|
||||
To sign with local private keys:
|
||||
|
||||
\b
|
||||
./headertool.py firmware.bin -S 1:ABCDEF<...hex private key> -S 2:1234<..hex private key>
|
||||
|
||||
Each instance of -S is in the form "index:privkey", where index is the same as
|
||||
above. Instead of specifying the keys manually, use -D to substitue known
|
||||
development keys.
|
||||
|
||||
Signature validity is not checked in either of the two cases.
|
||||
|
||||
To sign with remote participants:
|
||||
|
||||
./headertool.py firmware.bin -r 10.24.13.11 -r 10.24.13.190 ...
|
||||
|
||||
Each participant must be running keyctl-proxy configured on the same file. Signers'
|
||||
public keys must be in the list of known signers and are matched to indexes
|
||||
automatically.
|
||||
"""
|
||||
firmware_data = firmware_file.read()
|
||||
|
||||
try:
|
||||
fw = firmware_headers.parse_image(firmware_data)
|
||||
except Exception as e:
|
||||
import traceback
|
||||
|
||||
traceback.print_exc()
|
||||
magic = firmware_data[:4]
|
||||
raise click.ClickException(
|
||||
f"Could not parse file (magic bytes: {magic!r})"
|
||||
) from e
|
||||
|
||||
digest = fw.digest()
|
||||
if print_digest:
|
||||
click.echo(digest.hex())
|
||||
return
|
||||
|
||||
if replace_vendor_header:
|
||||
do_replace_vendorheader(fw, replace_vendor_header)
|
||||
|
||||
if rehash:
|
||||
fw.rehash()
|
||||
|
||||
if sign_dev_keys:
|
||||
privkeys = fw.DEV_KEYS
|
||||
sigmask = fw.DEV_KEY_SIGMASK
|
||||
else:
|
||||
sigmask, privkeys = parse_privkey_args(privkey_data)
|
||||
|
||||
signature = None
|
||||
|
||||
if privkeys:
|
||||
click.echo("Signing with local private keys...", err=True)
|
||||
signature = sign_with_privkeys(digest, privkeys)
|
||||
|
||||
if insert_signature:
|
||||
click.echo("Inserting external signature...", err=True)
|
||||
sigmask_str, signature = insert_signature
|
||||
signature = bytes.fromhex(signature)
|
||||
sigmask = 0
|
||||
for bit in sigmask_str.split(":"):
|
||||
sigmask |= 1 << (int(bit) - 1)
|
||||
|
||||
if remote:
|
||||
if Pyro4 is None:
|
||||
raise click.ClickException("Please install Pyro4 for remote signing.")
|
||||
click.echo(fw.format())
|
||||
click.echo(f"Signing with {len(remote)} remote participants.")
|
||||
sigmask, signature = process_remote_signers(fw, remote)
|
||||
|
||||
if signature:
|
||||
fw.rehash()
|
||||
fw.insert_signature(signature, sigmask)
|
||||
|
||||
click.echo(fw.format(verbose))
|
||||
|
||||
updated_data = fw.dump()
|
||||
if updated_data == firmware_data:
|
||||
click.echo("No changes made", err=True)
|
||||
elif dry_run:
|
||||
click.echo("Not saving changes", err=True)
|
||||
else:
|
||||
firmware_file.seek(0)
|
||||
firmware_file.truncate(0)
|
||||
firmware_file.write(updated_data)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cli()
|
@ -1,63 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
import binascii
|
||||
import struct
|
||||
import click
|
||||
import pyblake2
|
||||
from trezorlib import cosi
|
||||
|
||||
indexmap = {"bootloader": 0, "vendorheader": 1, "firmware": 2}
|
||||
|
||||
|
||||
def header_digest(index, filename):
|
||||
data = open(filename, "rb").read()
|
||||
z = bytes(65 * [0x00])
|
||||
if index == "bootloader":
|
||||
header = data[:0x03BF] + z
|
||||
elif index == "vendorheader":
|
||||
header = data[:-65] + z
|
||||
elif index == "firmware":
|
||||
vhdrlen = struct.unpack("<I", data[4:8])[0]
|
||||
header = data[vhdrlen : vhdrlen + 0x03BF] + z
|
||||
else:
|
||||
raise ValueError('Unknown index "%s"' % index)
|
||||
return pyblake2.blake2s(header).digest()
|
||||
|
||||
|
||||
@click.group()
|
||||
def cli():
|
||||
pass
|
||||
|
||||
|
||||
@cli.command(help="")
|
||||
@click.argument("index", type=click.Choice(indexmap.keys()))
|
||||
@click.argument("filename")
|
||||
@click.argument("seckeys", nargs=-1)
|
||||
def sign(index, filename, seckeys):
|
||||
# compute header digest
|
||||
digest = header_digest(index, filename)
|
||||
# collect commits
|
||||
pks, nonces, Rs = [], [], []
|
||||
for ctr, seckey in enumerate(seckeys):
|
||||
sk = binascii.unhexlify(seckey)
|
||||
pk = cosi.pubkey_from_privkey(sk)
|
||||
r, R = cosi.get_nonce(sk, digest, ctr)
|
||||
pks.append(pk)
|
||||
nonces.append(r)
|
||||
Rs.append(R)
|
||||
# compute global commit
|
||||
global_pk = cosi.combine_keys(pks)
|
||||
global_R = cosi.combine_keys(Rs)
|
||||
# collect signatures
|
||||
sigs = []
|
||||
for seckey, nonce in zip(seckeys, nonces):
|
||||
sk = binascii.unhexlify(seckey)
|
||||
sig = cosi.sign_with_privkey(digest, sk, global_pk, nonce, global_R)
|
||||
sigs.append(sig)
|
||||
# compute global signature
|
||||
sig = cosi.combine_sig(global_R, sigs)
|
||||
cosi.verify(sig, digest, global_pk)
|
||||
print(binascii.hexlify(sig).decode())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cli()
|
@ -1,75 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
import binascii
|
||||
import struct
|
||||
import click
|
||||
import pyblake2
|
||||
import Pyro4
|
||||
import serpent
|
||||
from trezorlib import cosi
|
||||
|
||||
PORT = 5001
|
||||
indexmap = {"bootloader": 0, "vendorheader": 1, "firmware": 2}
|
||||
|
||||
|
||||
def header_digest(index, filename):
|
||||
data = open(filename, "rb").read()
|
||||
z = bytes(65 * [0x00])
|
||||
if index == "bootloader":
|
||||
header = data[:0x03BF] + z
|
||||
elif index == "vendorheader":
|
||||
header = data[:-65] + z
|
||||
elif index == "firmware":
|
||||
vhdrlen = struct.unpack("<I", data[4:8])[0]
|
||||
header = data[vhdrlen : vhdrlen + 0x03BF] + z
|
||||
else:
|
||||
raise ValueError('Unknown index "%s"' % index)
|
||||
return pyblake2.blake2s(header).digest()
|
||||
|
||||
|
||||
@click.group()
|
||||
def cli():
|
||||
pass
|
||||
|
||||
|
||||
@cli.command(help="")
|
||||
@click.argument("index", type=click.Choice(indexmap.keys()))
|
||||
@click.argument("filename")
|
||||
@click.argument("participants", nargs=-1)
|
||||
def sign(index, filename, participants):
|
||||
# compute header digest
|
||||
digest = header_digest(index, filename)
|
||||
# create participant proxies
|
||||
if len(participants) < 1:
|
||||
raise ValueError("Not enough participants")
|
||||
print("connecting to %d participants:" % len(participants))
|
||||
proxy = []
|
||||
for p in participants:
|
||||
uri = "PYRO:keyctl@%s:%d" % (p, PORT)
|
||||
proxy.append(Pyro4.Proxy(uri))
|
||||
# collect commits
|
||||
pks, Rs = [], []
|
||||
for i, p in enumerate(proxy):
|
||||
pk, R = p.get_commit(index, digest)
|
||||
pk, R = serpent.tobytes(pk), serpent.tobytes(R)
|
||||
pks.append(pk)
|
||||
Rs.append(R)
|
||||
print("collected commit #%d from %s" % (i, p._pyroUri.host))
|
||||
# compute global commit
|
||||
global_pk = cosi.combine_keys(pks)
|
||||
global_R = cosi.combine_keys(Rs)
|
||||
# collect signatures
|
||||
sigs = []
|
||||
for i, p in enumerate(proxy):
|
||||
sig = p.get_signature(index, digest, global_R, global_pk)
|
||||
sig = serpent.tobytes(sig)
|
||||
sigs.append(sig)
|
||||
print("collected signature #%d from %s" % (i, p._pyroUri.host))
|
||||
# compute global signature
|
||||
sig = cosi.combine_sig(global_R, sigs)
|
||||
cosi.verify(sig, digest, global_pk)
|
||||
print("global signature:")
|
||||
print(binascii.hexlify(sig).decode())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cli()
|
@ -1,86 +1,167 @@
|
||||
#!/usr/bin/env python3
|
||||
import binascii
|
||||
import sys
|
||||
import traceback
|
||||
|
||||
import click
|
||||
import Pyro4
|
||||
import serpent
|
||||
from trezorlib import cosi, tools
|
||||
from trezorlib import cosi
|
||||
from trezorlib.client import get_default_client
|
||||
from trezorlib.tools import parse_path
|
||||
from trezorlib._internal.firmware_headers import (
|
||||
parse_image,
|
||||
VendorHeader,
|
||||
BootloaderImage,
|
||||
FirmwareImage,
|
||||
)
|
||||
|
||||
from typing import Tuple
|
||||
|
||||
Pyro4.config.SERIALIZER = "marshal"
|
||||
|
||||
PORT = 5001
|
||||
indexmap = {"bootloader": 0, "vendorheader": 1, "firmware": 2}
|
||||
indexmap = {
|
||||
"bootloader": BootloaderImage,
|
||||
"vendorheader": VendorHeader,
|
||||
"firmware": FirmwareImage,
|
||||
}
|
||||
|
||||
PATH = "10018h/{}h"
|
||||
|
||||
TREZOR = None
|
||||
|
||||
|
||||
def get_trezor():
|
||||
from trezorlib.client import TrezorClient
|
||||
from trezorlib.transport import get_transport
|
||||
from trezorlib.ui import ClickUI
|
||||
def make_commit(fw_or_type, digest, public_keys):
|
||||
path = PATH.format(fw_or_type.BIP32_INDEX)
|
||||
address_n = parse_path(path)
|
||||
|
||||
return TrezorClient(get_transport(), ui=ClickUI())
|
||||
# device information - show only first time
|
||||
click.echo(
|
||||
f"\nUsing device {click.style(TREZOR.features.label, bold=True)} "
|
||||
f"at path {TREZOR.transport.get_path()}"
|
||||
)
|
||||
|
||||
while True:
|
||||
# signing information - repeat every time
|
||||
click.echo(f"Commiting to {click.style(fw_or_type.NAME, bold=True)} hash:")
|
||||
for partid in range(4):
|
||||
digest_part = digest[partid * 8 : (partid + 1) * 8]
|
||||
color = "red" if partid % 2 else "cyan"
|
||||
digest_str = click.style(digest_part.hex().upper(), fg=color)
|
||||
click.echo("\t" + digest_str)
|
||||
click.echo(f"Using path: {click.style(path, bold=True)}")
|
||||
|
||||
def get_path(index):
|
||||
return "10018'/%d'" % indexmap[index]
|
||||
try:
|
||||
commit = cosi.commit(TREZOR, address_n, digest)
|
||||
if public_keys is not None and commit.pubkey not in public_keys:
|
||||
click.echo(f"\n\nPublic key {commit.pubkey.hex()} is unknown.")
|
||||
if click.confirm("Retry with a different passphrase?", default=True):
|
||||
TREZOR.init_device()
|
||||
continue
|
||||
|
||||
return commit.pubkey, commit.commitment
|
||||
except Exception as e:
|
||||
click.echo(e)
|
||||
traceback.print_exc()
|
||||
click.echo("Trying again ...\n\n")
|
||||
|
||||
|
||||
@Pyro4.expose
|
||||
class KeyctlProxy(object):
|
||||
def get_commit(self, index, digest):
|
||||
digest = serpent.tobytes(digest)
|
||||
path = get_path(index)
|
||||
commit = None
|
||||
while commit is None:
|
||||
try:
|
||||
t = get_trezor()
|
||||
print(
|
||||
"\n\n\nCommiting to hash %s with path %s:"
|
||||
% (binascii.hexlify(digest).decode(), path)
|
||||
)
|
||||
commit = cosi.commit(t, tools.parse_path(path), digest)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
traceback.print_exc()
|
||||
print("Trying again ...")
|
||||
pk = commit.pubkey
|
||||
R = commit.commitment
|
||||
print("Commitment sent!")
|
||||
return (pk, R)
|
||||
|
||||
def get_signature(self, index, digest, global_R, global_pk):
|
||||
digest, global_R, global_pk = (
|
||||
serpent.tobytes(digest),
|
||||
serpent.tobytes(global_R),
|
||||
serpent.tobytes(global_pk),
|
||||
)
|
||||
path = get_path(index)
|
||||
signature = None
|
||||
while signature is None:
|
||||
class KeyctlProxy:
|
||||
def __init__(
|
||||
self, daemon, image_type, digest: bytes, commit: Tuple[bytes, bytes]
|
||||
) -> None:
|
||||
self.daemon = daemon
|
||||
self.name = image_type.NAME
|
||||
self.address_n = parse_path(PATH.format(image_type.BIP32_INDEX))
|
||||
self.digest = digest
|
||||
self.commit = commit
|
||||
self.signature = None
|
||||
self.global_params = None
|
||||
|
||||
def _check_name_digest(self, name, digest):
|
||||
if name != self.name or digest != self.digest:
|
||||
click.echo(f"ERROR! Remote wants to sign {name} with digest {digest.hex()}")
|
||||
click.echo(f"Expected: {self.name} with digest {self.digest.hex()}")
|
||||
raise ValueError("Unexpected index/digest")
|
||||
|
||||
def get_commit(self, name, digest):
|
||||
self._check_name_digest(name, digest)
|
||||
click.echo("Sending commitment!")
|
||||
return self.commit
|
||||
|
||||
def _make_signature(self, global_R, global_pk):
|
||||
while True:
|
||||
try:
|
||||
t = get_trezor()
|
||||
print(
|
||||
"\n\n\nSigning hash %s with path %s:"
|
||||
% (binascii.hexlify(digest).decode(), path)
|
||||
)
|
||||
click.echo("\n\n\nSigning...")
|
||||
signature = cosi.sign(
|
||||
t, tools.parse_path(path), digest, global_R, global_pk
|
||||
TREZOR, self.address_n, self.digest, global_R, global_pk
|
||||
)
|
||||
return signature.signature
|
||||
except Exception as e:
|
||||
print(e)
|
||||
click.echo(e)
|
||||
traceback.print_exc()
|
||||
print("Trying again ...")
|
||||
sig = signature.signature
|
||||
print("Signature sent!")
|
||||
return sig
|
||||
click.echo("Trying again ...")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) > 1:
|
||||
ipaddr = sys.argv[1]
|
||||
else:
|
||||
print("Usage: keyctl-proxy ipaddress")
|
||||
sys.exit(1)
|
||||
def get_signature(self, name, digest, global_R, global_pk):
|
||||
self._check_name_digest(name, digest)
|
||||
global_params = global_R, global_pk
|
||||
if global_params != self.global_params:
|
||||
self.signature = self._make_signature(global_R, global_pk)
|
||||
self.global_params = global_params
|
||||
click.echo("Sending signature!")
|
||||
return self.signature
|
||||
|
||||
@Pyro4.oneway
|
||||
def finish(self):
|
||||
click.echo("Done! \\(^o^)/")
|
||||
self.daemon.shutdown()
|
||||
|
||||
|
||||
@click.command()
|
||||
@click.option(
|
||||
"-l", "--listen", "ipaddr", default="0.0.0.0", help="Bind to particular ip address"
|
||||
)
|
||||
@click.option("-t", "--header-type", "fw_or_type", type=click.Choice(indexmap.keys()))
|
||||
@click.option("-d", "--digest")
|
||||
@click.argument("fw_file", type=click.File("rb"), required=False)
|
||||
def cli(ipaddr, fw_file, fw_or_type, digest):
|
||||
"""Participate in signing of firmware.
|
||||
|
||||
Specify either fw_file to auto-detect type and digest, or use -t and -d to specify
|
||||
the type and digest manually.
|
||||
"""
|
||||
global TREZOR
|
||||
|
||||
public_keys = None
|
||||
if fw_file:
|
||||
if fw_or_type or digest:
|
||||
raise click.ClickException("Do not specify fw_file together with -t/-d")
|
||||
|
||||
fw_or_type = parse_image(fw_file.read())
|
||||
digest = fw_or_type.digest()
|
||||
public_keys = fw_or_type.public_keys
|
||||
|
||||
click.echo(fw_or_type.format())
|
||||
|
||||
if not fw_file and (not fw_or_type or not digest):
|
||||
raise click.ClickException("Please specify either fw_file or -t and -h")
|
||||
|
||||
try:
|
||||
TREZOR = get_default_client()
|
||||
TREZOR.ui.always_prompt = True
|
||||
except Exception as e:
|
||||
raise click.ClickException("Please connect a Trezor and retry.") from e
|
||||
|
||||
pubkey, R = make_commit(fw_or_type, digest, public_keys)
|
||||
|
||||
daemon = Pyro4.Daemon(host=ipaddr, port=PORT)
|
||||
proxy = KeyctlProxy()
|
||||
proxy = KeyctlProxy(daemon, fw_or_type, digest, (pubkey, R))
|
||||
uri = daemon.register(proxy, "keyctl")
|
||||
print('keyctl-proxy running at URI: "%s"' % uri)
|
||||
click.echo(f"keyctl-proxy running at URI: {uri}")
|
||||
click.echo("Press Ctrl+C to abort.")
|
||||
daemon.requestLoop()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cli()
|
||||
|
@ -0,0 +1,373 @@
|
||||
import struct
|
||||
from enum import Enum
|
||||
from typing import Any, List, Optional
|
||||
|
||||
import click
|
||||
import construct as c
|
||||
import pyblake2
|
||||
|
||||
from trezorlib import cosi, firmware
|
||||
|
||||
SYM_OK = click.style("\u2714", fg="green")
|
||||
SYM_FAIL = click.style("\u274c", fg="red")
|
||||
|
||||
|
||||
class Status(Enum):
|
||||
VALID = click.style("VALID", fg="green", bold=True)
|
||||
INVALID = click.style("INVALID", fg="red", bold=True)
|
||||
MISSING = click.style("MISSING", fg="blue", bold=True)
|
||||
DEVEL = click.style("DEVEL", fg="red", bold=True)
|
||||
|
||||
def is_ok(self):
|
||||
return self is Status.VALID or self is Status.DEVEL
|
||||
|
||||
|
||||
VHASH_DEVEL = bytes.fromhex(
|
||||
"c5b4d40cb76911392122c8d1c277937e49c69b2aaf818001ec5c7663fcce258f"
|
||||
)
|
||||
|
||||
|
||||
AnyFirmware = c.Struct(
|
||||
"vendor_header" / c.Optional(firmware.VendorHeader),
|
||||
"image" / c.Optional(firmware.FirmwareImage),
|
||||
)
|
||||
|
||||
|
||||
class ImageType(Enum):
|
||||
VENDOR_HEADER = 0
|
||||
BOOTLOADER = 1
|
||||
FIRMWARE = 2
|
||||
|
||||
|
||||
def _make_dev_keys(*key_bytes: bytes) -> List[bytes]:
|
||||
return [k * 32 for k in key_bytes]
|
||||
|
||||
|
||||
def compute_vhash(vendor_header):
|
||||
m = vendor_header.sig_m
|
||||
n = vendor_header.sig_n
|
||||
pubkeys = vendor_header.pubkeys
|
||||
h = pyblake2.blake2s()
|
||||
h.update(struct.pack("<BB", m, n))
|
||||
for i in range(8):
|
||||
if i < n:
|
||||
h.update(pubkeys[i])
|
||||
else:
|
||||
h.update(b"\x00" * 32)
|
||||
return h.digest()
|
||||
|
||||
|
||||
def all_zero(data: bytes) -> bool:
|
||||
return all(b == 0 for b in data)
|
||||
|
||||
|
||||
def _check_signature_any(
|
||||
header: c.Container, m: int, pubkeys: List[bytes], is_devel: bool
|
||||
) -> Optional[bool]:
|
||||
if all_zero(header.signature) and header.sigmask == 0:
|
||||
return Status.MISSING
|
||||
try:
|
||||
digest = firmware.header_digest(header)
|
||||
cosi.verify(header.signature, digest, m, pubkeys, header.sigmask)
|
||||
return Status.VALID if not is_devel else Status.DEVEL
|
||||
except Exception:
|
||||
return Status.INVALID
|
||||
|
||||
|
||||
# ====================== formatting functions ====================
|
||||
|
||||
|
||||
class LiteralStr(str):
|
||||
pass
|
||||
|
||||
|
||||
def _format_container(
|
||||
pb: c.Container,
|
||||
indent: int = 0,
|
||||
sep: str = " " * 4,
|
||||
truncate_after: Optional[int] = 64,
|
||||
truncate_to: Optional[int] = 32,
|
||||
) -> str:
|
||||
def mostly_printable(bytes: bytes) -> bool:
|
||||
if not bytes:
|
||||
return True
|
||||
printable = sum(1 for byte in bytes if 0x20 <= byte <= 0x7E)
|
||||
return printable / len(bytes) > 0.8
|
||||
|
||||
def pformat(value: Any, indent: int) -> str:
|
||||
level = sep * indent
|
||||
leadin = sep * (indent + 1)
|
||||
|
||||
if isinstance(value, LiteralStr):
|
||||
return value
|
||||
|
||||
if isinstance(value, list):
|
||||
# short list of simple values
|
||||
if not value or isinstance(value, (int, bool, Enum)):
|
||||
return repr(value)
|
||||
|
||||
# long list, one line per entry
|
||||
lines = ["[", level + "]"]
|
||||
lines[1:1] = [leadin + pformat(x, indent + 1) for x in value]
|
||||
return "\n".join(lines)
|
||||
|
||||
if isinstance(value, dict):
|
||||
lines = ["{"]
|
||||
for key, val in value.items():
|
||||
if key.startswith("_"):
|
||||
continue
|
||||
if val is None or val == []:
|
||||
continue
|
||||
lines.append(leadin + key + ": " + pformat(val, indent + 1))
|
||||
lines.append(level + "}")
|
||||
return "\n".join(lines)
|
||||
|
||||
if isinstance(value, (bytes, bytearray)):
|
||||
length = len(value)
|
||||
suffix = ""
|
||||
if truncate_after and length > truncate_after:
|
||||
suffix = "..."
|
||||
value = value[: truncate_to or 0]
|
||||
if mostly_printable(value):
|
||||
output = repr(value)
|
||||
else:
|
||||
output = value.hex()
|
||||
return "{} bytes {}{}".format(length, output, suffix)
|
||||
|
||||
if isinstance(value, Enum):
|
||||
return str(value)
|
||||
|
||||
return repr(value)
|
||||
|
||||
return pformat(pb, indent)
|
||||
|
||||
|
||||
def _format_version(version: c.Container) -> str:
|
||||
version_str = ".".join(
|
||||
str(version[k]) for k in ("major", "minor", "patch") if k in version
|
||||
)
|
||||
if "build" in version:
|
||||
version_str += " build {}".format(version.build)
|
||||
return version_str
|
||||
|
||||
|
||||
# =========================== functionality implementations ===============
|
||||
|
||||
|
||||
class SignableImage:
|
||||
NAME = "Unrecognized image"
|
||||
BIP32_INDEX = None
|
||||
DEV_KEYS = []
|
||||
DEV_KEY_SIGMASK = 0b11
|
||||
|
||||
def __init__(self, fw: c.Container) -> None:
|
||||
self.fw = fw
|
||||
self.header = None
|
||||
self.public_keys = None
|
||||
self.sigs_required = firmware.V2_SIGS_REQUIRED
|
||||
|
||||
def digest(self) -> bytes:
|
||||
return firmware.header_digest(self.header)
|
||||
|
||||
def check_signature(self) -> Status:
|
||||
raise NotImplementedError
|
||||
|
||||
def rehash(self) -> None:
|
||||
pass
|
||||
|
||||
def insert_signature(self, signature: bytes, sigmask: int) -> None:
|
||||
self.header.signature = signature
|
||||
self.header.sigmask = sigmask
|
||||
|
||||
def dump(self) -> bytes:
|
||||
return AnyFirmware.build(self.fw)
|
||||
|
||||
def format(self, verbose: bool) -> str:
|
||||
return _format_container(self.fw)
|
||||
|
||||
|
||||
class VendorHeader(SignableImage):
|
||||
NAME = "vendorheader"
|
||||
BIP32_INDEX = 1
|
||||
DEV_KEYS = _make_dev_keys(b"\x44", b"\x45")
|
||||
|
||||
def __init__(self, fw):
|
||||
super().__init__(fw)
|
||||
self.header = fw.vendor_header
|
||||
self.public_keys = firmware.V2_BOOTLOADER_KEYS
|
||||
|
||||
def check_signature(self) -> Status:
|
||||
return _check_signature_any(
|
||||
self.header, self.sigs_required, self.public_keys, False
|
||||
)
|
||||
|
||||
def _format(self, terse: bool) -> str:
|
||||
vh = self.fw.vendor_header
|
||||
if not terse:
|
||||
vhash = compute_vhash(vh)
|
||||
output = [
|
||||
"Vendor Header " + _format_container(vh),
|
||||
"Pubkey bundle hash: {}".format(vhash.hex()),
|
||||
]
|
||||
else:
|
||||
output = [
|
||||
"Vendor Header for {vendor} version {version} ({size} bytes)".format(
|
||||
vendor=click.style(vh.text, bold=True),
|
||||
version=_format_version(vh.version),
|
||||
size=vh.header_len,
|
||||
),
|
||||
]
|
||||
|
||||
fingerprint = firmware.header_digest(vh)
|
||||
|
||||
if not terse:
|
||||
output.append(
|
||||
"Fingerprint: {}".format(click.style(fingerprint.hex(), bold=True))
|
||||
)
|
||||
|
||||
sig_status = self.check_signature()
|
||||
sym = SYM_OK if sig_status.is_ok() else SYM_FAIL
|
||||
output.append("{} Signature is {}".format(sym, sig_status.value))
|
||||
|
||||
return "\n".join(output)
|
||||
|
||||
def format(self, verbose: bool = False) -> str:
|
||||
return self._format(terse=False)
|
||||
|
||||
|
||||
class BinImage(SignableImage):
|
||||
def __init__(self, fw):
|
||||
super().__init__(fw)
|
||||
self.header = self.fw.image.header
|
||||
self.code_hashes = firmware.calculate_code_hashes(
|
||||
self.fw.image.code, self.fw.image._code_offset
|
||||
)
|
||||
self.digest_header = self.header.copy()
|
||||
self.digest_header.hashes = self.code_hashes
|
||||
|
||||
def insert_signature(self, signature: bytes, sigmask: int) -> None:
|
||||
super().insert_signature(signature, sigmask)
|
||||
self.digest_header.signature = signature
|
||||
self.digest_header.sigmask = sigmask
|
||||
|
||||
def digest(self) -> bytes:
|
||||
return firmware.header_digest(self.digest_header)
|
||||
|
||||
def rehash(self):
|
||||
self.header.hashes = self.code_hashes
|
||||
|
||||
def format(self, verbose: bool = False) -> str:
|
||||
header_out = self.header.copy()
|
||||
|
||||
if not verbose:
|
||||
for key in self.header:
|
||||
if key.startswith("v1"):
|
||||
del header_out[key]
|
||||
if "version" in key:
|
||||
header_out[key] = LiteralStr(_format_version(self.header[key]))
|
||||
|
||||
all_ok = SYM_OK
|
||||
hash_status = Status.VALID
|
||||
sig_status = Status.VALID
|
||||
|
||||
hashes_out = []
|
||||
for expected, actual in zip(self.header.hashes, self.code_hashes):
|
||||
status = SYM_OK if expected == actual else SYM_FAIL
|
||||
hashes_out.append(LiteralStr("{} {}".format(status, expected.hex())))
|
||||
|
||||
if all(all_zero(h) for h in self.header.hashes):
|
||||
hash_status = Status.MISSING
|
||||
elif self.header.hashes != self.code_hashes:
|
||||
hash_status = Status.INVALID
|
||||
else:
|
||||
hash_status = Status.VALID
|
||||
|
||||
header_out["hashes"] = hashes_out
|
||||
|
||||
sig_status = self.check_signature()
|
||||
all_ok = SYM_OK if hash_status.is_ok() and sig_status.is_ok() else SYM_FAIL
|
||||
|
||||
output = [
|
||||
"Firmware Header " + _format_container(header_out),
|
||||
"Fingerprint: {}".format(click.style(self.digest().hex(), bold=True)),
|
||||
"{} Signature is {}, hashes are {}".format(
|
||||
all_ok, sig_status.value, hash_status.value
|
||||
),
|
||||
]
|
||||
|
||||
return "\n".join(output)
|
||||
|
||||
|
||||
class FirmwareImage(BinImage):
|
||||
NAME = "firmware"
|
||||
BIP32_INDEX = 2
|
||||
DEV_KEYS = _make_dev_keys(b"\x47", b"\x48")
|
||||
|
||||
def __init__(self, fw: c.Container) -> None:
|
||||
super().__init__(fw)
|
||||
self.public_keys = fw.vendor_header.pubkeys
|
||||
self.sigs_required = fw.vendor_header.sig_m
|
||||
|
||||
def check_signature(self) -> Status:
|
||||
vhash = compute_vhash(self.fw.vendor_header)
|
||||
return _check_signature_any(
|
||||
self.digest_header,
|
||||
self.sigs_required,
|
||||
self.public_keys,
|
||||
vhash == VHASH_DEVEL,
|
||||
)
|
||||
|
||||
def format(self, verbose: bool = False) -> str:
|
||||
return (
|
||||
VendorHeader(self.fw)._format(terse=not verbose)
|
||||
+ "\n"
|
||||
+ super().format(verbose)
|
||||
)
|
||||
|
||||
|
||||
class BootloaderImage(BinImage):
|
||||
NAME = "bootloader"
|
||||
BIP32_INDEX = 0
|
||||
DEV_KEYS = _make_dev_keys(b"\x41", b"\x42")
|
||||
|
||||
def __init__(self, fw):
|
||||
super().__init__(fw)
|
||||
self._identify_dev_keys()
|
||||
|
||||
def insert_signature(self, signature: bytes, sigmask: int) -> None:
|
||||
super().insert_signature(signature, sigmask)
|
||||
self._identify_dev_keys()
|
||||
|
||||
def _identify_dev_keys(self):
|
||||
# try checking signature with dev keys first
|
||||
self.public_keys = firmware.V2_BOARDLOADER_DEV_KEYS
|
||||
if not self.check_signature().is_ok():
|
||||
# validation with dev keys failed, use production keys
|
||||
self.public_keys = firmware.V2_BOARDLOADER_KEYS
|
||||
|
||||
def check_signature(self) -> Status:
|
||||
return _check_signature_any(
|
||||
self.header,
|
||||
self.sigs_required,
|
||||
self.public_keys,
|
||||
self.public_keys == firmware.V2_BOARDLOADER_DEV_KEYS,
|
||||
)
|
||||
|
||||
|
||||
def parse_image(image: bytes):
|
||||
fw = AnyFirmware.parse(image)
|
||||
if fw.vendor_header and not fw.image:
|
||||
return VendorHeader(fw)
|
||||
if (
|
||||
not fw.vendor_header
|
||||
and fw.image
|
||||
and fw.image.header.magic == firmware.HeaderType.BOOTLOADER
|
||||
):
|
||||
return BootloaderImage(fw)
|
||||
if (
|
||||
fw.vendor_header
|
||||
and fw.image
|
||||
and fw.image.header.magic == firmware.HeaderType.FIRMWARE
|
||||
):
|
||||
return FirmwareImage(fw)
|
||||
raise ValueError("Unrecognized image type")
|
Loading…
Reference in new issue