1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2025-02-22 12:32:02 +00:00

trezorlib: split out methods from ProtocolMixin

This commit is contained in:
matejcik 2018-06-13 19:04:18 +02:00
parent 1820f529fc
commit f3a042db80
10 changed files with 894 additions and 616 deletions

170
trezorlib/btc.py Normal file
View File

@ -0,0 +1,170 @@
from . import messages as proto
from .tools import expect, field, CallException, normalize_nfc, session
### Client functions ###
@expect(proto.PublicKey)
def get_public_node(client, n, ecdsa_curve_name=None, show_display=False, coin_name=None):
n = client._convert_prime(n)
return client.call(proto.GetPublicKey(address_n=n, ecdsa_curve_name=ecdsa_curve_name, show_display=show_display, coin_name=coin_name))
@field('address')
@expect(proto.Address)
def get_address(client, coin_name, n, show_display=False, multisig=None, script_type=proto.InputScriptType.SPENDADDRESS):
n = client._convert_prime(n)
if multisig:
return client.call(proto.GetAddress(address_n=n, coin_name=coin_name, show_display=show_display, multisig=multisig, script_type=script_type))
else:
return client.call(proto.GetAddress(address_n=n, coin_name=coin_name, show_display=show_display, script_type=script_type))
@expect(proto.MessageSignature)
def sign_message(client, coin_name, n, message, script_type=proto.InputScriptType.SPENDADDRESS):
n = client._convert_prime(n)
message = normalize_nfc(message)
return client.call(proto.SignMessage(coin_name=coin_name, address_n=n, message=message, script_type=script_type))
def verify_message(client, coin_name, address, signature, message):
message = normalize_nfc(message)
try:
resp = client.call(proto.VerifyMessage(address=address, signature=signature, message=message, coin_name=coin_name))
except CallException as e:
resp = e
return isinstance(resp, proto.Success)
@expect(proto.EncryptedMessage)
def encrypt_message(client, pubkey, message, display_only, coin_name, n):
if coin_name and n:
n = client._convert_prime(n)
return client.call(proto.EncryptMessage(pubkey=pubkey, message=message, display_only=display_only, coin_name=coin_name, address_n=n))
else:
return client.call(proto.EncryptMessage(pubkey=pubkey, message=message, display_only=display_only))
@expect(proto.DecryptedMessage)
def decrypt_message(client, n, nonce, message, msg_hmac):
n = client._convert_prime(n)
return client.call(proto.DecryptMessage(address_n=n, nonce=nonce, message=message, hmac=msg_hmac))
@session
def sign_tx(client, coin_name, inputs, outputs, version=None, lock_time=None, expiry=None, overwintered=None, debug_processor=None):
# start = time.time()
txes = client._prepare_sign_tx(inputs, outputs)
# Prepare and send initial message
tx = proto.SignTx()
tx.inputs_count = len(inputs)
tx.outputs_count = len(outputs)
tx.coin_name = coin_name
if version is not None:
tx.version = version
if lock_time is not None:
tx.lock_time = lock_time
if expiry is not None:
tx.expiry = expiry
if overwintered is not None:
tx.overwintered = overwintered
res = client.call(tx)
# Prepare structure for signatures
signatures = [None] * len(inputs)
serialized_tx = b''
counter = 0
while True:
counter += 1
if isinstance(res, proto.Failure):
raise CallException("Signing failed")
if not isinstance(res, proto.TxRequest):
raise CallException("Unexpected message")
# If there's some part of signed transaction, let's add it
if res.serialized and res.serialized.serialized_tx:
# log("RECEIVED PART OF SERIALIZED TX (%d BYTES)" % len(res.serialized.serialized_tx))
serialized_tx += res.serialized.serialized_tx
if res.serialized and res.serialized.signature_index is not None:
if signatures[res.serialized.signature_index] is not None:
raise ValueError("Signature for index %d already filled" % res.serialized.signature_index)
signatures[res.serialized.signature_index] = res.serialized.signature
if res.request_type == proto.RequestType.TXFINISHED:
# Device didn't ask for more information, finish workflow
break
# Device asked for one more information, let's process it.
if not res.details.tx_hash:
current_tx = txes[None]
else:
current_tx = txes[bytes(res.details.tx_hash)]
if res.request_type == proto.RequestType.TXMETA:
msg = proto.TransactionType()
msg.version = current_tx.version
msg.lock_time = current_tx.lock_time
msg.inputs_cnt = len(current_tx.inputs)
if res.details.tx_hash:
msg.outputs_cnt = len(current_tx.bin_outputs)
else:
msg.outputs_cnt = len(current_tx.outputs)
msg.extra_data_len = len(current_tx.extra_data) if current_tx.extra_data else 0
res = client.call(proto.TxAck(tx=msg))
continue
elif res.request_type == proto.RequestType.TXINPUT:
msg = proto.TransactionType()
msg.inputs = [current_tx.inputs[res.details.request_index]]
if debug_processor is not None:
# msg needs to be deep copied so when it's modified
# the other messages stay intact
from copy import deepcopy
msg = deepcopy(msg)
# If debug_processor function is provided,
# pass thru it the request and prepared response.
# This is useful for tests, see test_msg_signtx
msg = debug_processor(res, msg)
res = client.call(proto.TxAck(tx=msg))
continue
elif res.request_type == proto.RequestType.TXOUTPUT:
msg = proto.TransactionType()
if res.details.tx_hash:
msg.bin_outputs = [current_tx.bin_outputs[res.details.request_index]]
else:
msg.outputs = [current_tx.outputs[res.details.request_index]]
if debug_processor is not None:
# msg needs to be deep copied so when it's modified
# the other messages stay intact
from copy import deepcopy
msg = deepcopy(msg)
# If debug_processor function is provided,
# pass thru it the request and prepared response.
# This is useful for tests, see test_msg_signtx
msg = debug_processor(res, msg)
res = client.call(proto.TxAck(tx=msg))
continue
elif res.request_type == proto.RequestType.TXEXTRADATA:
o, l = res.details.extra_data_offset, res.details.extra_data_len
msg = proto.TransactionType()
msg.extra_data = current_tx.extra_data[o:o + l]
res = client.call(proto.TxAck(tx=msg))
continue
if None in signatures:
raise RuntimeError("Some signatures are missing!")
# log("SIGNED IN %.03f SECONDS, CALLED %d MESSAGES, %d BYTES" %
# (time.time() - start, counter, len(serialized_tx)))
return (signatures, serialized_tx)

View File

