mirror of
https://github.com/trezor/trezor-firmware.git
synced 2025-01-12 00:10:58 +00:00
core/tools: update keyctl-proxy to work with headertool
This commit is contained in:
parent
5b48505b88
commit
c03ac3f8dd
@ -1,81 +1,129 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
import binascii
|
|
||||||
import sys
|
import sys
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
|
import click
|
||||||
import Pyro4
|
import Pyro4
|
||||||
from trezorlib import cosi, tools
|
from trezorlib import cosi
|
||||||
|
from trezorlib.client import get_default_client
|
||||||
|
from trezorlib.tools import parse_path
|
||||||
|
from trezorlib._internal.firmware_headers import (
|
||||||
|
parse_image,
|
||||||
|
VendorHeader,
|
||||||
|
BootloaderImage,
|
||||||
|
FirmwareImage,
|
||||||
|
)
|
||||||
|
|
||||||
|
from typing import Tuple
|
||||||
|
|
||||||
Pyro4.config.SERIALIZER = "marshal"
|
Pyro4.config.SERIALIZER = "marshal"
|
||||||
|
|
||||||
PORT = 5001
|
PORT = 5001
|
||||||
indexmap = {"bootloader": 0, "vendorheader": 1, "firmware": 2}
|
indexmap = {
|
||||||
|
"bootloader": BootloaderImage,
|
||||||
|
"vendorheader": VendorHeader,
|
||||||
|
"firmware": FirmwareImage,
|
||||||
|
}
|
||||||
|
|
||||||
|
PATH = "10018h/{}h"
|
||||||
|
|
||||||
|
|
||||||
def get_trezor():
|
def make_commit(index, digest):
|
||||||
from trezorlib.client import TrezorClient
|
path = PATH.format(index)
|
||||||
from trezorlib.transport import get_transport
|
address_n = parse_path(path)
|
||||||
from trezorlib.ui import ClickUI
|
first_pass = True
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
t = get_default_client()
|
||||||
|
if first_pass:
|
||||||
|
t.clear_session()
|
||||||
|
first_pass = False
|
||||||
|
|
||||||
return TrezorClient(get_transport(), ui=ClickUI())
|
click.echo(f"\n\n\nCommiting to hash {digest.hex()} with path {path}")
|
||||||
|
commit = cosi.commit(t, address_n, digest)
|
||||||
|
return commit.pubkey, commit.commitment
|
||||||
def get_path(index):
|
except Exception as e:
|
||||||
return "10018'/%d'" % indexmap[index]
|
print(e)
|
||||||
|
traceback.print_exc()
|
||||||
|
print("Trying again ...")
|
||||||
|
|
||||||
|
|
||||||
@Pyro4.expose
|
@Pyro4.expose
|
||||||
class KeyctlProxy(object):
|
class KeyctlProxy:
|
||||||
def get_commit(self, index, digest):
|
def __init__(self, image_type, digest: bytes, commit: Tuple[bytes, bytes]) -> None:
|
||||||
path = get_path(index)
|
self.name = image_type.NAME
|
||||||
commit = None
|
self.address_n = parse_path(PATH.format(image_type.BIP32_INDEX))
|
||||||
while commit is None:
|
self.digest = digest
|
||||||
try:
|
self.commit = commit
|
||||||
t = get_trezor()
|
|
||||||
print(
|
|
||||||
"\n\n\nCommiting to hash %s with path %s:"
|
|
||||||
% (binascii.hexlify(digest).decode(), path)
|
|
||||||
)
|
|
||||||
commit = cosi.commit(t, tools.parse_path(path), digest)
|
|
||||||
except Exception as e:
|
|
||||||
print(e)
|
|
||||||
traceback.print_exc()
|
|
||||||
print("Trying again ...")
|
|
||||||
pk = commit.pubkey
|
|
||||||
R = commit.commitment
|
|
||||||
print("Commitment sent!")
|
|
||||||
return (pk, R)
|
|
||||||
|
|
||||||
def get_signature(self, index, digest, global_R, global_pk):
|
def _check_name_digest(self, name, digest):
|
||||||
path = get_path(index)
|
if name != self.name or digest != self.digest:
|
||||||
signature = None
|
print(f"ERROR! Remote wants to sign {name} with digest {digest.hex()}")
|
||||||
while signature is None:
|
print(f"Expected: {self.name} with digest {self.digest.hex()}")
|
||||||
|
raise ValueError("Unexpected index/digest")
|
||||||
|
|
||||||
|
def get_commit(self, name, digest):
|
||||||
|
self._check_name_digest(name, digest)
|
||||||
|
print("Sending commitment!")
|
||||||
|
return self.commit
|
||||||
|
|
||||||
|
def get_signature(self, name, digest, global_R, global_pk):
|
||||||
|
self._check_name_digest(name, digest)
|
||||||
|
while True:
|
||||||
try:
|
try:
|
||||||
t = get_trezor()
|
t = get_default_client()
|
||||||
print(
|
print("\n\n\nSigning...")
|
||||||
"\n\n\nSigning hash %s with path %s:"
|
signature = cosi.sign(t, self.address_n, digest, global_R, global_pk)
|
||||||
% (binascii.hexlify(digest).decode(), path)
|
print("Sending signature!")
|
||||||
)
|
return signature.signature
|
||||||
signature = cosi.sign(
|
|
||||||
t, tools.parse_path(path), digest, global_R, global_pk
|
|
||||||
)
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(e)
|
print(e)
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
print("Trying again ...")
|
print("Trying again ...")
|
||||||
sig = signature.signature
|
|
||||||
print("Signature sent!")
|
|
||||||
return sig
|
@click.command()
|
||||||
|
@click.option(
|
||||||
|
"-l", "--listen", "ipaddr", default="0.0.0.0", help="Bind to particular ip address"
|
||||||
|
)
|
||||||
|
@click.option("-t", "--header-type", type=click.Choice(indexmap.keys()))
|
||||||
|
@click.option("-d", "--digest")
|
||||||
|
@click.argument("fw_file", type=click.File("rb"), required=False)
|
||||||
|
def cli(ipaddr, fw_file, header_type, digest):
|
||||||
|
"""Participate in signing of firmware.
|
||||||
|
|
||||||
|
Specify either fw_file to auto-detect type and digest, or use -t and -d to specify
|
||||||
|
the type and digest manually.
|
||||||
|
"""
|
||||||
|
public_keys = None
|
||||||
|
if fw_file:
|
||||||
|
if header_type or digest:
|
||||||
|
raise click.ClickException("Do not specify fw_file together with -t/-d")
|
||||||
|
|
||||||
|
fw = parse_image(fw_file.read())
|
||||||
|
digest = fw.digest()
|
||||||
|
public_keys = fw.public_keys
|
||||||
|
|
||||||
|
click.echo(fw.format())
|
||||||
|
|
||||||
|
if not fw_file and (not header_type or not digest):
|
||||||
|
raise click.ClickException("Please specify either fw_file or -t and -h")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
pubkey, R = make_commit(header_type.BIP32_INDEX, digest)
|
||||||
|
if public_keys is not None and pubkey not in public_keys:
|
||||||
|
click.echo(f"\n\nPublic key {pubkey.hex()} is unknown.")
|
||||||
|
if click.confirm("Retry with a different passphrase?"):
|
||||||
|
continue
|
||||||
|
break
|
||||||
|
|
||||||
|
daemon = Pyro4.Daemon(host=ipaddr, port=PORT)
|
||||||
|
proxy = KeyctlProxy(header_type, digest, (pubkey, R))
|
||||||
|
uri = daemon.register(proxy, "keyctl")
|
||||||
|
click.echo(f"keyctl-proxy running at URI: {uri}")
|
||||||
|
click.echo("Press Ctrl+C to abort.")
|
||||||
|
daemon.requestLoop()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
if len(sys.argv) > 1:
|
cli()
|
||||||
ipaddr = sys.argv[1]
|
|
||||||
else:
|
|
||||||
print("Usage: keyctl-proxy ipaddress")
|
|
||||||
sys.exit(1)
|
|
||||||
daemon = Pyro4.Daemon(host=ipaddr, port=PORT)
|
|
||||||
proxy = KeyctlProxy()
|
|
||||||
uri = daemon.register(proxy, "keyctl")
|
|
||||||
print('keyctl-proxy running at URI: "%s"' % uri)
|
|
||||||
daemon.requestLoop()
|
|
||||||
|
Loading…
Reference in New Issue
Block a user