#!/usr/bin/env python3 # This file is part of the TREZOR project. # # Copyright (C) 2012-2017 Marek Palatinus # Copyright (C) 2012-2017 Pavol Rusnak # Copyright (C) 2016-2017 Jochen Hoenicke # Copyright (C) 2017 mruddy # # This library is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this library. If not, see . import base64 import json import os import sys import click import requests from trezorlib import ( btc, cardano, coins, cosi, debuglink, device, ethereum, exceptions, firmware, lisk, log, messages as proto, misc, monero, nem, ontology, protobuf, ripple, stellar, tezos, tools, ui, ) from trezorlib.client import TrezorClient from trezorlib.transport import enumerate_devices, get_transport class ChoiceType(click.Choice): def __init__(self, typemap): super(ChoiceType, self).__init__(typemap.keys()) self.typemap = typemap def convert(self, value, param, ctx): value = super(ChoiceType, self).convert(value, param, ctx) return self.typemap[value] CHOICE_RECOVERY_DEVICE_TYPE = ChoiceType( { "scrambled": proto.RecoveryDeviceType.ScrambledWords, "matrix": proto.RecoveryDeviceType.Matrix, } ) CHOICE_INPUT_SCRIPT_TYPE = ChoiceType( { "address": proto.InputScriptType.SPENDADDRESS, "segwit": proto.InputScriptType.SPENDWITNESS, "p2shsegwit": proto.InputScriptType.SPENDP2SHWITNESS, } ) CHOICE_OUTPUT_SCRIPT_TYPE = ChoiceType( { "address": proto.OutputScriptType.PAYTOADDRESS, "segwit": proto.OutputScriptType.PAYTOWITNESS, "p2shsegwit": proto.OutputScriptType.PAYTOP2SHWITNESS, } ) class UnderscoreAgnosticGroup(click.Group): """Command group that normalizes dashes and underscores. Click 7.0 silently switched all underscore_commands to dash-commands. This implementation of `click.Group` responds to underscore_commands by invoking the respective dash-command. """ def get_command(self, ctx, cmd_name): cmd = super().get_command(ctx, cmd_name) if cmd is None: cmd = super().get_command(ctx, cmd_name.replace("_", "-")) return cmd def enable_logging(): log.enable_debug_output() log.OMITTED_MESSAGES.add(proto.Features) @click.command(cls=UnderscoreAgnosticGroup, context_settings={"max_content_width": 400}) @click.option( "-p", "--path", help="Select device by specific path.", default=os.environ.get("TREZOR_PATH"), ) @click.option("-v", "--verbose", is_flag=True, help="Show communication messages.") @click.option( "-j", "--json", "is_json", is_flag=True, help="Print result as JSON object" ) @click.pass_context def cli(ctx, path, verbose, is_json): if verbose: enable_logging() def get_device(): try: device = get_transport(path, prefix_search=False) except Exception: try: device = get_transport(path, prefix_search=True) except Exception: click.echo("Failed to find a TREZOR device.") if path is not None: click.echo("Using path: {}".format(path)) sys.exit(1) return TrezorClient(transport=device, ui=ui.ClickUI) ctx.obj = get_device @cli.resultcallback() def print_result(res, path, verbose, is_json): if is_json: if isinstance(res, protobuf.MessageType): click.echo(json.dumps({res.__class__.__name__: res.__dict__})) else: click.echo(json.dumps(res, sort_keys=True, indent=4)) else: if isinstance(res, list): for line in res: click.echo(line) elif isinstance(res, dict): for k, v in res.items(): if isinstance(v, dict): for kk, vv in v.items(): click.echo("%s.%s: %s" % (k, kk, vv)) else: click.echo("%s: %s" % (k, v)) elif isinstance(res, protobuf.MessageType): click.echo(protobuf.format_message(res)) else: click.echo(res) # # Common functions # @cli.command(name="list", help="List connected TREZOR devices.") def ls(): return enumerate_devices() @cli.command(help="Show version of trezorctl/trezorlib.") def version(): from trezorlib import __version__ as VERSION return VERSION # # Basic device functions # @cli.command(help="Send ping message.") @click.argument("message") @click.option("-b", "--button-protection", is_flag=True) @click.option("-p", "--pin-protection", is_flag=True) @click.option("-r", "--passphrase-protection", is_flag=True) @click.pass_obj def ping(connect, message, button_protection, pin_protection, passphrase_protection): return connect().ping( message, button_protection=button_protection, pin_protection=pin_protection, passphrase_protection=passphrase_protection, ) @cli.command(help="Clear session (remove cached PIN, passphrase, etc.).") @click.pass_obj def clear_session(connect): return connect().clear_session() @cli.command(help="Get example entropy.") @click.argument("size", type=int) @click.pass_obj def get_entropy(connect, size): return misc.get_entropy(connect(), size).hex() @cli.command(help="Retrieve device features and settings.") @click.pass_obj def get_features(connect): return connect().features # # Device management functions # @cli.command(help="Change new PIN or remove existing.") @click.option("-r", "--remove", is_flag=True) @click.pass_obj def change_pin(connect, remove): click.echo(ui.PIN_MATRIX_DESCRIPTION) return device.change_pin(connect(), remove) @cli.command(help="Enable passphrase.") @click.pass_obj def enable_passphrase(connect): return device.apply_settings(connect(), use_passphrase=True) @cli.command(help="Disable passphrase.") @click.pass_obj def disable_passphrase(connect): return device.apply_settings(connect(), use_passphrase=False) @cli.command(help="Set new device label.") @click.option("-l", "--label") @click.pass_obj def set_label(connect, label): return device.apply_settings(connect(), label=label) @cli.command(help="Set passphrase source.") @click.argument("source", type=int) @click.pass_obj def set_passphrase_source(connect, source): return device.apply_settings(connect(), passphrase_source=source) @cli.command(help="Set auto-lock delay (in seconds).") @click.argument("delay", type=str) @click.pass_obj def set_auto_lock_delay(connect, delay): value, unit = delay[:-1], delay[-1:] units = {"s": 1, "m": 60, "h": 3600} if unit in units: seconds = float(value) * units[unit] else: seconds = float(delay) # assume seconds if no unit is specified return device.apply_settings(connect(), auto_lock_delay_ms=int(seconds * 1000)) @cli.command(help="Set device flags.") @click.argument("flags") @click.pass_obj def set_flags(connect, flags): flags = flags.lower() if flags.startswith("0b"): flags = int(flags, 2) elif flags.startswith("0x"): flags = int(flags, 16) else: flags = int(flags) return device.apply_flags(connect(), flags=flags) @cli.command(help="Set new homescreen.") @click.option("-f", "--filename", default=None) @click.pass_obj def set_homescreen(connect, filename): if filename is None: img = b"\x00" elif filename.endswith(".toif"): img = open(filename, "rb").read() if img[:8] != b"TOIf\x90\x00\x90\x00": raise tools.CallException( proto.FailureType.DataError, "File is not a TOIF file with size of 144x144", ) else: from PIL import Image im = Image.open(filename) if im.size != (128, 64): raise tools.CallException( proto.FailureType.DataError, "Wrong size of the image" ) im = im.convert("1") pix = im.load() img = bytearray(1024) for j in range(64): for i in range(128): if pix[i, j]: o = i + j * 128 img[o // 8] |= 1 << (7 - o % 8) img = bytes(img) return device.apply_settings(connect(), homescreen=img) @cli.command(help="Set U2F counter.") @click.argument("counter", type=int) @click.pass_obj def set_u2f_counter(connect, counter): return device.set_u2f_counter(connect(), counter) @cli.command(help="Reset device to factory defaults and remove all private data.") @click.option( "-b", "--bootloader", help="Wipe device in bootloader mode. This also erases the firmware.", is_flag=True, ) @click.pass_obj def wipe_device(connect, bootloader): client = connect() if bootloader: if not client.features.bootloader_mode: click.echo("Please switch your device to bootloader mode.") sys.exit(1) else: click.echo("Wiping user data and firmware!") else: if client.features.bootloader_mode: click.echo( "Your device is in bootloader mode. This operation would also erase firmware." ) click.echo( 'Specify "--bootloader" if that is what you want, or disconnect and reconnect device in normal mode.' ) click.echo("Aborting.") sys.exit(1) else: click.echo("Wiping user data!") try: return device.wipe(connect()) except tools.CallException as e: click.echo("Action failed: {} {}".format(*e.args)) sys.exit(3) @cli.command(help="Load custom configuration to the device.") @click.option("-m", "--mnemonic") @click.option("-e", "--expand", is_flag=True) @click.option("-x", "--xprv") @click.option("-p", "--pin", default="") @click.option("-r", "--passphrase-protection", is_flag=True) @click.option("-l", "--label", default="") @click.option("-i", "--ignore-checksum", is_flag=True) @click.option("-s", "--slip0014", is_flag=True) @click.pass_obj def load_device( connect, mnemonic, expand, xprv, pin, passphrase_protection, label, ignore_checksum, slip0014, ): if not mnemonic and not xprv and not slip0014: raise tools.CallException( proto.FailureType.DataError, "Please provide mnemonic or xprv" ) client = connect() if mnemonic: return debuglink.load_device_by_mnemonic( client, mnemonic, pin, passphrase_protection, label, "english", ignore_checksum, expand, ) if xprv: return debuglink.load_device_by_xprv( client, xprv, pin, passphrase_protection, label, "english" ) if slip0014: return debuglink.load_device_by_mnemonic( client, " ".join(["all"] * 12), pin, passphrase_protection, "SLIP-0014" ) @cli.command(help="Start safe recovery workflow.") @click.option("-w", "--words", type=click.Choice(["12", "18", "24"]), default="24") @click.option("-e", "--expand", is_flag=True) @click.option("-p", "--pin-protection", is_flag=True) @click.option("-r", "--passphrase-protection", is_flag=True) @click.option("-l", "--label") @click.option( "-t", "--type", "rec_type", type=CHOICE_RECOVERY_DEVICE_TYPE, default="scrambled" ) @click.option("-d", "--dry-run", is_flag=True) @click.pass_obj def recovery_device( connect, words, expand, pin_protection, passphrase_protection, label, rec_type, dry_run, ): if rec_type == proto.RecoveryDeviceType.ScrambledWords: input_callback = ui.mnemonic_words(expand) else: input_callback = ui.matrix_words click.echo(ui.RECOVERY_MATRIX_DESCRIPTION) return device.recover( connect(), int(words), passphrase_protection, pin_protection, label, "english", input_callback, rec_type, dry_run, ) @cli.command(help="Perform device setup and generate new seed.") @click.option("-e", "--entropy", is_flag=True) @click.option( "-t", "--strength", type=click.Choice(["128", "192", "256"]), default="256" ) @click.option("-r", "--passphrase-protection", is_flag=True) @click.option("-p", "--pin-protection", is_flag=True) @click.option("-l", "--label") @click.option("-u", "--u2f-counter", default=0) @click.option("-s", "--skip-backup", is_flag=True) @click.option("-n", "--no-backup", is_flag=True) @click.pass_obj def reset_device( connect, entropy, strength, passphrase_protection, pin_protection, label, u2f_counter, skip_backup, no_backup, ): return device.reset( connect(), entropy, int(strength), passphrase_protection, pin_protection, label, "english", u2f_counter, skip_backup, no_backup, ) @cli.command(help="Perform device seed backup.") @click.pass_obj def backup_device(connect): return device.backup(connect()) # # Firmware update # def validate_firmware_v1(fw, expected_fingerprint=None): click.echo("Trezor One firmware image.") distinct_sig_slots = set(i for i in fw.key_indexes if i != 0) if not distinct_sig_slots: if not click.confirm("No signatures found. Continue?", default=False): sys.exit(1) elif len(distinct_sig_slots) < 3: click.echo("Badly signed image (need 3 distinct signatures), aborting.") sys.exit(1) else: all_valid = True for i in range(len(fw.key_indexes)): if not firmware.check_sig_v1(fw, i): click.echo("INVALID signature in slot {}".format(i)) all_valid = False if all_valid: click.echo("Signatures are valid.") else: click.echo("Invalid signature detected, aborting.") sys.exit(4) fingerprint = firmware.digest_v1(fw).hex() click.echo("Firmware fingerprint: {}".format(fingerprint)) if expected_fingerprint and fingerprint != expected_fingerprint: click.echo("Expected fingerprint: {}".format(expected_fingerprint)) click.echo("Fingerprints do not match, aborting.") sys.exit(5) def validate_firmware_v2(fw, expected_fingerprint=None, skip_vendor_header=False): click.echo("Trezor T firmware image.") vendor = fw.vendor_header.vendor_string vendor_version = "{major}.{minor}".format(**fw.vendor_header.version) version = fw.firmware_header.version click.echo("Vendor header from {}, version {}".format(vendor, vendor_version)) click.echo( "Firmware version {major}.{minor}.{patch} build {build}".format(**version) ) try: firmware.validate(fw, skip_vendor_header) click.echo("Signatures are valid.") except Exception as e: click.echo(e) sys.exit(4) fingerprint = firmware.digest(fw).hex() click.echo("Firmware fingerprint: {}".format(fingerprint)) if expected_fingerprint and fingerprint != expected_fingerprint: click.echo("Expected fingerprint: {}".format(expected_fingerprint)) click.echo("Fingerprints do not match, aborting.") sys.exit(5) def find_best_firmware_version(bootloader_version, requested_version=None): url = "https://wallet.trezor.io/data/firmware/{}/releases.json" releases = requests.get(url.format(bootloader_version[0])).json() if not releases: raise click.ClickException("Failed to get list of releases") releases.sort(key=lambda r: r["version"], reverse=True) def version_str(version): return ".".join(map(str, version)) want_version = requested_version if want_version is None: want_version = releases[0]["version"] click.echo("Best available version: {}".format(version_str(want_version))) confirm_different_version = False while True: want_version_str = version_str(want_version) try: release = next(r for r in releases if r["version"] == want_version) except StopIteration: click.echo("Version {} not found.".format(want_version_str)) sys.exit(1) if ( "min_bootloader_version" in release and release["min_bootloader_version"] > bootloader_version ): need_version_str = version_str(release["min_firmware_version"]) click.echo( "Version {} is required before upgrading to {}.".format( need_version_str, want_version_str ) ) want_version = release["min_firmware_version"] confirm_different_version = True else: break if confirm_different_version: installing_different = "Installing version {} instead.".format(want_version_str) if requested_version is None: click.echo(installing_different) else: ok = click.confirm(installing_different + " Continue?", default=True) if not ok: sys.exit(1) url = "https://wallet.trezor.io/" + release["url"] if url.endswith(".hex"): url = url[:-4] return url, release["fingerprint"] @cli.command() @click.option("-f", "--filename") @click.option("-u", "--url") @click.option("-v", "--version") @click.option("-s", "--skip-check", is_flag=True) @click.option("--fingerprint", help="Expected firmware fingerprint in hex") @click.option("--skip-vendor-header", help="Skip vendor header validation on Trezor T") @click.pass_obj def firmware_update( connect, filename, url, version, skip_check, fingerprint, skip_vendor_header ): """Upload new firmware to device. Device must be in bootloader mode. You can specify a filename or URL from which the firmware can be downloaded. You can also explicitly specify a firmware version that you want. Otherwise, trezorctl will attempt to find latest available version from wallet.trezor.io. If you provide a fingerprint via the --fingerprint option, it will be checked against downloaded firmware fingerprint. Otherwise fingerprint is checked against wallet.trezor.io information, if available. If you are customizing Model T bootloader and providing your own vendor header, you can use --skip-vendor-header to ignore vendor header signatures. """ if sum(bool(x) for x in (filename, url, version)) > 1: click.echo("You can use only one of: filename, url, version.") sys.exit(1) client = connect() if not client.features.bootloader_mode: click.echo("Please switch your device to bootloader mode.") sys.exit(1) firmware_version = client.features.major_version if filename: data = open(filename, "rb").read() else: if not url: f = client.features bootloader_version = [f.major_version, f.minor_version, f.patch_version] version_list = [int(x) for x in version.split(".")] if version else None url, fp = find_best_firmware_version(bootloader_version, version_list) if not fingerprint: fingerprint = fp click.echo("Downloading from {}".format(url)) r = requests.get(url) data = r.content if not skip_check: try: version, fw = firmware.parse(data) except Exception as e: click.echo(e) sys.exit(2) if version == firmware.FirmwareFormat.TREZOR_ONE: validate_firmware_v1(fw, fingerprint) elif version == firmware.FirmwareFormat.TREZOR_T: validate_firmware_v2(fw, fingerprint) else: click.echo("Unrecognized firmware version.") if firmware_version != version.value: click.echo("Firmware does not match your device, aborting.") sys.exit(3) try: if firmware_version == 1: # Trezor One does not send ButtonRequest click.echo("Please confirm action on your Trezor device") return firmware.update(client, data) except exceptions.Cancelled: click.echo("Update aborted on device.") except exceptions.TrezorException as e: click.echo("Update failed: {}".format(e)) sys.exit(3) @cli.command(help="Perform a self-test.") @click.pass_obj def self_test(connect): return debuglink.self_test(connect()) # # Basic coin functions # @cli.command(help="Get address for specified path.") @click.option("-c", "--coin", default="Bitcoin") @click.option( "-n", "--address", required=True, help="BIP-32 path, e.g. m/44'/0'/0'/0/0" ) @click.option("-t", "--script-type", type=CHOICE_INPUT_SCRIPT_TYPE, default="address") @click.option("-d", "--show-display", is_flag=True) @click.pass_obj def get_address(connect, coin, address, script_type, show_display): client = connect() address_n = tools.parse_path(address) return btc.get_address( client, coin, address_n, show_display, script_type=script_type ) @cli.command(help="Get public node of given path.") @click.option("-c", "--coin", default="Bitcoin") @click.option("-n", "--address", required=True, help="BIP-32 path, e.g. m/44'/0'/0'") @click.option("-e", "--curve") @click.option("-t", "--script-type", type=CHOICE_INPUT_SCRIPT_TYPE, default="address") @click.option("-d", "--show-display", is_flag=True) @click.pass_obj def get_public_node(connect, coin, address, curve, script_type, show_display): client = connect() address_n = tools.parse_path(address) result = btc.get_public_node( client, address_n, ecdsa_curve_name=curve, show_display=show_display, coin_name=coin, script_type=script_type, ) return { "node": { "depth": result.node.depth, "fingerprint": "%08x" % result.node.fingerprint, "child_num": result.node.child_num, "chain_code": result.node.chain_code.hex(), "public_key": result.node.public_key.hex(), }, "xpub": result.xpub, } # # Signing options # @cli.command(help="Sign transaction.") @click.option("-c", "--coin", default="Bitcoin") # @click.option('-n', '--address', required=True, help="BIP-32 path, e.g. m/44'/0'/0'/0/0") # @click.option('-t', '--script-type', type=CHOICE_INPUT_SCRIPT_TYPE, default='address') # @click.option('-o', '--output', required=True, help='Transaction output') # @click.option('-f', '--fee', required=True, help='Transaction fee (sat/B)') @click.pass_obj def sign_tx(connect, coin): client = connect() if coin in coins.tx_api: txapi = coins.tx_api[coin] else: click.echo('Coin "%s" is not recognized.' % coin, err=True) click.echo( "Supported coin types: %s" % ", ".join(coins.tx_api.keys()), err=True ) sys.exit(1) client.set_tx_api(txapi) def default_script_type(address_n): script_type = "address" if address_n is None: pass elif address_n[0] == tools.H_(49): script_type = "p2shsegwit" return script_type def outpoint(s): txid, vout = s.split(":") return bytes.fromhex(txid), int(vout) inputs = [] while True: click.echo() prev = click.prompt( "Previous output to spend (txid:vout)", type=outpoint, default="" ) if not prev: break prev_hash, prev_index = prev address_n = click.prompt("BIP-32 path to derive the key", type=tools.parse_path) amount = click.prompt("Input amount (satoshis)", type=int, default=0) sequence = click.prompt( "Sequence Number to use (RBF opt-in enabled by default)", type=int, default=0xFFFFFFFD, ) script_type = click.prompt( "Input type", type=CHOICE_INPUT_SCRIPT_TYPE, default=default_script_type(address_n), ) script_type = ( script_type if isinstance(script_type, int) else CHOICE_INPUT_SCRIPT_TYPE.typemap[script_type] ) new_input = proto.TxInputType( address_n=address_n, prev_hash=prev_hash, prev_index=prev_index, amount=amount, script_type=script_type, sequence=sequence, ) if txapi.bip115: prev_output = txapi.get_tx(prev_hash.hex()).bin_outputs[prev_index] new_input.prev_block_hash_bip115 = prev_output.block_hash new_input.prev_block_height_bip115 = prev_output.block_height inputs.append(new_input) if txapi.bip115: current_block_height = txapi.current_height() # Zencash recommendation for the better protection block_height = current_block_height - 300 block_hash = txapi.get_block_hash(block_height) # Blockhash passed in reverse order block_hash = block_hash[::-1] else: block_height = None block_hash = None outputs = [] while True: click.echo() address = click.prompt("Output address (for non-change output)", default="") if address: address_n = None else: address = None address_n = click.prompt( "BIP-32 path (for change output)", type=tools.parse_path, default="" ) if not address_n: break amount = click.prompt("Amount to spend (satoshis)", type=int) script_type = click.prompt( "Output type", type=CHOICE_OUTPUT_SCRIPT_TYPE, default=default_script_type(address_n), ) script_type = ( script_type if isinstance(script_type, int) else CHOICE_OUTPUT_SCRIPT_TYPE.typemap[script_type] ) outputs.append( proto.TxOutputType( address_n=address_n, address=address, amount=amount, script_type=script_type, block_hash_bip115=block_hash, block_height_bip115=block_height, ) ) tx_version = click.prompt("Transaction version", type=int, default=2) tx_locktime = click.prompt("Transaction locktime", type=int, default=0) tx_timestamp = click.prompt( "Transaction timestamp (Capricoin)", type=int, default=None ) _, serialized_tx = btc.sign_tx( client, coin, inputs, outputs, tx_version, tx_locktime, timestamp=tx_timestamp ) client.close() click.echo() click.echo("Signed Transaction:") click.echo(serialized_tx.hex()) click.echo() click.echo("Use the following form to broadcast it to the network:") click.echo(txapi.pushtx_url) # # Message functions # @cli.command(help="Sign message using address of given path.") @click.option("-c", "--coin", default="Bitcoin") @click.option( "-n", "--address", required=True, help="BIP-32 path, e.g. m/44'/0'/0'/0/0" ) @click.option( "-t", "--script-type", type=click.Choice(["address", "segwit", "p2shsegwit"]), default="address", ) @click.argument("message") @click.pass_obj def sign_message(connect, coin, address, message, script_type): client = connect() address_n = tools.parse_path(address) typemap = { "address": proto.InputScriptType.SPENDADDRESS, "segwit": proto.InputScriptType.SPENDWITNESS, "p2shsegwit": proto.InputScriptType.SPENDP2SHWITNESS, } script_type = typemap[script_type] res = btc.sign_message(client, coin, address_n, message, script_type) return { "message": message, "address": res.address, "signature": base64.b64encode(res.signature), } @cli.command(help="Verify message.") @click.option("-c", "--coin", default="Bitcoin") @click.argument("address") @click.argument("signature") @click.argument("message") @click.pass_obj def verify_message(connect, coin, address, signature, message): signature = base64.b64decode(signature) return btc.verify_message(connect(), coin, address, signature, message) @cli.command(help="Sign message with Ethereum address.") @click.option( "-n", "--address", required=True, help="BIP-32 path, e.g. m/44'/60'/0'/0/0" ) @click.argument("message") @click.pass_obj def ethereum_sign_message(connect, address, message): client = connect() address_n = tools.parse_path(address) ret = ethereum.sign_message(client, address_n, message) output = { "message": message, "address": "0x%s" % ret.address.hex(), "signature": "0x%s" % ret.signature.hex(), } return output def ethereum_decode_hex(value): if value.startswith("0x") or value.startswith("0X"): return bytes.fromhex(value[2:]) else: return bytes.fromhex(value) @cli.command(help="Verify message signed with Ethereum address.") @click.argument("address") @click.argument("signature") @click.argument("message") @click.pass_obj def ethereum_verify_message(connect, address, signature, message): address = ethereum_decode_hex(address) signature = ethereum_decode_hex(signature) return ethereum.verify_message(connect(), address, signature, message) @cli.command(help="Encrypt value by given key and path.") @click.option("-n", "--address", required=True, help="BIP-32 path, e.g. m/10016'/0") @click.argument("key") @click.argument("value") @click.pass_obj def encrypt_keyvalue(connect, address, key, value): client = connect() address_n = tools.parse_path(address) res = misc.encrypt_keyvalue(client, address_n, key, value.encode()) return res.hex() @cli.command(help="Decrypt value by given key and path.") @click.option("-n", "--address", required=True, help="BIP-32 path, e.g. m/10016'/0") @click.argument("key") @click.argument("value") @click.pass_obj def decrypt_keyvalue(connect, address, key, value): client = connect() address_n = tools.parse_path(address) return misc.decrypt_keyvalue(client, address_n, key, bytes.fromhex(value)) # @cli.command(help='Encrypt message.') # @click.option('-c', '--coin', default='Bitcoin') # @click.option('-d', '--display-only', is_flag=True) # @click.option('-n', '--address', required=True, help="BIP-32 path, e.g. m/44'/0'/0'/0/0") # @click.argument('pubkey') # @click.argument('message') # @click.pass_obj # def encrypt_message(connect, coin, display_only, address, pubkey, message): # client = connect() # pubkey = bytes.fromhex(pubkey) # address_n = tools.parse_path(address) # res = client.encrypt_message(pubkey, message, display_only, coin, address_n) # return { # 'nonce': res.nonce.hex(), # 'message': res.message.hex(), # 'hmac': res.hmac.hex(), # 'payload': base64.b64encode(res.nonce + res.message + res.hmac), # } # @cli.command(help='Decrypt message.') # @click.option('-n', '--address', required=True, help="BIP-32 path, e.g. m/44'/0'/0'/0/0") # @click.argument('payload') # @click.pass_obj # def decrypt_message(connect, address, payload): # client = connect() # address_n = tools.parse_path(address) # payload = base64.b64decode(payload) # nonce, message, msg_hmac = payload[:33], payload[33:-8], payload[-8:] # return client.decrypt_message(address_n, nonce, message, msg_hmac) # # Ethereum functions # @cli.command(help="Get Ethereum address in hex encoding.") @click.option( "-n", "--address", required=True, help="BIP-32 path, e.g. m/44'/60'/0'/0/0" ) @click.option("-d", "--show-display", is_flag=True) @click.pass_obj def ethereum_get_address(connect, address, show_display): client = connect() address_n = tools.parse_path(address) address = ethereum.get_address(client, address_n, show_display) return "0x%s" % address.hex() @cli.command( help='Sign (and optionally publish) Ethereum transaction. Use TO as destination address or set TO to "" for contract creation.' ) @click.option( "-a", "--host", default="localhost:8545", help="RPC port of ethereum node for automatic gas/nonce estimation and publishing", ) @click.option("-c", "--chain-id", type=int, help="EIP-155 chain id (replay protection)") @click.option( "-n", "--address", required=True, help="BIP-32 path to source address, e.g., m/44'/60'/0'/0/0", ) @click.option( "-v", "--value", default="0", help='Ether amount to transfer, e.g. "100 milliether"' ) @click.option( "-g", "--gas-limit", type=int, help="Gas limit - Required for offline signing" ) @click.option( "-t", "--gas-price", help='Gas price, e.g. "20 nanoether" - Required for offline signing', ) @click.option( "-i", "--nonce", type=int, help="Transaction counter - Required for offline signing" ) @click.option("-d", "--data", default="", help="Data as hex string, e.g. 0x12345678") @click.option("-p", "--publish", is_flag=True, help="Publish transaction via RPC") @click.option("-x", "--tx-type", type=int, help="TX type (used only for Wanchain)") @click.argument("to") @click.pass_obj def ethereum_sign_tx( connect, host, chain_id, address, value, gas_limit, gas_price, nonce, data, publish, to, tx_type, ): from ethjsonrpc import EthJsonRpc import rlp # fmt: off ether_units = { 'wei': 1, 'kwei': 1000, 'babbage': 1000, 'femtoether': 1000, 'mwei': 1000000, 'lovelace': 1000000, 'picoether': 1000000, 'gwei': 1000000000, 'shannon': 1000000000, 'nanoether': 1000000000, 'nano': 1000000000, 'szabo': 1000000000000, 'microether': 1000000000000, 'micro': 1000000000000, 'finney': 1000000000000000, 'milliether': 1000000000000000, 'milli': 1000000000000000, 'ether': 1000000000000000000, 'eth': 1000000000000000000, } # fmt: on if " " in value: value, unit = value.split(" ", 1) if unit.lower() not in ether_units: raise tools.CallException( proto.Failure.DataError, "Unrecognized ether unit %r" % unit ) value = int(value) * ether_units[unit.lower()] else: value = int(value) if gas_price is not None: if " " in gas_price: gas_price, unit = gas_price.split(" ", 1) if unit.lower() not in ether_units: raise tools.CallException( proto.Failure.DataError, "Unrecognized gas price unit %r" % unit ) gas_price = int(gas_price) * ether_units[unit.lower()] else: gas_price = int(gas_price) if gas_limit is not None: gas_limit = int(gas_limit) to_address = ethereum_decode_hex(to) client = connect() address_n = tools.parse_path(address) address = "0x%s" % ethereum.get_address(client, address_n).hex() if gas_price is None or gas_limit is None or nonce is None or publish: host, port = host.split(":") eth = EthJsonRpc(host, int(port)) if not data: data = "" data = ethereum_decode_hex(data) if gas_price is None: gas_price = eth.eth_gasPrice() if gas_limit is None: gas_limit = eth.eth_estimateGas( to_address=to, from_address=address, value=("0x%x" % value), data="0x%s" % data.hex(), ) if nonce is None: nonce = eth.eth_getTransactionCount(address) sig = ethereum.sign_tx( client, n=address_n, tx_type=tx_type, nonce=nonce, gas_price=gas_price, gas_limit=gas_limit, to=to_address, value=value, data=data, chain_id=chain_id, ) if tx_type is None: transaction = rlp.encode( (nonce, gas_price, gas_limit, to_address, value, data) + sig ) else: transaction = rlp.encode( (tx_type, nonce, gas_price, gas_limit, to_address, value, data) + sig ) tx_hex = "0x%s" % transaction.hex() if publish: tx_hash = eth.eth_sendRawTransaction(tx_hex) return "Transaction published with ID: %s" % tx_hash else: return "Signed raw transaction: %s" % tx_hex # # ADA functions # @cli.command(help="Sign Cardano transaction.") @click.option( "-f", "--file", type=click.File("r"), required=True, help="Transaction in JSON format", ) @click.option("-N", "--network", type=int, default=1) @click.pass_obj def cardano_sign_tx(connect, file, network): client = connect() transaction = json.load(file) inputs = [cardano.create_input(input) for input in transaction["inputs"]] outputs = [cardano.create_output(output) for output in transaction["outputs"]] transactions = transaction["transactions"] signed_transaction = cardano.sign_tx(client, inputs, outputs, transactions, network) return { "tx_hash": signed_transaction.tx_hash.hex(), "tx_body": signed_transaction.tx_body.hex(), } @cli.command(help="Get Cardano address.") @click.option( "-n", "--address", required=True, help="BIP-32 path to key, e.g. m/44'/1815'/0'/0/0" ) @click.option("-d", "--show-display", is_flag=True) @click.pass_obj def cardano_get_address(connect, address, show_display): client = connect() address_n = tools.parse_path(address) return cardano.get_address(client, address_n, show_display) @cli.command(help="Get Cardano public key.") @click.option( "-n", "--address", required=True, help="BIP-32 path to key, e.g. m/44'/1815'/0'/0/0" ) @click.pass_obj def cardano_get_public_key(connect, address): client = connect() address_n = tools.parse_path(address) return cardano.get_public_key(client, address_n) # # NEM functions # @cli.command(help="Get NEM address for specified path.") @click.option( "-n", "--address", required=True, help="BIP-32 path, e.g. m/44'/0'/43'/0/0" ) @click.option("-N", "--network", type=int, default=0x68) @click.option("-d", "--show-display", is_flag=True) @click.pass_obj def nem_get_address(connect, address, network, show_display): client = connect() address_n = tools.parse_path(address) return nem.get_address(client, address_n, network, show_display) @cli.command(help="Sign (and optionally broadcast) NEM transaction.") @click.option("-n", "--address", help="BIP-32 path to signing key") @click.option( "-f", "--file", type=click.File("r"), default="-", help="Transaction in NIS (RequestPrepareAnnounce) format", ) @click.option("-b", "--broadcast", help="NIS to announce transaction to") @click.pass_obj def nem_sign_tx(connect, address, file, broadcast): client = connect() address_n = tools.parse_path(address) transaction = nem.sign_tx(client, address_n, json.load(file)) payload = {"data": transaction.data.hex(), "signature": transaction.signature.hex()} if broadcast: return requests.post( "{}/transaction/announce".format(broadcast), json=payload ).json() else: return payload # # Lisk functions # @cli.command(help="Get Lisk address for specified path.") @click.option( "-n", "--address", required=True, help="BIP-32 path, e.g. m/44'/134'/0'/0'" ) @click.option("-d", "--show-display", is_flag=True) @click.pass_obj def lisk_get_address(connect, address, show_display): client = connect() address_n = tools.parse_path(address) return lisk.get_address(client, address_n, show_display) @cli.command(help="Get Lisk public key for specified path.") @click.option( "-n", "--address", required=True, help="BIP-32 path, e.g. m/44'/134'/0'/0'" ) @click.option("-d", "--show-display", is_flag=True) @click.pass_obj def lisk_get_public_key(connect, address, show_display): client = connect() address_n = tools.parse_path(address) res = lisk.get_public_key(client, address_n, show_display) output = {"public_key": res.public_key.hex()} return output @cli.command(help="Sign Lisk transaction.") @click.option( "-n", "--address", required=True, help="BIP-32 path to signing key, e.g. m/44'/134'/0'/0'", ) @click.option( "-f", "--file", type=click.File("r"), default="-", help="Transaction in JSON format" ) # @click.option('-b', '--broadcast', help='Broadcast Lisk transaction') @click.pass_obj def lisk_sign_tx(connect, address, file): client = connect() address_n = tools.parse_path(address) transaction = lisk.sign_tx(client, address_n, json.load(file)) payload = {"signature": transaction.signature.hex()} return payload @cli.command(help="Sign message with Lisk address.") @click.option( "-n", "--address", required=True, help="BIP-32 path, e.g. m/44'/134'/0'/0'" ) @click.argument("message") @click.pass_obj def lisk_sign_message(connect, address, message): client = connect() address_n = client.expand_path(address) res = lisk.sign_message(client, address_n, message) output = { "message": message, "public_key": res.public_key.hex(), "signature": res.signature.hex(), } return output @cli.command(help="Verify message signed with Lisk address.") @click.argument("pubkey") @click.argument("signature") @click.argument("message") @click.pass_obj def lisk_verify_message(connect, pubkey, signature, message): signature = bytes.fromhex(signature) pubkey = bytes.fromhex(pubkey) return lisk.verify_message(connect(), pubkey, signature, message) # # Monero functions # @cli.command(help="Get Monero address for specified path.") @click.option("-n", "--address", required=True, help="BIP-32 path, e.g. m/44'/128'/0'") @click.option("-d", "--show-display", is_flag=True) @click.option( "-t", "--network-type", type=click.Choice(["0", "1", "2", "3"]), default="0" ) @click.pass_obj def monero_get_address(connect, address, show_display, network_type): client = connect() address_n = tools.parse_path(address) network_type = int(network_type) return monero.get_address(client, address_n, show_display, network_type) @cli.command(help="Get Monero watch key for specified path.") @click.option("-n", "--address", required=True, help="BIP-32 path, e.g. m/44'/128'/0'") @click.option( "-t", "--network-type", type=click.Choice(["0", "1", "2", "3"]), default="0" ) @click.pass_obj def monero_get_watch_key(connect, address, network_type): client = connect() address_n = tools.parse_path(address) network_type = int(network_type) res = monero.get_watch_key(client, address_n, network_type) output = {"address": res.address.decode(), "watch_key": res.watch_key.hex()} return output # # CoSi functions # @cli.command(help="Ask device to commit to CoSi signing.") @click.option( "-n", "--address", required=True, help="BIP-32 path, e.g. m/44'/0'/0'/0/0" ) @click.argument("data") @click.pass_obj def cosi_commit(connect, address, data): client = connect() address_n = tools.parse_path(address) return cosi.commit(client, address_n, bytes.fromhex(data)) @cli.command(help="Ask device to sign using CoSi.") @click.option( "-n", "--address", required=True, help="BIP-32 path, e.g. m/44'/0'/0'/0/0" ) @click.argument("data") @click.argument("global_commitment") @click.argument("global_pubkey") @click.pass_obj def cosi_sign(connect, address, data, global_commitment, global_pubkey): client = connect() address_n = tools.parse_path(address) return cosi.sign( client, address_n, bytes.fromhex(data), bytes.fromhex(global_commitment), bytes.fromhex(global_pubkey), ) # # Stellar functions # @cli.command(help="Get Stellar public address") @click.option( "-n", "--address", required=False, help="BIP32 path. Always use hardened paths and the m/44'/148'/ prefix", default=stellar.DEFAULT_BIP32_PATH, ) @click.option("-d", "--show-display", is_flag=True) @click.pass_obj def stellar_get_address(connect, address, show_display): client = connect() address_n = tools.parse_path(address) return stellar.get_address(client, address_n, show_display) @cli.command(help="Sign a base64-encoded transaction envelope") @click.option( "-n", "--address", required=False, help="BIP32 path. Always use hardened paths and the m/44'/148'/ prefix", default=stellar.DEFAULT_BIP32_PATH, ) @click.option( "-n", "--network-passphrase", default=stellar.DEFAULT_NETWORK_PASSPHRASE, required=False, help="Network passphrase (blank for public network). Testnet is: 'Test SDF Network ; September 2015'", ) @click.argument("b64envelope") @click.pass_obj def stellar_sign_transaction(connect, b64envelope, address, network_passphrase): client = connect() address_n = tools.parse_path(address) tx, operations = stellar.parse_transaction_bytes(base64.b64decode(b64envelope)) resp = stellar.sign_tx(client, tx, operations, address_n, network_passphrase) return base64.b64encode(resp.signature) # # Ripple functions # @cli.command(help="Get Ripple address") @click.option( "-n", "--address", required=True, help="BIP-32 path to key, e.g. m/44'/144'/0'/0/0" ) @click.option("-d", "--show-display", is_flag=True) @click.pass_obj def ripple_get_address(connect, address, show_display): client = connect() address_n = tools.parse_path(address) return ripple.get_address(client, address_n, show_display) @cli.command(help="Sign Ripple transaction") @click.option( "-n", "--address", required=True, help="BIP-32 path to key, e.g. m/44'/144'/0'/0/0" ) @click.option( "-f", "--file", type=click.File("r"), default="-", help="Transaction in JSON format" ) @click.pass_obj def ripple_sign_tx(connect, address, file): client = connect() address_n = tools.parse_path(address) msg = ripple.create_sign_tx_msg(json.load(file)) result = ripple.sign_tx(client, address_n, msg) click.echo("Signature:") click.echo(result.signature.hex()) click.echo() click.echo("Serialized tx including the signature:") click.echo(result.serialized_tx.hex()) # # Tezos functions # @cli.command(help="Get Tezos address for specified path.") @click.option("-n", "--address", required=True, help="BIP-32 path, e.g. m/44'/1729'/0'") @click.option("-d", "--show-display", is_flag=True) @click.pass_obj def tezos_get_address(connect, address, show_display): client = connect() address_n = tools.parse_path(address) return tezos.get_address(client, address_n, show_display) @cli.command(help="Get Tezos public key.") @click.option("-n", "--address", required=True, help="BIP-32 path, e.g. m/44'/1729'/0'") @click.option("-d", "--show-display", is_flag=True) @click.pass_obj def tezos_get_public_key(connect, address, show_display): client = connect() address_n = tools.parse_path(address) return tezos.get_public_key(client, address_n, show_display) @cli.command(help="Sign Tezos transaction.") @click.option("-n", "--address", required=True, help="BIP-32 path, e.g. m/44'/1729'/0'") @click.option( "-f", "--file", type=click.File("r"), default="-", help="Transaction in JSON format (byte fields should be hexlified)", ) @click.pass_obj def tezos_sign_tx(connect, address, file): client = connect() address_n = tools.parse_path(address) msg = protobuf.dict_to_proto(proto.TezosSignTx, json.load(file)) return tezos.sign_tx(client, address_n, msg) # # Ontology functions # @cli.command(help="Get Ontology address for specified path.") @click.option( "-n", "--address", required=True, help="BIP-32 path, e.g. m/44'/888'/0'/0/0" ) @click.option("-d", "--show-display", is_flag=True) @click.pass_obj def ontology_get_address(connect, address, show_display): client = connect() address_n = tools.parse_path(address) return ontology.get_address(client, address_n, show_display) @cli.command(help="Get Ontology public key for specified path.") @click.option( "-n", "--address", required=True, help="BIP-32 path, e.g. m/44'/888'/0'/0/0" ) @click.option("-d", "--show-display", is_flag=True) @click.pass_obj def ontology_get_public_key(connect, address, show_display): client = connect() address_n = tools.parse_path(address) result = ontology.get_public_key(client, address_n, show_display) return result.public_key.hex() @cli.command(help="Sign Ontology transfer.") @click.option( "-n", "--address", required=True, help="BIP-32 path to signing key, e.g. m/44'/888'/0'/0/0", ) @click.option( "-tx", "--transaction", type=click.File("r"), default="-", help="Transaction in JSON format", ) @click.option( "-tr", "--transfer", type=click.File("r"), default="-", help="Transfer in JSON format", ) @click.pass_obj def ontology_sign_transfer(connect, address, transaction_f, transfer_f): client = connect() address_n = tools.parse_path(address) transaction = protobuf.dict_to_proto( proto.OntologyTransaction, json.load(transaction_f) ) transfer = protobuf.dict_to_proto(proto.OntologyTransfer, json.load(transfer_f)) result = ontology.sign_transfer(client, address_n, transaction, transfer) output = {"payload": result.payload.hex(), "signature": result.signature.hex()} return output @cli.command(help="Sign Ontology withdraw Ong.") @click.option( "-n", "--address", required=True, help="BIP-32 path to signing key, e.g. m/44'/888'/0'/0/0", ) @click.option( "-tx", "--transaction", type=click.File("r"), default="-", help="Transaction in JSON format", ) @click.option( "-wi", "--withdraw_ong", type=click.File("r"), default="-", help="Withdrawal in JSON format", ) @click.pass_obj def ontology_sign_withdraw_ong(connect, address, transaction_f, withdraw_ong_f): client = connect() address_n = tools.parse_path(address) transaction = protobuf.dict_to_proto( proto.OntologyTransaction, json.load(transaction_f) ) withdraw_ong = protobuf.dict_to_proto( proto.OntologyWithdrawOng, json.load(withdraw_ong_f) ) result = ontology.sign_withdrawal(client, address_n, transaction, withdraw_ong) output = {"payload": result.payload.hex(), "signature": result.signature.hex()} return output @cli.command(help="Sign Ontology ONT ID Registration.") @click.option( "-n", "--address", required=True, help="BIP-32 path to signing key, e.g. m/44'/888'/0'/0/0", ) @click.option( "-tx", "--transaction", type=click.File("r"), default="-", help="Transaction in JSON format", ) @click.option( "-re", "--register", type=click.File("r"), default="-", help="Register in JSON format", ) @click.argument("transaction") @click.argument("ont_id_register") @click.pass_obj def ontology_sign_ont_id_register(connect, address, transaction_f, ont_id_register_f): client = connect() address_n = tools.parse_path(address) transaction = protobuf.dict_to_proto( proto.OntologyTransaction, json.load(transaction_f) ) ont_id_register = protobuf.dict_to_proto( proto.OntologyOntIdRegister, json.load(ont_id_register_f) ) result = ontology.sign_register(client, address_n, transaction, ont_id_register) output = {"payload": result.payload.hex(), "signature": result.signature.hex()} return output @cli.command(help="Sign Ontology ONT ID Attributes adding.") @click.option( "-n", "--address", required=True, help="BIP-32 path to signing key, e.g. m/44'/888'/0'/0/0", ) @click.option( "-tx", "--transaction", type=click.File("r"), default="-", help="Transaction in JSON format", ) @click.option( "-aa", "--add_attr", type=click.File("r"), default="-", help="Add attributes in JSON format", ) @click.pass_obj def ontology_sign_ont_id_add_attributes( connect, address, transaction_f, ont_id_add_attributes_f ): client = connect() address_n = tools.parse_path(address) transaction = protobuf.dict_to_proto( proto.OntologyTransaction, json.load(transaction_f) ) ont_id_add_attributes = protobuf.dict_to_proto( proto.OntologyOntIdAddAttributes, json.load(ont_id_add_attributes_f) ) result = ontology.sign_add_attr( client, address_n, transaction, ont_id_add_attributes ) output = {"payload": result.payload.hex(), "signature": result.signature.hex()} return output # # Main # if __name__ == "__main__": cli() # pylint: disable=E1120