@ -28,11 +28,9 @@ import warnings
from mnemonic import Mnemonic
from . import messages as proto
from . import tools
from . import btc, cosi, device, ethereum, firmware, lisk, misc, nem, stellar
from . import mapping
from . import nem
from . import stellar
from .tools import CallException, field, expect
from . import tools
from .debuglink import DebugLink
if sys.version_info.major < 3:
@ -412,9 +410,6 @@ class ProtocolMixin(object):
if str(self.features.vendor) not in self.VENDORS:
raise RuntimeError("Unsupported device")
def _get_local_entropy(self):
return os.urandom(32)
@staticmethod
def _convert_prime(n: tools.Address) -> tools.Address:
# Convert minus signs to uint32 with flag
@ -425,158 +420,8 @@ class ProtocolMixin(object):
warnings.warn('expand_path is deprecated, use tools.parse_path', DeprecationWarning, stacklevel=2)
return tools.parse_path(n)
@expect(proto.PublicKey)
def get_public_node(self, n, ecdsa_curve_name=None, show_display=False, coin_name=None):
n = self._convert_prime(n)
return self.call(proto.GetPublicKey(address_n=n, ecdsa_curve_name=ecdsa_curve_name, show_display=show_display, coin_name=coin_name))
@field('address')
@expect(proto.Address)
def get_address(self, coin_name, n, show_display=False, multisig=None, script_type=proto.InputScriptType.SPENDADDRESS):
n = self._convert_prime(n)
if multisig:
return self.call(proto.GetAddress(address_n=n, coin_name=coin_name, show_display=show_display, multisig=multisig, script_type=script_type))
else:
return self.call(proto.GetAddress(address_n=n, coin_name=coin_name, show_display=show_display, script_type=script_type))
@field('address')
@expect(proto.EthereumAddress)
def ethereum_get_address(self, n, show_display=False, multisig=None):
n = self._convert_prime(n)
return self.call(proto.EthereumGetAddress(address_n=n, show_display=show_display))
@session
def ethereum_sign_tx(self, n, nonce, gas_price, gas_limit, to, value, data=None, chain_id=None, tx_type=None):
def int_to_big_endian(value):
return value.to_bytes((value.bit_length() + 7) // 8, 'big')
n = self._convert_prime(n)
msg = proto.EthereumSignTx(
address_n=n,
nonce=int_to_big_endian(nonce),
gas_price=int_to_big_endian(gas_price),
gas_limit=int_to_big_endian(gas_limit),
value=int_to_big_endian(value))
if to:
msg.to = to
if data:
msg.data_length = len(data)
data, chunk = data[1024:], data[:1024]
msg.data_initial_chunk = chunk
if chain_id:
msg.chain_id = chain_id
if tx_type is not None:
msg.tx_type = tx_type
response = self.call(msg)
while response.data_length is not None:
data_length = response.data_length
data, chunk = data[data_length:], data[:data_length]
response = self.call(proto.EthereumTxAck(data_chunk=chunk))
return response.signature_v, response.signature_r, response.signature_s
@expect(proto.EthereumMessageSignature)
def ethereum_sign_message(self, n, message):
n = self._convert_prime(n)
message = normalize_nfc(message)
return self.call(proto.EthereumSignMessage(address_n=n, message=message))
def ethereum_verify_message(self, address, signature, message):
message = normalize_nfc(message)
try:
resp = self.call(proto.EthereumVerifyMessage(address=address, signature=signature, message=message))
except CallException as e:
resp = e
if isinstance(resp, proto.Success):
return True
return False
#
# Lisk functions
#
@field('address')
@expect(proto.LiskAddress)
def lisk_get_address(self, n, show_display=False):
n = self._convert_prime(n)
return self.call(proto.LiskGetAddress(address_n=n, show_display=show_display))
@expect(proto.LiskPublicKey)
def lisk_get_public_key(self, n, show_display=False):
n = self._convert_prime(n)
return self.call(proto.LiskGetPublicKey(address_n=n, show_display=show_display))
@expect(proto.LiskMessageSignature)
def lisk_sign_message(self, n, message):
n = self._convert_prime(n)
message = normalize_nfc(message)
return self.call(proto.LiskSignMessage(address_n=n, message=message))
def lisk_verify_message(self, pubkey, signature, message):
message = normalize_nfc(message)
try:
resp = self.call(proto.LiskVerifyMessage(signature=signature, public_key=pubkey, message=message))
except CallException as e:
resp = e
return isinstance(resp, proto.Success)
@expect(proto.LiskSignedTx)
def lisk_sign_tx(self, n, transaction):
n = self._convert_prime(n)
def asset_to_proto(asset):
msg = proto.LiskTransactionAsset()
if "votes" in asset:
msg.votes = asset["votes"]
if "data" in asset:
msg.data = asset["data"]
if "signature" in asset:
msg.signature = proto.LiskSignatureType()
msg.signature.public_key = binascii.unhexlify(asset["signature"]["publicKey"])
if "delegate" in asset:
msg.delegate = proto.LiskDelegateType()
msg.delegate.username = asset["delegate"]["username"]
if "multisignature" in asset:
msg.multisignature = proto.LiskMultisignatureType()
msg.multisignature.min = asset["multisignature"]["min"]
msg.multisignature.life_time = asset["multisignature"]["lifetime"]
msg.multisignature.keys_group = asset["multisignature"]["keysgroup"]
return msg
msg = proto.LiskTransactionCommon()
msg.type = transaction["type"]
msg.fee = int(transaction["fee"]) # Lisk use strings for big numbers (javascript issue)
msg.amount = int(transaction["amount"]) # And we convert it back to number
msg.timestamp = transaction["timestamp"]
if "recipientId" in transaction:
msg.recipient_id = transaction["recipientId"]
if "senderPublicKey" in transaction:
msg.sender_public_key = binascii.unhexlify(transaction["senderPublicKey"])
if "requesterPublicKey" in transaction:
msg.requester_public_key = binascii.unhexlify(transaction["requesterPublicKey"])
if "signature" in transaction:
msg.signature = binascii.unhexlify(transaction["signature"])
msg.asset = asset_to_proto(transaction["asset"])
return self.call(proto.LiskSignTx(address_n=n, transaction=msg))
@field('entropy')
@expect(proto.Entropy)
def get_entropy(self, size):
return self.call(proto.GetEntropy(size=size))
@field('message')
@expect(proto.Success)
@tools.field('message')
@tools.expect(proto.Success)
def ping(self, msg, button_protection=False, pin_protection=False, passphrase_protection=False):
msg = proto.Ping(message=msg,
button_protection=button_protection,
@ -587,128 +432,6 @@ class ProtocolMixin(object):
def get_device_id(self):
return self.features.device_id
@field('message')
@expect(proto.Success)
def apply_settings(self, label=None, language=None, use_passphrase=None, homescreen=None, passphrase_source=None, auto_lock_delay_ms=None):
settings = proto.ApplySettings()
if label is not None:
settings.label = label
if language:
settings.language = language
if use_passphrase is not None:
settings.use_passphrase = use_passphrase
if homescreen is not None:
settings.homescreen = homescreen
if passphrase_source is not None:
settings.passphrase_source = passphrase_source
if auto_lock_delay_ms is not None:
settings.auto_lock_delay_ms = auto_lock_delay_ms
out = self.call(settings)
self.init_device() # Reload Features
return out
@field('message')
@expect(proto.Success)
def apply_flags(self, flags):
out = self.call(proto.ApplyFlags(flags=flags))
self.init_device() # Reload Features
return out
@field('message')
@expect(proto.Success)
def clear_session(self):
return self.call(proto.ClearSession())
@field('message')
@expect(proto.Success)
def change_pin(self, remove=False):
ret = self.call(proto.ChangePin(remove=remove))
self.init_device() # Re-read features
return ret
@expect(proto.MessageSignature)
def sign_message(self, coin_name, n, message, script_type=proto.InputScriptType.SPENDADDRESS):
n = self._convert_prime(n)
message = normalize_nfc(message)
return self.call(proto.SignMessage(coin_name=coin_name, address_n=n, message=message, script_type=script_type))
@expect(proto.SignedIdentity)
def sign_identity(self, identity, challenge_hidden, challenge_visual, ecdsa_curve_name=None):
return self.call(proto.SignIdentity(identity=identity, challenge_hidden=challenge_hidden, challenge_visual=challenge_visual, ecdsa_curve_name=ecdsa_curve_name))
@expect(proto.ECDHSessionKey)
def get_ecdh_session_key(self, identity, peer_public_key, ecdsa_curve_name=None):
return self.call(proto.GetECDHSessionKey(identity=identity, peer_public_key=peer_public_key, ecdsa_curve_name=ecdsa_curve_name))
@expect(proto.CosiCommitment)
def cosi_commit(self, n, data):
n = self._convert_prime(n)
return self.call(proto.CosiCommit(address_n=n, data=data))
@expect(proto.CosiSignature)
def cosi_sign(self, n, data, global_commitment, global_pubkey):
n = self._convert_prime(n)
return self.call(proto.CosiSign(address_n=n, data=data, global_commitment=global_commitment, global_pubkey=global_pubkey))
@field('message')
@expect(proto.Success)
def set_u2f_counter(self, u2f_counter):
ret = self.call(proto.SetU2FCounter(u2f_counter=u2f_counter))
return ret
@field("address")
@expect(proto.NEMAddress)
def nem_get_address(self, n, network, show_display=False):
n = self._convert_prime(n)
return self.call(proto.NEMGetAddress(address_n=n, network=network, show_display=show_display))
@expect(proto.NEMSignedTx)
def nem_sign_tx(self, n, transaction):
n = self._convert_prime(n)
try:
msg = nem.create_sign_tx(transaction)
except ValueError as e:
raise CallException(e.args)
assert msg.transaction is not None
msg.transaction.address_n = n
return self.call(msg)
def verify_message(self, coin_name, address, signature, message):
message = normalize_nfc(message)
try:
resp = self.call(proto.VerifyMessage(address=address, signature=signature, message=message, coin_name=coin_name))
except CallException as e:
resp = e
if isinstance(resp, proto.Success):
return True
return False
@field('value')
@expect(proto.CipheredKeyValue)
def encrypt_keyvalue(self, n, key, value, ask_on_encrypt=True, ask_on_decrypt=True, iv=b''):
n = self._convert_prime(n)
return self.call(proto.CipherKeyValue(address_n=n,
key=key,
value=value,
encrypt=True,
ask_on_encrypt=ask_on_encrypt,
ask_on_decrypt=ask_on_decrypt,
iv=iv))
@field('value')
@expect(proto.CipheredKeyValue)
def decrypt_keyvalue(self, n, key, value, ask_on_encrypt=True, ask_on_decrypt=True, iv=b''):
n = self._convert_prime(n)
return self.call(proto.CipherKeyValue(address_n=n,
key=key,
value=value,
encrypt=False,
ask_on_encrypt=ask_on_encrypt,
ask_on_decrypt=ask_on_decrypt,
iv=iv))
def _prepare_sign_tx(self, inputs, outputs):
tx = proto.TransactionType()
tx.inputs = inputs
@ -732,355 +455,73 @@ class ProtocolMixin(object):
return txes
@session
def sign_tx(self, coin_name, inputs, outputs, version=None, lock_time=None, expiry=None, overwintered=None, debug_processor=None):
@tools.field('message')
@tools.expect(proto.Success)
def clear_session(self):
return self.call(proto.ClearSession())
# start = time.time()
txes = self._prepare_sign_tx(inputs, outputs)
# Prepare and send initial message
tx = proto.SignTx()
tx.inputs_count = len(inputs)
tx.outputs_count = len(outputs)
tx.coin_name = coin_name
if version is not None:
tx.version = version
if lock_time is not None:
tx.lock_time = lock_time
if expiry is not None:
tx.expiry = expiry
if overwintered is not None:
tx.overwintered = overwintered
res = self.call(tx)
# Device functionality
self_test = MovedTo(device.self_test)
# Prepare structure for signatures
signatures = [None] * len(inputs)
serialized_tx = b''
wipe_device = MovedTo(device.wipe_device)
recovery_device = MovedTo(device.recovery_device)
reset_device = MovedTo(device.reset_device)
backup_device = MovedTo(device.backup_device)
counter = 0
while True:
counter += 1
load_device_by_mnemonic = MovedTo(device.load_device_by_mnemonic)
load_device_by_xprv = MovedTo(device.load_device_by_xprv)
if isinstance(res, proto.Failure):
raise CallException("Signing failed")
set_u2f_counter = MovedTo(device.set_u2f_counter)
if not isinstance(res, proto.TxRequest):
raise CallException("Unexpected message")
apply_settings = MovedTo(device.apply_settings)
apply_flags = MovedTo(device.apply_flags)
change_pin = MovedTo(device.change_pin)
# If there's some part of signed transaction, let's add it
if res.serialized and res.serialized.serialized_tx:
# log("RECEIVED PART OF SERIALIZED TX (%d BYTES)" % len(res.serialized.serialized_tx))
serialized_tx += res.serialized.serialized_tx
# Firmware functionality
firmware_update = MovedTo(firmware.update)
if res.serialized and res.serialized.signature_index is not None:
if signatures[res.serialized.signature_index] is not None:
raise ValueError("Signature for index %d already filled" % res.serialized.signature_index)
signatures[res.serialized.signature_index] = res.serialized.signature
# BTC-like functionality
get_public_node = MovedTo(btc.get_public_node)
get_address = MovedTo(btc.get_address)
sign_tx = MovedTo(btc.sign_tx)
sign_message = MovedTo(btc.sign_message)
verify_message = MovedTo(btc.verify_message)
encrypt_message = MovedTo(btc.encrypt_message)
decrypt_message = MovedTo(btc.decrypt_message)
if res.request_type == proto.RequestType.TXFINISHED:
# Device didn't ask for more information, finish workflow
break
# CoSi functionality
cosi_commit = MovedTo(cosi.commit)
cosi_sign = MovedTo(cosi.sign)
# Device asked for one more information, let's process it.
if not res.details.tx_hash:
current_tx = txes[None]
else:
current_tx = txes[bytes(res.details.tx_hash)]
# Ethereum functionality
ethereum_get_address = MovedTo(ethereum.get_address)
ethereum_sign_tx = MovedTo(ethereum.sign_tx)
ethereum_sign_message = MovedTo(ethereum.sign_message)
ethereum_verify_message = MovedTo(ethereum.verify_message)
if res.request_type == proto.RequestType.TXMETA:
msg = proto.TransactionType()
msg.version = current_tx.version
msg.lock_time = current_tx.lock_time
msg.inputs_cnt = len(current_tx.inputs)
if res.details.tx_hash:
msg.outputs_cnt = len(current_tx.bin_outputs)
else:
msg.outputs_cnt = len(current_tx.outputs)
msg.extra_data_len = len(current_tx.extra_data) if current_tx.extra_data else 0
res = self.call(proto.TxAck(tx=msg))
continue
# Lisk functionality
lisk_get_address = MovedTo(lisk.get_address)
lisk_get_public_key = MovedTo(lisk.get_public_key)
lisk_sign_message = MovedTo(lisk.sign_message)
lisk_verify_message = MovedTo(lisk.verify_message)
lisk_sign_tx = MovedTo(lisk.sign_tx)
elif res.request_type == proto.RequestType.TXINPUT:
msg = proto.TransactionType()
msg.inputs = [current_tx.inputs[res.details.request_index]]
if debug_processor is not None:
# msg needs to be deep copied so when it's modified
# the other messages stay intact
from copy import deepcopy
msg = deepcopy(msg)
# If debug_processor function is provided,
# pass thru it the request and prepared response.
# This is useful for tests, see test_msg_signtx
msg = debug_processor(res, msg)
# NEM functionality
nem_get_address = MovedTo(nem.get_address)
nem_sign_tx = MovedTo(nem.sign_tx)
res = self.call(proto.TxAck(tx=msg))
continue
# Stellar functionality
stellar_get_public_key = MovedTo(stellar.get_public_key)
stellar_get_address = MovedTo(stellar.get_address)
stellar_sign_transaction = MovedTo(stellar.sign_tx)
elif res.request_type == proto.RequestType.TXOUTPUT:
msg = proto.TransactionType()
if res.details.tx_hash:
msg.bin_outputs = [current_tx.bin_outputs[res.details.request_index]]
else:
msg.outputs = [current_tx.outputs[res.details.request_index]]
if debug_processor is not None:
# msg needs to be deep copied so when it's modified
# the other messages stay intact
from copy import deepcopy
msg = deepcopy(msg)
# If debug_processor function is provided,
# pass thru it the request and prepared response.
# This is useful for tests, see test_msg_signtx
msg = debug_processor(res, msg)
res = self.call(proto.TxAck(tx=msg))
continue
elif res.request_type == proto.RequestType.TXEXTRADATA:
o, l = res.details.extra_data_offset, res.details.extra_data_len
msg = proto.TransactionType()
msg.extra_data = current_tx.extra_data[o:o + l]
res = self.call(proto.TxAck(tx=msg))
continue
if None in signatures:
raise RuntimeError("Some signatures are missing!")
# log("SIGNED IN %.03f SECONDS, CALLED %d MESSAGES, %d BYTES" %
# (time.time() - start, counter, len(serialized_tx)))
return (signatures, serialized_tx)
@field('message')
@expect(proto.Success)
def wipe_device(self):
ret = self.call(proto.WipeDevice())
self.init_device()
return ret
@field('message')
@expect(proto.Success)
def recovery_device(self, word_count, passphrase_protection, pin_protection, label, language, type=proto.RecoveryDeviceType.ScrambledWords, expand=False, dry_run=False):
if self.features.initialized and not dry_run:
raise RuntimeError("Device is initialized already. Call wipe_device() and try again.")
if word_count not in (12, 18, 24):
raise ValueError("Invalid word count. Use 12/18/24")
self.recovery_matrix_first_pass = True
self.expand = expand
if self.expand:
# optimization to load the wordlist once, instead of for each recovery word
self.mnemonic_wordlist = Mnemonic('english')
res = self.call(proto.RecoveryDevice(
word_count=int(word_count),
passphrase_protection=bool(passphrase_protection),
pin_protection=bool(pin_protection),
label=label,
language=language,
enforce_wordlist=True,
type=type,
dry_run=dry_run))
self.init_device()
return res
@field('message')
@expect(proto.Success)
@session
def reset_device(self, display_random, strength, passphrase_protection, pin_protection, label, language, u2f_counter=0, skip_backup=False):
if self.features.initialized:
raise RuntimeError("Device is initialized already. Call wipe_device() and try again.")
# Begin with device reset workflow
msg = proto.ResetDevice(display_random=display_random,
strength=strength,
passphrase_protection=bool(passphrase_protection),
pin_protection=bool(pin_protection),
language=language,
label=label,
u2f_counter=u2f_counter,
skip_backup=bool(skip_backup))
resp = self.call(msg)
if not isinstance(resp, proto.EntropyRequest):
raise RuntimeError("Invalid response, expected EntropyRequest")
external_entropy = self._get_local_entropy()
LOG.debug("Computer generated entropy: " + binascii.hexlify(external_entropy).decode())
ret = self.call(proto.EntropyAck(entropy=external_entropy))
self.init_device()
return ret
@field('message')
@expect(proto.Success)
def backup_device(self):
ret = self.call(proto.BackupDevice())
return ret
@field('message')
@expect(proto.Success)
def load_device_by_mnemonic(self, mnemonic, pin, passphrase_protection, label, language='english', skip_checksum=False, expand=False):
# Convert mnemonic to UTF8 NKFD
mnemonic = Mnemonic.normalize_string(mnemonic)
# Convert mnemonic to ASCII stream
mnemonic = mnemonic.encode('utf-8')
m = Mnemonic('english')
if expand:
mnemonic = m.expand(mnemonic)
if not skip_checksum and not m.check(mnemonic):
raise ValueError("Invalid mnemonic checksum")
if self.features.initialized:
raise RuntimeError("Device is initialized already. Call wipe_device() and try again.")
resp = self.call(proto.LoadDevice(mnemonic=mnemonic, pin=pin,
passphrase_protection=passphrase_protection,
language=language,
label=label,
skip_checksum=skip_checksum))
self.init_device()
return resp
@field('message')
@expect(proto.Success)
def load_device_by_xprv(self, xprv, pin, passphrase_protection, label, language):
if self.features.initialized:
raise RuntimeError("Device is initialized already. Call wipe_device() and try again.")
if xprv[0:4] not in ('xprv', 'tprv'):
raise ValueError("Unknown type of xprv")
if not 100 < len(xprv) < 112: # yes this is correct in Python
raise ValueError("Invalid length of xprv")
node = proto.HDNodeType()
data = binascii.hexlify(tools.b58decode(xprv, None))
if data[90:92] != b'00':
raise ValueError("Contain invalid private key")
checksum = binascii.hexlify(tools.btc_hash(binascii.unhexlify(data[:156]))[:4])
if checksum != data[156:]:
raise ValueError("Checksum doesn't match")
# version 0488ade4
# depth 00
# fingerprint 00000000
# child_num 00000000
# chaincode 873dff81c02f525623fd1fe5167eac3a55a049de3d314bb42ee227ffed37d508
# privkey 00e8f32e723decf4051aefac8e2c93c9c5b214313817cdb01a1494b917c8436b35
# checksum e77e9d71
node.depth = int(data[8:10], 16)
node.fingerprint = int(data[10:18], 16)
node.child_num = int(data[18:26], 16)
node.chain_code = binascii.unhexlify(data[26:90])
node.private_key = binascii.unhexlify(data[92:156]) # skip 0x00 indicating privkey
resp = self.call(proto.LoadDevice(node=node,
pin=pin,
passphrase_protection=passphrase_protection,
language=language,
label=label))
self.init_device()
return resp
@session
def firmware_update(self, fp):
if self.features.bootloader_mode is False:
raise RuntimeError("Device must be in bootloader mode")
data = fp.read()
resp = self.call(proto.FirmwareErase(length=len(data)))
if isinstance(resp, proto.Failure) and resp.code == proto.FailureType.FirmwareError:
return False
# TREZORv1 method
if isinstance(resp, proto.Success):
fingerprint = hashlib.sha256(data[256:]).hexdigest()
LOG.debug("Firmware fingerprint: " + fingerprint)
resp = self.call(proto.FirmwareUpload(payload=data))
if isinstance(resp, proto.Success):
return True
elif isinstance(resp, proto.Failure) and resp.code == proto.FailureType.FirmwareError:
return False
raise RuntimeError("Unexpected result %s" % resp)
# TREZORv2 method
if isinstance(resp, proto.FirmwareRequest):
import pyblake2
while True:
payload = data[resp.offset:resp.offset + resp.length]
digest = pyblake2.blake2s(payload).digest()
resp = self.call(proto.FirmwareUpload(payload=payload, hash=digest))
if isinstance(resp, proto.FirmwareRequest):
continue
elif isinstance(resp, proto.Success):
return True
elif isinstance(resp, proto.Failure) and resp.code == proto.FailureType.FirmwareError:
return False
raise RuntimeError("Unexpected result %s" % resp)
raise RuntimeError("Unexpected message %s" % resp)
@field('message')
@expect(proto.Success)
def self_test(self):
if self.features.bootloader_mode is False:
raise RuntimeError("Device must be in bootloader mode")
return self.call(proto.SelfTest(payload=b'\x00\xFF\x55\xAA\x66\x99\x33\xCCABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!\x00\xFF\x55\xAA\x66\x99\x33\xCC'))
@field('public_key')
@expect(proto.StellarPublicKey)
def stellar_get_public_key(self, address_n, show_display=False):
return self.call(proto.StellarGetPublicKey(address_n=address_n, show_display=show_display))
@field('address')
@expect(proto.StellarAddress)
def stellar_get_address(self, address_n, show_display=False):
return self.call(proto.StellarGetAddress(address_n=address_n, show_display=show_display))
def stellar_sign_transaction(self, tx, operations, address_n, network_passphrase=None):
# default networkPassphrase to the public network
if network_passphrase is None:
network_passphrase = "Public Global Stellar Network ; September 2015"
tx.network_passphrase = network_passphrase
tx.address_n = address_n
tx.num_operations = len(operations)
# Signing loop works as follows:
#
# 1. Start with tx (header information for the transaction) and operations (an array of operation protobuf messagess)
# 2. Send the tx header to the device
# 3. Receive a StellarTxOpRequest message
# 4. Send operations one by one until all operations have been sent. If there are more operations to sign, the device will send a StellarTxOpRequest message
# 5. The final message received will be StellarSignedTx which is returned from this method
resp = self.call(tx)
try:
while isinstance(resp, proto.StellarTxOpRequest):
resp = self.call(operations.pop(0))
except IndexError:
# pop from empty list
raise CallException("Stellar.UnexpectedEndOfOperations",
"Reached end of operations without a signature.") from None
if not isinstance(resp, proto.StellarSignedTx):
raise CallException(proto.FailureType.UnexpectedMessage, resp)
if operations:
raise CallException("Stellar.UnprocessedOperations",
"Received a signature before processing all operations.")
return resp
# Miscellaneous cryptography
get_entropy = MovedTo(misc.get_entropy)
sign_identity = MovedTo(misc.sign_identity)
get_ecdh_session_key = MovedTo(misc.get_ecdh_session_key)
encrypt_keyvalue = MovedTo(misc.encrypt_keyvalue)
decrypt_keyvalue = MovedTo(misc.decrypt_keyvalue)
class TrezorClient(ProtocolMixin, TextUIMixin, BaseClient):

View File

@ -19,6 +19,9 @@ from functools import reduce
import binascii
from typing import Iterable, Tuple
from . import messages
from .tools import expect
from trezorlib import _ed25519
# XXX, these could be NewType's, but that would infect users of the cosi module with these types as well.
@ -86,3 +89,18 @@ def sign_with_privkey(digest: bytes, privkey: Ed25519PrivateKey,
a = 2 ** (b - 2) + sum(2 ** i * _ed25519.bit(h, i) for i in range(3, b - 2))
S = (nonce + _ed25519.Hint(global_commit + global_pubkey + digest) * a) % _ed25519.l
return Ed25519Signature(_ed25519.encodeint(S))
### Client functions ###
@expect(messages.CosiCommitment)
def commit(client, n, data):
n = client._convert_prime(n)
return client.call(messages.CosiCommit(address_n=n, data=data))
@expect(messages.CosiSignature)
def sign(client, n, data, global_commitment, global_pubkey):
n = client._convert_prime(n)
return client.call(messages.CosiSign(address_n=n, data=data, global_commitment=global_commitment, global_pubkey=global_pubkey))

View File

@ -14,7 +14,14 @@
# You should have received a copy of the License along with this library.
# If not, see <https://www.gnu.org/licenses/lgpl-3.0.html>.
import binascii
import os
import warnings
from mnemonic import Mnemonic
from . import messages as proto
from . import tools
from .tools import field, expect, session
from .transport import enumerate_devices, get_transport
@ -35,3 +42,204 @@ class TrezorDevice:
def find_by_path(cls, path):
warnings.warn('TrezorDevice is deprecated.', DeprecationWarning)
return get_transport(path, prefix_search=False)
@field('message')
@expect(proto.Success)
def apply_settings(client, label=None, language=None, use_passphrase=None, homescreen=None, passphrase_source=None, auto_lock_delay_ms=None):
settings = proto.ApplySettings()
if label is not None:
settings.label = label
if language:
settings.language = language
if use_passphrase is not None:
settings.use_passphrase = use_passphrase
if homescreen is not None:
settings.homescreen = homescreen
if passphrase_source is not None:
settings.passphrase_source = passphrase_source
if auto_lock_delay_ms is not None:
settings.auto_lock_delay_ms = auto_lock_delay_ms
out = client.call(settings)
client.init_device() # Reload Features
return out
@field('message')
@expect(proto.Success)
def apply_flags(client, flags):
out = client.call(proto.ApplyFlags(flags=flags))
client.init_device() # Reload Features
return out
@field('message')
@expect(proto.Success)
def change_pin(client, remove=False):
ret = client.call(proto.ChangePin(remove=remove))
client.init_device() # Re-read features
return ret
@field('message')
@expect(proto.Success)
def set_u2f_counter(client, u2f_counter):
ret = client.call(proto.SetU2FCounter(u2f_counter=u2f_counter))
return ret
@field('message')
@expect(proto.Success)
def wipe_device(client):
ret = client.call(proto.WipeDevice())
client.init_device()
return ret
@field('message')
@expect(proto.Success)
def recovery_device(client, word_count, passphrase_protection, pin_protection, label, language, type=proto.RecoveryDeviceType.ScrambledWords, expand=False, dry_run=False):
if client.features.initialized and not dry_run:
raise RuntimeError("Device is initialized already. Call wipe_device() and try again.")
if word_count not in (12, 18, 24):
raise ValueError("Invalid word count. Use 12/18/24")
client.recovery_matrix_first_pass = True
client.expand = expand
if client.expand:
# optimization to load the wordlist once, instead of for each recovery word
client.mnemonic_wordlist = Mnemonic('english')
res = client.call(proto.RecoveryDevice(
word_count=int(word_count),
passphrase_protection=bool(passphrase_protection),
pin_protection=bool(pin_protection),
label=label,
language=language,
enforce_wordlist=True,
type=type,
dry_run=dry_run))
client.init_device()
return res
@field('message')
@expect(proto.Success)
@session
def reset_device(client, display_random, strength, passphrase_protection, pin_protection, label, language, u2f_counter=0, skip_backup=False):
if client.features.initialized:
raise RuntimeError("Device is initialized already. Call wipe_device() and try again.")
# Begin with device reset workflow
msg = proto.ResetDevice(display_random=display_random,
strength=strength,
passphrase_protection=bool(passphrase_protection),
pin_protection=bool(pin_protection),
language=language,
label=label,
u2f_counter=u2f_counter,
skip_backup=bool(skip_backup))
resp = client.call(msg)
if not isinstance(resp, proto.EntropyRequest):
raise RuntimeError("Invalid response, expected EntropyRequest")
external_entropy = os.urandom(32)
# LOG.debug("Computer generated entropy: " + binascii.hexlify(external_entropy).decode())
ret = client.call(proto.EntropyAck(entropy=external_entropy))
client.init_device()
return ret
@field('message')
@expect(proto.Success)
def backup_device(client):
ret = client.call(proto.BackupDevice())
return ret
@field('message')
@expect(proto.Success)
def load_device_by_mnemonic(client, mnemonic, pin, passphrase_protection, label, language='english', skip_checksum=False, expand=False):
# Convert mnemonic to UTF8 NKFD
mnemonic = Mnemonic.normalize_string(mnemonic)
# Convert mnemonic to ASCII stream
mnemonic = mnemonic.encode('utf-8')
m = Mnemonic('english')
if expand:
mnemonic = m.expand(mnemonic)
if not skip_checksum and not m.check(mnemonic):
raise ValueError("Invalid mnemonic checksum")
if client.features.initialized:
raise RuntimeError("Device is initialized already. Call wipe_device() and try again.")
resp = client.call(proto.LoadDevice(mnemonic=mnemonic, pin=pin,
passphrase_protection=passphrase_protection,
language=language,
label=label,
skip_checksum=skip_checksum))
client.init_device()
return resp
@field('message')
@expect(proto.Success)
def load_device_by_xprv(client, xprv, pin, passphrase_protection, label, language):
if client.features.initialized:
raise RuntimeError("Device is initialized already. Call wipe_device() and try again.")
if xprv[0:4] not in ('xprv', 'tprv'):
raise ValueError("Unknown type of xprv")
if not 100 < len(xprv) < 112: # yes this is correct in Python
raise ValueError("Invalid length of xprv")
node = proto.HDNodeType()
data = binascii.hexlify(tools.b58decode(xprv, None))
if data[90:92] != b'00':
raise ValueError("Contain invalid private key")
checksum = binascii.hexlify(tools.btc_hash(binascii.unhexlify(data[:156]))[:4])
if checksum != data[156:]:
raise ValueError("Checksum doesn't match")
# version 0488ade4
# depth 00
# fingerprint 00000000
# child_num 00000000
# chaincode 873dff81c02f525623fd1fe5167eac3a55a049de3d314bb42ee227ffed37d508
# privkey 00e8f32e723decf4051aefac8e2c93c9c5b214313817cdb01a1494b917c8436b35
# checksum e77e9d71
node.depth = int(data[8:10], 16)
node.fingerprint = int(data[10:18], 16)
node.child_num = int(data[18:26], 16)
node.chain_code = binascii.unhexlify(data[26:90])
node.private_key = binascii.unhexlify(data[92:156]) # skip 0x00 indicating privkey
resp = client.call(proto.LoadDevice(node=node,
pin=pin,
passphrase_protection=passphrase_protection,
language=language,
label=label))
client.init_device()
return resp
@field('message')
@expect(proto.Success)
def self_test(client):
if client.features.bootloader_mode is False:
raise RuntimeError("Device must be in bootloader mode")
return client.call(proto.SelfTest(payload=b'\x00\xFF\x55\xAA\x66\x99\x33\xCCABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!\x00\xFF\x55\xAA\x66\x99\x33\xCC'))

69
trezorlib/ethereum.py Normal file
View File

@ -0,0 +1,69 @@
from . import messages as proto
from .tools import field, expect, CallException, normalize_nfc, session
def int_to_big_endian(value):
return value.to_bytes((value.bit_length() + 7) // 8, 'big')
### Client functions ###
@field('address')
@expect(proto.EthereumAddress)
def get_address(client, n, show_display=False, multisig=None):
n = client._convert_prime(n)
return client.call(proto.EthereumGetAddress(address_n=n, show_display=show_display))
@session
def sign_tx(client, n, nonce, gas_price, gas_limit, to, value, data=None, chain_id=None, tx_type=None):
n = client._convert_prime(n)
msg = proto.EthereumSignTx(
address_n=n,
nonce=int_to_big_endian(nonce),
gas_price=int_to_big_endian(gas_price),
gas_limit=int_to_big_endian(gas_limit),
value=int_to_big_endian(value))
if to:
msg.to = to
if data:
msg.data_length = len(data)
data, chunk = data[1024:], data[:1024]
msg.data_initial_chunk = chunk
if chain_id:
msg.chain_id = chain_id
if tx_type is not None:
msg.tx_type = tx_type
response = client.call(msg)
while response.data_length is not None:
data_length = response.data_length
data, chunk = data[data_length:], data[:data_length]
response = client.call(proto.EthereumTxAck(data_chunk=chunk))
return response.signature_v, response.signature_r, response.signature_s
@expect(proto.EthereumMessageSignature)
def sign_message(client, n, message):
n = client._convert_prime(n)
message = normalize_nfc(message)
return client.call(proto.EthereumSignMessage(address_n=n, message=message))
def verify_message(client, address, signature, message):
message = normalize_nfc(message)
try:
resp = client.call(proto.EthereumVerifyMessage(address=address, signature=signature, message=message))
except CallException as e:
resp = e
if isinstance(resp, proto.Success):
return True
return False

179
trezorlib/firmware.py Normal file
View File

@ -0,0 +1,179 @@
import binascii
import construct as c
import hashlib
import pyblake2
from . import cosi
from . import messages as proto
from . import tools
Toif = c.Struct(
"magic" / c.Const(b"TOI"),
"format" / c.Enum(c.Byte, full_color=b"f", grayscale=b"g"),
"width" / c.Int16ul,
"height" / c.Int16ul,
"data" / c.Prefixed(c.Int32ul, c.GreedyBytes),
)
def bytes_not(data):
return bytes(~b & 0xff for b in data)
VendorTrust = c.Transformed(c.BitStruct(
"reserved" / c.Padding(9),
"show_vendor_string" / c.Flag,
"require_user_click" / c.Flag,
"red_background" / c.Flag,
"delay" / c.BitsInteger(4),
), bytes_not, 2, bytes_not, 2)
VendorHeader = c.Struct(
"_start_offset" / c.Tell,
"magic" / c.Const(b"TRZV"),
"_header_len" / c.Padding(4),
"expiry" / c.Int32ul,
"version" / c.Struct(
"major" / c.Int8ul,
"minor" / c.Int8ul,
),
"vendor_sigs_required" / c.Int8ul,
"vendor_sigs_n" / c.Rebuild(c.Int8ul, c.len_(c.this.pubkeys)),
"vendor_trust" / VendorTrust,
"reserved" / c.Padding(14),
"pubkeys" / c.Bytes(32)[c.this.vendor_sigs_n],
"vendor_string" / c.Aligned(4, c.PascalString(c.Int8ul, "utf-8")),
"vendor_image" / Toif,
"_data_end_offset" / c.Tell,
c.Padding(-(c.this._data_end_offset + 65) % 512),
"sigmask" / c.Byte,
"signature" / c.Bytes(64),
"_end_offset" / c.Tell,
"header_len" / c.Pointer(
c.this._start_offset + 4,
c.Rebuild(c.Int32ul, c.this._end_offset - c.this._start_offset)
),
)
VersionLong = c.Struct(
"major" / c.Int8ul,
"minor" / c.Int8ul,
"patch" / c.Int8ul,
"build" / c.Int8ul,
)
FirmwareHeader = c.Struct(
"_start_offset" / c.Tell,
"magic" / c.Const(b"TRZF"),
"_header_len" / c.Padding(4),
"expiry" / c.Int32ul,
"code_length" / c.Rebuild(
c.Int32ul,
lambda this:
len(this._.code) if "code" in this._
else (this.code_length or 0)),
"version" / VersionLong,
"fix_version" / VersionLong,
"reserved" / c.Padding(8),
"hashes" / c.Bytes(32)[16],
"reserved" / c.Padding(415),
"sigmask" / c.Byte,
"signature" / c.Bytes(64),
"_end_offset" / c.Tell,
"header_len" / c.Pointer(c.this._start_offset + 4, c.Rebuild(c.Int32ul, c.this._end_offset - c.this._start_offset)),
)
Firmware = c.Struct(
"vendor_header" / VendorHeader,
"firmware_header" / FirmwareHeader,
"code" / c.Bytes(c.this.firmware_header.code_length),
c.Terminated,
)
def validate_firmware(filename):
with open(filename, "rb") as f:
data = f.read()
if data[:6] == b'54525a':
data = binascii.unhexlify(data)
try:
fw = Firmware.parse(data)
except Exception as e:
raise ValueError("Invalid firmware image format") from e
vendor = fw.vendor_header
header = fw.firmware_header
print("Vendor header from {}, version {}.{}".format(vendor.vendor_string, vendor.version.major, vendor.version.minor))
print("Firmware version {v.major}.{v.minor}.{v.patch} build {v.build}".format(v=header.version))
# rebuild header without signatures
stripped_header = header.copy()
stripped_header.sigmask = 0
stripped_header.signature = b'\0' * 64
header_bytes = FirmwareHeader.build(stripped_header)
digest = pyblake2.blake2s(header_bytes).digest()
print("Fingerprint: {}".format(binascii.hexlify(digest).decode("ascii")))
global_pk = cosi.combine_keys(vendor.pubkeys[i] for i in range(8) if header.sigmask & (1 << i))
try:
cosi.verify(header.signature, digest, global_pk)
print("Signature OK")
except:
print("Signature FAILED")
raise
### Client functions ###
@tools.session
def update(client, fp):
if client.features.bootloader_mode is False:
raise RuntimeError("Device must be in bootloader mode")
data = fp.read()
resp = client.call(proto.FirmwareErase(length=len(data)))
if isinstance(resp, proto.Failure) and resp.code == proto.FailureType.FirmwareError:
return False
# TREZORv1 method
if isinstance(resp, proto.Success):
fingerprint = hashlib.sha256(data[256:]).hexdigest()
# LOG.debug("Firmware fingerprint: " + fingerprint)
resp = client.call(proto.FirmwareUpload(payload=data))
if isinstance(resp, proto.Success):
return True
elif isinstance(resp, proto.Failure) and resp.code == proto.FailureType.FirmwareError:
return False
raise RuntimeError("Unexpected result %s" % resp)
# TREZORv2 method
if isinstance(resp, proto.FirmwareRequest):
import pyblake2
while True:
payload = data[resp.offset:resp.offset + resp.length]
digest = pyblake2.blake2s(payload).digest()
resp = client.call(proto.FirmwareUpload(payload=payload, hash=digest))
if isinstance(resp, proto.FirmwareRequest):
continue
elif isinstance(resp, proto.Success):
return True
elif isinstance(resp, proto.Failure) and resp.code == proto.FailureType.FirmwareError:
return False
raise RuntimeError("Unexpected result %s" % resp)
raise RuntimeError("Unexpected message %s" % resp)

78
trezorlib/lisk.py Normal file
View File

@ -0,0 +1,78 @@
import binascii
from . import messages as proto
from .tools import field, expect, CallException, normalize_nfc
@field('address')
@expect(proto.LiskAddress)
def get_address(client, n, show_display=False):
n = client._convert_prime(n)
return client.call(proto.LiskGetAddress(address_n=n, show_display=show_display))
@expect(proto.LiskPublicKey)
def get_public_key(client, n, show_display=False):
n = client._convert_prime(n)
return client.call(proto.LiskGetPublicKey(address_n=n, show_display=show_display))
@expect(proto.LiskMessageSignature)
def sign_message(client, n, message):
n = client._convert_prime(n)
message = normalize_nfc(message)
return client.call(proto.LiskSignMessage(address_n=n, message=message))
def verify_message(client, pubkey, signature, message):
message = normalize_nfc(message)
try:
resp = client.call(proto.LiskVerifyMessage(signature=signature, public_key=pubkey, message=message))
except CallException as e:
resp = e
return isinstance(resp, proto.Success)
def _asset_to_proto(asset):
msg = proto.LiskTransactionAsset()
if "votes" in asset:
msg.votes = asset["votes"]
if "data" in asset:
msg.data = asset["data"]
if "signature" in asset:
msg.signature = proto.LiskSignatureType()
msg.signature.public_key = binascii.unhexlify(asset["signature"]["publicKey"])
if "delegate" in asset:
msg.delegate = proto.LiskDelegateType()
msg.delegate.username = asset["delegate"]["username"]
if "multisignature" in asset:
msg.multisignature = proto.LiskMultisignatureType()
msg.multisignature.min = asset["multisignature"]["min"]
msg.multisignature.life_time = asset["multisignature"]["lifetime"]
msg.multisignature.keys_group = asset["multisignature"]["keysgroup"]
return msg
@expect(proto.LiskSignedTx)
def sign_tx(client, n, transaction):
n = client._convert_prime(n)
msg = proto.LiskTransactionCommon()
msg.type = transaction["type"]
msg.fee = int(transaction["fee"]) # Lisk use strings for big numbers (javascript issue)
msg.amount = int(transaction["amount"]) # And we convert it back to number
msg.timestamp = transaction["timestamp"]
if "recipientId" in transaction:
msg.recipient_id = transaction["recipientId"]
if "senderPublicKey" in transaction:
msg.sender_public_key = binascii.unhexlify(transaction["senderPublicKey"])
if "requesterPublicKey" in transaction:
msg.requester_public_key = binascii.unhexlify(transaction["requesterPublicKey"])
if "signature" in transaction:
msg.signature = binascii.unhexlify(transaction["signature"])
msg.asset = _asset_to_proto(transaction["asset"])
return client.call(proto.LiskSignTx(address_n=n, transaction=msg))

43
trezorlib/misc.py Normal file
View File

@ -0,0 +1,43 @@
from . import messages as proto
from .tools import field, expect
@field('entropy')
@expect(proto.Entropy)
def get_entropy(client, size):
return client.call(proto.GetEntropy(size=size))
@expect(proto.SignedIdentity)
def sign_identity(client, identity, challenge_hidden, challenge_visual, ecdsa_curve_name=None):
return client.call(proto.SignIdentity(identity=identity, challenge_hidden=challenge_hidden, challenge_visual=challenge_visual, ecdsa_curve_name=ecdsa_curve_name))
@expect(proto.ECDHSessionKey)
def get_ecdh_session_key(client, identity, peer_public_key, ecdsa_curve_name=None):
return client.call(proto.GetECDHSessionKey(identity=identity, peer_public_key=peer_public_key, ecdsa_curve_name=ecdsa_curve_name))
@field('value')
@expect(proto.CipheredKeyValue)
def encrypt_keyvalue(client, n, key, value, ask_on_encrypt=True, ask_on_decrypt=True, iv=b''):
n = client._convert_prime(n)
return client.call(proto.CipherKeyValue(address_n=n,
key=key,
value=value,
encrypt=True,
ask_on_encrypt=ask_on_encrypt,
ask_on_decrypt=ask_on_decrypt,
iv=iv))
@field('value')
@expect(proto.CipheredKeyValue)
def decrypt_keyvalue(client, n, key, value, ask_on_encrypt=True, ask_on_decrypt=True, iv=b''):
n = client._convert_prime(n)
return client.call(proto.CipherKeyValue(address_n=n,
key=key,
value=value,
encrypt=False,
ask_on_encrypt=ask_on_encrypt,
ask_on_decrypt=ask_on_decrypt,
iv=iv))

View File

@ -17,6 +17,7 @@
import binascii
import json
from . import messages as proto
from .tools import expect, field, CallException
TYPE_TRANSACTION_TRANSFER = 0x0101
TYPE_IMPORTANCE_TRANSFER = 0x0801
@ -164,3 +165,27 @@ def create_sign_tx(transaction):
raise ValueError("Unknown transaction type")
return msg
### Client functions ###
@field("address")
@expect(proto.NEMAddress)
def get_address(client, n, network, show_display=False):
n = client._convert_prime(n)
return client.call(proto.NEMGetAddress(address_n=n, network=network, show_display=show_display))
@expect(proto.NEMSignedTx)
def sign_tx(client, n, transaction):
n = client._convert_prime(n)
try:
msg = create_sign_tx(transaction)
except ValueError as e:
raise CallException(e.args)
assert msg.transaction is not None
msg.transaction.address_n = n
return client.call(msg)

View File

@ -19,6 +19,7 @@ import struct
import xdrlib
from . import messages
from .tools import field, expect, CallException
# Memo types
MEMO_TYPE_NONE = 0
@ -49,6 +50,7 @@ OP_BUMP_SEQUENCE = 11
DEFAULT_BIP32_PATH = "m/44h/148h/0h"
# Stellar's BIP32 differs to Bitcoin's see https://github.com/stellar/stellar-protocol/blob/master/ecosystem/sep-0005.md
DEFAULT_NETWORK_PASSPHRASE = "Public Global Stellar Network ; September 2015"
def address_from_public_key(pk_bytes):
@ -336,3 +338,48 @@ def _crc16_checksum(bytes):
crc ^= polynomial
return crc & 0xffff
### Client functions ###
@field('public_key')
@expect(messages.StellarPublicKey)
def get_public_key(client, address_n, show_display=False):
return client.call(messages.StellarGetPublicKey(address_n=address_n, show_display=show_display))
@field('address')
@expect(messages.StellarAddress)
def get_address(client, address_n, show_display=False):
return client.call(messages.StellarGetAddress(address_n=address_n, show_display=show_display))
def sign_tx(client, tx, operations, address_n, network_passphrase=DEFAULT_NETWORK_PASSPHRASE):
tx.network_passphrase = network_passphrase
tx.address_n = address_n
tx.num_operations = len(operations)
# Signing loop works as follows:
#
# 1. Start with tx (header information for the transaction) and operations (an array of operation protobuf messagess)
# 2. Send the tx header to the device
# 3. Receive a StellarTxOpRequest message
# 4. Send operations one by one until all operations have been sent. If there are more operations to sign, the device will send a StellarTxOpRequest message
# 5. The final message received will be StellarSignedTx which is returned from this method
resp = client.call(tx)
try:
while isinstance(resp, messages.StellarTxOpRequest):
resp = client.call(operations.pop(0))
except IndexError:
# pop from empty list
raise CallException("Stellar.UnexpectedEndOfOperations",
"Reached end of operations without a signature.") from None
if not isinstance(resp, messages.StellarSignedTx):
raise CallException(messages.FailureType.UnexpectedMessage, resp)
if operations:
raise CallException("Stellar.UnprocessedOperations",
"Received a signature before processing all operations.")
return resp