From b335d30b8d9be4d03f27651e116e473a4a1ccfbe Mon Sep 17 00:00:00 2001 From: Pavol Rusnak Date: Mon, 3 Jul 2017 17:45:56 +0200 Subject: [PATCH] use click in trezorctl --- .flake8 | 5 + .travis.yml | 1 + requirements.txt | 1 + setup.py | 1 + trezorctl | 1110 ++++++++++++++++++------------------ trezorlib/transport_udp.py | 2 + 6 files changed, 573 insertions(+), 547 deletions(-) diff --git a/.flake8 b/.flake8 index f15837b452..c1e3840895 100644 --- a/.flake8 +++ b/.flake8 @@ -1,4 +1,7 @@ [flake8] +filename = + *.py, + trezorctl exclude = .tox/, build/, @@ -11,6 +14,8 @@ ignore = F841, # F401: module imported but unused F401, + # E241: multiple spaces after ':' + E241, # E402: module level import not at top of file E402, # E501: line too long diff --git a/.travis.yml b/.travis.yml index 2bf8eea3e6..73adda9ae2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -30,6 +30,7 @@ install: script: - python setup.py install - flake8 + - flake8 trezorctl - tox notifications: diff --git a/requirements.txt b/requirements.txt index a6ebc9568f..52c12b86fb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ protobuf>=3.1.0 mnemonic>=0.17 hidapi>=0.7.99.post20 requests>=2.4.0 +click>=6.2 diff --git a/setup.py b/setup.py index 7f7018254a..5456ec292f 100755 --- a/setup.py +++ b/setup.py @@ -7,6 +7,7 @@ install_requires = [ 'mnemonic>=0.17', 'setuptools>=19.0', 'requests>=2.4.0', + 'click>=6.2' ] import sys diff --git a/trezorctl b/trezorctl index beb08a0f44..80da17839c 100755 --- a/trezorctl +++ b/trezorctl @@ -19,68 +19,15 @@ # You should have received a copy of the GNU Lesser General Public License # along with this library. If not, see . -from __future__ import print_function -import sys import binascii -import argparse import json import base64 -from io import BytesIO +import click from trezorlib.client import TrezorClient, TrezorClientVerbose, CallException import trezorlib.types_pb2 as types -ether_units = { - "wei": 1, - "kwei": 1000, - "babbage": 1000, - "femtoether": 1000, - "mwei": 1000000, - "lovelace": 1000000, - "picoether": 1000000, - "gwei": 1000000000, - "shannon": 1000000000, - "nanoether": 1000000000, - "nano": 1000000000, - "szabo": 1000000000000, - "microether": 1000000000000, - "micro": 1000000000000, - "finney": 1000000000000000, - "milliether": 1000000000000000, - "milli": 1000000000000000, - "ether": 1000000000000000000, - "eth": 1000000000000000000, -} - - -def init_parser(commands): - parser = argparse.ArgumentParser(description='Commandline tool for TREZOR devices.') - parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', help='Prints communication to device') - parser.add_argument('-t', '--transport', dest='transport', choices=['usb', 'udp', 'pipe', 'bridge'], default='usb', help="Transport used for talking with the device") - parser.add_argument('-p', '--path', dest='path', default='', help="Path used by the transport") - parser.add_argument('-j', '--json', dest='json', action='store_true', help="Prints result as json object") - - cmdparser = parser.add_subparsers(title='Available commands') - - for cmd in commands._list_commands(): - func = object.__getattribute__(commands, cmd) - - try: - arguments = func.arguments - except AttributeError: - arguments = ((('params',), {'nargs': '*'}),) - - item = cmdparser.add_parser(cmd, help=func.help) - for arg in arguments: - item.add_argument(*arg[0], **arg[1]) - - item.set_defaults(func=func) - item.set_defaults(cmd=cmd) - - return parser - - def get_transport_class_by_name(name): if name == 'usb': @@ -108,507 +55,576 @@ def get_transport(transport_name, path): return dev -class Commands(object): - def __init__(self, client): - self.client = client +def output(res): + if output.json: + click.echo(json.dumps(res, sort_keys=True, indent=4)) + else: + click.echo(res) - @classmethod - def _list_commands(cls): - return [x for x in dir(cls) if not x.startswith('_')] - def list(self, args): - # Fake method for advertising 'list' command - pass +@click.group() +@click.option('-t', '--transport', type=click.Choice(['usb', 'udp', 'pipe', 'bridge']), default='usb', help='Select transport used for communication.') +@click.option('-p', '--path', help='Select device by transport-specific path.') +@click.option('-v', '--verbose', is_flag=True, help='Show communication messages.') +@click.option('-j', '--json', 'is_json', is_flag=True, help='Print result as JSON object') +@click.pass_context +def cli(ctx, transport, path, verbose, is_json): + output.json = is_json + if ctx.invoked_subcommand == 'list': + ctx.obj = transport + else: + t = get_transport(transport, path) + if verbose: + ctx.obj = TrezorClientVerbose(t) + else: + ctx.obj = TrezorClient(t) - def get_address(self, args): - address_n = self.client.expand_path(args.n) - typemap = { 'address': types.SPENDADDRESS, - 'segwit': types.SPENDWITNESS, - 'p2shsegwit': types.SPENDP2SHWITNESS } - script_type = typemap[args.script_type]; - return self.client.get_address(args.coin, address_n, args.show_display, script_type=script_type) - def ethereum_get_address(self, args): - address_n = self.client.expand_path(args.n) - address = self.client.ethereum_get_address(address_n, args.show_display) - return "0x%s" % (binascii.hexlify(address),) +# +# Common functions +# - def ethereum_sign_tx(self, args): - from ethjsonrpc import EthJsonRpc - from ethjsonrpc.utils import hex_to_dec - import rlp - value = args.value - if ' ' in value: - value, unit = value.split(' ', 1) +@cli.command(name='list', help='List connected TREZOR devices.') +@click.pass_obj +def ls(transport_name): + transport_class = get_transport_class_by_name(transport_name) + devices = transport_class.enumerate() + if output.json: + click.echo(json.dumps(devices, sort_keys=True, indent=4)) + else: + if transport_name == 'usb': + for dev in devices: + if dev[1] is not None: + click.echo('%s - debuglink enabled' % dev[0]) + else: + click.echo(dev[0]) + else: + for dev in devices: + click.echo(dev) + + +# +# Basic device functions +# + + +@cli.command(help='Send ping message.') +@click.argument('message') +@click.option('-b', '--button-protection', is_flag=True) +@click.option('-p', '--pin-protection', is_flag=True) +@click.option('-r', '--passphrase-protection', is_flag=True) +@click.pass_obj +def ping(client, message, button_protection, pin_protection, passphrase_protection): + ret = client.ping(message, button_protection=button_protection, pin_protection=pin_protection, passphrase_protection=passphrase_protection) + output(ret) + + +@cli.command(help='Clear session (remove cached PIN, passphrase, etc.).') +@click.pass_obj +def clear_session(client): + ret = client.clear_session() + output(ret) + + +@cli.command(help='Get example entropy.') +@click.argument('size', type=int) +@click.pass_obj +def get_entropy(client, size): + ret = binascii.hexlify(client.get_entropy(size)) + output(ret) + + +@cli.command(help='Retrieve device features and settings.') +@click.pass_obj +def get_features(client): + return client.features + + +@cli.command(help='List all supported coin types by the device.') +@click.pass_obj +def list_coins(client): + ret = [coin.coin_name for coin in client.features.coins] + output(ret) + + +# +# Device management functions +# + + +@cli.command(help='Change new PIN or remove existing.') +@click.option('-r', '--remove', is_flag=True) +@click.pass_obj +def change_pin(client, remove): + ret = client.change_pin(remove) + output(ret) + + +@cli.command(help='Enable passphrase.') +@click.pass_obj +def enable_passphrase(client): + ret = client.apply_settings(use_passphrase=True) + output(ret) + + +@cli.command(help='Disable passphrase.') +@click.pass_obj +def disable_passphrase(client): + ret = client.apply_settings(use_passphrase=False) + output(ret) + + +@cli.command(help='Set new device label.') +@click.option('-l', '--label') +@click.pass_obj +def set_label(client, label): + ret = client.apply_settings(label=label) + output(ret) + + +@cli.command(help='Set new homescreen.') +@click.option('-f', '--filename', default=None) +@click.pass_obj +def set_homescreen(client, filename): + if filename is not None: + from PIL import Image + im = Image.open(filename) + if im.size != (128, 64): + raise CallException(types.Failure_DataError, 'Wrong size of the image') + im = im.convert('1') + pix = im.load() + img = '' + for j in range(64): + for i in range(128): + img += '1' if pix[i, j] else '0' + img = ''.join(chr(int(img[i:i + 8], 2)) for i in range(0, len(img), 8)) + else: + img = '\x00' + ret = client.apply_settings(homescreen=img) + output(ret) + + +@cli.command(help='Set U2F counter.') +@click.argument('counter', type=int) +@click.pass_obj +def set_u2f_counter(client, counter): + ret = client.set_u2f_counter(counter) + output(ret) + + +@cli.command(help='Reset device to factory defaults and remove all private data.') +@click.pass_obj +def wipe_device(client): + ret = client.wipe_device() + output(ret) + + +@cli.command(help='Load custom configuration to the device.') +@click.option('-m', '--mnemonic') +@click.option('-e', '--expand', is_flag=True) +@click.option('-x', '--xprv') +@click.option('-p', '--pin', default='') +@click.option('-r', '--passphrase-protection', is_flag=True) +@click.option('-l', '--label', default='') +@click.option('-s', '--skip-checksum', is_flag=True) +@click.pass_obj +def load_device(client, mnemonic, expand, xprv, pin, passphrase_protection, label, skip_checksum): + if not mnemonic and not xprv: + raise CallException(types.Failure_DataError, 'Please provide mnemonic or xprv') + + if mnemonic: + ret = client.load_device_by_mnemonic( + mnemonic, + pin, + passphrase_protection, + label, + 'english', + skip_checksum, + expand + ) + if xprv: + ret = client.load_device_by_xprv( + xprv, + pin, + passphrase_protection, + label, + 'english' + ) + output(ret) + + +@cli.command(help='Start safe recovery workflow.') +@click.option('-w', '--words', type=click.Choice([12, 18, 24]), default=24) +@click.option('-e', '--expand', is_flag=True) +@click.option('-p', '--pin-protection', is_flag=True) +@click.option('-r', '--passphrase-protection', is_flag=True) +@click.option('-l', '--label') +@click.option('-t', '--type', 'rec_type', type=click.Choice(['scrambled', 'matrix']), default='scrambled') +@click.option('-d', '--dry-run', is_flag=True) +@click.pass_obj +def recovery_device(client, words, expand, pin_protection, passphrase_protection, label, rec_type, dry_run): + typemap = { + 'scrambled': types.RecoveryDeviceType_ScrambledWords, + 'matrix': types.RecoveryDeviceType_Matrix + } + ret = client.recovery_device( + words, + passphrase_protection, + pin_protection, + label, + 'english', + typemap[rec_type], + expand, + dry_run + ) + output(ret) + + +@cli.command(help='Perform device setup and generate new seed.') +@click.option('-t', '--strength', type=click.Choice([128, 192, 256]), default=256) +@click.option('-p', '--pin-protection', is_flag=True) +@click.option('-r', '--passphrase-protection', is_flag=True) +@click.option('-l', '--label') +@click.option('-u', '--u2f-counter', default=0) +@click.option('-s', '--skip-backup', is_flag=True) +@click.pass_obj +def reset_device(client, strength, pin_protection, passphrase_protection, label, u2f_counter, skip_backup): + ret = client.reset_device( + True, + strength, + passphrase_protection, + pin_protection, + label, + 'english', + u2f_counter, + skip_backup + ) + output(ret) + + +@cli.command(help='Perform device seed backup.') +@click.pass_obj +def backup_device(client): + ret = client.backup_device() + output(ret) + + +# +# Firmware update +# + + +@cli.command(help='Upload new firmware to device (must be in bootloader mode).') +@click.option('-f', '--filename') +@click.option('-u', '--url') +@click.option('-v', '--version') +@click.option('-s', '--skip-check', is_flag=True) +@click.pass_obj +def firmware_update(client, filename, url, version, skip_check): + if filename: + fp = open(filename, 'rb').read() + elif url: + import requests + click.echo('Downloading from', url) + r = requests.get(url) + fp = r.content + else: + import requests + r = requests.get('https://wallet.trezor.io/data/firmware/releases.json') + releases = r.json() + + def version_func(r): + return r['version'] + + def version_string(r): + return '.'.join(map(str, version_func(r))) + + if version: + release = next((r for r in releases if version_string(r) == version)) + else: + release = max(releases, key=version_func) + click.echo('Fetching version: %s' % version_string(release)) + click.echo('Firmware fingerprint: %s' % release['fingerprint']) + url = 'https://wallet.trezor.io/' + release['url'] + click.echo('Downloading from %s' % url) + r = requests.get(url) + fp = r.content + + if not skip_check: + if fp[:8] == b'54525a52' or fp[:8] == b'54525a56': + fp = binascii.unhexlify(fp) + if fp[:4] != b'TRZR' and fp[:4] != b'TRZV': + raise CallException(types.Failure_FirmwareError, 'TREZOR firmware header expected') + + click.echo('Please confirm action on device...') + + from io import BytesIO + ret = client.firmware_update(fp=BytesIO(fp)) + output(ret) + + +@cli.command(help='Perform a self-test.') +@click.pass_obj +def self_test(client): + ret = client.self_test() + output(ret) + + +# +# Basic coin functions +# + + +@cli.command(help='Get address for specified path.') +@click.option('-c', '--coin', default='Bitcoin') +@click.option('-n', '--address') +@click.option('-t', '--script-type', type=click.Choice(['address', 'segwit', 'p2shsegwit']), default='address') +@click.option('-d', '--show-display', is_flag=True) +@click.pass_obj +def get_address(client, coin, address, script_type, show_display): + address_n = client.expand_path(address) + typemap = { + 'address': types.SPENDADDRESS, + 'segwit': types.SPENDWITNESS, + 'p2shsegwit': types.SPENDP2SHWITNESS, + } + script_type = typemap[script_type] + ret = client.get_address(coin, address_n, show_display, script_type=script_type) + output(ret) + + +@cli.command(help='Get public node of given path.') +@click.option('-c', '--coin', default='Bitcoin') +@click.option('-n', '-address') +@click.option('-e', '--curve') +@click.option('-d', '--show-display', is_flag=True) +@click.pass_obj +def get_public_node(client, coin, address, curve, show_display): + address_n = client.expand_path(address) + ret = client.get_public_node(address_n, ecdsa_curve_name=curve, show_display=show_display, coin_name=coin) + output(ret) + + +# +# Message functions +# + + +@cli.command(help='Sign message using address of given path.') +@click.option('-c', '--coin', default='Bitcoin') +@click.option('-n', '-address') +@click.argument('message') +@click.pass_obj +def sign_message(client, coin, address, message): + address_n = client.expand_path(address) + res = client.sign_message(coin, address_n, message) + ret = { + 'message': message, + 'address': res.address, + 'signature': base64.b64encode(res.signature) + } + output(ret) + + +@cli.command(help='Verify message.') +@click.option('-c', '--coin', default='Bitcoin') +@click.argument('address') +@click.argument('signature') +@click.argument('message') +@click.pass_obj +def verify_message(client, coin, address, signature, message): + signature = base64.b64decode(signature) + ret = client.verify_message(coin, address, signature, message) + output(ret) + + +@cli.command(help='Encrypt value by given key and path.') +@click.option('-n', '-address') +@click.argument('key') +@click.argument('value') +@click.pass_obj +def encrypt_keyvalue(client, address, key, value): + address_n = client.expand_path(address) + res = client.encrypt_keyvalue(address_n, key, value) + ret = binascii.hexlify(res) + output(ret) + + +@cli.command(help='Decrypt value by given key and path.') +@click.option('-n', '-address') +@click.argument('key') +@click.argument('value') +@click.pass_obj +def decrypt_keyvalue(client, address, key, value): + address_n = client.expand_path(address) + ret = client.decrypt_keyvalue(address_n, key, value.decode('hex')) + output(ret) + + +@cli.command(help='Encrypt message.') +@click.option('-c', '--coin', default='Bitcoin') +@click.option('-d', '--display-only', is_flag=True) +@click.option('-n', '-address') +@click.argument('pubkey') +@click.argument('message') +@click.pass_obj +def encrypt_message(client, coin, display_only, address, pubkey, message): + pubkey = binascii.unhexlify(pubkey) + address_n = client.expand_path(address) + res = client.encrypt_message(pubkey, message, display_only, coin, address_n) + ret = { + 'nonce': binascii.hexlify(res.nonce), + 'message': binascii.hexlify(res.message), + 'hmac': binascii.hexlify(res.hmac), + 'payload': base64.b64encode(res.nonce + res.message + res.hmac), + } + output(ret) + + +@cli.command(help='Decrypt message.') +@click.option('-n', '-address') +@click.argument('payload') +@click.pass_obj +def decrypt_message(client, address, payload): + address_n = client.expand_path(address) + payload = base64.b64decode(payload) + nonce, message, msg_hmac = payload[:33], payload[33:-8], payload[-8:] + ret = client.decrypt_message(address_n, nonce, message, msg_hmac) + output(ret) + + +# +# Ethereum functions +# + + +@cli.command(help='Get Ethereum address in hex encoding.') +@click.option('-n', '-address') +@click.option('-d', '--show-display', is_flag=True) +@click.pass_obj +def ethereum_get_address(client, address, show_display): + address_n = client.expand_path(address) + address = client.ethereum_get_address(address_n, show_display) + ret = '0x%s' % binascii.hexlify(address) + output(ret) + + +@cli.command(help='Sign (and optionally publish) Ethereum transaction. Use TO as destination address or set TO to "" for contract creation.') +@click.option('-a', '--host', default='localhost:8545', help='RPC port of ethereum node for automatic gas/nonce estimation') +@click.option('-c', '--chain-id', type=int, help='EIP-155 chain id (replay protection)') +@click.option('-n', '-address', help='BIP-32 path to signing key') +@click.option('-v', '--value', default='0', help='Ether amount to transfer, e.g. "100 milliether"') +@click.option('-g', '--gas-limit', type=int, help='Gas limit - Required for offline signing') +@click.option('-t', '--gas-price', help='Gas price, e.g. "20 nanoether" - Required for offline signing') +@click.option('-i', '--nonce', type=int, help='Transaction counter - Required for offline signing') +@click.option('-d', '--data', default='', help='Data as hex string, e.g. 0x12345678') +@click.option('-p', '--publish', is_flag=True, help='Publish transaction via RPC') +@click.argument('to') +@click.pass_obj +def ethereum_sign_tx(client, host, chain_id, address, value, gas_limit, gas_price, nonce, data, publish, to): + from ethjsonrpc import EthJsonRpc + import rlp + + ether_units = { + 'wei': 1, + 'kwei': 1000, + 'babbage': 1000, + 'femtoether': 1000, + 'mwei': 1000000, + 'lovelace': 1000000, + 'picoether': 1000000, + 'gwei': 1000000000, + 'shannon': 1000000000, + 'nanoether': 1000000000, + 'nano': 1000000000, + 'szabo': 1000000000000, + 'microether': 1000000000000, + 'micro': 1000000000000, + 'finney': 1000000000000000, + 'milliether': 1000000000000000, + 'milli': 1000000000000000, + 'ether': 1000000000000000000, + 'eth': 1000000000000000000, + } + + if ' ' in value: + value, unit = value.split(' ', 1) + if unit.lower() not in ether_units: + raise CallException(types.Failure_DataError, 'Unrecognized ether unit %r' % unit) + value = int(value) * ether_units[unit.lower()] + else: + value = int(value) + + if gas_price is not None: + if ' ' in gas_price: + gas_price, unit = gas_price.split(' ', 1) if unit.lower() not in ether_units: - raise CallException(types.Failure_DataError, "Unrecognized ether unit %r" % unit) - value = int(value) * ether_units[unit.lower()] + raise CallException(types.Failure_DataError, 'Unrecognized gas price unit %r' % unit) + gas_price = int(gas_price) * ether_units[unit.lower()] else: - value = int(value) - - gas_price = args.gas_price - if gas_price is not None: - if ' ' in gas_price: - gas_price, unit = gas_price.split(' ', 1) - if unit.lower() not in ether_units: - raise CallException(types.Failure_DataError, "Unrecognized gas price unit %r" % unit) - gas_price = int(gas_price) * ether_units[unit.lower()] - else: - gas_price = int(gas_price) - - gas_limit = args.gas - if gas_limit is not None: - gas_limit = int(gas_limit) - - if args.to.startswith('0x') or args.to.startswith('0X'): - to_address = args.to[2:].decode('hex') - else: - to_address = args.to.decode('hex') - - nonce = args.nonce - if nonce: - nonce = int(nonce) - - address_n = self.client.expand_path(args.n) - address = "0x%s" % (binascii.hexlify(self.client.ethereum_get_address(address_n)),) - - if gas_price is None or gas_limit is None or nonce is None: - host, port = args.host.split(':') - eth = EthJsonRpc(host, int(port)) - - if not args.data: - args.data = '' - elif args.data.startswith('0x'): - args.data = args.data[2:] - data = binascii.unhexlify(args.data) - - if gas_price is None: - gas_price = eth.eth_gasPrice() - - if gas_limit is None: - gas_limit = eth.eth_estimateGas( - to_address=args.to, - from_address=address, - value=("0x%x" % value), - data="0x"+args.data) - - if nonce is None: - nonce = eth.eth_getTransactionCount(address) - - sig = self.client.ethereum_sign_tx( - n=address_n, - nonce=nonce, - gas_price=gas_price, - gas_limit=gas_limit, - to=to_address, - value=value, - data=data, - chain_id=args.chain_id) - - transaction = rlp.encode( - (nonce, gas_price, gas_limit, to_address, value, data) + sig) - tx_hex = '0x%s' % binascii.hexlify(transaction) - - if args.publish: - tx_hash = eth.eth_sendRawTransaction(tx_hex) - return 'Transaction published with ID: %s' % tx_hash - else: - return 'Signed raw transaction: %s' % tx_hex - - def get_entropy(self, args): - return binascii.hexlify(self.client.get_entropy(args.size)) - - def get_features(self, args): - return self.client.features - - def list_coins(self, args): - return [coin.coin_name for coin in self.client.features.coins] - - def ping(self, args): - return self.client.ping(args.msg, button_protection=args.button_protection, pin_protection=args.pin_protection, passphrase_protection=args.passphrase_protection) - - def get_public_node(self, args): - address_n = self.client.expand_path(args.n) - return self.client.get_public_node(address_n, ecdsa_curve_name=args.curve, show_display=args.show_display, coin_name=args.coin) - - def enable_passphrase(self, args): - return self.client.apply_settings(use_passphrase=True) - - def disable_passphrase(self, args): - return self.client.apply_settings(use_passphrase=False) - - def set_label(self, args): - return self.client.apply_settings(label=args.label) - - def set_homescreen(self, args): - if args.filename: - from PIL import Image - im = Image.open(args.filename) - if im.size != (128, 64): - raise CallException(types.Failure_DataError, 'Wrong size of the image') - im = im.convert('1') - pix = im.load() - img = '' - for j in range(64): - for i in range(128): - img += '1' if pix[i, j] else '0' - img = ''.join(chr(int(img[i:i + 8], 2)) for i in range(0, len(img), 8)) - else: - img = '\x00' - return self.client.apply_settings(homescreen=img) - - def clear_session(self, args): - return self.client.clear_session() - - def change_pin(self, args): - return self.client.change_pin(args.remove) - - def wipe_device(self, args): - return self.client.wipe_device() - - def recovery_device(self, args): - typemap = { 'scrambled': types.RecoveryDeviceType_ScrambledWords, - 'matrix': types.RecoveryDeviceType_Matrix } - return self.client.recovery_device(args.words, args.passphrase_protection, - args.pin_protection, args.label, 'english', - typemap[args.type], args.expand, args.dry_run) - - def load_device(self, args): - if not args.mnemonic and not args.xprv: - raise CallException(types.Failure_DataError, "Please provide mnemonic or xprv") - - if args.mnemonic: - mnemonic = ' '.join(args.mnemonic) - return self.client.load_device_by_mnemonic(mnemonic, args.pin, - args.passphrase_protection, - args.label, 'english', - args.skip_checksum, - args.expand) - else: - return self.client.load_device_by_xprv(args.xprv, args.pin, - args.passphrase_protection, - args.label, 'english') - - def reset_device(self, args): - return self.client.reset_device(True, args.strength, args.passphrase_protection, - args.pin_protection, args.label, 'english', - args.u2f_counter, args.skip_backup) - - def backup_device(self, args): - return self.client.backup_device() - - def sign_message(self, args): - address_n = self.client.expand_path(args.n) - ret = self.client.sign_message(args.coin, address_n, args.message) - output = { - 'message': args.message, - 'address': ret.address, - 'signature': base64.b64encode(ret.signature) - } - return output - - def verify_message(self, args): - signature = base64.b64decode(args.signature) - return self.client.verify_message(args.coin, args.address, signature, args.message) - - def encrypt_message(self, args): - pubkey = binascii.unhexlify(args.pubkey) - address_n = self.client.expand_path(args.n) - ret = self.client.encrypt_message(pubkey, args.message, args.display_only, args.coin, address_n) - output = { - 'nonce': binascii.hexlify(ret.nonce), - 'message': binascii.hexlify(ret.message), - 'hmac': binascii.hexlify(ret.hmac), - 'payload': base64.b64encode(ret.nonce + ret.message + ret.hmac), - } - return output - - def decrypt_message(self, args): - address_n = self.client.expand_path(args.n) - payload = base64.b64decode(args.payload) - nonce, message, msg_hmac = payload[:33], payload[33:-8], payload[-8:] - ret = self.client.decrypt_message(address_n, nonce, message, msg_hmac) - return ret - - def encrypt_keyvalue(self, args): - address_n = self.client.expand_path(args.n) - ret = self.client.encrypt_keyvalue(address_n, args.key, args.value) - return binascii.hexlify(ret) - - def decrypt_keyvalue(self, args): - address_n = self.client.expand_path(args.n) - ret = self.client.decrypt_keyvalue(address_n, args.key, args.value.decode("hex")) - return ret - - def set_u2f_counter(self, args): - ret = self.client.set_u2f_counter(args.counter) - return ret - - def self_test(self, args): - ret = self.client.self_test() - return ret - - def firmware_update(self, args): - if args.file: - fp = open(args.file, 'rb').read() - elif args.url: - import requests - print("Downloading from", args.url) - r = requests.get(args.url) - fp = r.content - else: - import requests - r = requests.get('https://wallet.trezor.io/data/firmware/releases.json') - releases = r.json() - version = lambda r: r['version'] - version_string = lambda r: ".".join(map(str, version(r))) - if args.version: - release = next((r for r in releases if version_string(r) == args.version)) - else: - release = max(releases, key=version) - print("No file, url, or version given. Fetching latest version: %s" % version_string(release)) - print("Firmware fingerprint: %s" % release['fingerprint']) - args.url = 'https://wallet.trezor.io/' + release['url'] - return self.firmware_update(args) - - if not args.skip_check: - if fp[:8] == b'54525a52' or fp[:8] == b'54525a56': - fp = binascii.unhexlify(fp) - if fp[:4] != b'TRZR' and fp[:4] != b'TRZV': - raise CallException(types.Failure_FirmwareError, "TREZOR firmware header expected") - - print("Please confirm action on device...") - - return self.client.firmware_update(fp=BytesIO(fp)) - - list.help = 'List connected TREZOR USB devices' - ping.help = 'Send ping message' - get_address.help = 'Get bitcoin address in base58 encoding' - ethereum_get_address.help = 'Get Ethereum address in hex encoding' - ethereum_sign_tx.help = 'Sign (and optionally publish) Ethereum transaction' - get_entropy.help = 'Get example entropy' - get_features.help = 'Retrieve device features and settings' - get_public_node.help = 'Get public node of given path' - enable_passphrase.help = 'Enable passphrase' - disable_passphrase.help = 'Disable passphrase' - set_label.help = 'Set new wallet label' - set_homescreen.help = 'Set new homescreen' - clear_session.help = 'Clear session (remove cached PIN, passphrase, etc.)' - change_pin.help = 'Change new PIN or remove existing' - list_coins.help = 'List all supported coin types by the device' - wipe_device.help = 'Reset device to factory defaults and remove all private data.' - recovery_device.help = 'Start safe recovery workflow' - load_device.help = 'Load custom configuration to the device' - reset_device.help = 'Perform device setup and generate new seed' - backup_device.help = 'Perform device seed backup' - sign_message.help = 'Sign message using address of given path' - verify_message.help = 'Verify message' - encrypt_message.help = 'Encrypt message' - decrypt_message.help = 'Decrypt message' - encrypt_keyvalue.help = 'Encrypt value by given key and path' - decrypt_keyvalue.help = 'Decrypt value by given key and path' - set_u2f_counter.help = 'Set U2F counter' - self_test.help = 'Perform a self-test' - firmware_update.help = 'Upload new firmware to device (must be in bootloader mode)' - - clear_session.arguments= () - - get_address.arguments = ( - (('-c', '--coin'), {'type': str, 'default': 'Bitcoin'}), - (('-n', '-address'), {'type': str}), - (('-t', '--script-type'), {'type': str, 'choices': ['address', 'segwit', 'p2shsegwit'], 'default': 'address'}), - (('-d', '--show-display'), {'action': 'store_true', 'default': False}), - ) - - ethereum_get_address.arguments = ( - (('-n', '-address'), {'type': str}), - (('-d', '--show-display'), {'action': 'store_true', 'default': False}), - ) - - ethereum_sign_tx.arguments = ( - (('-a', '--host'), {'type': str, 'help': 'RPC port of ethereum node for automatic gas/nonce estimation', 'default': 'localhost:8545'}), - (('-c', '--chain-id'), {'type' : int, 'help': 'EIP-155 chain id (replay protection)', 'default': None}), - (('-n', '-address'), {'type': str, 'help': 'BIP-32 path to signing key'}), - (('-v', '--value'), {'type': str, 'help': 'Ether amount to transfer, e.g., "100 milliether"', 'default': "0"}), - (('-g', '--gas'), {'type': int, 'help': 'Gas limit - Required for offline signing', 'default': None}), - (('-t', '--gas-price'), {'type': str, 'help': 'Gas price, e.g., "20 nanoether" - Required for offline signing', 'default': None }), - (('-i', '--nonce'), {'type': int, 'help': 'Transaction counter - Required for offline signing', 'default': None}), - (('-d', '--data'), {'type': str, 'help': 'Data as hex string, e.g., 0x12345678', 'default': ''}), - (('-p', '--publish'), {'action': 'store_true', 'help': 'publish transaction via RPC', 'default': False}), - (('to',), {'type': str, 'help': 'Destination address; "" for contract creation'}), - ) - - get_entropy.arguments = ( - (('size',), {'type': int}), - ) - - get_features.arguments = () - - list_coins.arguments = () - - ping.arguments = ( - (('msg',), {'type': str}), - (('-b', '--button-protection'), {'action': 'store_true', 'default': False}), - (('-p', '--pin-protection'), {'action': 'store_true', 'default': False}), - (('-r', '--passphrase-protection'), {'action': 'store_true', 'default': False}), - ) - - enable_passphrase.arguments = () - - disable_passphrase.arguments = () - - set_label.arguments = ( - (('-l', '--label',), {'type': str, 'default': ''}), - # (('-c', '--clear'), {'action': 'store_true', 'default': False}) - ) - - set_homescreen.arguments = ( - (('-f', '--filename',), {'type': str, 'default': ''}), - ) - change_pin.arguments = ( - (('-r', '--remove'), {'action': 'store_true', 'default': False}), - ) - - wipe_device.arguments = () - - recovery_device.arguments = ( - (('-w', '--words'), {'type': int, 'choices': [12, 18, 24], 'default': 24}), - (('-e', '--expand'), {'action': 'store_true', 'default': False}), - (('-p', '--pin-protection'), {'action': 'store_true', 'default': False}), - (('-r', '--passphrase-protection'), {'action': 'store_true', 'default': False}), - (('-l', '--label'), {'type': str, 'default': ''}), - (('-t', '--type'), {'type': str, 'choices': ['scrambled', 'matrix'], 'default': 'scrambled'}), - (('-d', '--dry-run'), {'action': 'store_true', 'default': False}), - ) - - load_device.arguments = ( - (('-m', '--mnemonic'), {'type': str, 'nargs': '+'}), - (('-e', '--expand'), {'action': 'store_true', 'default': False}), - (('-x', '--xprv'), {'type': str}), - (('-p', '--pin'), {'type': str, 'default': ''}), - (('-r', '--passphrase-protection'), {'action': 'store_true', 'default': False}), - (('-l', '--label'), {'type': str, 'default': ''}), - (('-s', '--skip-checksum'), {'action': 'store_true', 'default': False}), - ) - - reset_device.arguments = ( - (('-t', '--strength'), {'type': int, 'choices': [128, 192, 256], 'default': 256}), - (('-p', '--pin-protection'), {'action': 'store_true', 'default': False}), - (('-r', '--passphrase-protection'), {'action': 'store_true', 'default': False}), - (('-l', '--label'), {'type': str, 'default': ''}), - (('-u', '--u2f-counter'), {'type': int, 'default': 0}), - (('-s', '--skip-backup'), {'action': 'store_true', 'default': False}), - ) - - backup_device.arguments = () - - sign_message.arguments = ( - (('-c', '--coin'), {'type': str, 'default': 'Bitcoin'}), - (('-n', '-address'), {'type': str}), - (('message',), {'type': str}), - ) - - encrypt_message.arguments = ( - (('pubkey',), {'type': str}), - (('message',), {'type': str}), - (('-d', '--display-only'), {'action': 'store_true', 'default': False}), - (('-c', '--coin'), {'type': str, 'default': 'Bitcoin'}), - (('-n', '-address'), {'type': str}), - ) - - decrypt_message.arguments = ( - (('-n', '-address'), {'type': str}), - (('payload',), {'type': str}), - ) - - verify_message.arguments = ( - (('-c', '--coin'), {'type': str, 'default': 'Bitcoin'}), - (('address',), {'type': str}), - (('signature',), {'type': str}), - (('message',), {'type': str}), - ) - - encrypt_keyvalue.arguments = ( - (('-n', '-address'), {'type': str}), - (('key',), {'type': str}), - (('value',), {'type': str}), - ) - - decrypt_keyvalue.arguments = ( - (('-n', '-address'), {'type': str}), - (('key',), {'type': str}), - (('value',), {'type': str}), - ) - - get_public_node.arguments = ( - (('-c', '--coin'), {'type': str, 'default': 'Bitcoin'}), - (('-n', '-address'), {'type': str}), - (('-e', '--curve'), {'type': str}), - (('-d', '--show-display'), {'action': 'store_true', 'default': False}), - ) - - set_u2f_counter.arguments = ( - (('counter',), {'type': int}), - ) - - self_test.arguments = () - - firmware_update.arguments = ( - (('-f', '--file'), {'type': str}), - (('-u', '--url'), {'type': str}), - (('-n', '--version'), {'type': str}), - (('-s', '--skip-check'), {'action': 'store_true', 'default': False}), - ) - -def list_usb(): - from trezorlib.transport_hid import HidTransport - return HidTransport.enumerate() - -def main(): - parser = init_parser(Commands) - args = parser.parse_args() - - if not hasattr(args, 'cmd'): - parser.print_help() - exit(0) - - try: - - if args.cmd == 'list': - devices = list_usb() - if args.json: - print(json.dumps(devices)) - else: - for dev in devices: - if dev[1] != None: - print("%s - debuglink enabled" % dev[0]) - else: - print(dev[0]) - return - - transport = get_transport(args.transport, args.path) - if args.verbose: - client = TrezorClientVerbose(transport) - else: - client = TrezorClient(transport) - - cmds = Commands(client) - - res = args.func(cmds, args) - - if args.json: - print(json.dumps(res, sort_keys=True, indent=4)) - else: - print(res) - except CallException as e: - status, message = e.args - sys.stderr.write('Failure: ') - sys.stderr.write(str(message)) - sys.stderr.write('\n') - exit(status) + gas_price = int(gas_price) + + if gas_limit is not None: + gas_limit = int(gas_limit) + + if to.startswith('0x') or to.startswith('0X'): + to_address = to[2:].decode('hex') + else: + to_address = to.decode('hex') + + address_n = client.expand_path(address) + address = '0x%s' % (binascii.hexlify(client.ethereum_get_address(address_n)),) + + if gas_price is None or gas_limit is None or nonce is None: + host, port = host.split(':') + eth = EthJsonRpc(host, int(port)) + + if not data: + data = '' + elif data.startswith('0x'): + data = data[2:] + data = binascii.unhexlify(data) + + if gas_price is None: + gas_price = eth.eth_gasPrice() + + if gas_limit is None: + gas_limit = eth.eth_estimateGas( + to_address=to, + from_address=address, + value=('0x%x' % value), + data='0x' + data) + + if nonce is None: + nonce = eth.eth_getTransactionCount(address) + + sig = client.ethereum_sign_tx( + n=address_n, + nonce=nonce, + gas_price=gas_price, + gas_limit=gas_limit, + to=to_address, + value=value, + data=data, + chain_id=chain_id) + + transaction = rlp.encode( + (nonce, gas_price, gas_limit, to_address, value, data) + sig) + tx_hex = '0x%s' % binascii.hexlify(transaction) + + if publish: + tx_hash = eth.eth_sendRawTransaction(tx_hex) + ret = 'Transaction published with ID: %s' % tx_hash + else: + ret = 'Signed raw transaction: %s' % tx_hex + output(ret) + + +# +# Main +# if __name__ == '__main__': - main() - \ No newline at end of file + cli() diff --git a/trezorlib/transport_udp.py b/trezorlib/transport_udp.py index da221b6a16..1bf5ad8469 100644 --- a/trezorlib/transport_udp.py +++ b/trezorlib/transport_udp.py @@ -26,6 +26,8 @@ from .transport import TransportV2 class UdpTransport(TransportV2): def __init__(self, device, *args, **kwargs): + if device is None: + device = '' device = device.split(':') if len(device) < 2: if not device[0]: