From c03ac3f8dd5561b41beec28f63d5be7778c34c5e Mon Sep 17 00:00:00 2001 From: matejcik Date: Fri, 3 Jan 2020 14:19:02 +0100 Subject: [PATCH] core/tools: update keyctl-proxy to work with headertool --- core/tools/keyctl-proxy | 158 ++++++++++++++++++++++++++-------------- 1 file changed, 103 insertions(+), 55 deletions(-) diff --git a/core/tools/keyctl-proxy b/core/tools/keyctl-proxy index 124647716..05cf02f36 100755 --- a/core/tools/keyctl-proxy +++ b/core/tools/keyctl-proxy @@ -1,81 +1,129 @@ #!/usr/bin/env python3 -import binascii import sys import traceback +import click import Pyro4 -from trezorlib import cosi, tools +from trezorlib import cosi +from trezorlib.client import get_default_client +from trezorlib.tools import parse_path +from trezorlib._internal.firmware_headers import ( + parse_image, + VendorHeader, + BootloaderImage, + FirmwareImage, +) + +from typing import Tuple Pyro4.config.SERIALIZER = "marshal" PORT = 5001 -indexmap = {"bootloader": 0, "vendorheader": 1, "firmware": 2} - +indexmap = { + "bootloader": BootloaderImage, + "vendorheader": VendorHeader, + "firmware": FirmwareImage, +} -def get_trezor(): - from trezorlib.client import TrezorClient - from trezorlib.transport import get_transport - from trezorlib.ui import ClickUI +PATH = "10018h/{}h" - return TrezorClient(get_transport(), ui=ClickUI()) +def make_commit(index, digest): + path = PATH.format(index) + address_n = parse_path(path) + first_pass = True + while True: + try: + t = get_default_client() + if first_pass: + t.clear_session() + first_pass = False -def get_path(index): - return "10018'/%d'" % indexmap[index] + click.echo(f"\n\n\nCommiting to hash {digest.hex()} with path {path}") + commit = cosi.commit(t, address_n, digest) + return commit.pubkey, commit.commitment + except Exception as e: + print(e) + traceback.print_exc() + print("Trying again ...") @Pyro4.expose -class KeyctlProxy(object): - def get_commit(self, index, digest): - path = get_path(index) - commit = None - while commit is None: - try: - t = get_trezor() - print( - "\n\n\nCommiting to hash %s with path %s:" - % (binascii.hexlify(digest).decode(), path) - ) - commit = cosi.commit(t, tools.parse_path(path), digest) - except Exception as e: - print(e) - traceback.print_exc() - print("Trying again ...") - pk = commit.pubkey - R = commit.commitment - print("Commitment sent!") - return (pk, R) - - def get_signature(self, index, digest, global_R, global_pk): - path = get_path(index) - signature = None - while signature is None: +class KeyctlProxy: + def __init__(self, image_type, digest: bytes, commit: Tuple[bytes, bytes]) -> None: + self.name = image_type.NAME + self.address_n = parse_path(PATH.format(image_type.BIP32_INDEX)) + self.digest = digest + self.commit = commit + + def _check_name_digest(self, name, digest): + if name != self.name or digest != self.digest: + print(f"ERROR! Remote wants to sign {name} with digest {digest.hex()}") + print(f"Expected: {self.name} with digest {self.digest.hex()}") + raise ValueError("Unexpected index/digest") + + def get_commit(self, name, digest): + self._check_name_digest(name, digest) + print("Sending commitment!") + return self.commit + + def get_signature(self, name, digest, global_R, global_pk): + self._check_name_digest(name, digest) + while True: try: - t = get_trezor() - print( - "\n\n\nSigning hash %s with path %s:" - % (binascii.hexlify(digest).decode(), path) - ) - signature = cosi.sign( - t, tools.parse_path(path), digest, global_R, global_pk - ) + t = get_default_client() + print("\n\n\nSigning...") + signature = cosi.sign(t, self.address_n, digest, global_R, global_pk) + print("Sending signature!") + return signature.signature except Exception as e: print(e) traceback.print_exc() print("Trying again ...") - sig = signature.signature - print("Signature sent!") - return sig -if __name__ == "__main__": - if len(sys.argv) > 1: - ipaddr = sys.argv[1] - else: - print("Usage: keyctl-proxy ipaddress") - sys.exit(1) +@click.command() +@click.option( + "-l", "--listen", "ipaddr", default="0.0.0.0", help="Bind to particular ip address" +) +@click.option("-t", "--header-type", type=click.Choice(indexmap.keys())) +@click.option("-d", "--digest") +@click.argument("fw_file", type=click.File("rb"), required=False) +def cli(ipaddr, fw_file, header_type, digest): + """Participate in signing of firmware. + + Specify either fw_file to auto-detect type and digest, or use -t and -d to specify + the type and digest manually. + """ + public_keys = None + if fw_file: + if header_type or digest: + raise click.ClickException("Do not specify fw_file together with -t/-d") + + fw = parse_image(fw_file.read()) + digest = fw.digest() + public_keys = fw.public_keys + + click.echo(fw.format()) + + if not fw_file and (not header_type or not digest): + raise click.ClickException("Please specify either fw_file or -t and -h") + + while True: + pubkey, R = make_commit(header_type.BIP32_INDEX, digest) + if public_keys is not None and pubkey not in public_keys: + click.echo(f"\n\nPublic key {pubkey.hex()} is unknown.") + if click.confirm("Retry with a different passphrase?"): + continue + break + daemon = Pyro4.Daemon(host=ipaddr, port=PORT) - proxy = KeyctlProxy() + proxy = KeyctlProxy(header_type, digest, (pubkey, R)) uri = daemon.register(proxy, "keyctl") - print('keyctl-proxy running at URI: "%s"' % uri) + click.echo(f"keyctl-proxy running at URI: {uri}") + click.echo("Press Ctrl+C to abort.") daemon.requestLoop() + + +if __name__ == "__main__": + cli()