2016-11-25 21:53:55 +00:00
# This file is part of the TREZOR project.
#
# Copyright (C) 2012-2016 Marek Palatinus <slush@satoshilabs.com>
# Copyright (C) 2012-2016 Pavol Rusnak <stick@satoshilabs.com>
# Copyright (C) 2016 Jochen Hoenicke <hoenicke@gmail.com>
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this library. If not, see <http://www.gnu.org/licenses/>.
2018-05-11 13:24:24 +00:00
import functools
import logging
2013-09-13 03:31:24 +00:00
import os
2014-06-12 11:26:24 +00:00
import sys
2014-04-09 19:42:30 +00:00
import time
2014-01-06 00:54:53 +00:00
import binascii
2014-01-08 14:59:18 +00:00
import hashlib
2014-02-21 01:30:10 +00:00
import unicodedata
2014-11-07 00:09:53 +00:00
import getpass
2017-12-29 18:18:04 +00:00
import warnings
2013-09-13 03:31:24 +00:00
2014-02-21 00:48:11 +00:00
from mnemonic import Mnemonic
2013-09-13 03:31:24 +00:00
2017-12-12 15:40:11 +00:00
from . import messages as proto
2016-05-05 02:57:14 +00:00
from . import tools
2017-12-17 02:17:37 +00:00
from . import mapping
2018-03-15 15:14:24 +00:00
from . import nem
2018-04-20 13:52:47 +00:00
from . import protobuf
2018-04-19 19:29:36 +00:00
from . import stellar
2016-05-05 02:57:14 +00:00
from . debuglink import DebugLink
2017-06-23 19:31:42 +00:00
2017-12-29 18:18:04 +00:00
if sys . version_info . major < 3 :
2018-02-27 15:30:32 +00:00
raise Exception ( " Trezorlib does not support Python 2 anymore. " )
2017-12-29 18:18:04 +00:00
2014-11-07 00:58:20 +00:00
SCREENSHOT = False
2018-05-11 13:24:24 +00:00
LOG = logging . getLogger ( __name__ )
2016-06-29 20:40:37 +00:00
2018-03-07 14:16:01 +00:00
# make a getch function
try :
import termios
2018-03-07 14:28:35 +00:00
import tty
2016-06-29 20:40:37 +00:00
# POSIX system. Create and return a getch that manipulates the tty.
2018-03-07 14:16:01 +00:00
# On Windows, termios will fail to import.
2017-06-23 19:31:42 +00:00
2018-03-07 14:16:01 +00:00
def getch ( ) :
2016-06-29 20:40:37 +00:00
fd = sys . stdin . fileno ( )
old_settings = termios . tcgetattr ( fd )
try :
tty . setraw ( fd )
ch = sys . stdin . read ( 1 )
finally :
termios . tcsetattr ( fd , termios . TCSADRAIN , old_settings )
return ch
2018-03-07 14:16:01 +00:00
except ImportError :
# Windows system.
# Use msvcrt's getch function.
import msvcrt
def getch ( ) :
while True :
key = msvcrt . getch ( )
if key in ( 0x00 , 0xe0 ) :
# skip special keys: read the scancode and repeat
msvcrt . getch ( )
continue
return key . decode ( ' latin1 ' )
2016-06-29 20:40:37 +00:00
2017-06-23 19:31:42 +00:00
2014-02-13 15:46:21 +00:00
def get_buttonrequest_value ( code ) :
# Converts integer code to its string representation of ButtonRequestType
2017-12-12 15:40:11 +00:00
return [ k for k in dir ( proto . ButtonRequestType ) if getattr ( proto . ButtonRequestType , k ) == code ] [ 0 ]
2017-06-23 19:31:42 +00:00
2014-02-02 17:27:44 +00:00
2013-09-13 03:31:24 +00:00
class CallException ( Exception ) :
2018-05-11 13:27:39 +00:00
pass
2018-02-22 15:51:32 +00:00
2013-09-13 03:31:24 +00:00
class PinException ( CallException ) :
pass
2017-06-23 19:31:42 +00:00
2018-05-11 13:27:39 +00:00
class field :
2014-02-13 15:46:21 +00:00
# Decorator extracts single value from
# protobuf object. If the field is not
# present, raises an exception.
def __init__ ( self , field ) :
self . field = field
def __call__ ( self , f ) :
2018-05-11 13:27:39 +00:00
@functools.wraps ( f )
2014-02-13 15:46:21 +00:00
def wrapped_f ( * args , * * kwargs ) :
ret = f ( * args , * * kwargs )
return getattr ( ret , self . field )
return wrapped_f
2017-06-23 19:31:42 +00:00
2018-05-11 13:27:39 +00:00
class expect :
2014-02-13 15:46:21 +00:00
# Decorator checks if the method
# returned one of expected protobuf messages
# or raises an exception
def __init__ ( self , * expected ) :
self . expected = expected
2016-01-12 23:17:38 +00:00
2014-02-13 15:46:21 +00:00
def __call__ ( self , f ) :
2018-05-11 13:27:39 +00:00
@functools.wraps ( f )
2014-02-13 15:46:21 +00:00
def wrapped_f ( * args , * * kwargs ) :
ret = f ( * args , * * kwargs )
if not isinstance ( ret , self . expected ) :
2017-11-06 10:09:54 +00:00
raise RuntimeError ( " Got %s , expected %s " % ( ret . __class__ , self . expected ) )
2014-02-13 15:46:21 +00:00
return ret
return wrapped_f
2017-06-23 19:31:42 +00:00
2016-09-13 10:25:06 +00:00
def session ( f ) :
# Decorator wraps a BaseClient method
# with session activation / deactivation
2018-05-11 13:27:39 +00:00
@functools.wraps ( f )
2016-09-13 10:25:06 +00:00
def wrapped_f ( * args , * * kwargs ) :
2018-05-28 12:45:54 +00:00
__tracebackhide__ = True # pytest traceback hiding - this function won't appear in tracebacks
2016-09-13 10:25:06 +00:00
client = args [ 0 ]
2017-09-04 11:36:08 +00:00
client . transport . session_begin ( )
2016-09-13 10:25:06 +00:00
try :
return f ( * args , * * kwargs )
finally :
2017-09-04 11:36:08 +00:00
client . transport . session_end ( )
2016-09-13 10:25:06 +00:00
return wrapped_f
2017-06-23 19:31:42 +00:00
2014-02-21 01:30:10 +00:00
def normalize_nfc ( txt ) :
2018-03-06 12:58:39 +00:00
'''
Normalize message to NFC and return bytes suitable for protobuf .
This seems to be bitcoin - qt standard of doing things .
'''
2018-02-27 15:30:32 +00:00
if isinstance ( txt , bytes ) :
2018-03-06 12:58:39 +00:00
txt = txt . decode ( ' utf-8 ' )
return unicodedata . normalize ( ' NFC ' , txt ) . encode ( ' utf-8 ' )
2014-02-21 01:30:10 +00:00
2017-06-23 19:31:42 +00:00
2014-02-13 15:46:21 +00:00
class BaseClient ( object ) :
# Implements very basic layer of sending raw protobuf
# messages to device and getting its response back.
2017-09-04 11:36:08 +00:00
def __init__ ( self , transport , * * kwargs ) :
2018-05-24 17:14:05 +00:00
LOG . info ( " creating client instance for device: {} " . format ( transport . get_path ( ) ) )
2017-09-04 11:36:08 +00:00
self . transport = transport
2014-02-13 16:20:40 +00:00
super ( BaseClient , self ) . __init__ ( ) # *args, **kwargs)
2013-12-30 22:35:20 +00:00
2017-09-04 09:30:34 +00:00
def close ( self ) :
pass
2016-02-10 15:46:58 +00:00
def cancel ( self ) :
2017-09-04 11:36:08 +00:00
self . transport . write ( proto . Cancel ( ) )
2016-02-10 15:46:58 +00:00
2016-09-13 10:25:06 +00:00
@session
2014-02-15 19:30:39 +00:00
def call_raw ( self , msg ) :
2018-05-28 12:45:54 +00:00
__tracebackhide__ = True # pytest traceback hiding - this function won't appear in tracebacks
2017-09-04 11:36:08 +00:00
self . transport . write ( msg )
return self . transport . read ( )
2014-02-15 19:30:39 +00:00
2016-09-13 10:25:06 +00:00
@session
2014-02-15 19:30:39 +00:00
def call ( self , msg ) :
2016-09-13 10:25:06 +00:00
resp = self . call_raw ( msg )
handler_name = " callback_ %s " % resp . __class__ . __name__
handler = getattr ( self , handler_name , None )
2014-01-13 03:44:57 +00:00
2017-06-23 19:31:42 +00:00
if handler is not None :
2016-09-13 10:25:06 +00:00
msg = handler ( resp )
2017-06-23 19:31:42 +00:00
if msg is None :
2017-11-06 10:09:54 +00:00
raise ValueError ( " Callback %s must return protobuf message, not None " % handler )
2016-09-13 10:25:06 +00:00
resp = self . call ( msg )
2014-02-13 15:46:21 +00:00
return resp
def callback_Failure ( self , msg ) :
2017-12-12 15:40:11 +00:00
if msg . code in ( proto . FailureType . PinInvalid ,
proto . FailureType . PinCancelled , proto . FailureType . PinExpected ) :
2014-02-13 15:46:21 +00:00
raise PinException ( msg . code , msg . message )
raise CallException ( msg . code , msg . message )
2017-12-17 02:17:37 +00:00
def register_message ( self , msg ) :
''' Allow application to register custom protobuf message type '''
mapping . register_message ( msg )
2017-06-23 19:31:42 +00:00
2014-02-13 15:46:21 +00:00
class TextUIMixin ( object ) :
# This class demonstrates easy test-based UI
# integration between the device and wallet.
# You can implement similar functionality
# by implementing your own GuiMixin with
# graphical widgets for every type of these callbacks.
2014-02-13 16:20:40 +00:00
def __init__ ( self , * args , * * kwargs ) :
super ( TextUIMixin , self ) . __init__ ( * args , * * kwargs )
2018-05-11 13:24:24 +00:00
@staticmethod
def print ( text ) :
print ( text , file = sys . stderr )
2014-02-13 15:46:21 +00:00
def callback_ButtonRequest ( self , msg ) :
2014-06-12 15:22:22 +00:00
# log("Sending ButtonAck for %s " % get_buttonrequest_value(msg.code))
2014-02-13 15:46:21 +00:00
return proto . ButtonAck ( )
2016-06-29 20:40:37 +00:00
def callback_RecoveryMatrix ( self , msg ) :
if self . recovery_matrix_first_pass :
self . recovery_matrix_first_pass = False
2018-05-11 13:24:24 +00:00
self . print ( " Use the numeric keypad to describe positions. For the word list use only left and right keys. " )
self . print ( " Use backspace to correct an entry. The keypad layout is: " )
self . print ( " 7 8 9 7 | 9 " )
self . print ( " 4 5 6 4 | 6 " )
self . print ( " 1 2 3 1 | 3 " )
2016-06-29 20:40:37 +00:00
while True :
character = getch ( )
if character in ( ' \x03 ' , ' \x04 ' ) :
return proto . Cancel ( )
if character in ( ' \x08 ' , ' \x7f ' ) :
return proto . WordAck ( word = ' \x08 ' )
# ignore middle column if only 6 keys requested.
2017-12-24 21:12:54 +00:00
if msg . type == proto . WordRequestType . Matrix6 and character in ( ' 2 ' , ' 5 ' , ' 8 ' ) :
2016-06-29 20:40:37 +00:00
continue
2017-07-31 11:35:31 +00:00
if character . isdigit ( ) :
2016-06-29 20:40:37 +00:00
return proto . WordAck ( word = character )
2014-02-13 15:46:21 +00:00
def callback_PinMatrixRequest ( self , msg ) :
2017-12-21 19:12:35 +00:00
if msg . type == proto . PinMatrixRequestType . Current :
2015-02-28 13:06:23 +00:00
desc = ' current PIN '
2017-12-21 19:12:35 +00:00
elif msg . type == proto . PinMatrixRequestType . NewFirst :
2014-04-02 18:06:54 +00:00
desc = ' new PIN '
2017-12-21 19:12:35 +00:00
elif msg . type == proto . PinMatrixRequestType . NewSecond :
2014-04-02 18:06:54 +00:00
desc = ' new PIN again '
2014-03-28 15:26:48 +00:00
else :
2014-04-02 18:06:54 +00:00
desc = ' PIN '
2014-06-12 15:22:22 +00:00
2018-05-11 13:24:24 +00:00
self . print ( " Use the numeric keypad to describe number positions. The layout is: " )
self . print ( " 7 8 9 " )
self . print ( " 4 5 6 " )
self . print ( " 1 2 3 " )
self . print ( " Please enter %s : " % desc )
2014-11-07 00:09:53 +00:00
pin = getpass . getpass ( ' ' )
2017-11-28 18:59:06 +00:00
if not pin . isdigit ( ) :
raise ValueError ( ' Non-numerical PIN provided ' )
2014-02-13 15:46:21 +00:00
return proto . PinMatrixAck ( pin = pin )
def callback_PassphraseRequest ( self , msg ) :
2018-02-14 18:11:21 +00:00
if msg . on_device is True :
return proto . PassphraseAck ( )
2018-01-29 17:04:48 +00:00
if os . getenv ( " PASSPHRASE " ) is not None :
2018-05-11 13:24:24 +00:00
self . print ( " Passphrase required. Using PASSPHRASE environment variable. " )
2018-01-29 17:04:48 +00:00
passphrase = Mnemonic . normalize_string ( os . getenv ( " PASSPHRASE " ) )
return proto . PassphraseAck ( passphrase = passphrase )
2018-05-11 13:24:24 +00:00
self . print ( " Passphrase required: " )
2014-11-07 00:09:53 +00:00
passphrase = getpass . getpass ( ' ' )
2018-05-11 13:24:24 +00:00
self . print ( " Confirm your Passphrase: " )
2014-11-23 12:28:09 +00:00
if passphrase == getpass . getpass ( ' ' ) :
2017-11-13 21:13:11 +00:00
passphrase = Mnemonic . normalize_string ( passphrase )
2014-11-07 00:09:53 +00:00
return proto . PassphraseAck ( passphrase = passphrase )
else :
2018-05-11 13:24:24 +00:00
self . print ( " Passphrase did not match! " )
2014-11-07 00:09:53 +00:00
exit ( )
2014-02-13 15:46:21 +00:00
2018-02-28 22:12:55 +00:00
def callback_PassphraseStateRequest ( self , msg ) :
return proto . PassphraseStateAck ( )
2014-02-13 15:46:21 +00:00
def callback_WordRequest ( self , msg ) :
2017-12-24 21:12:54 +00:00
if msg . type in ( proto . WordRequestType . Matrix9 ,
proto . WordRequestType . Matrix6 ) :
2016-06-29 20:40:37 +00:00
return self . callback_RecoveryMatrix ( msg )
2018-05-11 13:24:24 +00:00
self . print ( " Enter one word of mnemonic: " )
2017-06-23 19:31:42 +00:00
word = input ( )
2017-02-11 19:15:09 +00:00
if self . expand :
word = self . mnemonic_wordlist . expand_word ( word )
2014-02-13 15:46:21 +00:00
return proto . WordAck ( word = word )
2014-02-21 06:28:10 +00:00
2017-06-23 19:31:42 +00:00
2014-02-13 15:46:21 +00:00
class DebugLinkMixin ( object ) :
# This class implements automatic responses
# and other functionality for unit tests
# for various callbacks, created in order
# to automatically pass unit tests.
#
# This mixing should be used only for purposes
# of unit testing, because it will fail to work
# without special DebugLink interface provided
# by the device.
2018-05-11 13:24:24 +00:00
DEBUG = LOG . getChild ( ' debug_link ' ) . debug
2014-02-13 15:46:21 +00:00
def __init__ ( self , * args , * * kwargs ) :
super ( DebugLinkMixin , self ) . __init__ ( * args , * * kwargs )
self . debug = None
2014-02-21 06:28:10 +00:00
self . in_with_statement = 0
2014-12-10 14:26:18 +00:00
self . button_wait = 0
2014-11-07 00:58:20 +00:00
self . screenshot_id = 0
2014-02-13 15:46:21 +00:00
# Always press Yes and provide correct pin
self . setup_debuglink ( True , True )
2016-01-12 23:17:38 +00:00
2014-02-15 19:30:39 +00:00
# Do not expect any specific response from device
2014-02-21 06:28:10 +00:00
self . expected_responses = None
2014-02-15 19:30:39 +00:00
# Use blank passphrase
self . set_passphrase ( ' ' )
2014-02-13 15:46:21 +00:00
def close ( self ) :
super ( DebugLinkMixin , self ) . close ( )
if self . debug :
self . debug . close ( )
def set_debuglink ( self , debug_transport ) :
self . debug = DebugLink ( debug_transport )
2014-12-10 14:26:18 +00:00
def set_buttonwait ( self , secs ) :
self . button_wait = secs
2014-02-21 06:28:10 +00:00
def __enter__ ( self ) :
# For usage in with/expected_responses
self . in_with_statement + = 1
return self
2014-02-21 20:00:56 +00:00
def __exit__ ( self , _type , value , traceback ) :
2014-02-21 06:28:10 +00:00
self . in_with_statement - = 1
2017-06-23 19:31:42 +00:00
if _type is not None :
2014-02-21 20:00:56 +00:00
# Another exception raised
return False
# return isinstance(value, TypeError)
2014-02-21 06:28:10 +00:00
# Evaluate missed responses in 'with' statement
2017-06-23 19:31:42 +00:00
if self . expected_responses is not None and len ( self . expected_responses ) :
2017-11-06 10:09:54 +00:00
raise RuntimeError ( " Some of expected responses didn ' t come from device: %s " %
2018-05-09 16:11:38 +00:00
[ repr ( x ) for x in self . expected_responses ] )
2014-02-21 06:28:10 +00:00
# Cleanup
self . expected_responses = None
return False
2014-02-15 19:30:39 +00:00
def set_expected_responses ( self , expected ) :
2014-02-21 06:28:10 +00:00
if not self . in_with_statement :
2017-11-06 10:09:54 +00:00
raise RuntimeError ( " Must be called inside ' with ' statement " )
2014-02-15 19:30:39 +00:00
self . expected_responses = expected
2014-02-13 15:46:21 +00:00
def setup_debuglink ( self , button , pin_correct ) :
self . button = button # True -> YES button, False -> NO button
self . pin_correct = pin_correct
2014-02-13 16:20:40 +00:00
2014-02-15 19:30:39 +00:00
def set_passphrase ( self , passphrase ) :
2017-11-13 21:13:11 +00:00
self . passphrase = Mnemonic . normalize_string ( passphrase )
2014-02-21 20:00:56 +00:00
def set_mnemonic ( self , mnemonic ) :
2017-11-13 21:13:11 +00:00
self . mnemonic = Mnemonic . normalize_string ( mnemonic ) . split ( ' ' )
2014-02-15 19:30:39 +00:00
def call_raw ( self , msg ) :
2018-05-28 12:45:54 +00:00
__tracebackhide__ = True # pytest traceback hiding - this function won't appear in tracebacks
2014-11-07 00:58:20 +00:00
if SCREENSHOT and self . debug :
2018-04-03 19:25:07 +00:00
from PIL import Image
2014-11-07 00:58:20 +00:00
layout = self . debug . read_layout ( )
im = Image . new ( " RGB " , ( 128 , 64 ) )
pix = im . load ( )
for x in range ( 128 ) :
for y in range ( 64 ) :
rx , ry = 127 - x , 63 - y
if ( ord ( layout [ rx + ( ry / 8 ) * 128 ] ) & ( 1 << ( ry % 8 ) ) ) > 0 :
pix [ x , y ] = ( 255 , 255 , 255 )
im . save ( ' scr %05d .png ' % self . screenshot_id )
self . screenshot_id + = 1
2014-02-15 19:30:39 +00:00
resp = super ( DebugLinkMixin , self ) . call_raw ( msg )
self . _check_request ( resp )
return resp
2016-01-12 23:17:38 +00:00
2014-02-15 19:30:39 +00:00
def _check_request ( self , msg ) :
2018-05-28 12:45:54 +00:00
__tracebackhide__ = True # pytest traceback hiding - this function won't appear in tracebacks
2017-06-23 19:31:42 +00:00
if self . expected_responses is not None :
2014-02-13 15:46:21 +00:00
try :
2014-02-15 19:30:39 +00:00
expected = self . expected_responses . pop ( 0 )
2014-02-13 15:46:21 +00:00
except IndexError :
2018-05-11 13:27:39 +00:00
raise AssertionError ( proto . FailureType . UnexpectedMessage ,
" Got %s , but no message has been expected " % repr ( msg ) )
2014-02-13 15:46:21 +00:00
2014-02-15 19:30:39 +00:00
if msg . __class__ != expected . __class__ :
2018-05-11 13:27:39 +00:00
raise AssertionError ( proto . FailureType . UnexpectedMessage ,
" Expected %s , got %s " % ( repr ( expected ) , repr ( msg ) ) )
2014-02-15 19:30:39 +00:00
2017-12-12 15:40:11 +00:00
for field , value in expected . __dict__ . items ( ) :
2018-01-07 17:07:13 +00:00
if value is None or value == [ ] :
2018-01-04 15:55:27 +00:00
continue
if getattr ( msg , field ) != value :
2018-05-11 13:27:39 +00:00
raise AssertionError ( proto . FailureType . UnexpectedMessage ,
" Expected %s , got %s " % ( repr ( expected ) , repr ( msg ) ) )
2016-01-12 23:17:38 +00:00
2014-02-15 19:30:39 +00:00
def callback_ButtonRequest ( self , msg ) :
2018-05-11 13:24:24 +00:00
self . DEBUG ( " ButtonRequest code: " + get_buttonrequest_value ( msg . code ) )
2014-02-13 15:46:21 +00:00
2018-05-11 13:24:24 +00:00
self . DEBUG ( " Pressing button " + str ( self . button ) )
2014-12-10 14:26:18 +00:00
if self . button_wait :
2018-05-11 13:24:24 +00:00
self . DEBUG ( " Waiting %d seconds " % self . button_wait )
2014-12-10 14:26:18 +00:00
time . sleep ( self . button_wait )
2014-02-13 15:46:21 +00:00
self . debug . press_button ( self . button )
return proto . ButtonAck ( )
def callback_PinMatrixRequest ( self , msg ) :
if self . pin_correct :
pin = self . debug . read_pin_encoded ( )
2014-01-13 03:44:57 +00:00
else :
2014-02-13 15:46:21 +00:00
pin = ' 444222 '
return proto . PinMatrixAck ( pin = pin )
def callback_PassphraseRequest ( self , msg ) :
2018-05-11 13:24:24 +00:00
self . DEBUG ( " Provided passphrase: ' %s ' " % self . passphrase )
2014-02-15 19:30:39 +00:00
return proto . PassphraseAck ( passphrase = self . passphrase )
2014-01-13 03:44:57 +00:00
2018-02-28 22:12:55 +00:00
def callback_PassphraseStateRequest ( self , msg ) :
return proto . PassphraseStateAck ( )
2014-02-13 15:46:21 +00:00
def callback_WordRequest ( self , msg ) :
2014-03-07 16:25:55 +00:00
( word , pos ) = self . debug . read_recovery_word ( )
2014-02-21 20:00:56 +00:00
if word != ' ' :
return proto . WordAck ( word = word )
if pos != 0 :
return proto . WordAck ( word = self . mnemonic [ pos - 1 ] )
2017-11-06 10:09:54 +00:00
raise RuntimeError ( " Unexpected call " )
2014-02-13 15:46:21 +00:00
2017-06-23 19:31:42 +00:00
2014-02-13 15:46:21 +00:00
class ProtocolMixin ( object ) :
2017-01-17 13:13:02 +00:00
VENDORS = ( ' bitcointrezor.com ' , ' trezor.io ' )
2014-02-13 15:46:21 +00:00
2018-03-27 12:52:32 +00:00
def __init__ ( self , state = None , * args , * * kwargs ) :
2014-02-13 16:20:40 +00:00
super ( ProtocolMixin , self ) . __init__ ( * args , * * kwargs )
2018-03-27 12:52:32 +00:00
self . state = state
2013-09-13 03:31:24 +00:00
self . init_device ( )
2014-03-28 18:47:53 +00:00
self . tx_api = None
2014-02-13 15:46:21 +00:00
2014-03-28 18:47:53 +00:00
def set_tx_api ( self , tx_api ) :
self . tx_api = tx_api
2014-02-13 15:46:21 +00:00
def init_device ( self ) :
2018-03-27 12:52:32 +00:00
init_msg = proto . Initialize ( )
if self . state is not None :
init_msg . state = self . state
self . features = expect ( proto . Features ) ( self . call ) ( init_msg )
2015-01-28 04:31:30 +00:00
if str ( self . features . vendor ) not in self . VENDORS :
2017-11-06 10:09:54 +00:00
raise RuntimeError ( " Unsupported device " )
2014-01-27 10:25:27 +00:00
2013-09-13 03:31:24 +00:00
def _get_local_entropy ( self ) :
return os . urandom ( 32 )
2013-12-30 22:35:20 +00:00
2018-04-18 13:00:59 +00:00
@staticmethod
def _convert_prime ( n : tools . Address ) - > tools . Address :
2013-12-30 22:35:20 +00:00
# Convert minus signs to uint32 with flag
2018-04-18 13:00:59 +00:00
return [ tools . H_ ( int ( abs ( x ) ) ) if x < 0 else x for x in n ]
2014-01-27 10:25:27 +00:00
2016-02-10 14:53:14 +00:00
@staticmethod
def expand_path ( n ) :
2018-04-18 13:00:59 +00:00
warnings . warn ( ' expand_path is deprecated, use tools.parse_path ' , DeprecationWarning )
return tools . parse_path ( n )
2014-01-09 16:34:29 +00:00
2014-02-13 15:46:21 +00:00
@expect ( proto . PublicKey )
2017-12-19 16:30:32 +00:00
def get_public_node ( self , n , ecdsa_curve_name = None , show_display = False , coin_name = None ) :
2014-02-21 17:56:51 +00:00
n = self . _convert_prime ( n )
2017-04-21 17:14:15 +00:00
return self . call ( proto . GetPublicKey ( address_n = n , ecdsa_curve_name = ecdsa_curve_name , show_display = show_display , coin_name = coin_name ) )
2014-01-27 10:25:27 +00:00
2014-02-13 15:46:21 +00:00
@field ( ' address ' )
@expect ( proto . Address )
2017-12-12 15:40:11 +00:00
def get_address ( self , coin_name , n , show_display = False , multisig = None , script_type = proto . InputScriptType . SPENDADDRESS ) :
2014-01-13 03:44:57 +00:00
n = self . _convert_prime ( n )
2014-12-10 17:03:54 +00:00
if multisig :
2017-01-02 20:25:07 +00:00
return self . call ( proto . GetAddress ( address_n = n , coin_name = coin_name , show_display = show_display , multisig = multisig , script_type = script_type ) )
2014-12-10 17:03:54 +00:00
else :
2017-01-02 20:25:07 +00:00
return self . call ( proto . GetAddress ( address_n = n , coin_name = coin_name , show_display = show_display , script_type = script_type ) )
2014-01-27 10:25:27 +00:00
2016-05-24 19:32:58 +00:00
@field ( ' address ' )
@expect ( proto . EthereumAddress )
2016-05-26 06:47:04 +00:00
def ethereum_get_address ( self , n , show_display = False , multisig = None ) :
2016-05-24 19:32:58 +00:00
n = self . _convert_prime ( n )
return self . call ( proto . EthereumGetAddress ( address_n = n , show_display = show_display ) )
2016-09-13 10:25:06 +00:00
@session
2018-04-11 10:24:13 +00:00
def ethereum_sign_tx ( self , n , nonce , gas_price , gas_limit , to , value , data = None , chain_id = None , tx_type = None ) :
2016-08-19 21:05:54 +00:00
def int_to_big_endian ( value ) :
2018-05-04 15:43:58 +00:00
return value . to_bytes ( ( value . bit_length ( ) + 7 ) / / 8 , ' big ' )
2016-05-27 06:13:23 +00:00
n = self . _convert_prime ( n )
2016-09-13 10:25:06 +00:00
msg = proto . EthereumSignTx (
address_n = n ,
nonce = int_to_big_endian ( nonce ) ,
gas_price = int_to_big_endian ( gas_price ) ,
gas_limit = int_to_big_endian ( gas_limit ) ,
value = int_to_big_endian ( value ) )
2016-08-10 16:13:05 +00:00
2016-09-13 10:25:06 +00:00
if to :
msg . to = to
2016-08-22 23:14:23 +00:00
2016-09-13 10:25:06 +00:00
if data :
msg . data_length = len ( data )
data , chunk = data [ 1024 : ] , data [ : 1024 ]
msg . data_initial_chunk = chunk
2016-08-10 16:13:05 +00:00
2017-01-22 12:51:07 +00:00
if chain_id :
msg . chain_id = chain_id
2018-04-11 10:24:13 +00:00
if tx_type is not None :
msg . tx_type = tx_type
2016-09-13 10:25:06 +00:00
response = self . call ( msg )
2016-05-27 06:13:23 +00:00
2017-12-12 15:40:11 +00:00
while response . data_length is not None :
2016-09-13 10:25:06 +00:00
data_length = response . data_length
data , chunk = data [ data_length : ] , data [ : data_length ]
response = self . call ( proto . EthereumTxAck ( data_chunk = chunk ) )
2016-05-27 06:13:23 +00:00
2016-09-13 10:25:06 +00:00
return response . signature_v , response . signature_r , response . signature_s
2016-08-10 16:13:05 +00:00
2017-07-12 16:35:54 +00:00
@expect ( proto . EthereumMessageSignature )
def ethereum_sign_message ( self , n , message ) :
n = self . _convert_prime ( n )
2018-03-06 12:58:39 +00:00
message = normalize_nfc ( message )
2017-07-12 16:35:54 +00:00
return self . call ( proto . EthereumSignMessage ( address_n = n , message = message ) )
def ethereum_verify_message ( self , address , signature , message ) :
2018-03-06 12:58:39 +00:00
message = normalize_nfc ( message )
2017-07-12 16:35:54 +00:00
try :
resp = self . call ( proto . EthereumVerifyMessage ( address = address , signature = signature , message = message ) )
except CallException as e :
resp = e
if isinstance ( resp , proto . Success ) :
return True
return False
2018-04-16 14:38:00 +00:00
#
# Lisk functions
#
@field ( ' address ' )
@expect ( proto . LiskAddress )
def lisk_get_address ( self , n , show_display = False ) :
n = self . _convert_prime ( n )
return self . call ( proto . LiskGetAddress ( address_n = n , show_display = show_display ) )
2018-04-16 15:31:11 +00:00
@expect ( proto . LiskPublicKey )
def lisk_get_public_key ( self , n , show_display = False ) :
n = self . _convert_prime ( n )
return self . call ( proto . LiskGetPublicKey ( address_n = n , show_display = show_display ) )
2018-06-06 20:50:27 +00:00
@expect ( proto . LiskMessageSignature )
def lisk_sign_message ( self , n , message ) :
n = self . _convert_prime ( n )
message = normalize_nfc ( message )
return self . call ( proto . LiskSignMessage ( address_n = n , message = message ) )
def lisk_verify_message ( self , pubkey , signature , message ) :
message = normalize_nfc ( message )
try :
resp = self . call ( proto . LiskVerifyMessage ( signature = signature , public_key = pubkey , message = message ) )
except CallException as e :
resp = e
return isinstance ( resp , proto . Success )
2018-04-16 17:56:07 +00:00
@expect ( proto . LiskSignedTx )
def lisk_sign_tx ( self , n , transaction ) :
n = self . _convert_prime ( n )
def asset_to_proto ( asset ) :
msg = proto . LiskTransactionAsset ( )
if " votes " in asset :
msg . votes = asset [ " votes " ]
if " data " in asset :
msg . data = asset [ " data " ]
if " signature " in asset :
msg . signature = proto . LiskSignatureType ( )
msg . signature . public_key = binascii . unhexlify ( asset [ " signature " ] [ " publicKey " ] )
if " delegate " in asset :
msg . delegate = proto . LiskDelegateType ( )
msg . delegate . username = asset [ " delegate " ] [ " username " ]
if " multisignature " in asset :
msg . multisignature = proto . LiskMultisignatureType ( )
msg . multisignature . min = asset [ " multisignature " ] [ " min " ]
msg . multisignature . life_time = asset [ " multisignature " ] [ " lifetime " ]
msg . multisignature . keys_group = asset [ " multisignature " ] [ " keysgroup " ]
return msg
msg = proto . LiskTransactionCommon ( )
msg . type = transaction [ " type " ]
msg . fee = int ( transaction [ " fee " ] ) # Lisk use strings for big numbers (javascript issue)
msg . amount = int ( transaction [ " amount " ] ) # And we convert it back to number
msg . timestamp = transaction [ " timestamp " ]
if " recipientId " in transaction :
msg . recipient_id = transaction [ " recipientId " ]
if " senderPublicKey " in transaction :
msg . sender_public_key = binascii . unhexlify ( transaction [ " senderPublicKey " ] )
if " requesterPublicKey " in transaction :
msg . requester_public_key = binascii . unhexlify ( transaction [ " requesterPublicKey " ] )
if " signature " in transaction :
msg . signature = binascii . unhexlify ( transaction [ " signature " ] )
msg . asset = asset_to_proto ( transaction [ " asset " ] )
return self . call ( proto . LiskSignTx ( address_n = n , transaction = msg ) )
2018-04-16 15:23:09 +00:00
2014-02-13 15:46:21 +00:00
@field ( ' entropy ' )
@expect ( proto . Entropy )
2013-09-13 03:31:24 +00:00
def get_entropy ( self , size ) :
2014-02-13 15:46:21 +00:00
return self . call ( proto . GetEntropy ( size = size ) )
2013-09-13 03:31:24 +00:00
2014-02-13 15:46:21 +00:00
@field ( ' message ' )
@expect ( proto . Success )
2014-02-07 10:31:12 +00:00
def ping ( self , msg , button_protection = False , pin_protection = False , passphrase_protection = False ) :
2014-02-03 23:31:44 +00:00
msg = proto . Ping ( message = msg ,
2014-02-04 00:02:16 +00:00
button_protection = button_protection ,
pin_protection = pin_protection ,
passphrase_protection = passphrase_protection )
2014-02-13 15:46:21 +00:00
return self . call ( msg )
2013-09-13 03:31:24 +00:00
2013-10-08 18:33:39 +00:00
def get_device_id ( self ) :
return self . features . device_id
2013-09-13 03:31:24 +00:00
2014-02-13 15:46:21 +00:00
@field ( ' message ' )
@expect ( proto . Success )
2018-03-13 13:53:57 +00:00
def apply_settings ( self , label = None , language = None , use_passphrase = None , homescreen = None , passphrase_source = None , auto_lock_delay_ms = None ) :
2013-09-13 03:31:24 +00:00
settings = proto . ApplySettings ( )
2017-06-23 19:31:42 +00:00
if label is not None :
2013-09-13 03:31:24 +00:00
settings . label = label
if language :
settings . language = language
2017-06-23 19:31:42 +00:00
if use_passphrase is not None :
2014-12-13 18:07:30 +00:00
settings . use_passphrase = use_passphrase
2017-06-23 19:31:42 +00:00
if homescreen is not None :
2015-02-04 19:53:22 +00:00
settings . homescreen = homescreen
2018-03-12 14:14:13 +00:00
if passphrase_source is not None :
settings . passphrase_source = passphrase_source
2018-03-13 13:53:57 +00:00
if auto_lock_delay_ms is not None :
settings . auto_lock_delay_ms = auto_lock_delay_ms
2013-09-13 03:31:24 +00:00
2014-02-13 15:46:21 +00:00
out = self . call ( settings )
self . init_device ( ) # Reload Features
2013-09-13 03:31:24 +00:00
return out
2017-07-17 16:36:53 +00:00
@field ( ' message ' )
@expect ( proto . Success )
def apply_flags ( self , flags ) :
out = self . call ( proto . ApplyFlags ( flags = flags ) )
self . init_device ( ) # Reload Features
return out
2014-06-17 13:31:10 +00:00
@field ( ' message ' )
@expect ( proto . Success )
def clear_session ( self ) :
return self . call ( proto . ClearSession ( ) )
2014-02-13 15:46:21 +00:00
@field ( ' message ' )
@expect ( proto . Success )
2014-01-31 18:48:19 +00:00
def change_pin ( self , remove = False ) :
ret = self . call ( proto . ChangePin ( remove = remove ) )
self . init_device ( ) # Re-read features
return ret
2014-02-13 15:46:21 +00:00
@expect ( proto . MessageSignature )
2017-12-12 15:40:11 +00:00
def sign_message ( self , coin_name , n , message , script_type = proto . InputScriptType . SPENDADDRESS ) :
2014-02-13 15:46:21 +00:00
n = self . _convert_prime ( n )
2018-03-06 12:58:39 +00:00
message = normalize_nfc ( message )
2017-07-24 14:11:38 +00:00
return self . call ( proto . SignMessage ( coin_name = coin_name , address_n = n , message = message , script_type = script_type ) )
2014-02-13 15:46:21 +00:00
2015-02-20 17:50:53 +00:00
@expect ( proto . SignedIdentity )
2017-12-19 16:30:32 +00:00
def sign_identity ( self , identity , challenge_hidden , challenge_visual , ecdsa_curve_name = None ) :
2015-06-23 14:26:31 +00:00
return self . call ( proto . SignIdentity ( identity = identity , challenge_hidden = challenge_hidden , challenge_visual = challenge_visual , ecdsa_curve_name = ecdsa_curve_name ) )
2015-02-20 17:50:53 +00:00
2016-06-12 12:39:04 +00:00
@expect ( proto . ECDHSessionKey )
2017-12-19 16:30:32 +00:00
def get_ecdh_session_key ( self , identity , peer_public_key , ecdsa_curve_name = None ) :
2016-06-12 12:39:04 +00:00
return self . call ( proto . GetECDHSessionKey ( identity = identity , peer_public_key = peer_public_key , ecdsa_curve_name = ecdsa_curve_name ) )
2017-10-03 14:45:42 +00:00
@expect ( proto . CosiCommitment )
def cosi_commit ( self , n , data ) :
n = self . _convert_prime ( n )
return self . call ( proto . CosiCommit ( address_n = n , data = data ) )
@expect ( proto . CosiSignature )
def cosi_sign ( self , n , data , global_commitment , global_pubkey ) :
n = self . _convert_prime ( n )
return self . call ( proto . CosiSign ( address_n = n , data = data , global_commitment = global_commitment , global_pubkey = global_pubkey ) )
2016-06-12 21:49:52 +00:00
@field ( ' message ' )
@expect ( proto . Success )
def set_u2f_counter ( self , u2f_counter ) :
2017-06-23 19:31:42 +00:00
ret = self . call ( proto . SetU2FCounter ( u2f_counter = u2f_counter ) )
2016-06-12 21:49:52 +00:00
return ret
2017-09-03 13:35:22 +00:00
@field ( " address " )
@expect ( proto . NEMAddress )
def nem_get_address ( self , n , network , show_display = False ) :
n = self . _convert_prime ( n )
return self . call ( proto . NEMGetAddress ( address_n = n , network = network , show_display = show_display ) )
2017-07-01 13:43:33 +00:00
@expect ( proto . NEMSignedTx )
def nem_sign_tx ( self , n , transaction ) :
n = self . _convert_prime ( n )
2018-03-15 15:14:24 +00:00
try :
msg = nem . create_sign_tx ( transaction )
except ValueError as e :
raise CallException ( e . message )
2017-07-01 13:43:33 +00:00
2018-03-21 13:56:27 +00:00
assert msg . transaction is not None
msg . transaction . address_n = n
2017-07-01 13:43:33 +00:00
return self . call ( msg )
2016-10-10 09:01:40 +00:00
def verify_message ( self , coin_name , address , signature , message ) :
2018-03-06 12:58:39 +00:00
message = normalize_nfc ( message )
2014-02-13 15:46:21 +00:00
try :
2016-10-10 09:01:40 +00:00
resp = self . call ( proto . VerifyMessage ( address = address , signature = signature , message = message , coin_name = coin_name ) )
2014-02-13 15:46:21 +00:00
except CallException as e :
resp = e
if isinstance ( resp , proto . Success ) :
return True
return False
2014-11-03 18:42:22 +00:00
@expect ( proto . EncryptedMessage )
2014-10-29 23:48:06 +00:00
def encrypt_message ( self , pubkey , message , display_only , coin_name , n ) :
2014-11-26 17:49:22 +00:00
if coin_name and n :
n = self . _convert_prime ( n )
return self . call ( proto . EncryptMessage ( pubkey = pubkey , message = message , display_only = display_only , coin_name = coin_name , address_n = n ) )
else :
return self . call ( proto . EncryptMessage ( pubkey = pubkey , message = message , display_only = display_only ) )
2014-06-12 15:02:46 +00:00
2014-11-03 18:42:22 +00:00
@expect ( proto . DecryptedMessage )
def decrypt_message ( self , n , nonce , message , msg_hmac ) :
2014-06-12 15:02:46 +00:00
n = self . _convert_prime ( n )
2014-11-03 18:42:22 +00:00
return self . call ( proto . DecryptMessage ( address_n = n , nonce = nonce , message = message , hmac = msg_hmac ) )
2014-06-12 15:02:46 +00:00
2014-11-03 18:42:22 +00:00
@field ( ' value ' )
@expect ( proto . CipheredKeyValue )
2016-06-27 21:17:20 +00:00
def encrypt_keyvalue ( self , n , key , value , ask_on_encrypt = True , ask_on_decrypt = True , iv = b ' ' ) :
2014-06-06 12:40:07 +00:00
n = self . _convert_prime ( n )
return self . call ( proto . CipherKeyValue ( address_n = n ,
key = key ,
value = value ,
encrypt = True ,
ask_on_encrypt = ask_on_encrypt ,
2015-12-24 16:31:09 +00:00
ask_on_decrypt = ask_on_decrypt ,
2016-06-27 21:17:20 +00:00
iv = iv ) )
2014-06-06 12:40:07 +00:00
2014-11-03 18:42:22 +00:00
@field ( ' value ' )
@expect ( proto . CipheredKeyValue )
2016-06-27 21:17:20 +00:00
def decrypt_keyvalue ( self , n , key , value , ask_on_encrypt = True , ask_on_decrypt = True , iv = b ' ' ) :
2014-06-06 12:40:07 +00:00
n = self . _convert_prime ( n )
return self . call ( proto . CipherKeyValue ( address_n = n ,
key = key ,
value = value ,
encrypt = False ,
ask_on_encrypt = ask_on_encrypt ,
2015-12-24 16:31:09 +00:00
ask_on_decrypt = ask_on_decrypt ,
2016-06-27 21:17:20 +00:00
iv = iv ) )
2014-06-06 12:40:07 +00:00
2018-03-20 12:10:08 +00:00
def _prepare_sign_tx ( self , inputs , outputs ) :
2017-12-12 15:40:11 +00:00
tx = proto . TransactionType ( )
2018-03-20 12:10:08 +00:00
tx . inputs = inputs
tx . outputs = outputs
2014-04-09 19:42:30 +00:00
2018-03-20 12:29:33 +00:00
txes = { None : tx }
2014-04-09 19:42:30 +00:00
for inp in inputs :
2018-03-20 12:10:08 +00:00
if inp . prev_hash in txes :
2014-04-09 19:42:30 +00:00
continue
2018-03-20 12:10:08 +00:00
if inp . script_type in ( proto . InputScriptType . SPENDP2SHWITNESS ,
2018-03-20 12:29:33 +00:00
proto . InputScriptType . SPENDWITNESS ) :
2018-03-20 12:10:08 +00:00
continue
if not self . tx_api :
2017-11-06 10:09:54 +00:00
raise RuntimeError ( ' TX_API not defined ' )
2018-03-20 12:10:08 +00:00
prev_tx = self . tx_api . get_tx ( binascii . hexlify ( inp . prev_hash ) . decode ( ' utf-8 ' ) )
txes [ inp . prev_hash ] = prev_tx
2014-04-09 19:42:30 +00:00
return txes
2014-02-13 15:46:21 +00:00
2016-09-13 10:25:06 +00:00
@session
2018-06-05 14:02:51 +00:00
def sign_tx ( self , coin_name , inputs , outputs , version = None , lock_time = None , expiry = None , overwintered = None , debug_processor = None ) :
2014-04-09 19:42:30 +00:00
2018-04-03 19:25:07 +00:00
# start = time.time()
2018-03-20 12:10:08 +00:00
txes = self . _prepare_sign_tx ( inputs , outputs )
2014-04-09 19:42:30 +00:00
2016-09-13 10:25:06 +00:00
# Prepare and send initial message
tx = proto . SignTx ( )
tx . inputs_count = len ( inputs )
tx . outputs_count = len ( outputs )
tx . coin_name = coin_name
2017-04-19 12:19:26 +00:00
if version is not None :
tx . version = version
if lock_time is not None :
tx . lock_time = lock_time
2018-06-05 14:02:51 +00:00
if expiry is not None :
tx . expiry = expiry
if overwintered is not None :
tx . overwintered = overwintered
2016-09-13 10:25:06 +00:00
res = self . call ( tx )
# Prepare structure for signatures
signatures = [ None ] * len ( inputs )
serialized_tx = b ' '
counter = 0
while True :
counter + = 1
if isinstance ( res , proto . Failure ) :
raise CallException ( " Signing failed " )
if not isinstance ( res , proto . TxRequest ) :
raise CallException ( " Unexpected message " )
# If there's some part of signed transaction, let's add it
2017-12-12 15:40:11 +00:00
if res . serialized and res . serialized . serialized_tx :
2018-02-06 20:39:02 +00:00
# log("RECEIVED PART OF SERIALIZED TX (%d BYTES)" % len(res.serialized.serialized_tx))
2016-09-13 10:25:06 +00:00
serialized_tx + = res . serialized . serialized_tx
2017-12-12 15:40:11 +00:00
if res . serialized and res . serialized . signature_index is not None :
2017-06-23 19:31:42 +00:00
if signatures [ res . serialized . signature_index ] is not None :
2017-11-06 10:09:54 +00:00
raise ValueError ( " Signature for index %d already filled " % res . serialized . signature_index )
2016-09-13 10:25:06 +00:00
signatures [ res . serialized . signature_index ] = res . serialized . signature
2017-12-12 15:40:11 +00:00
if res . request_type == proto . RequestType . TXFINISHED :
2016-09-13 10:25:06 +00:00
# Device didn't ask for more information, finish workflow
break
# Device asked for one more information, let's process it.
2017-12-12 15:40:11 +00:00
if not res . details . tx_hash :
current_tx = txes [ None ]
else :
current_tx = txes [ bytes ( res . details . tx_hash ) ]
2016-09-13 10:25:06 +00:00
2017-12-12 15:40:11 +00:00
if res . request_type == proto . RequestType . TXMETA :
msg = proto . TransactionType ( )
2016-09-13 10:25:06 +00:00
msg . version = current_tx . version
msg . lock_time = current_tx . lock_time
msg . inputs_cnt = len ( current_tx . inputs )
if res . details . tx_hash :
msg . outputs_cnt = len ( current_tx . bin_outputs )
else :
msg . outputs_cnt = len ( current_tx . outputs )
2017-12-12 15:40:11 +00:00
msg . extra_data_len = len ( current_tx . extra_data ) if current_tx . extra_data else 0
2016-09-13 10:25:06 +00:00
res = self . call ( proto . TxAck ( tx = msg ) )
continue
2014-04-09 19:42:30 +00:00
2017-12-12 15:40:11 +00:00
elif res . request_type == proto . RequestType . TXINPUT :
msg = proto . TransactionType ( )
2018-03-20 12:10:08 +00:00
msg . inputs = [ current_tx . inputs [ res . details . request_index ] ]
2017-07-27 19:50:12 +00:00
if debug_processor is not None :
2018-02-23 15:03:43 +00:00
# msg needs to be deep copied so when it's modified
# the other messages stay intact
from copy import deepcopy
msg = deepcopy ( msg )
2017-07-27 19:50:12 +00:00
# If debug_processor function is provided,
# pass thru it the request and prepared response.
2018-02-23 15:03:43 +00:00
# This is useful for tests, see test_msg_signtx
2017-07-27 19:50:12 +00:00
msg = debug_processor ( res , msg )
2016-09-13 10:25:06 +00:00
res = self . call ( proto . TxAck ( tx = msg ) )
continue
2017-12-12 15:40:11 +00:00
elif res . request_type == proto . RequestType . TXOUTPUT :
msg = proto . TransactionType ( )
2016-09-13 10:25:06 +00:00
if res . details . tx_hash :
2018-03-20 12:10:08 +00:00
msg . bin_outputs = [ current_tx . bin_outputs [ res . details . request_index ] ]
2016-09-13 10:25:06 +00:00
else :
2018-03-20 12:10:08 +00:00
msg . outputs = [ current_tx . outputs [ res . details . request_index ] ]
2016-09-13 10:25:06 +00:00
2017-06-23 19:31:42 +00:00
if debug_processor is not None :
2018-02-23 15:03:43 +00:00
# msg needs to be deep copied so when it's modified
# the other messages stay intact
from copy import deepcopy
msg = deepcopy ( msg )
2016-09-13 10:25:06 +00:00
# If debug_processor function is provided,
# pass thru it the request and prepared response.
2018-02-23 15:03:43 +00:00
# This is useful for tests, see test_msg_signtx
2016-09-13 10:25:06 +00:00
msg = debug_processor ( res , msg )
res = self . call ( proto . TxAck ( tx = msg ) )
continue
2014-04-09 19:42:30 +00:00
2017-12-12 15:40:11 +00:00
elif res . request_type == proto . RequestType . TXEXTRADATA :
2016-10-21 13:24:30 +00:00
o , l = res . details . extra_data_offset , res . details . extra_data_len
2017-12-12 15:40:11 +00:00
msg = proto . TransactionType ( )
2016-10-21 13:24:30 +00:00
msg . extra_data = current_tx . extra_data [ o : o + l ]
res = self . call ( proto . TxAck ( tx = msg ) )
continue
2014-04-18 16:09:39 +00:00
if None in signatures :
2017-11-06 10:09:54 +00:00
raise RuntimeError ( " Some signatures are missing! " )
2014-04-18 16:09:39 +00:00
2018-02-06 20:39:02 +00:00
# log("SIGNED IN %.03f SECONDS, CALLED %d MESSAGES, %d BYTES" %
2018-02-06 19:52:45 +00:00
# (time.time() - start, counter, len(serialized_tx)))
2014-04-09 19:42:30 +00:00
return ( signatures , serialized_tx )
2014-02-13 15:46:21 +00:00
@field ( ' message ' )
@expect ( proto . Success )
def wipe_device ( self ) :
ret = self . call ( proto . WipeDevice ( ) )
self . init_device ( )
return ret
@field ( ' message ' )
@expect ( proto . Success )
2017-12-12 15:40:11 +00:00
def recovery_device ( self , word_count , passphrase_protection , pin_protection , label , language , type = proto . RecoveryDeviceType . ScrambledWords , expand = False , dry_run = False ) :
2017-06-21 14:29:49 +00:00
if self . features . initialized and not dry_run :
2017-11-06 10:09:54 +00:00
raise RuntimeError ( " Device is initialized already. Call wipe_device() and try again. " )
2014-02-13 15:46:21 +00:00
if word_count not in ( 12 , 18 , 24 ) :
2017-11-06 10:09:54 +00:00
raise ValueError ( " Invalid word count. Use 12/18/24 " )
2014-02-13 15:46:21 +00:00
2016-06-29 20:40:37 +00:00
self . recovery_matrix_first_pass = True
2017-02-11 19:15:09 +00:00
self . expand = expand
if self . expand :
# optimization to load the wordlist once, instead of for each recovery word
self . mnemonic_wordlist = Mnemonic ( ' english ' )
2017-06-23 19:31:42 +00:00
res = self . call ( proto . RecoveryDevice (
word_count = int ( word_count ) ,
passphrase_protection = bool ( passphrase_protection ) ,
pin_protection = bool ( pin_protection ) ,
label = label ,
language = language ,
enforce_wordlist = True ,
type = type ,
dry_run = dry_run ) )
2014-02-13 15:46:21 +00:00
self . init_device ( )
return res
@field ( ' message ' )
@expect ( proto . Success )
2016-09-13 10:25:06 +00:00
@session
2017-07-28 16:07:20 +00:00
def reset_device ( self , display_random , strength , passphrase_protection , pin_protection , label , language , u2f_counter = 0 , skip_backup = False ) :
2014-02-13 15:46:21 +00:00
if self . features . initialized :
2017-11-06 10:09:54 +00:00
raise RuntimeError ( " Device is initialized already. Call wipe_device() and try again. " )
2014-02-13 15:46:21 +00:00
# Begin with device reset workflow
msg = proto . ResetDevice ( display_random = display_random ,
strength = strength ,
passphrase_protection = bool ( passphrase_protection ) ,
pin_protection = bool ( pin_protection ) ,
2017-06-23 17:23:08 +00:00
language = language ,
label = label ,
u2f_counter = u2f_counter ,
skip_backup = bool ( skip_backup ) )
2014-02-13 15:46:21 +00:00
resp = self . call ( msg )
if not isinstance ( resp , proto . EntropyRequest ) :
2017-11-06 10:09:54 +00:00
raise RuntimeError ( " Invalid response, expected EntropyRequest " )
2014-02-13 15:46:21 +00:00
external_entropy = self . _get_local_entropy ( )
2018-05-11 13:24:24 +00:00
LOG . debug ( " Computer generated entropy: " + binascii . hexlify ( external_entropy ) . decode ( ) )
2014-02-21 20:00:56 +00:00
ret = self . call ( proto . EntropyAck ( entropy = external_entropy ) )
self . init_device ( )
return ret
2014-02-13 15:46:21 +00:00
2017-06-23 17:23:08 +00:00
@field ( ' message ' )
@expect ( proto . Success )
def backup_device ( self ) :
ret = self . call ( proto . BackupDevice ( ) )
return ret
2014-02-13 15:46:21 +00:00
@field ( ' message ' )
@expect ( proto . Success )
2017-07-16 12:53:28 +00:00
def load_device_by_mnemonic ( self , mnemonic , pin , passphrase_protection , label , language = ' english ' , skip_checksum = False , expand = False ) :
2014-02-21 00:48:11 +00:00
# Convert mnemonic to UTF8 NKFD
mnemonic = Mnemonic . normalize_string ( mnemonic )
# Convert mnemonic to ASCII stream
2017-12-17 12:31:54 +00:00
mnemonic = mnemonic . encode ( ' utf-8 ' )
2014-02-21 00:48:11 +00:00
2017-02-09 13:25:32 +00:00
m = Mnemonic ( ' english ' )
2017-02-11 19:15:09 +00:00
if expand :
mnemonic = m . expand ( mnemonic )
2017-02-09 13:25:32 +00:00
if not skip_checksum and not m . check ( mnemonic ) :
2017-11-06 10:09:54 +00:00
raise ValueError ( " Invalid mnemonic checksum " )
2017-02-09 13:25:32 +00:00
2014-02-13 15:46:21 +00:00
if self . features . initialized :
2017-11-06 10:09:54 +00:00
raise RuntimeError ( " Device is initialized already. Call wipe_device() and try again. " )
2014-02-13 15:46:21 +00:00
resp = self . call ( proto . LoadDevice ( mnemonic = mnemonic , pin = pin ,
passphrase_protection = passphrase_protection ,
language = language ,
2014-02-21 00:48:11 +00:00
label = label ,
skip_checksum = skip_checksum ) )
2014-02-13 15:46:21 +00:00
self . init_device ( )
return resp
@field ( ' message ' )
@expect ( proto . Success )
2014-03-07 20:44:06 +00:00
def load_device_by_xprv ( self , xprv , pin , passphrase_protection , label , language ) :
2014-02-13 15:46:21 +00:00
if self . features . initialized :
2017-11-06 10:09:54 +00:00
raise RuntimeError ( " Device is initialized already. Call wipe_device() and try again. " )
2014-02-13 15:46:21 +00:00
if xprv [ 0 : 4 ] not in ( ' xprv ' , ' tprv ' ) :
2017-11-06 10:09:54 +00:00
raise ValueError ( " Unknown type of xprv " )
2014-02-13 15:46:21 +00:00
2018-04-20 15:23:43 +00:00
if not 100 < len ( xprv ) < 112 : # yes this is correct in Python
2017-11-06 10:09:54 +00:00
raise ValueError ( " Invalid length of xprv " )
2014-02-13 15:46:21 +00:00
2017-12-12 15:40:11 +00:00
node = proto . HDNodeType ( )
2016-05-20 17:18:33 +00:00
data = binascii . hexlify ( tools . b58decode ( xprv , None ) )
2014-02-13 15:46:21 +00:00
2016-06-27 21:17:20 +00:00
if data [ 90 : 92 ] != b ' 00 ' :
2017-11-06 10:09:54 +00:00
raise ValueError ( " Contain invalid private key " )
2014-02-13 15:46:21 +00:00
2018-04-20 15:23:43 +00:00
checksum = binascii . hexlify ( tools . btc_hash ( binascii . unhexlify ( data [ : 156 ] ) ) [ : 4 ] )
2014-02-13 15:46:21 +00:00
if checksum != data [ 156 : ] :
2017-11-06 10:09:54 +00:00
raise ValueError ( " Checksum doesn ' t match " )
2014-02-13 15:46:21 +00:00
# version 0488ade4
# depth 00
# fingerprint 00000000
# child_num 00000000
# chaincode 873dff81c02f525623fd1fe5167eac3a55a049de3d314bb42ee227ffed37d508
# privkey 00e8f32e723decf4051aefac8e2c93c9c5b214313817cdb01a1494b917c8436b35
# checksum e77e9d71
node . depth = int ( data [ 8 : 10 ] , 16 )
node . fingerprint = int ( data [ 10 : 18 ] , 16 )
node . child_num = int ( data [ 18 : 26 ] , 16 )
2016-06-27 21:17:20 +00:00
node . chain_code = binascii . unhexlify ( data [ 26 : 90 ] )
node . private_key = binascii . unhexlify ( data [ 92 : 156 ] ) # skip 0x00 indicating privkey
2014-02-13 15:46:21 +00:00
resp = self . call ( proto . LoadDevice ( node = node ,
pin = pin ,
passphrase_protection = passphrase_protection ,
2014-03-07 20:44:06 +00:00
language = language ,
2014-02-13 15:46:21 +00:00
label = label ) )
self . init_device ( )
return resp
2016-09-13 10:25:06 +00:00
@session
2014-02-13 15:46:21 +00:00
def firmware_update ( self , fp ) :
2017-06-23 19:31:42 +00:00
if self . features . bootloader_mode is False :
2017-11-06 10:09:54 +00:00
raise RuntimeError ( " Device must be in bootloader mode " )
2014-02-13 15:46:21 +00:00
2017-06-20 14:32:54 +00:00
data = fp . read ( )
resp = self . call ( proto . FirmwareErase ( length = len ( data ) ) )
2017-12-12 15:40:11 +00:00
if isinstance ( resp , proto . Failure ) and resp . code == proto . FailureType . FirmwareError :
2014-02-13 15:46:21 +00:00
return False
2017-04-12 12:11:42 +00:00
# TREZORv1 method
2014-02-13 15:46:21 +00:00
if isinstance ( resp , proto . Success ) :
2017-04-12 12:11:42 +00:00
fingerprint = hashlib . sha256 ( data [ 256 : ] ) . hexdigest ( )
2018-05-11 13:24:24 +00:00
LOG . debug ( " Firmware fingerprint: " + fingerprint )
2017-04-12 12:11:42 +00:00
resp = self . call ( proto . FirmwareUpload ( payload = data ) )
if isinstance ( resp , proto . Success ) :
return True
2017-12-12 15:40:11 +00:00
elif isinstance ( resp , proto . Failure ) and resp . code == proto . FailureType . FirmwareError :
2017-04-12 12:11:42 +00:00
return False
2017-11-06 10:09:54 +00:00
raise RuntimeError ( " Unexpected result %s " % resp )
2017-04-12 12:11:42 +00:00
# TREZORv2 method
if isinstance ( resp , proto . FirmwareRequest ) :
import pyblake2
while True :
payload = data [ resp . offset : resp . offset + resp . length ]
digest = pyblake2 . blake2s ( payload ) . digest ( )
resp = self . call ( proto . FirmwareUpload ( payload = payload , hash = digest ) )
if isinstance ( resp , proto . FirmwareRequest ) :
continue
elif isinstance ( resp , proto . Success ) :
return True
2017-12-12 15:40:11 +00:00
elif isinstance ( resp , proto . Failure ) and resp . code == proto . FailureType . FirmwareError :
2017-04-12 12:11:42 +00:00
return False
2017-11-06 10:09:54 +00:00
raise RuntimeError ( " Unexpected result %s " % resp )
2017-04-12 12:11:42 +00:00
2017-11-06 10:09:54 +00:00
raise RuntimeError ( " Unexpected message %s " % resp )
2014-02-13 15:46:21 +00:00
2017-07-03 16:49:03 +00:00
@field ( ' message ' )
@expect ( proto . Success )
def self_test ( self ) :
if self . features . bootloader_mode is False :
2017-11-06 10:09:54 +00:00
raise RuntimeError ( " Device must be in bootloader mode " )
2017-07-03 16:49:03 +00:00
2017-07-10 17:01:14 +00:00
return self . call ( proto . SelfTest ( payload = b ' \x00 \xFF \x55 \xAA \x66 \x99 \x33 \xCC ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789! \x00 \xFF \x55 \xAA \x66 \x99 \x33 \xCC ' ) )
2017-07-03 16:49:03 +00:00
2018-04-04 01:50:22 +00:00
@expect ( proto . StellarPublicKey )
def stellar_get_public_key ( self , address_n ) :
return self . call ( proto . StellarGetPublicKey ( address_n = address_n ) )
2018-05-16 14:00:08 +00:00
def stellar_sign_transaction ( self , tx , operations , address_n , network_passphrase = None ) :
2018-04-04 01:50:22 +00:00
# default networkPassphrase to the public network
2018-04-19 19:29:36 +00:00
if network_passphrase is None :
network_passphrase = " Public Global Stellar Network ; September 2015 "
2018-04-04 01:50:22 +00:00
2018-04-19 21:10:57 +00:00
tx . network_passphrase = network_passphrase
tx . address_n = address_n
2018-05-16 14:00:08 +00:00
tx . num_operations = len ( operations )
2018-04-20 16:33:56 +00:00
# Signing loop works as follows:
#
# 1. Start with tx (header information for the transaction) and operations (an array of operation protobuf messagess)
# 2. Send the tx header to the device
# 3. Receive a StellarTxOpRequest message
# 4. Send operations one by one until all operations have been sent. If there are more operations to sign, the device will send a StellarTxOpRequest message
# 5. The final message received will be StellarSignedTx which is returned from this method
resp = self . call ( tx )
try :
while isinstance ( resp , proto . StellarTxOpRequest ) :
resp = self . call ( operations . pop ( 0 ) )
except IndexError :
# pop from empty list
raise CallException ( " Stellar.UnexpectedEndOfOperations " ,
" Reached end of operations without a signature. " ) from None
2018-04-19 21:10:57 +00:00
if not isinstance ( resp , proto . StellarSignedTx ) :
2018-04-20 16:33:56 +00:00
raise CallException ( proto . FailureType . UnexpectedMessage , resp )
if operations :
raise CallException ( " Stellar.UnprocessedOperations " ,
" Received a signature before processing all operations. " )
2018-04-19 21:10:57 +00:00
return resp
2018-04-04 01:50:22 +00:00
2017-06-23 19:31:42 +00:00
2014-02-13 16:20:40 +00:00
class TrezorClient ( ProtocolMixin , TextUIMixin , BaseClient ) :
2018-03-28 13:57:50 +00:00
def __init__ ( self , transport , * args , * * kwargs ) :
super ( ) . __init__ ( transport = transport , * args , * * kwargs )
2014-02-13 15:46:21 +00:00
2017-06-23 19:31:42 +00:00
2018-05-09 16:11:38 +00:00
class TrezorClientDebugLink ( ProtocolMixin , DebugLinkMixin , BaseClient ) :
2018-03-28 13:57:50 +00:00
def __init__ ( self , transport , * args , * * kwargs ) :
super ( ) . __init__ ( transport = transport , * args , * * kwargs )