#!/usr/bin/env python3 import sys import traceback import click import Pyro4 from trezorlib import cosi from trezorlib.client import get_default_client from trezorlib.tools import parse_path from trezorlib._internal.firmware_headers import ( parse_image, VendorHeader, BootloaderImage, FirmwareImage, ) from typing import Tuple Pyro4.config.SERIALIZER = "marshal" PORT = 5001 indexmap = { "bootloader": BootloaderImage, "vendorheader": VendorHeader, "firmware": FirmwareImage, } PATH = "10018h/{}h" def make_commit(name, index, digest): path = PATH.format(index) address_n = parse_path(path) first_pass = True while True: try: t = get_default_client() if first_pass: t.clear_session() first_pass = False click.echo(f"\n\n\nCommiting to {click.style(name, bold=True)} hash:") for partid in range(4): digest_part = digest[partid * 8 : (partid + 1) * 8] color = "red" if partid % 2 else "blue" digest_str = click.style(digest_part.hex().upper(), fg=color) click.echo(digest_str) click.echo(f"Using path: {click.style(path, bold=True)}") commit = cosi.commit(t, address_n, digest) return commit.pubkey, commit.commitment except Exception as e: click.echo(e) traceback.print_exc() click.echo("Trying again ...") @Pyro4.expose class KeyctlProxy: def __init__( self, daemon, image_type, digest: bytes, commit: Tuple[bytes, bytes] ) -> None: self.daemon = daemon self.name = image_type.NAME self.address_n = parse_path(PATH.format(image_type.BIP32_INDEX)) self.digest = digest self.commit = commit def _check_name_digest(self, name, digest): if name != self.name or digest != self.digest: click.echo(f"ERROR! Remote wants to sign {name} with digest {digest.hex()}") click.echo(f"Expected: {self.name} with digest {self.digest.hex()}") raise ValueError("Unexpected index/digest") def get_commit(self, name, digest): self._check_name_digest(name, digest) click.echo("Sending commitment!") return self.commit def get_signature(self, name, digest, global_R, global_pk): self._check_name_digest(name, digest) while True: try: t = get_default_client() click.echo("\n\n\nSigning...") signature = cosi.sign(t, self.address_n, digest, global_R, global_pk) click.echo("Sending signature!") return signature.signature except Exception as e: click.echo(e) traceback.print_exc() click.echo("Trying again ...") @Pyro4.oneway def finish(self): click.echo("Done! \\(^o^)/") self.daemon.shutdown() @click.command() @click.option( "-l", "--listen", "ipaddr", default="0.0.0.0", help="Bind to particular ip address" ) @click.option("-t", "--header-type", "fw_or_type", type=click.Choice(indexmap.keys())) @click.option("-d", "--digest") @click.argument("fw_file", type=click.File("rb"), required=False) def cli(ipaddr, fw_file, fw_or_type, digest): """Participate in signing of firmware. Specify either fw_file to auto-detect type and digest, or use -t and -d to specify the type and digest manually. """ public_keys = None if fw_file: if fw_or_type or digest: raise click.ClickException("Do not specify fw_file together with -t/-d") fw_or_type = parse_image(fw_file.read()) digest = fw_or_type.digest() public_keys = fw_or_type.public_keys click.echo(fw_or_type.format()) if not fw_file and (not fw_or_type or not digest): raise click.ClickException("Please specify either fw_file or -t and -h") while True: pubkey, R = make_commit(fw_or_type.NAME, fw_or_type.BIP32_INDEX, digest) if public_keys is not None and pubkey not in public_keys: click.echo(f"\n\nPublic key {pubkey.hex()} is unknown.") if click.confirm("Retry with a different passphrase?"): continue break daemon = Pyro4.Daemon(host=ipaddr, port=PORT) proxy = KeyctlProxy(daemon, fw_or_type, digest, (pubkey, R)) uri = daemon.register(proxy, "keyctl") click.echo(f"keyctl-proxy running at URI: {uri}") click.echo("Press Ctrl+C to abort.") daemon.requestLoop() if __name__ == "__main__": cli()