@ -19,6 +19,7 @@ import logging
import re
import re
import textwrap
import textwrap
import time
import time
from contextlib import contextmanager
from copy import deepcopy
from copy import deepcopy
from datetime import datetime
from datetime import datetime
from enum import IntEnum
from enum import IntEnum
@ -38,19 +39,20 @@ from typing import (
Tuple ,
Tuple ,
Type ,
Type ,
Union ,
Union ,
overload ,
)
)
from mnemonic import Mnemonic
from mnemonic import Mnemonic
from typing_extensions import Literal
from . import mapping , messages , protobuf
from . import mapping , messages , protobuf
from . client import TrezorClient
from . client import TrezorClient
from . exceptions import TrezorFailure
from . exceptions import TrezorFailure
from . log import DUMP_BYTES
from . log import DUMP_BYTES
from . messages import DebugWaitType
from . tools import expect
from . tools import expect
if TYPE_CHECKING :
if TYPE_CHECKING :
from typing_extensions import Protocol
from . messages import PinMatrixRequestType
from . messages import PinMatrixRequestType
from . transport import Transport
from . transport import Transport
@ -60,6 +62,15 @@ if TYPE_CHECKING:
AnyDict = Dict [ str , Any ]
AnyDict = Dict [ str , Any ]
class InputFunc ( Protocol ) :
def __call__ (
self ,
hold_ms : Optional [ int ] = None ,
wait : Optional [ bool ] = None ,
) - > " LayoutContent " :
. . .
EXPECTED_RESPONSES_CONTEXT_LINES = 3
EXPECTED_RESPONSES_CONTEXT_LINES = 3
LOG = logging . getLogger ( __name__ )
LOG = logging . getLogger ( __name__ )
@ -361,6 +372,29 @@ def multipage_content(layouts: List[LayoutContent]) -> str:
return " " . join ( layout . text_content ( ) for layout in layouts )
return " " . join ( layout . text_content ( ) for layout in layouts )
def _make_input_func (
button : Optional [ messages . DebugButton ] = None ,
physical_button : Optional [ messages . DebugPhysicalButton ] = None ,
swipe : Optional [ messages . DebugSwipeDirection ] = None ,
) - > " InputFunc " :
decision = messages . DebugLinkDecision (
button = button ,
physical_button = physical_button ,
swipe = swipe ,
)
def input_func (
self : " DebugLink " ,
hold_ms : Optional [ int ] = None ,
wait : Optional [ bool ] = None ,
) - > LayoutContent :
__tracebackhide__ = True # for pytest # pylint: disable=W0612
decision . hold_ms = hold_ms
return self . _decision ( decision , wait = wait )
return input_func # type: ignore [Parameter name mismatch]
class DebugLink :
class DebugLink :
def __init__ ( self , transport : " Transport " , auto_interact : bool = True ) - > None :
def __init__ ( self , transport : " Transport " , auto_interact : bool = True ) - > None :
self . transport = transport
self . transport = transport
@ -375,7 +409,6 @@ class DebugLink:
self . screenshot_recording_dir : Optional [ str ] = None
self . screenshot_recording_dir : Optional [ str ] = None
# For T1 screenshotting functionality in DebugUI
# For T1 screenshotting functionality in DebugUI
self . t1_take_screenshots = False
self . t1_screenshot_directory : Optional [ Path ] = None
self . t1_screenshot_directory : Optional [ Path ] = None
self . t1_screenshot_counter = 0
self . t1_screenshot_counter = 0
@ -383,6 +416,11 @@ class DebugLink:
self . screen_text_file : Optional [ Path ] = None
self . screen_text_file : Optional [ Path ] = None
self . last_screen_content = " "
self . last_screen_content = " "
self . waiting_for_layout_change = False
self . layout_dirty = True
self . input_wait_type = DebugWaitType . IMMEDIATE
@property
@property
def legacy_ui ( self ) - > bool :
def legacy_ui ( self ) - > bool :
""" Differences between UI1 and UI2. """
""" Differences between UI1 and UI2. """
@ -404,7 +442,12 @@ class DebugLink:
def close ( self ) - > None :
def close ( self ) - > None :
self . transport . end_session ( )
self . transport . end_session ( )
def _call ( self , msg : protobuf . MessageType , nowait : bool = False ) - > Any :
def _write ( self , msg : protobuf . MessageType ) - > None :
if self . waiting_for_layout_change :
raise RuntimeError (
" Debuglink is unavailable while waiting for layout change. "
)
LOG . debug (
LOG . debug (
f " sending message: { msg . __class__ . __name__ } " ,
f " sending message: { msg . __class__ . __name__ } " ,
extra = { " protobuf " : msg } ,
extra = { " protobuf " : msg } ,
@ -415,13 +458,12 @@ class DebugLink:
f " encoded as type { msg_type } ( { len ( msg_bytes ) } bytes): { msg_bytes . hex ( ) } " ,
f " encoded as type { msg_type } ( { len ( msg_bytes ) } bytes): { msg_bytes . hex ( ) } " ,
)
)
self . transport . write ( msg_type , msg_bytes )
self . transport . write ( msg_type , msg_bytes )
if nowait :
return None
def _read ( self ) - > protobuf . MessageType :
ret_type , ret_bytes = self . transport . read ( )
ret_type , ret_bytes = self . transport . read ( )
LOG . log (
LOG . log (
DUMP_BYTES ,
DUMP_BYTES ,
f " received type { msg_type} ( { len ( msg_bytes ) } bytes): { msg _bytes. hex ( ) } " ,
f " received type { ret_type} ( { len ( ret_bytes ) } bytes): { ret _bytes. hex ( ) } " ,
)
)
msg = self . mapping . decode ( ret_type , ret_bytes )
msg = self . mapping . decode ( ret_type , ret_bytes )
LOG . debug (
LOG . debug (
@ -430,11 +472,20 @@ class DebugLink:
)
)
return msg
return msg
def state ( self ) - > messages . DebugLinkState :
def _call ( self , msg : protobuf . MessageType ) - > Any :
return self . _call ( messages . DebugLinkGetState ( ) )
self . _write ( msg )
return self . _read ( )
def state (
self , wait_type : DebugWaitType = DebugWaitType . CURRENT_LAYOUT
) - > messages . DebugLinkState :
result = self . _call ( messages . DebugLinkGetState ( wait_layout = wait_type ) )
if isinstance ( result , messages . Failure ) :
raise TrezorFailure ( result )
return result
def read_layout ( self ) - > LayoutContent :
def read_layout ( self ) - > LayoutContent :
return LayoutContent ( self . state ( ) . tokens or [ ] )
return LayoutContent ( self . state ( ) . tokens )
def wait_layout ( self , wait_for_external_change : bool = False ) - > LayoutContent :
def wait_layout ( self , wait_for_external_change : bool = False ) - > LayoutContent :
# Next layout change will be caused by external event
# Next layout change will be caused by external event
@ -445,11 +496,38 @@ class DebugLink:
if wait_for_external_change :
if wait_for_external_change :
self . reset_debug_events ( )
self . reset_debug_events ( )
obj = self . _call ( messages . DebugLinkGetState ( wait_layout = True ) )
obj = self . _call (
messages . DebugLinkGetState ( wait_layout = DebugWaitType . NEXT_LAYOUT )
)
self . layout_dirty = True
if isinstance ( obj , messages . Failure ) :
if isinstance ( obj , messages . Failure ) :
raise TrezorFailure ( obj )
raise TrezorFailure ( obj )
return LayoutContent ( obj . tokens )
return LayoutContent ( obj . tokens )
@contextmanager
def wait_for_layout_change ( self ) - > Iterator [ LayoutContent ] :
# set up a dummy layout content object to be yielded
layout_content = LayoutContent (
[ " DUMMY CONTENT, WAIT UNTIL THE END OF THE BLOCK :( " ]
)
# send GetState without waiting for reply
self . _write ( messages . DebugLinkGetState ( wait_layout = DebugWaitType . NEXT_LAYOUT ) )
# allow the block to proceed
self . waiting_for_layout_change = True
try :
yield layout_content
finally :
self . waiting_for_layout_change = False
# wait for the reply
resp = self . _read ( )
assert isinstance ( resp , messages . DebugLinkState )
# replace contents of the yielded object with the new thing
layout_content . __init__ ( resp . tokens )
def reset_debug_events ( self ) - > None :
def reset_debug_events ( self ) - > None :
# Only supported on TT and above certain version
# Only supported on TT and above certain version
if self . model in ( " T " , " Safe 3 " ) and not self . legacy_debug :
if self . model in ( " T " , " Safe 3 " ) and not self . legacy_debug :
@ -493,56 +571,102 @@ class DebugLink:
state = self . _call ( messages . DebugLinkGetState ( wait_word_list = True ) )
state = self . _call ( messages . DebugLinkGetState ( wait_word_list = True ) )
return state . reset_word
return state . reset_word
def input (
def _decision (
self ,
self , decision : messages . DebugLinkDecision , wait : Optional [ bool ] = None
word : Optional [ str ] = None ,
) - > LayoutContent :
button : Optional [ messages . DebugButton ] = None ,
""" Send a debuglink decision and returns the resulting layout.
physical_button : Optional [ messages . DebugPhysicalButton ] = None ,
swipe : Optional [ messages . DebugSwipeDirection ] = None ,
If hold_ms is set , an additional 200 ms is added to account for processing
x : Optional [ int ] = None ,
delays . ( This is needed for hold - to - confirm to trigger reliably . )
y : Optional [ int ] = None ,
wait : Optional [ bool ] = None ,
If ` wait ` is unset , the current wait mode is used :
hold_ms : Optional [ int ] = None ,
) - > Optional [ LayoutContent ] :
- when in normal tests , IMMEDIATE , which never deadlocks the device , but may
return an empty layout in case the next one didn ' t come up immediately. (E.g.,
in SignTx flow , the device is waiting for more TxRequest / TxAck exchanges
before showing the next UI layout . )
- when in tests running through a ` DeviceHandler ` , CURRENT_LAYOUT , which waits
for the next layout to come up . The assumption is that wirelink is
communicating on another thread and won ' t be blocked by waiting on debuglink.
Force waiting for the layout by setting ` wait = True ` . Force not waiting by
setting ` wait = False ` - - useful when , e . g . , you are causing the next layout to be
deliberately delayed .
"""
if not self . allow_interactions :
if not self . allow_interactions :
return None
return self . wait_layout ( )
args = sum ( a is not None for a in ( word , button , physical_button , swipe , x ) )
if decision . hold_ms is not None :
if args != 1 :
decision . hold_ms + = 200
raise ValueError (
" Invalid input - must use one of word, button, physical_button, swipe, click(x,y) "
)
decision = messages . DebugLinkDecision (
self . _write ( decision )
button = button ,
self . layout_dirty = True
physical_button = physical_button ,
if wait is True :
swipe = swipe ,
wait_type = DebugWaitType . CURRENT_LAYOUT
input = word ,
elif wait is False :
x = x ,
wait_type = DebugWaitType . IMMEDIATE
y = y ,
else :
wait = wait ,
wait_type = self . input_wait_type
hold_ms = hold_ms ,
return self . snapshot ( wait_type )
)
press_yes = _make_input_func ( button = messages . DebugButton . YES )
""" Confirm current layout. See `_decision` for more details. """
press_no = _make_input_func ( button = messages . DebugButton . NO )
""" Reject current layout. See `_decision` for more details. """
press_info = _make_input_func ( button = messages . DebugButton . INFO )
""" Trigger the Info action. See `_decision` for more details. """
swipe_up = _make_input_func ( swipe = messages . DebugSwipeDirection . UP )
""" Swipe up. See `_decision` for more details. """
swipe_down = _make_input_func ( swipe = messages . DebugSwipeDirection . DOWN )
""" Swipe down. See `_decision` for more details. """
swipe_right = _make_input_func ( swipe = messages . DebugSwipeDirection . RIGHT )
""" Swipe right. See `_decision` for more details. """
swipe_left = _make_input_func ( swipe = messages . DebugSwipeDirection . LEFT )
""" Swipe left. See `_decision` for more details. """
press_left = _make_input_func ( physical_button = messages . DebugPhysicalButton . LEFT_BTN )
""" Press left button. See `_decision` for more details. """
press_middle = _make_input_func (
physical_button = messages . DebugPhysicalButton . MIDDLE_BTN
)
""" Press middle button. See `_decision` for more details. """
press_right = _make_input_func (
physical_button = messages . DebugPhysicalButton . RIGHT_BTN
)
""" Press right button. See `_decision` for more details. """
ret = self . _call ( decision , nowait = not wait )
def input ( self , word : str , wait : Optional [ bool ] = None ) - > LayoutContent :
if ret is not None :
""" Send text input to the device. See `_decision` for more details. """
return LayoutContent ( ret . tokens )
return self . _decision ( messages . DebugLinkDecision ( input = word ) , wait )
# Getting the current screen after the (nowait) decision
def click (
self . save_current_screen_if_relevant ( wait = False )
self ,
click : Tuple [ int , int ] ,
hold_ms : Optional [ int ] = None ,
wait : Optional [ bool ] = None ,
) - > LayoutContent :
""" Send a click to the device. See `_decision` for more details. """
x , y = click
return self . _decision (
messages . DebugLinkDecision ( x = x , y = y , hold_ms = hold_ms ) , wait
)
return None
def snapshot (
self , wait_type : DebugWaitType = DebugWaitType . IMMEDIATE
) - > LayoutContent :
""" Save text and image content of the screen to relevant directories. """
# take the snapshot
state = self . state ( wait_type )
layout = LayoutContent ( state . tokens )
def save_current_screen_if_relevant ( self , wait : bool = True ) - > None :
if state . tokens and self . layout_dirty :
""" Optionally saving the textual screen output. """
# save it, unless we already did or unless it's empty
if self . screen_text_file is None :
self . save_debug_screen ( layout . visible_screen ( ) )
return
if state . layout is not None :
self . save_screenshot ( state . layout )
self . layout_dirty = False
if wait :
# return the layout
layout = self . wait_layout ( )
return layout
else :
layout = self . read_layout ( )
self . save_debug_screen ( layout . visible_screen ( ) )
def save_debug_screen ( self , screen_content : str ) - > None :
def save_debug_screen ( self , screen_content : str ) - > None :
if self . screen_text_file is None :
if self . screen_text_file is None :
@ -561,139 +685,8 @@ class DebugLink:
f . write ( screen_content )
f . write ( screen_content )
f . write ( " \n " + 80 * " / " + " \n " )
f . write ( " \n " + 80 * " / " + " \n " )
# Type overloads below make sure that when we supply `wait=True` into functions,
# they will always return `LayoutContent` and we do not need to assert `is not None`.
@overload
def click ( self , click : Tuple [ int , int ] ) - > None :
. . .
@overload
def click ( self , click : Tuple [ int , int ] , wait : Literal [ True ] ) - > LayoutContent :
. . .
def click (
self , click : Tuple [ int , int ] , wait : bool = False
) - > Optional [ LayoutContent ] :
x , y = click
return self . input ( x = x , y = y , wait = wait )
# Made into separate function as `hold_ms: Optional[int]` in `click`
# was causing problems with @overload
def click_hold (
self , click : Tuple [ int , int ] , hold_ms : int
) - > Optional [ LayoutContent ] :
x , y = click
return self . input ( x = x , y = y , hold_ms = hold_ms , wait = True )
def press_yes ( self , wait : bool = False ) - > Optional [ LayoutContent ] :
return self . input ( button = messages . DebugButton . YES , wait = wait )
def press_no ( self , wait : bool = False ) - > Optional [ LayoutContent ] :
return self . input ( button = messages . DebugButton . NO , wait = wait )
def press_info ( self , wait : bool = False ) - > Optional [ LayoutContent ] :
return self . input ( button = messages . DebugButton . INFO , wait = wait )
def swipe_up ( self , wait : bool = False ) - > Optional [ LayoutContent ] :
return self . input ( swipe = messages . DebugSwipeDirection . UP , wait = wait )
def swipe_down ( self , wait : bool = False ) - > Optional [ LayoutContent ] :
return self . input ( swipe = messages . DebugSwipeDirection . DOWN , wait = wait )
@overload
def swipe_right ( self ) - > None :
. . .
@overload
def swipe_right ( self , wait : Literal [ True ] ) - > LayoutContent :
. . .
def swipe_right ( self , wait : bool = False ) - > Union [ LayoutContent , None ] :
return self . input ( swipe = messages . DebugSwipeDirection . RIGHT , wait = wait )
@overload
def swipe_left ( self ) - > None :
. . .
@overload
def swipe_left ( self , wait : Literal [ True ] ) - > LayoutContent :
. . .
def swipe_left ( self , wait : bool = False ) - > Union [ LayoutContent , None ] :
return self . input ( swipe = messages . DebugSwipeDirection . LEFT , wait = wait )
@overload
def press_left ( self ) - > None :
. . .
@overload
def press_left ( self , wait : Literal [ True ] ) - > LayoutContent :
. . .
def press_left ( self , wait : bool = False ) - > Optional [ LayoutContent ] :
return self . input (
physical_button = messages . DebugPhysicalButton . LEFT_BTN , wait = wait
)
@overload
def press_middle ( self ) - > None :
. . .
@overload
def press_middle ( self , wait : Literal [ True ] ) - > LayoutContent :
. . .
def press_middle ( self , wait : bool = False ) - > Optional [ LayoutContent ] :
return self . input (
physical_button = messages . DebugPhysicalButton . MIDDLE_BTN , wait = wait
)
def press_middle_htc (
self , hold_ms : int , extra_ms : int = 200
) - > Optional [ LayoutContent ] :
return self . press_htc (
button = messages . DebugPhysicalButton . MIDDLE_BTN ,
hold_ms = hold_ms ,
extra_ms = extra_ms ,
)
@overload
def press_right ( self ) - > None :
. . .
@overload
def press_right ( self , wait : Literal [ True ] ) - > LayoutContent :
. . .
def press_right ( self , wait : bool = False ) - > Optional [ LayoutContent ] :
return self . input (
physical_button = messages . DebugPhysicalButton . RIGHT_BTN , wait = wait
)
def press_right_htc (
self , hold_ms : int , extra_ms : int = 200
) - > Optional [ LayoutContent ] :
return self . press_htc (
button = messages . DebugPhysicalButton . RIGHT_BTN ,
hold_ms = hold_ms ,
extra_ms = extra_ms ,
)
def press_htc (
self , button : messages . DebugPhysicalButton , hold_ms : int , extra_ms : int = 200
) - > Optional [ LayoutContent ] :
hold_ms = hold_ms + extra_ms # safety margin
result = self . input (
physical_button = button ,
hold_ms = hold_ms ,
)
# sleeping little longer for UI to update
time . sleep ( hold_ms / 1000 + 0.1 )
return result
def stop ( self ) - > None :
def stop ( self ) - > None :
self . _ call ( messages . DebugLinkStop ( ) , nowait = True )
self . _write ( messages . DebugLinkStop ( ) )
def reseed ( self , value : int ) - > protobuf . MessageType :
def reseed ( self , value : int ) - > protobuf . MessageType :
return self . _call ( messages . DebugLinkReseedRandom ( value = value ) )
return self . _call ( messages . DebugLinkReseedRandom ( value = value ) )
@ -727,44 +720,35 @@ class DebugLink:
return self . _call ( messages . DebugLinkMemoryRead ( address = address , length = length ) )
return self . _call ( messages . DebugLinkMemoryRead ( address = address , length = length ) )
def memory_write ( self , address : int , memory : bytes , flash : bool = False ) - > None :
def memory_write ( self , address : int , memory : bytes , flash : bool = False ) - > None :
self . _call (
self . _write (
messages . DebugLinkMemoryWrite ( address = address , memory = memory , flash = flash ) ,
messages . DebugLinkMemoryWrite ( address = address , memory = memory , flash = flash )
nowait = True ,
)
)
def flash_erase ( self , sector : int ) - > None :
def flash_erase ( self , sector : int ) - > None :
self . _ call ( messages . DebugLinkFlashErase ( sector = sector ) , nowait = True )
self . _ write ( messages . DebugLinkFlashErase ( sector = sector ) )
@expect ( messages . Success )
@expect ( messages . Success )
def erase_sd_card ( self , format : bool = True ) - > messages . Success :
def erase_sd_card ( self , format : bool = True ) - > messages . Success :
return self . _call ( messages . DebugLinkEraseSdCard ( format = format ) )
return self . _call ( messages . DebugLinkEraseSdCard ( format = format ) )
def take_t1_screenshot_if_relevant ( self ) - > None :
def save_screenshot ( self , data : bytes ) - > None :
""" Conditionally take screenshots on T1.
if self . t1_screenshot_directory is None :
return
TT handles them differently , see debuglink . start_recording .
"""
if self . model == " 1 " and self . t1_take_screenshots :
self . save_screenshot_for_t1 ( )
def save_screenshot_for_t1 ( self ) - > None :
from PIL import Image
from PIL import Image
layout = self . state ( ) . layout
assert len ( data ) == 128 * 64 / / 8
assert layout is not None
assert len ( layout ) == 128 * 64 / / 8
pixels : List [ int ] = [ ]
pixels : List [ int ] = [ ]
for byteline in range ( 64 / / 8 ) :
for byteline in range ( 64 / / 8 ) :
offset = byteline * 128
offset = byteline * 128
row = layout [ offset : offset + 128 ]
row = data [ offset : offset + 128 ]
for bit in range ( 8 ) :
for bit in range ( 8 ) :
pixels . extend ( bool ( px & ( 1 << bit ) ) for px in row )
pixels . extend ( bool ( px & ( 1 << bit ) ) for px in row )
im = Image . new ( " 1 " , ( 128 , 64 ) )
im = Image . new ( " 1 " , ( 128 , 64 ) )
im . putdata ( pixels [ : : - 1 ] )
im . putdata ( pixels [ : : - 1 ] )
assert self . t1_screenshot_directory is not None
img_location = (
img_location = (
self . t1_screenshot_directory / f " { self . t1_screenshot_counter : 04d } .png "
self . t1_screenshot_directory / f " { self . t1_screenshot_counter : 04d } .png "
)
)
@ -772,6 +756,9 @@ class DebugLink:
self . t1_screenshot_counter + = 1
self . t1_screenshot_counter + = 1
del _make_input_func
class NullDebugLink ( DebugLink ) :
class NullDebugLink ( DebugLink ) :
def __init__ ( self ) - > None :
def __init__ ( self ) - > None :
# Ignoring type error as self.transport will not be touched while using NullDebugLink
# Ignoring type error as self.transport will not be touched while using NullDebugLink
@ -810,15 +797,9 @@ class DebugUI:
] = None
] = None
def button_request ( self , br : messages . ButtonRequest ) - > None :
def button_request ( self , br : messages . ButtonRequest ) - > None :
self . debuglink . take_t1_ scree nshot_if_relevan t( )
self . debuglink . snap shot( )
if self . input_flow is None :
if self . input_flow is None :
# Only calling screen-saver when not in input-flow
# as it collides with wait-layout of input flows.
# All input flows call debuglink.input(), so
# recording their screens that way (as well as
# possible swipes below).
self . debuglink . save_current_screen_if_relevant ( wait = True )
if br . code == messages . ButtonRequestType . PinEntry :
if br . code == messages . ButtonRequestType . PinEntry :
self . debuglink . input ( self . get_pin ( ) )
self . debuglink . input ( self . get_pin ( ) )
else :
else :
@ -837,7 +818,7 @@ class DebugUI:
self . input_flow = self . INPUT_FLOW_DONE
self . input_flow = self . INPUT_FLOW_DONE
def get_pin ( self , code : Optional [ " PinMatrixRequestType " ] = None ) - > str :
def get_pin ( self , code : Optional [ " PinMatrixRequestType " ] = None ) - > str :
self . debuglink . take_t1_ scree nshot_if_relevan t( )
self . debuglink . snap shot( )
if self . pins is None :
if self . pins is None :
raise RuntimeError ( " PIN requested but no sequence was configured " )
raise RuntimeError ( " PIN requested but no sequence was configured " )
@ -848,7 +829,7 @@ class DebugUI:
raise AssertionError ( " PIN sequence ended prematurely " )
raise AssertionError ( " PIN sequence ended prematurely " )
def get_passphrase ( self , available_on_device : bool ) - > str :
def get_passphrase ( self , available_on_device : bool ) - > str :
self . debuglink . take_t1_ scree nshot_if_relevan t( )
self . debuglink . snap shot( )
return self . passphrase
return self . passphrase