mirror of
https://github.com/trezor/trezor-firmware.git
synced 2025-08-01 03:18:12 +00:00
fix(core): handle DebugLink.input_wait_type
when return_empty_state
is set
Also, simplify `tests/upgrade_tests/recovery_old.py`. [no changelog]
This commit is contained in:
parent
717429bedb
commit
7ebb00ff55
@ -71,7 +71,9 @@ if __debug__:
|
||||
)
|
||||
|
||||
async def return_layout_change(
|
||||
ctx: wire.protocol_common.Context, detect_deadlock: bool = False
|
||||
ctx: wire.protocol_common.Context,
|
||||
detect_deadlock: bool = False,
|
||||
return_empty_state: bool = False,
|
||||
) -> None:
|
||||
# set up the wait
|
||||
storage.layout_watcher = True
|
||||
@ -100,7 +102,7 @@ if __debug__:
|
||||
|
||||
# send the message and reset the wait
|
||||
storage.layout_watcher = False
|
||||
await ctx.write(_state())
|
||||
await ctx.write(_state(return_empty_state))
|
||||
|
||||
async def _layout_click(x: int, y: int, hold_ms: int = 0) -> None:
|
||||
assert isinstance(ui.CURRENT_LAYOUT, ui.Layout)
|
||||
@ -244,9 +246,12 @@ if __debug__:
|
||||
# If no exception was raised, the layout did not shut down. That means that it
|
||||
# just updated itself. The update is already live for the caller to retrieve.
|
||||
|
||||
def _state() -> DebugLinkState:
|
||||
def _state(return_empty_state: bool = False) -> DebugLinkState:
|
||||
from trezor.messages import DebugLinkState
|
||||
|
||||
if return_empty_state:
|
||||
return DebugLinkState()
|
||||
|
||||
from apps.common import mnemonic, passphrase
|
||||
|
||||
tokens = []
|
||||
@ -268,24 +273,27 @@ if __debug__:
|
||||
async def dispatch_DebugLinkGetState(
|
||||
msg: DebugLinkGetState,
|
||||
) -> DebugLinkState | None:
|
||||
if msg.return_empty_state:
|
||||
from trezor.messages import DebugLinkState
|
||||
|
||||
return DebugLinkState()
|
||||
|
||||
if msg.wait_layout == DebugWaitType.IMMEDIATE:
|
||||
return _state()
|
||||
return _state(msg.return_empty_state)
|
||||
|
||||
assert DEBUG_CONTEXT is not None
|
||||
if msg.wait_layout == DebugWaitType.NEXT_LAYOUT:
|
||||
layout_change_box.clear()
|
||||
return await return_layout_change(DEBUG_CONTEXT, detect_deadlock=False)
|
||||
return await return_layout_change(
|
||||
DEBUG_CONTEXT,
|
||||
detect_deadlock=False,
|
||||
return_empty_state=msg.return_empty_state,
|
||||
)
|
||||
|
||||
# default behavior: msg.wait_layout == DebugWaitType.CURRENT_LAYOUT
|
||||
if not layout_is_ready():
|
||||
return await return_layout_change(DEBUG_CONTEXT, detect_deadlock=True)
|
||||
return await return_layout_change(
|
||||
DEBUG_CONTEXT,
|
||||
detect_deadlock=True,
|
||||
return_empty_state=msg.return_empty_state,
|
||||
)
|
||||
else:
|
||||
return _state()
|
||||
return _state(msg.return_empty_state)
|
||||
|
||||
async def dispatch_DebugLinkRecordScreen(msg: DebugLinkRecordScreen) -> Success:
|
||||
if msg.target_directory:
|
||||
|
@ -639,11 +639,28 @@ class DebugLink:
|
||||
state = self._call(messages.DebugLinkGetState(wait_word_list=True))
|
||||
return state.reset_word
|
||||
|
||||
def _decision(self, decision: messages.DebugLinkDecision) -> None:
|
||||
def _decision(
|
||||
self, decision: messages.DebugLinkDecision, wait: bool | None = None
|
||||
) -> None:
|
||||
"""Send a debuglink decision.
|
||||
|
||||
If hold_ms is set, an additional 200ms is added to account for processing
|
||||
delays. (This is needed for hold-to-confirm to trigger reliably.)
|
||||
|
||||
If `wait` is unset, the following wait mode is used:
|
||||
|
||||
- `IMMEDIATE`, when in normal tests, which never deadlocks the device, but may
|
||||
return an empty layout in case the next one didn't come up immediately. (E.g.,
|
||||
in SignTx flow, the device is waiting for more TxRequest/TxAck exchanges
|
||||
before showing the next UI layout.)
|
||||
- `CURRENT_LAYOUT`, when in tests running through a `DeviceHandler`. This mode
|
||||
returns the current layout or waits for some layout to come up if there is
|
||||
none at the moment. The assumption is that wirelink is communicating on
|
||||
another thread and won't be blocked by waiting on debuglink.
|
||||
|
||||
Force waiting for the layout by setting `wait=True`. Force not waiting by
|
||||
setting `wait=False` -- useful when, e.g., you are causing the next layout to be
|
||||
deliberately delayed.
|
||||
"""
|
||||
if not self.allow_interactions:
|
||||
self.wait_layout()
|
||||
@ -655,12 +672,24 @@ class DebugLink:
|
||||
self._write(decision)
|
||||
if self.model is models.T1B1:
|
||||
return
|
||||
|
||||
if wait is True:
|
||||
wait_type = DebugWaitType.CURRENT_LAYOUT
|
||||
elif wait is False:
|
||||
wait_type = DebugWaitType.IMMEDIATE
|
||||
else:
|
||||
wait_type = self.input_wait_type
|
||||
|
||||
# When the call below returns, we know that `decision` has been processed in Core.
|
||||
# XXX Due to a bug, the reply may get lost at the end of a workflow.
|
||||
# We assume that no single input event takes more than 5 seconds to process,
|
||||
# and give up waiting after that.
|
||||
try:
|
||||
self._call(messages.DebugLinkGetState(return_empty_state=True), timeout=5)
|
||||
msg = messages.DebugLinkGetState(
|
||||
wait_layout=wait_type,
|
||||
return_empty_state=True,
|
||||
)
|
||||
self._call(msg, timeout=5)
|
||||
except Timeout as e:
|
||||
LOG.warning("timeout waiting for DebugLinkState: %s", e)
|
||||
|
||||
@ -693,10 +722,15 @@ class DebugLink:
|
||||
"""Send text input to the device. See `_decision` for more details."""
|
||||
self._decision(messages.DebugLinkDecision(input=word))
|
||||
|
||||
def click(self, click: Tuple[int, int], hold_ms: int | None = None) -> None:
|
||||
def click(
|
||||
self,
|
||||
click: Tuple[int, int],
|
||||
hold_ms: int | None = None,
|
||||
wait: bool | None = None,
|
||||
) -> None:
|
||||
"""Send a click to the device. See `_decision` for more details."""
|
||||
x, y = click
|
||||
self._decision(messages.DebugLinkDecision(x=x, y=y, hold_ms=hold_ms))
|
||||
self._decision(messages.DebugLinkDecision(x=x, y=y, hold_ms=hold_ms), wait=wait)
|
||||
|
||||
def stop(self) -> None:
|
||||
self._write(messages.DebugLinkStop())
|
||||
|
@ -4,21 +4,16 @@ if TYPE_CHECKING:
|
||||
from trezorlib.debuglink import DebugLink, LayoutContent
|
||||
|
||||
|
||||
def _enter_word(
|
||||
debug: "DebugLink", word: str, is_slip39: bool = False
|
||||
) -> "LayoutContent":
|
||||
def _enter_word(debug: "DebugLink", word: str, is_slip39: bool = False) -> None:
|
||||
typed_word = word[:4]
|
||||
for coords in debug.button_actions.type_word(typed_word, is_slip39=is_slip39):
|
||||
debug.click(coords)
|
||||
debug.read_layout(wait=False)
|
||||
debug.click(coords, wait=False)
|
||||
|
||||
debug.click(debug.screen_buttons.mnemonic_confirm())
|
||||
return debug.read_layout(wait=True)
|
||||
|
||||
|
||||
def confirm_recovery(debug: "DebugLink") -> None:
|
||||
debug.click(debug.screen_buttons.ok())
|
||||
debug.read_layout(wait=True)
|
||||
|
||||
|
||||
def select_number_of_words(
|
||||
@ -26,7 +21,6 @@ def select_number_of_words(
|
||||
) -> None:
|
||||
if "SelectWordCount" not in debug.read_layout().all_components():
|
||||
debug.click(debug.screen_buttons.ok())
|
||||
debug.read_layout(wait=True)
|
||||
if tag_version is None or tag_version > (2, 8, 8):
|
||||
# layout changed after adding the cancel button
|
||||
coords = debug.screen_buttons.word_count_all_word(num_of_words)
|
||||
@ -38,7 +32,6 @@ def select_number_of_words(
|
||||
) # raises if num of words is invalid
|
||||
coords = debug.screen_buttons.grid34(index % 3, index // 3)
|
||||
debug.click(coords)
|
||||
debug.read_layout(wait=True)
|
||||
|
||||
|
||||
def enter_share(debug: "DebugLink", share: str) -> "LayoutContent":
|
||||
@ -46,4 +39,4 @@ def enter_share(debug: "DebugLink", share: str) -> "LayoutContent":
|
||||
for word in share.split(" "):
|
||||
_enter_word(debug, word, is_slip39=True)
|
||||
|
||||
return debug.read_layout(wait=True)
|
||||
return debug.read_layout()
|
||||
|
Loading…
Reference in New Issue
Block a user