1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2025-01-22 13:21:03 +00:00
trezor-firmware/trezorctl
matejcik e1efd493fd trezorctl: updated firmware update flow
We can now locally verify firmware signatures and hashes. We also
recognize min_firmware_version, so this resolves #308

This also helps with #273, as trezorlib is now mostly usable for signing
firmware images.
2018-10-12 15:58:55 +02:00

1745 lines
51 KiB
Python
Executable File

#!/usr/bin/env python3
# This file is part of the TREZOR project.
#
# Copyright (C) 2012-2017 Marek Palatinus <slush@satoshilabs.com>
# Copyright (C) 2012-2017 Pavol Rusnak <stick@satoshilabs.com>
# Copyright (C) 2016-2017 Jochen Hoenicke <hoenicke@gmail.com>
# Copyright (C) 2017 mruddy
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this library. If not, see <http://www.gnu.org/licenses/>.
import base64
import json
import os
import sys
import click
import requests
from trezorlib import (
btc,
cardano,
coins,
cosi,
debuglink,
device,
ethereum,
exceptions,
firmware,
lisk,
log,
messages as proto,
misc,
nem,
ontology,
protobuf,
ripple,
stellar,
tezos,
tools,
ui,
)
from trezorlib.client import TrezorClient
from trezorlib.transport import enumerate_devices, get_transport
class ChoiceType(click.Choice):
def __init__(self, typemap):
super(ChoiceType, self).__init__(typemap.keys())
self.typemap = typemap
def convert(self, value, param, ctx):
value = super(ChoiceType, self).convert(value, param, ctx)
return self.typemap[value]
CHOICE_RECOVERY_DEVICE_TYPE = ChoiceType(
{
"scrambled": proto.RecoveryDeviceType.ScrambledWords,
"matrix": proto.RecoveryDeviceType.Matrix,
}
)
CHOICE_INPUT_SCRIPT_TYPE = ChoiceType(
{
"address": proto.InputScriptType.SPENDADDRESS,
"segwit": proto.InputScriptType.SPENDWITNESS,
"p2shsegwit": proto.InputScriptType.SPENDP2SHWITNESS,
}
)
CHOICE_OUTPUT_SCRIPT_TYPE = ChoiceType(
{
"address": proto.OutputScriptType.PAYTOADDRESS,
"segwit": proto.OutputScriptType.PAYTOWITNESS,
"p2shsegwit": proto.OutputScriptType.PAYTOP2SHWITNESS,
}
)
class UnderscoreAgnosticGroup(click.Group):
"""Command group that normalizes dashes and underscores.
Click 7.0 silently switched all underscore_commands to dash-commands.
This implementation of `click.Group` responds to underscore_commands by invoking
the respective dash-command.
"""
def get_command(self, ctx, cmd_name):
cmd = super().get_command(ctx, cmd_name)
if cmd is None:
cmd = super().get_command(ctx, cmd_name.replace("_", "-"))
return cmd
def enable_logging():
log.enable_debug_output()
log.OMITTED_MESSAGES.add(proto.Features)
@click.command(cls=UnderscoreAgnosticGroup, context_settings={"max_content_width": 400})
@click.option(
"-p",
"--path",
help="Select device by specific path.",
default=os.environ.get("TREZOR_PATH"),
)
@click.option("-v", "--verbose", is_flag=True, help="Show communication messages.")
@click.option(
"-j", "--json", "is_json", is_flag=True, help="Print result as JSON object"
)
@click.pass_context
def cli(ctx, path, verbose, is_json):
if verbose:
enable_logging()
def get_device():
try:
device = get_transport(path, prefix_search=False)
except Exception:
try:
device = get_transport(path, prefix_search=True)
except Exception:
click.echo("Failed to find a TREZOR device.")
if path is not None:
click.echo("Using path: {}".format(path))
sys.exit(1)
return TrezorClient(transport=device, ui=ui.ClickUI)
ctx.obj = get_device
@cli.resultcallback()
def print_result(res, path, verbose, is_json):
if is_json:
if isinstance(res, protobuf.MessageType):
click.echo(json.dumps({res.__class__.__name__: res.__dict__}))
else:
click.echo(json.dumps(res, sort_keys=True, indent=4))
else:
if isinstance(res, list):
for line in res:
click.echo(line)
elif isinstance(res, dict):
for k, v in res.items():
if isinstance(v, dict):
for kk, vv in v.items():
click.echo("%s.%s: %s" % (k, kk, vv))
else:
click.echo("%s: %s" % (k, v))
elif isinstance(res, protobuf.MessageType):
click.echo(protobuf.format_message(res))
else:
click.echo(res)
#
# Common functions
#
@cli.command(name="list", help="List connected TREZOR devices.")
def ls():
return enumerate_devices()
@cli.command(help="Show version of trezorctl/trezorlib.")
def version():
from trezorlib import __version__ as VERSION
return VERSION
#
# Basic device functions
#
@cli.command(help="Send ping message.")
@click.argument("message")
@click.option("-b", "--button-protection", is_flag=True)
@click.option("-p", "--pin-protection", is_flag=True)
@click.option("-r", "--passphrase-protection", is_flag=True)
@click.pass_obj
def ping(connect, message, button_protection, pin_protection, passphrase_protection):
return connect().ping(
message,
button_protection=button_protection,
pin_protection=pin_protection,
passphrase_protection=passphrase_protection,
)
@cli.command(help="Clear session (remove cached PIN, passphrase, etc.).")
@click.pass_obj
def clear_session(connect):
return connect().clear_session()
@cli.command(help="Get example entropy.")
@click.argument("size", type=int)
@click.pass_obj
def get_entropy(connect, size):
return misc.get_entropy(connect(), size).hex()
@cli.command(help="Retrieve device features and settings.")
@click.pass_obj
def get_features(connect):
return connect().features
#
# Device management functions
#
@cli.command(help="Change new PIN or remove existing.")
@click.option("-r", "--remove", is_flag=True)
@click.pass_obj
def change_pin(connect, remove):
click.echo(ui.PIN_MATRIX_DESCRIPTION)
return device.change_pin(connect(), remove)
@cli.command(help="Enable passphrase.")
@click.pass_obj
def enable_passphrase(connect):
return device.apply_settings(connect(), use_passphrase=True)
@cli.command(help="Disable passphrase.")
@click.pass_obj
def disable_passphrase(connect):
return device.apply_settings(connect(), use_passphrase=False)
@cli.command(help="Set new device label.")
@click.option("-l", "--label")
@click.pass_obj
def set_label(connect, label):
return device.apply_settings(connect(), label=label)
@cli.command(help="Set passphrase source.")
@click.argument("source", type=int)
@click.pass_obj
def set_passphrase_source(connect, source):
return device.apply_settings(connect(), passphrase_source=source)
@cli.command(help="Set auto-lock delay (in seconds).")
@click.argument("delay", type=str)
@click.pass_obj
def set_auto_lock_delay(connect, delay):
value, unit = delay[:-1], delay[-1:]
units = {"s": 1, "m": 60, "h": 3600}
if unit in units:
seconds = float(value) * units[unit]
else:
seconds = float(delay) # assume seconds if no unit is specified
return device.apply_settings(connect(), auto_lock_delay_ms=int(seconds * 1000))
@cli.command(help="Set device flags.")
@click.argument("flags")
@click.pass_obj
def set_flags(connect, flags):
flags = flags.lower()
if flags.startswith("0b"):
flags = int(flags, 2)
elif flags.startswith("0x"):
flags = int(flags, 16)
else:
flags = int(flags)
return device.apply_flags(connect(), flags=flags)
@cli.command(help="Set new homescreen.")
@click.option("-f", "--filename", default=None)
@click.pass_obj
def set_homescreen(connect, filename):
if filename is None:
img = b"\x00"
elif filename.endswith(".toif"):
img = open(filename, "rb").read()
if img[:8] != b"TOIf\x90\x00\x90\x00":
raise tools.CallException(
proto.FailureType.DataError,
"File is not a TOIF file with size of 144x144",
)
else:
from PIL import Image
im = Image.open(filename)
if im.size != (128, 64):
raise tools.CallException(
proto.FailureType.DataError, "Wrong size of the image"
)
im = im.convert("1")
pix = im.load()
img = bytearray(1024)
for j in range(64):
for i in range(128):
if pix[i, j]:
o = i + j * 128
img[o // 8] |= 1 << (7 - o % 8)
img = bytes(img)
return device.apply_settings(connect(), homescreen=img)
@cli.command(help="Set U2F counter.")
@click.argument("counter", type=int)
@click.pass_obj
def set_u2f_counter(connect, counter):
return device.set_u2f_counter(connect(), counter)
@cli.command(help="Reset device to factory defaults and remove all private data.")
@click.option(
"-b",
"--bootloader",
help="Wipe device in bootloader mode. This also erases the firmware.",
is_flag=True,
)
@click.pass_obj
def wipe_device(connect, bootloader):
client = connect()
if bootloader:
if not client.features.bootloader_mode:
click.echo("Please switch your device to bootloader mode.")
sys.exit(1)
else:
click.echo("Wiping user data and firmware!")
else:
if client.features.bootloader_mode:
click.echo(
"Your device is in bootloader mode. This operation would also erase firmware."
)
click.echo(
'Specify "--bootloader" if that is what you want, or disconnect and reconnect device in normal mode.'
)
click.echo("Aborting.")
sys.exit(1)
else:
click.echo("Wiping user data!")
try:
return device.wipe(connect())
except tools.CallException as e:
click.echo("Action failed: {} {}".format(*e.args))
sys.exit(3)
@cli.command(help="Load custom configuration to the device.")
@click.option("-m", "--mnemonic")
@click.option("-e", "--expand", is_flag=True)
@click.option("-x", "--xprv")
@click.option("-p", "--pin", default="")
@click.option("-r", "--passphrase-protection", is_flag=True)
@click.option("-l", "--label", default="")
@click.option("-i", "--ignore-checksum", is_flag=True)
@click.option("-s", "--slip0014", is_flag=True)
@click.pass_obj
def load_device(
connect,
mnemonic,
expand,
xprv,
pin,
passphrase_protection,
label,
ignore_checksum,
slip0014,
):
if not mnemonic and not xprv and not slip0014:
raise tools.CallException(
proto.FailureType.DataError, "Please provide mnemonic or xprv"
)
client = connect()
if mnemonic:
return debuglink.load_device_by_mnemonic(
client,
mnemonic,
pin,
passphrase_protection,
label,
"english",
ignore_checksum,
expand,
)
if xprv:
return debuglink.load_device_by_xprv(
client, xprv, pin, passphrase_protection, label, "english"
)
if slip0014:
return debuglink.load_device_by_mnemonic(
client, " ".join(["all"] * 12), pin, passphrase_protection, "SLIP-0014"
)
@cli.command(help="Start safe recovery workflow.")
@click.option("-w", "--words", type=click.Choice(["12", "18", "24"]), default="24")
@click.option("-e", "--expand", is_flag=True)
@click.option("-p", "--pin-protection", is_flag=True)
@click.option("-r", "--passphrase-protection", is_flag=True)
@click.option("-l", "--label")
@click.option(
"-t", "--type", "rec_type", type=CHOICE_RECOVERY_DEVICE_TYPE, default="scrambled"
)
@click.option("-d", "--dry-run", is_flag=True)
@click.pass_obj
def recovery_device(
connect,
words,
expand,
pin_protection,
passphrase_protection,
label,
rec_type,
dry_run,
):
if rec_type == proto.RecoveryDeviceType.ScrambledWords:
input_callback = ui.mnemonic_words(expand)
else:
input_callback = ui.matrix_words
click.echo(ui.RECOVERY_MATRIX_DESCRIPTION)
return device.recover(
connect(),
int(words),
passphrase_protection,
pin_protection,
label,
"english",
input_callback,
rec_type,
dry_run,
)
@cli.command(help="Perform device setup and generate new seed.")
@click.option("-e", "--entropy", is_flag=True)
@click.option(
"-t", "--strength", type=click.Choice(["128", "192", "256"]), default="256"
)
@click.option("-r", "--passphrase-protection", is_flag=True)
@click.option("-p", "--pin-protection", is_flag=True)
@click.option("-l", "--label")
@click.option("-u", "--u2f-counter", default=0)
@click.option("-s", "--skip-backup", is_flag=True)
@click.option("-n", "--no-backup", is_flag=True)
@click.pass_obj
def reset_device(
connect,
entropy,
strength,
passphrase_protection,
pin_protection,
label,
u2f_counter,
skip_backup,
no_backup,
):
return device.reset(
connect(),
entropy,
int(strength),
passphrase_protection,
pin_protection,
label,
"english",
u2f_counter,
skip_backup,
no_backup,
)
@cli.command(help="Perform device seed backup.")
@click.pass_obj
def backup_device(connect):
return device.backup(connect())
#
# Firmware update
#
def validate_firmware_v1(fw, expected_fingerprint=None):
click.echo("Trezor One firmware image.")
distinct_sig_slots = set(i for i in fw.key_indexes if i != 0)
if not distinct_sig_slots:
if not click.confirm("No signatures found. Continue?", default=False):
sys.exit(1)
elif len(distinct_sig_slots) < 3:
click.echo("Badly signed image (need 3 distinct signatures), aborting.")
sys.exit(1)
else:
all_valid = True
for i in range(len(fw.key_indexes)):
if not firmware.check_sig_v1(fw, i):
click.echo("INVALID signature in slot {}".format(i))
all_valid = False
if all_valid:
click.echo("Signatures are valid.")
else:
click.echo("Invalid signature detected, aborting.")
sys.exit(4)
fingerprint = firmware.digest_v1(fw).hex()
click.echo("Firmware fingerprint: {}".format(fingerprint))
if expected_fingerprint and fingerprint != expected_fingerprint:
click.echo("Expected fingerprint: {}".format(expected_fingerprint))
click.echo("Fingerprints do not match, aborting.")
sys.exit(5)
def validate_firmware_v2(fw, expected_fingerprint=None, skip_vendor_header=False):
click.echo("Trezor T firmware image.")
vendor = fw.vendor_header.vendor_string
vendor_version = "{major}.{minor}".format(**fw.vendor_header.version)
version = fw.firmware_header.version
click.echo("Vendor header from {}, version {}".format(vendor, vendor_version))
click.echo(
"Firmware version {major}.{minor}.{patch} build {build}".format(**version)
)
try:
firmware.validate(fw, skip_vendor_header)
click.echo("Signatures are valid.")
except Exception as e:
click.echo(e)
sys.exit(4)
fingerprint = firmware.digest(fw).hex()
click.echo("Firmware fingerprint: {}".format(fingerprint))
if expected_fingerprint and fingerprint != expected_fingerprint:
click.echo("Expected fingerprint: {}".format(expected_fingerprint))
click.echo("Fingerprints do not match, aborting.")
sys.exit(5)
def find_best_firmware_version(bootloader_version, requested_version=None):
url = "https://wallet.trezor.io/data/firmware/{}/releases.json"
releases = requests.get(url.format(bootloader_version[0])).json()
if not releases:
raise click.ClickException("Failed to get list of releases")
releases.sort(key=lambda r: r["version"], reverse=True)
def version_str(version):
return ".".join(map(str, version))
want_version = requested_version
if want_version is None:
want_version = releases[0]["version"]
click.echo("Best available version: {}".format(version_str(want_version)))
confirm_different_version = False
while True:
want_version_str = version_str(want_version)
try:
release = next(r for r in releases if r["version"] == want_version)
except StopIteration:
click.echo("Version {} not found.".format(want_version_str))
sys.exit(1)
if (
"min_bootloader_version" in release
and release["min_bootloader_version"] > bootloader_version
):
need_version_str = version_str(release["min_firmware_version"])
click.echo(
"Version {} is required before upgrading to {}.".format(
need_version_str, want_version_str
)
)
want_version = release["min_firmware_version"]
confirm_different_version = True
else:
break
if confirm_different_version:
installing_different = "Installing version {} instead.".format(want_version_str)
if requested_version is None:
click.echo(installing_different)
else:
ok = click.confirm(installing_different + " Continue?", default=True)
if not ok:
sys.exit(1)
url = "https://wallet.trezor.io/" + release["url"]
if url.endswith(".hex"):
url = url[:-4]
return url, release["fingerprint"]
@cli.command()
@click.option("-f", "--filename")
@click.option("-u", "--url")
@click.option("-v", "--version")
@click.option("-s", "--skip-check", is_flag=True)
@click.option("--fingerprint", help="Expected firmware fingerprint in hex")
@click.option("--skip-vendor-header", help="Skip vendor header validation on Trezor T")
@click.pass_obj
def firmware_update(
connect, filename, url, version, skip_check, fingerprint, skip_vendor_header
):
"""Upload new firmware to device.
Device must be in bootloader mode.
You can specify a filename or URL from which the firmware can be downloaded.
You can also explicitly specify a firmware version that you want.
Otherwise, trezorctl will attempt to find latest available version
from wallet.trezor.io.
If you provide a fingerprint via the --fingerprint option, it will be checked
against downloaded firmware fingerprint. Otherwise fingerprint is checked
against wallet.trezor.io information, if available.
If you are customizing Model T bootloader and providing your own vendor header,
you can use --skip-vendor-header to ignore vendor header signatures.
"""
if sum(bool(x) for x in (filename, url, version)) > 1:
click.echo("You can use only one of: filename, url, version.")
sys.exit(1)
client = connect()
if not client.features.bootloader_mode:
click.echo("Please switch your device to bootloader mode.")
sys.exit(1)
firmware_version = client.features.major_version
if filename:
data = open(filename, "rb").read()
else:
if not url:
f = client.features
bootloader_version = [f.major_version, f.minor_version, f.patch_version]
version_list = [int(x) for x in version.split(".")] if version else None
url, fp = find_best_firmware_version(bootloader_version, version_list)
if not fingerprint:
fingerprint = fp
click.echo("Downloading from {}".format(url))
r = requests.get(url)
data = r.content
if not skip_check:
try:
version, fw = firmware.parse(data)
except Exception as e:
click.echo(e)
sys.exit(2)
if version == firmware.FirmwareFormat.TREZOR_ONE:
validate_firmware_v1(fw, fingerprint)
elif version == firmware.FirmwareFormat.TREZOR_T:
validate_firmware_v2(fw, fingerprint)
else:
click.echo("Unrecognized firmware version.")
if firmware_version != version.value:
click.echo("Firmware does not match your device, aborting.")
sys.exit(3)
try:
if firmware_version == 1:
# Trezor One does not send ButtonRequest
click.echo("Please confirm action on your Trezor device")
return firmware.update(client, data)
except exceptions.Cancelled:
click.echo("Update aborted on device.")
except exceptions.TrezorException as e:
click.echo("Update failed: {}".format(e))
sys.exit(3)
@cli.command(help="Perform a self-test.")
@click.pass_obj
def self_test(connect):
return debuglink.self_test(connect())
#
# Basic coin functions
#
@cli.command(help="Get address for specified path.")
@click.option("-c", "--coin", default="Bitcoin")
@click.option(
"-n", "--address", required=True, help="BIP-32 path, e.g. m/44'/0'/0'/0/0"
)
@click.option("-t", "--script-type", type=CHOICE_INPUT_SCRIPT_TYPE, default="address")
@click.option("-d", "--show-display", is_flag=True)
@click.pass_obj
def get_address(connect, coin, address, script_type, show_display):
client = connect()
address_n = tools.parse_path(address)
return btc.get_address(
client, coin, address_n, show_display, script_type=script_type
)
@cli.command(help="Get public node of given path.")
@click.option("-c", "--coin", default="Bitcoin")
@click.option("-n", "--address", required=True, help="BIP-32 path, e.g. m/44'/0'/0'")
@click.option("-e", "--curve")
@click.option("-t", "--script-type", type=CHOICE_INPUT_SCRIPT_TYPE, default="address")
@click.option("-d", "--show-display", is_flag=True)
@click.pass_obj
def get_public_node(connect, coin, address, curve, script_type, show_display):
client = connect()
address_n = tools.parse_path(address)
result = btc.get_public_node(
client,
address_n,
ecdsa_curve_name=curve,
show_display=show_display,
coin_name=coin,
script_type=script_type,
)
return {
"node": {
"depth": result.node.depth,
"fingerprint": "%08x" % result.node.fingerprint,
"child_num": result.node.child_num,
"chain_code": result.node.chain_code.hex(),
"public_key": result.node.public_key.hex(),
},
"xpub": result.xpub,
}
#
# Signing options
#
@cli.command(help="Sign transaction.")
@click.option("-c", "--coin", default="Bitcoin")
# @click.option('-n', '--address', required=True, help="BIP-32 path, e.g. m/44'/0'/0'/0/0")
# @click.option('-t', '--script-type', type=CHOICE_INPUT_SCRIPT_TYPE, default='address')
# @click.option('-o', '--output', required=True, help='Transaction output')
# @click.option('-f', '--fee', required=True, help='Transaction fee (sat/B)')
@click.pass_obj
def sign_tx(connect, coin):
client = connect()
if coin in coins.tx_api:
txapi = coins.tx_api[coin]
else:
click.echo('Coin "%s" is not recognized.' % coin, err=True)
click.echo(
"Supported coin types: %s" % ", ".join(coins.tx_api.keys()), err=True
)
sys.exit(1)
client.set_tx_api(txapi)
def default_script_type(address_n):
script_type = "address"
if address_n is None:
pass
elif address_n[0] == tools.H_(49):
script_type = "p2shsegwit"
return script_type
def outpoint(s):
txid, vout = s.split(":")
return bytes.fromhex(txid), int(vout)
inputs = []
while True:
click.echo()
prev = click.prompt(
"Previous output to spend (txid:vout)", type=outpoint, default=""
)
if not prev:
break
prev_hash, prev_index = prev
address_n = click.prompt("BIP-32 path to derive the key", type=tools.parse_path)
amount = click.prompt("Input amount (satoshis)", type=int, default=0)
sequence = click.prompt(
"Sequence Number to use (RBF opt-in enabled by default)",
type=int,
default=0xFFFFFFFD,
)
script_type = click.prompt(
"Input type",
type=CHOICE_INPUT_SCRIPT_TYPE,
default=default_script_type(address_n),
)
script_type = (
script_type
if isinstance(script_type, int)
else CHOICE_INPUT_SCRIPT_TYPE.typemap[script_type]
)
new_input = proto.TxInputType(
address_n=address_n,
prev_hash=prev_hash,
prev_index=prev_index,
amount=amount,
script_type=script_type,
sequence=sequence,
)
if txapi.bip115:
prev_output = txapi.get_tx(prev_hash.hex()).bin_outputs[prev_index]
new_input.prev_block_hash_bip115 = prev_output.block_hash
new_input.prev_block_height_bip115 = prev_output.block_height
inputs.append(new_input)
if txapi.bip115:
current_block_height = txapi.current_height()
block_height = (
current_block_height - 300
) # Zencash recommendation for the better protection
block_hash = txapi.get_block_hash(block_height)
outputs = []
while True:
click.echo()
address = click.prompt("Output address (for non-change output)", default="")
if address:
address_n = None
else:
address = None
address_n = click.prompt(
"BIP-32 path (for change output)", type=tools.parse_path, default=""
)
if not address_n:
break
amount = click.prompt("Amount to spend (satoshis)", type=int)
script_type = click.prompt(
"Output type",
type=CHOICE_OUTPUT_SCRIPT_TYPE,
default=default_script_type(address_n),
)
script_type = (
script_type
if isinstance(script_type, int)
else CHOICE_OUTPUT_SCRIPT_TYPE.typemap[script_type]
)
outputs.append(
proto.TxOutputType(
address_n=address_n,
address=address,
amount=amount,
script_type=script_type,
block_hash_bip115=block_hash[::-1], # Blockhash passed in reverse order
block_height_bip115=block_height,
)
)
tx_version = click.prompt("Transaction version", type=int, default=2)
tx_locktime = click.prompt("Transaction locktime", type=int, default=0)
_, serialized_tx = btc.sign_tx(
client, coin, inputs, outputs, tx_version, tx_locktime
)
client.close()
click.echo()
click.echo("Signed Transaction:")
click.echo(serialized_tx.hex())
click.echo()
click.echo("Use the following form to broadcast it to the network:")
click.echo(txapi.pushtx_url)
#
# Message functions
#
@cli.command(help="Sign message using address of given path.")
@click.option("-c", "--coin", default="Bitcoin")
@click.option(
"-n", "--address", required=True, help="BIP-32 path, e.g. m/44'/0'/0'/0/0"
)
@click.option(
"-t",
"--script-type",
type=click.Choice(["address", "segwit", "p2shsegwit"]),
default="address",
)
@click.argument("message")
@click.pass_obj
def sign_message(connect, coin, address, message, script_type):
client = connect()
address_n = tools.parse_path(address)
typemap = {
"address": proto.InputScriptType.SPENDADDRESS,
"segwit": proto.InputScriptType.SPENDWITNESS,
"p2shsegwit": proto.InputScriptType.SPENDP2SHWITNESS,
}
script_type = typemap[script_type]
res = btc.sign_message(client, coin, address_n, message, script_type)
return {
"message": message,
"address": res.address,
"signature": base64.b64encode(res.signature),
}
@cli.command(help="Verify message.")
@click.option("-c", "--coin", default="Bitcoin")
@click.argument("address")
@click.argument("signature")
@click.argument("message")
@click.pass_obj
def verify_message(connect, coin, address, signature, message):
signature = base64.b64decode(signature)
return btc.verify_message(connect(), coin, address, signature, message)
@cli.command(help="Sign message with Ethereum address.")
@click.option(
"-n", "--address", required=True, help="BIP-32 path, e.g. m/44'/60'/0'/0/0"
)
@click.argument("message")
@click.pass_obj
def ethereum_sign_message(connect, address, message):
client = connect()
address_n = tools.parse_path(address)
ret = ethereum.sign_message(client, address_n, message)
output = {
"message": message,
"address": "0x%s" % ret.address.hex(),
"signature": "0x%s" % ret.signature.hex(),
}
return output
def ethereum_decode_hex(value):
if value.startswith("0x") or value.startswith("0X"):
return bytes.fromhex(value[2:])
else:
return bytes.fromhex(value)
@cli.command(help="Verify message signed with Ethereum address.")
@click.argument("address")
@click.argument("signature")
@click.argument("message")
@click.pass_obj
def ethereum_verify_message(connect, address, signature, message):
address = ethereum_decode_hex(address)
signature = ethereum_decode_hex(signature)
return ethereum.verify_message(connect(), address, signature, message)
@cli.command(help="Encrypt value by given key and path.")
@click.option("-n", "--address", required=True, help="BIP-32 path, e.g. m/10016'/0")
@click.argument("key")
@click.argument("value")
@click.pass_obj
def encrypt_keyvalue(connect, address, key, value):
client = connect()
address_n = tools.parse_path(address)
res = misc.encrypt_keyvalue(client, address_n, key, value.encode())
return res.hex()
@cli.command(help="Decrypt value by given key and path.")
@click.option("-n", "--address", required=True, help="BIP-32 path, e.g. m/10016'/0")
@click.argument("key")
@click.argument("value")
@click.pass_obj
def decrypt_keyvalue(connect, address, key, value):
client = connect()
address_n = tools.parse_path(address)
return misc.decrypt_keyvalue(client, address_n, key, bytes.fromhex(value))
# @cli.command(help='Encrypt message.')
# @click.option('-c', '--coin', default='Bitcoin')
# @click.option('-d', '--display-only', is_flag=True)
# @click.option('-n', '--address', required=True, help="BIP-32 path, e.g. m/44'/0'/0'/0/0")
# @click.argument('pubkey')
# @click.argument('message')
# @click.pass_obj
# def encrypt_message(connect, coin, display_only, address, pubkey, message):
# client = connect()
# pubkey = bytes.fromhex(pubkey)
# address_n = tools.parse_path(address)
# res = client.encrypt_message(pubkey, message, display_only, coin, address_n)
# return {
# 'nonce': res.nonce.hex(),
# 'message': res.message.hex(),
# 'hmac': res.hmac.hex(),
# 'payload': base64.b64encode(res.nonce + res.message + res.hmac),
# }
# @cli.command(help='Decrypt message.')
# @click.option('-n', '--address', required=True, help="BIP-32 path, e.g. m/44'/0'/0'/0/0")
# @click.argument('payload')
# @click.pass_obj
# def decrypt_message(connect, address, payload):
# client = connect()
# address_n = tools.parse_path(address)
# payload = base64.b64decode(payload)
# nonce, message, msg_hmac = payload[:33], payload[33:-8], payload[-8:]
# return client.decrypt_message(address_n, nonce, message, msg_hmac)
#
# Ethereum functions
#
@cli.command(help="Get Ethereum address in hex encoding.")
@click.option(
"-n", "--address", required=True, help="BIP-32 path, e.g. m/44'/60'/0'/0/0"
)
@click.option("-d", "--show-display", is_flag=True)
@click.pass_obj
def ethereum_get_address(connect, address, show_display):
client = connect()
address_n = tools.parse_path(address)
address = ethereum.get_address(client, address_n, show_display)
return "0x%s" % address.hex()
@cli.command(
help='Sign (and optionally publish) Ethereum transaction. Use TO as destination address or set TO to "" for contract creation.'
)
@click.option(
"-a",
"--host",
default="localhost:8545",
help="RPC port of ethereum node for automatic gas/nonce estimation and publishing",
)
@click.option("-c", "--chain-id", type=int, help="EIP-155 chain id (replay protection)")
@click.option(
"-n",
"--address",
required=True,
help="BIP-32 path to source address, e.g., m/44'/60'/0'/0/0",
)
@click.option(
"-v", "--value", default="0", help='Ether amount to transfer, e.g. "100 milliether"'
)
@click.option(
"-g", "--gas-limit", type=int, help="Gas limit - Required for offline signing"
)
@click.option(
"-t",
"--gas-price",
help='Gas price, e.g. "20 nanoether" - Required for offline signing',
)
@click.option(
"-i", "--nonce", type=int, help="Transaction counter - Required for offline signing"
)
@click.option("-d", "--data", default="", help="Data as hex string, e.g. 0x12345678")
@click.option("-p", "--publish", is_flag=True, help="Publish transaction via RPC")
@click.option("-x", "--tx-type", type=int, help="TX type (used only for Wanchain)")
@click.argument("to")
@click.pass_obj
def ethereum_sign_tx(
connect,
host,
chain_id,
address,
value,
gas_limit,
gas_price,
nonce,
data,
publish,
to,
tx_type,
):
from ethjsonrpc import EthJsonRpc
import rlp
# fmt: off
ether_units = {
'wei': 1,
'kwei': 1000,
'babbage': 1000,
'femtoether': 1000,
'mwei': 1000000,
'lovelace': 1000000,
'picoether': 1000000,
'gwei': 1000000000,
'shannon': 1000000000,
'nanoether': 1000000000,
'nano': 1000000000,
'szabo': 1000000000000,
'microether': 1000000000000,
'micro': 1000000000000,
'finney': 1000000000000000,
'milliether': 1000000000000000,
'milli': 1000000000000000,
'ether': 1000000000000000000,
'eth': 1000000000000000000,
}
# fmt: on
if " " in value:
value, unit = value.split(" ", 1)
if unit.lower() not in ether_units:
raise tools.CallException(
proto.Failure.DataError, "Unrecognized ether unit %r" % unit
)
value = int(value) * ether_units[unit.lower()]
else:
value = int(value)
if gas_price is not None:
if " " in gas_price:
gas_price, unit = gas_price.split(" ", 1)
if unit.lower() not in ether_units:
raise tools.CallException(
proto.Failure.DataError, "Unrecognized gas price unit %r" % unit
)
gas_price = int(gas_price) * ether_units[unit.lower()]
else:
gas_price = int(gas_price)
if gas_limit is not None:
gas_limit = int(gas_limit)
to_address = ethereum_decode_hex(to)
client = connect()
address_n = tools.parse_path(address)
address = "0x%s" % ethereum.get_address(client, address_n).hex()
if gas_price is None or gas_limit is None or nonce is None or publish:
host, port = host.split(":")
eth = EthJsonRpc(host, int(port))
if not data:
data = ""
data = ethereum_decode_hex(data)
if gas_price is None:
gas_price = eth.eth_gasPrice()
if gas_limit is None:
gas_limit = eth.eth_estimateGas(
to_address=to,
from_address=address,
value=("0x%x" % value),
data="0x%s" % data.hex(),
)
if nonce is None:
nonce = eth.eth_getTransactionCount(address)
sig = ethereum.sign_tx(
client,
n=address_n,
tx_type=tx_type,
nonce=nonce,
gas_price=gas_price,
gas_limit=gas_limit,
to=to_address,
value=value,
data=data,
chain_id=chain_id,
)
if tx_type is None:
transaction = rlp.encode(
(nonce, gas_price, gas_limit, to_address, value, data) + sig
)
else:
transaction = rlp.encode(
(tx_type, nonce, gas_price, gas_limit, to_address, value, data) + sig
)
tx_hex = "0x%s" % transaction.hex()
if publish:
tx_hash = eth.eth_sendRawTransaction(tx_hex)
return "Transaction published with ID: %s" % tx_hash
else:
return "Signed raw transaction: %s" % tx_hex
#
# ADA functions
#
@cli.command(help="Sign Cardano transaction.")
@click.option(
"-f",
"--file",
type=click.File("r"),
required=True,
help="Transaction in JSON format",
)
@click.option("-N", "--network", type=int, default=1)
@click.pass_obj
def cardano_sign_tx(connect, file, network):
client = connect()
transaction = json.load(file)
inputs = [cardano.create_input(input) for input in transaction["inputs"]]
outputs = [cardano.create_output(output) for output in transaction["outputs"]]
transactions = transaction["transactions"]
signed_transaction = cardano.sign_tx(client, inputs, outputs, transactions, network)
return {
"tx_hash": signed_transaction.tx_hash.hex(),
"tx_body": signed_transaction.tx_body.hex(),
}
@cli.command(help="Get Cardano address.")
@click.option(
"-n", "--address", required=True, help="BIP-32 path to key, e.g. m/44'/1815'/0'/0/0"
)
@click.option("-d", "--show-display", is_flag=True)
@click.pass_obj
def cardano_get_address(connect, address, show_display):
client = connect()
address_n = tools.parse_path(address)
return cardano.get_address(client, address_n, show_display)
@cli.command(help="Get Cardano public key.")
@click.option(
"-n", "--address", required=True, help="BIP-32 path to key, e.g. m/44'/1815'/0'/0/0"
)
@click.pass_obj
def cardano_get_public_key(connect, address):
client = connect()
address_n = tools.parse_path(address)
return cardano.get_public_key(client, address_n)
#
# NEM functions
#
@cli.command(help="Get NEM address for specified path.")
@click.option(
"-n", "--address", required=True, help="BIP-32 path, e.g. m/44'/0'/43'/0/0"
)
@click.option("-N", "--network", type=int, default=0x68)
@click.option("-d", "--show-display", is_flag=True)
@click.pass_obj
def nem_get_address(connect, address, network, show_display):
client = connect()
address_n = tools.parse_path(address)
return nem.get_address(client, address_n, network, show_display)
@cli.command(help="Sign (and optionally broadcast) NEM transaction.")
@click.option("-n", "--address", help="BIP-32 path to signing key")
@click.option(
"-f",
"--file",
type=click.File("r"),
default="-",
help="Transaction in NIS (RequestPrepareAnnounce) format",
)
@click.option("-b", "--broadcast", help="NIS to announce transaction to")
@click.pass_obj
def nem_sign_tx(connect, address, file, broadcast):
client = connect()
address_n = tools.parse_path(address)
transaction = nem.sign_tx(client, address_n, json.load(file))
payload = {"data": transaction.data.hex(), "signature": transaction.signature.hex()}
if broadcast:
return requests.post(
"{}/transaction/announce".format(broadcast), json=payload
).json()
else:
return payload
#
# Lisk functions
#
@cli.command(help="Get Lisk address for specified path.")
@click.option(
"-n", "--address", required=True, help="BIP-32 path, e.g. m/44'/134'/0'/0'"
)
@click.option("-d", "--show-display", is_flag=True)
@click.pass_obj
def lisk_get_address(connect, address, show_display):
client = connect()
address_n = tools.parse_path(address)
return lisk.get_address(client, address_n, show_display)
@cli.command(help="Get Lisk public key for specified path.")
@click.option(
"-n", "--address", required=True, help="BIP-32 path, e.g. m/44'/134'/0'/0'"
)
@click.option("-d", "--show-display", is_flag=True)
@click.pass_obj
def lisk_get_public_key(connect, address, show_display):
client = connect()
address_n = tools.parse_path(address)
res = lisk.get_public_key(client, address_n, show_display)
output = {"public_key": res.public_key.hex()}
return output
@cli.command(help="Sign Lisk transaction.")
@click.option(
"-n",
"--address",
required=True,
help="BIP-32 path to signing key, e.g. m/44'/134'/0'/0'",
)
@click.option(
"-f", "--file", type=click.File("r"), default="-", help="Transaction in JSON format"
)
# @click.option('-b', '--broadcast', help='Broadcast Lisk transaction')
@click.pass_obj
def lisk_sign_tx(connect, address, file):
client = connect()
address_n = tools.parse_path(address)
transaction = lisk.sign_tx(client, address_n, json.load(file))
payload = {"signature": transaction.signature.hex()}
return payload
@cli.command(help="Sign message with Lisk address.")
@click.option(
"-n", "--address", required=True, help="BIP-32 path, e.g. m/44'/134'/0'/0'"
)
@click.argument("message")
@click.pass_obj
def lisk_sign_message(connect, address, message):
client = connect()
address_n = client.expand_path(address)
res = lisk.sign_message(client, address_n, message)
output = {
"message": message,
"public_key": res.public_key.hex(),
"signature": res.signature.hex(),
}
return output
@cli.command(help="Verify message signed with Lisk address.")
@click.argument("pubkey")
@click.argument("signature")
@click.argument("message")
@click.pass_obj
def lisk_verify_message(connect, pubkey, signature, message):
signature = bytes.fromhex(signature)
pubkey = bytes.fromhex(pubkey)
return lisk.verify_message(connect(), pubkey, signature, message)
#
# CoSi functions
#
@cli.command(help="Ask device to commit to CoSi signing.")
@click.option(
"-n", "--address", required=True, help="BIP-32 path, e.g. m/44'/0'/0'/0/0"
)
@click.argument("data")
@click.pass_obj
def cosi_commit(connect, address, data):
client = connect()
address_n = tools.parse_path(address)
return cosi.commit(client, address_n, bytes.fromhex(data))
@cli.command(help="Ask device to sign using CoSi.")
@click.option(
"-n", "--address", required=True, help="BIP-32 path, e.g. m/44'/0'/0'/0/0"
)
@click.argument("data")
@click.argument("global_commitment")
@click.argument("global_pubkey")
@click.pass_obj
def cosi_sign(connect, address, data, global_commitment, global_pubkey):
client = connect()
address_n = tools.parse_path(address)
return cosi.sign(
client,
address_n,
bytes.fromhex(data),
bytes.fromhex(global_commitment),
bytes.fromhex(global_pubkey),
)
#
# Stellar functions
#
@cli.command(help="Get Stellar public address")
@click.option(
"-n",
"--address",
required=False,
help="BIP32 path. Always use hardened paths and the m/44'/148'/ prefix",
default=stellar.DEFAULT_BIP32_PATH,
)
@click.option("-d", "--show-display", is_flag=True)
@click.pass_obj
def stellar_get_address(connect, address, show_display):
client = connect()
address_n = tools.parse_path(address)
return stellar.get_address(client, address_n, show_display)
@cli.command(help="Sign a base64-encoded transaction envelope")
@click.option(
"-n",
"--address",
required=False,
help="BIP32 path. Always use hardened paths and the m/44'/148'/ prefix",
default=stellar.DEFAULT_BIP32_PATH,
)
@click.option(
"-n",
"--network-passphrase",
default=stellar.DEFAULT_NETWORK_PASSPHRASE,
required=False,
help="Network passphrase (blank for public network). Testnet is: 'Test SDF Network ; September 2015'",
)
@click.argument("b64envelope")
@click.pass_obj
def stellar_sign_transaction(connect, b64envelope, address, network_passphrase):
client = connect()
address_n = tools.parse_path(address)
tx, operations = stellar.parse_transaction_bytes(base64.b64decode(b64envelope))
resp = stellar.sign_tx(client, tx, operations, address_n, network_passphrase)
return base64.b64encode(resp.signature)
#
# Ripple functions
#
@cli.command(help="Get Ripple address")
@click.option(
"-n", "--address", required=True, help="BIP-32 path to key, e.g. m/44'/144'/0'/0/0"
)
@click.option("-d", "--show-display", is_flag=True)
@click.pass_obj
def ripple_get_address(connect, address, show_display):
client = connect()
address_n = tools.parse_path(address)
return ripple.get_address(client, address_n, show_display)
@cli.command(help="Sign Ripple transaction")
@click.option(
"-n", "--address", required=True, help="BIP-32 path to key, e.g. m/44'/144'/0'/0/0"
)
@click.option(
"-f", "--file", type=click.File("r"), default="-", help="Transaction in JSON format"
)
@click.pass_obj
def ripple_sign_tx(connect, address, file):
client = connect()
address_n = tools.parse_path(address)
msg = ripple.create_sign_tx_msg(json.load(file))
result = ripple.sign_tx(client, address_n, msg)
click.echo("Signature:")
click.echo(result.signature.hex())
click.echo()
click.echo("Serialized tx including the signature:")
click.echo(result.serialized_tx.hex())
#
# Tezos functions
#
@cli.command(help="Get Tezos address for specified path.")
@click.option("-n", "--address", required=True, help="BIP-32 path, e.g. m/44'/1729'/0'")
@click.option("-d", "--show-display", is_flag=True)
@click.pass_obj
def tezos_get_address(connect, address, show_display):
client = connect()
address_n = tools.parse_path(address)
return tezos.get_address(client, address_n, show_display)
@cli.command(help="Get Tezos public key.")
@click.option("-n", "--address", required=True, help="BIP-32 path, e.g. m/44'/1729'/0'")
@click.option("-d", "--show-display", is_flag=True)
@click.pass_obj
def tezos_get_public_key(connect, address, show_display):
client = connect()
address_n = tools.parse_path(address)
return tezos.get_public_key(client, address_n, show_display)
@cli.command(help="Sign Tezos transaction.")
@click.option("-n", "--address", required=True, help="BIP-32 path, e.g. m/44'/1729'/0'")
@click.option(
"-f",
"--file",
type=click.File("r"),
default="-",
help="Transaction in JSON format (byte fields should be hexlified)",
)
@click.pass_obj
def tezos_sign_tx(connect, address, file):
client = connect()
address_n = tools.parse_path(address)
msg = protobuf.dict_to_proto(proto.TezosSignTx, json.load(file))
return tezos.sign_tx(client, address_n, msg)
#
# Ontology functions
#
@cli.command(help="Get Ontology address for specified path.")
@click.option(
"-n", "--address", required=True, help="BIP-32 path, e.g. m/44'/888'/0'/0/0"
)
@click.option("-d", "--show-display", is_flag=True)
@click.pass_obj
def ontology_get_address(connect, address, show_display):
client = connect()
address_n = tools.parse_path(address)
return ontology.get_address(client, address_n, show_display)
@cli.command(help="Get Ontology public key for specified path.")
@click.option(
"-n", "--address", required=True, help="BIP-32 path, e.g. m/44'/888'/0'/0/0"
)
@click.option("-d", "--show-display", is_flag=True)
@click.pass_obj
def ontology_get_public_key(connect, address, show_display):
client = connect()
address_n = tools.parse_path(address)
result = ontology.get_public_key(client, address_n, show_display)
return result.public_key.hex()
@cli.command(help="Sign Ontology transfer.")
@click.option(
"-n",
"--address",
required=True,
help="BIP-32 path to signing key, e.g. m/44'/888'/0'/0/0",
)
@click.option(
"-tx",
"--transaction",
type=click.File("r"),
default="-",
help="Transaction in JSON format",
)
@click.option(
"-tr",
"--transfer",
type=click.File("r"),
default="-",
help="Transfer in JSON format",
)
@click.pass_obj
def ontology_sign_transfer(connect, address, transaction_f, transfer_f):
client = connect()
address_n = tools.parse_path(address)
transaction = protobuf.dict_to_proto(
proto.OntologyTransaction, json.load(transaction_f)
)
transfer = protobuf.dict_to_proto(proto.OntologyTransfer, json.load(transfer_f))
result = ontology.sign_transfer(client, address_n, transaction, transfer)
output = {"payload": result.payload.hex(), "signature": result.signature.hex()}
return output
@cli.command(help="Sign Ontology withdraw Ong.")
@click.option(
"-n",
"--address",
required=True,
help="BIP-32 path to signing key, e.g. m/44'/888'/0'/0/0",
)
@click.option(
"-tx",
"--transaction",
type=click.File("r"),
default="-",
help="Transaction in JSON format",
)
@click.option(
"-wi",
"--withdraw_ong",
type=click.File("r"),
default="-",
help="Withdrawal in JSON format",
)
@click.pass_obj
def ontology_sign_withdraw_ong(connect, address, transaction_f, withdraw_ong_f):
client = connect()
address_n = tools.parse_path(address)
transaction = protobuf.dict_to_proto(
proto.OntologyTransaction, json.load(transaction_f)
)
withdraw_ong = protobuf.dict_to_proto(
proto.OntologyWithdrawOng, json.load(withdraw_ong_f)
)
result = ontology.sign_withdrawal(client, address_n, transaction, withdraw_ong)
output = {"payload": result.payload.hex(), "signature": result.signature.hex()}
return output
@cli.command(help="Sign Ontology ONT ID Registration.")
@click.option(
"-n",
"--address",
required=True,
help="BIP-32 path to signing key, e.g. m/44'/888'/0'/0/0",
)
@click.option(
"-tx",
"--transaction",
type=click.File("r"),
default="-",
help="Transaction in JSON format",
)
@click.option(
"-re",
"--register",
type=click.File("r"),
default="-",
help="Register in JSON format",
)
@click.argument("transaction")
@click.argument("ont_id_register")
@click.pass_obj
def ontology_sign_ont_id_register(connect, address, transaction_f, ont_id_register_f):
client = connect()
address_n = tools.parse_path(address)
transaction = protobuf.dict_to_proto(
proto.OntologyTransaction, json.load(transaction_f)
)
ont_id_register = protobuf.dict_to_proto(
proto.OntologyOntIdRegister, json.load(ont_id_register_f)
)
result = ontology.sign_register(client, address_n, transaction, ont_id_register)
output = {"payload": result.payload.hex(), "signature": result.signature.hex()}
return output
@cli.command(help="Sign Ontology ONT ID Attributes adding.")
@click.option(
"-n",
"--address",
required=True,
help="BIP-32 path to signing key, e.g. m/44'/888'/0'/0/0",
)
@click.option(
"-tx",
"--transaction",
type=click.File("r"),
default="-",
help="Transaction in JSON format",
)
@click.option(
"-aa",
"--add_attr",
type=click.File("r"),
default="-",
help="Add attributes in JSON format",
)
@click.pass_obj
def ontology_sign_ont_id_add_attributes(
connect, address, transaction_f, ont_id_add_attributes_f
):
client = connect()
address_n = tools.parse_path(address)
transaction = protobuf.dict_to_proto(
proto.OntologyTransaction, json.load(transaction_f)
)
ont_id_add_attributes = protobuf.dict_to_proto(
proto.OntologyOntIdAddAttributes, json.load(ont_id_add_attributes_f)
)
result = ontology.sign_add_attr(
client, address_n, transaction, ont_id_add_attributes
)
output = {"payload": result.payload.hex(), "signature": result.signature.hex()}
return output
#
# Main
#
if __name__ == "__main__":
cli() # pylint: disable=E1120