feat(all): increase stability of debug-decision events + parsing JSON from Rust

pull/3011/head
grdddj 1 year ago committed by Martin Milata
parent c6ad596339
commit bd6783b1a3

@ -5,6 +5,7 @@ if not __debug__:
if __debug__:
from storage import debug as storage
from storage.debug import debug_events
import trezorui2
@ -30,17 +31,17 @@ if __debug__:
DebugLinkReseedRandom,
DebugLinkState,
DebugLinkWatchLayout,
DebugLinkResetDebugEvents,
)
reset_current_words = loop.chan()
reset_word_index = loop.chan()
confirm_chan = loop.chan()
swipe_chan = loop.chan()
input_chan = loop.chan()
confirm_signal = confirm_chan.take
result_chan = loop.chan()
button_chan = loop.chan()
click_chan = loop.chan()
swipe_signal = swipe_chan.take
input_signal = input_chan.take
result_signal = result_chan.take
button_signal = button_chan.take
click_signal = click_chan.take
debuglink_decision_chan = loop.chan()
@ -64,65 +65,80 @@ if __debug__:
return True
return False
def notify_layout_change(layout: Layout) -> None:
storage.current_content[:] = layout.read_content()
def notify_layout_change(layout: Layout, event_id: int | None = None) -> None:
layout.read_content_into(storage.current_content_tokens)
if storage.watch_layout_changes or layout_change_chan.takers:
layout_change_chan.publish(storage.current_content)
async def _dispatch_debuglink_decision(msg: DebugLinkDecision) -> None:
from trezor.enums import DebugButton, DebugSwipeDirection
from trezor.ui import (
Result,
SWIPE_UP,
SWIPE_DOWN,
SWIPE_LEFT,
SWIPE_RIGHT,
)
payload = (event_id, storage.current_content_tokens)
layout_change_chan.publish(payload)
button = msg.button # local_cache_attribute
swipe = msg.swipe # local_cache_attribute
if button is not None:
if button == DebugButton.NO:
await confirm_chan.put(Result(trezorui2.CANCELLED))
elif button == DebugButton.YES:
await confirm_chan.put(Result(trezorui2.CONFIRMED))
elif button == DebugButton.INFO:
await confirm_chan.put(Result(trezorui2.INFO))
if swipe is not None:
if swipe == DebugSwipeDirection.UP:
await swipe_chan.put(SWIPE_UP)
elif swipe == DebugSwipeDirection.DOWN:
await swipe_chan.put(SWIPE_DOWN)
elif swipe == DebugSwipeDirection.LEFT:
await swipe_chan.put(SWIPE_LEFT)
elif swipe == DebugSwipeDirection.RIGHT:
await swipe_chan.put(SWIPE_RIGHT)
if msg.input is not None:
await input_chan.put(Result(msg.input))
async def _dispatch_debuglink_decision(
event_id: int | None, msg: DebugLinkDecision
) -> None:
from trezor.enums import DebugButton
if msg.button is not None:
if msg.button == DebugButton.NO:
await result_chan.put((event_id, trezorui2.CANCELLED))
elif msg.button == DebugButton.YES:
await result_chan.put((event_id, trezorui2.CONFIRMED))
elif msg.button == DebugButton.INFO:
await result_chan.put((event_id, trezorui2.INFO))
else:
raise RuntimeError(f"Invalid msg.button - {msg.button}")
elif msg.input is not None:
await result_chan.put((event_id, msg.input))
elif msg.swipe is not None:
await swipe_chan.put((event_id, msg.swipe))
else:
# Sanity check. The message will be visible in terminal.
raise RuntimeError("Invalid DebugLinkDecision message")
async def debuglink_decision_dispatcher() -> None:
while True:
msg = await debuglink_decision_chan.take()
await _dispatch_debuglink_decision(msg)
event_id, msg = await debuglink_decision_chan.take()
await _dispatch_debuglink_decision(event_id, msg)
async def get_layout_change_content() -> list[str]:
awaited_event_id = debug_events.awaited_event
last_result_id = debug_events.last_result
if awaited_event_id is not None and awaited_event_id == last_result_id:
# We are awaiting the event that just happened - return current state
return storage.current_content_tokens
while True:
event_id, content = await layout_change_chan.take()
if awaited_event_id is None or event_id is None:
# Not waiting for anything or event does not have ID
break
elif event_id == awaited_event_id:
# We found what we were waiting for
debug_events.awaited_event = None
break
elif event_id > awaited_event_id:
# Sanity check
raise RuntimeError(
f"Waiting for event that already happened - {event_id} > {awaited_event_id}"
)
if awaited_event_id is not None:
# Updating last result
debug_events.last_result = awaited_event_id
return content
async def return_layout_change() -> None:
content = await layout_change_chan.take()
content_tokens = await get_layout_change_content()
assert DEBUG_CONTEXT is not None
if storage.layout_watcher is LAYOUT_WATCHER_LAYOUT:
await DEBUG_CONTEXT.write(DebugLinkLayout(lines=content))
await DEBUG_CONTEXT.write(DebugLinkLayout(tokens=content_tokens))
else:
from trezor.messages import DebugLinkState
await DEBUG_CONTEXT.write(DebugLinkState(layout_lines=content))
await DEBUG_CONTEXT.write(DebugLinkState(tokens=content_tokens))
storage.layout_watcher = LAYOUT_WATCHER_NONE
async def touch_hold(x: int, y: int, duration_ms: int) -> None:
from trezor import io
await loop.sleep(duration_ms)
loop.synthetic_events.append((io.TOUCH, (io.TOUCH_END, x, y)))
async def dispatch_DebugLinkWatchLayout(
ctx: wire.Context, msg: DebugLinkWatchLayout
) -> Success:
@ -135,30 +151,40 @@ if __debug__:
log.debug(__name__, "Watch layout changes: %s", storage.watch_layout_changes)
return Success()
async def dispatch_DebugLinkResetDebugEvents(
ctx: wire.Context, msg: DebugLinkResetDebugEvents
) -> Success:
# Resetting the debug events makes sure that the previous
# events/layouts are not mixed with the new ones.
storage.reset_debug_events()
return Success()
async def dispatch_DebugLinkDecision(
ctx: wire.Context, msg: DebugLinkDecision
) -> None:
from trezor import io, workflow
from trezor import workflow
workflow.idle_timer.touch()
if debuglink_decision_chan.putters:
log.warning(__name__, "DebugLinkDecision queue is not empty")
x = msg.x # local_cache_attribute
y = msg.y # local_cache_attribute
# Incrementing the counter for last events so we know what to await
debug_events.last_event += 1
# TT click on specific coordinates, with possible hold
if x is not None and y is not None:
evt_down = io.TOUCH_START, x, y
evt_up = io.TOUCH_END, x, y
loop.synthetic_events.append((io.TOUCH, evt_down))
if msg.hold_ms is not None:
loop.schedule(touch_hold(x, y, msg.hold_ms))
else:
loop.synthetic_events.append((io.TOUCH, evt_up))
click_chan.publish((debug_events.last_event, x, y, msg.hold_ms))
else:
debuglink_decision_chan.publish(msg)
# Will get picked up by _dispatch_debuglink_decision eventually
debuglink_decision_chan.publish((debug_events.last_event, msg))
if msg.wait:
# We wait for all the previously sent events
debug_events.awaited_event = debug_events.last_event
storage.layout_watcher = LAYOUT_WATCHER_LAYOUT
loop.schedule(return_layout_change())
@ -178,15 +204,13 @@ if __debug__:
if not storage.watch_layout_changes:
raise wire.ProcessError("Layout is not watched")
storage.layout_watcher = LAYOUT_WATCHER_STATE
# We wait for the last previously sent event to finish
debug_events.awaited_event = debug_events.last_event
loop.schedule(return_layout_change())
return None
else:
m.layout_lines = storage.current_content
m.tokens = storage.current_content_tokens
if msg.wait_word_pos:
m.reset_word_pos = await reset_word_index.take()
if msg.wait_word_list:
m.reset_word = " ".join(await reset_current_words.take())
return m
async def dispatch_DebugLinkRecordScreen(
@ -248,6 +272,9 @@ if __debug__:
register(MessageType.DebugLinkRecordScreen, dispatch_DebugLinkRecordScreen)
register(MessageType.DebugLinkEraseSdCard, dispatch_DebugLinkEraseSdCard)
register(MessageType.DebugLinkWatchLayout, dispatch_DebugLinkWatchLayout)
register(
MessageType.DebugLinkResetDebugEvents, dispatch_DebugLinkResetDebugEvents
)
loop.schedule(debuglink_decision_dispatcher())
if storage.layout_watcher is not LAYOUT_WATCHER_NONE:

@ -16,9 +16,6 @@ if TYPE_CHECKING:
from typing import Sequence
from trezor.wire import GenericContext
if __debug__:
from apps import debug
_NUM_OF_CHOICES = const(3)
@ -57,10 +54,6 @@ async def _confirm_word(
checked_index = share_words.index(checked_word) + offset
# shuffle again so the confirmed word is not always the first choice
random.shuffle(choices)
if __debug__:
debug.reset_word_index.publish(checked_index)
# let the user pick a word
selected_word: str = await select_word(
ctx, choices, share_index, checked_index, count, group_index

@ -7,10 +7,28 @@ if __debug__:
save_screen = False
save_screen_directory = "."
current_content: list[str] = [""] * 20
current_content.clear()
current_content_tokens: list[str] = [""] * 60
current_content_tokens.clear()
watch_layout_changes = False
layout_watcher = 0
reset_internal_entropy: bytes = b""
class DebugEvents:
def __init__(self):
self.reset()
def reset(self) -> None:
self.last_event = 0
self.last_result: int | None = None
self.awaited_event: int | None = None
debug_events = DebugEvents()
def reset_debug_events() -> None:
debug_events.reset()
# Event resulted in the layout change, call
# notify_layout_change with this ID in first_paint of next layout.
new_layout_event_id: int | None = None

@ -41,10 +41,6 @@ _finalizers: dict[int, Finalizer] = {}
# reference to the task that is currently executing
this_task: Task | None = None
if __debug__:
# synthetic event queue
synthetic_events: list[tuple[int, Any]] = []
class TaskClosed(Exception):
pass
@ -118,20 +114,6 @@ def run() -> None:
task_entry = [0, 0, 0] # deadline, task, value
msg_entry = [0, 0] # iface | flags, value
while _queue or _paused:
if __debug__:
# process synthetic events
if synthetic_events:
iface, event = synthetic_events[0]
msg_tasks = _paused.pop(iface, ())
if msg_tasks:
synthetic_events.pop(0)
for task in msg_tasks:
_step(task, event)
# XXX: we assume that synthetic events are rare. If there is a lot of them,
# this degrades to "while synthetic_events" and would ignore all real ones.
continue
# compute the maximum amount of time we can wait for a message
if _queue:
delay = utime.ticks_diff(_queue.peektime(), utime.ticks_ms())

@ -17,14 +17,6 @@ MONO: int = Display.FONT_MONO
WIDTH: int = Display.WIDTH
HEIGHT: int = Display.HEIGHT
if __debug__:
# common symbols to transfer swipes between debuglink and the UI
SWIPE_UP = const(0x01)
SWIPE_DOWN = const(0x02)
SWIPE_LEFT = const(0x04)
SWIPE_RIGHT = const(0x08)
# channel used to cancel layouts, see `Cancelled` exception
layout_chan = loop.chan()
@ -172,8 +164,9 @@ class Component:
if __debug__:
def read_content(self) -> list[str]:
return [self.__class__.__name__]
def read_content_into(self, content_store: list[str]) -> None:
content_store.clear()
content_store.append(self.__class__.__name__)
class Result(Exception):

@ -32,14 +32,10 @@ if __debug__:
class RustLayout(ui.Layout):
# pylint: disable=super-init-not-called
def __init__(self, layout: Any, is_backup: bool = False):
def __init__(self, layout: Any):
self.layout = layout
self.timer = loop.Timer()
self.layout.attach_timer_fn(self.set_timer)
self.is_backup = is_backup
if __debug__ and self.is_backup:
self.notify_backup()
def set_timer(self, token: int, deadline: int) -> None:
self.timer.schedule(deadline, token)
@ -60,44 +56,49 @@ class RustLayout(ui.Layout):
if __debug__:
def create_tasks(self) -> tuple[loop.AwaitableTask, ...]:
from apps.debug import confirm_signal, input_signal
return (
self.handle_timers(),
self.handle_input_and_rendering(),
self.handle_swipe(),
confirm_signal(),
input_signal(),
self.handle_click_signal(),
self.handle_result_signal(),
)
def read_content(self) -> list[str]:
result: list[str] = []
async def handle_result_signal(self) -> None:
"""Enables sending arbitrary input - ui.Result.
Waits for `result_signal` and carries it out.
"""
from apps.debug import result_signal
from storage import debug as debug_storage
while True:
event_id, result = await result_signal()
debug_storage.new_layout_event_id = event_id
raise ui.Result(result)
def read_content_into(self, content_store: list[str]) -> None:
"""Reads all the strings/tokens received from Rust into given list."""
def callback(*args: Any) -> None:
for arg in args:
result.append(str(arg))
content_store.append(str(arg))
content_store.clear()
self.layout.trace(callback)
result = " ".join(result).split("\n")
return result
async def handle_swipe(self):
from apps.debug import notify_layout_change, swipe_signal
from trezor.ui import (
SWIPE_UP,
SWIPE_DOWN,
SWIPE_LEFT,
SWIPE_RIGHT,
)
from trezor.enums import DebugSwipeDirection
while True:
direction = await swipe_signal()
event_id, direction = await swipe_signal()
orig_x = orig_y = 120
off_x, off_y = {
SWIPE_UP: (0, -30),
SWIPE_DOWN: (0, 30),
SWIPE_LEFT: (-30, 0),
SWIPE_RIGHT: (30, 0),
DebugSwipeDirection.UP: (0, -30),
DebugSwipeDirection.DOWN: (0, 30),
DebugSwipeDirection.LEFT: (-30, 0),
DebugSwipeDirection.RIGHT: (30, 0),
}[direction]
for event, x, y in (
@ -110,26 +111,46 @@ class RustLayout(ui.Layout):
if msg is not None:
raise ui.Result(msg)
if self.is_backup:
self.notify_backup()
notify_layout_change(self)
def notify_backup(self):
from apps.debug import reset_current_words
content = "\n".join(self.read_content())
start = "< Paragraphs "
end = ">"
start_pos = content.index(start)
end_pos = content.index(end, start_pos)
words: list[str] = []
for line in content[start_pos + len(start) : end_pos].split("\n"):
line = line.strip()
if not line:
continue
space_pos = line.index(" ")
words.append(line[space_pos + 1 :])
reset_current_words.publish(words)
notify_layout_change(self, event_id)
async def _click(
self,
event_id: int | None,
x: int,
y: int,
hold_ms: int | None,
) -> Any:
from trezor import workflow
from apps.debug import notify_layout_change
from storage import debug as debug_storage
self.layout.touch_event(io.TOUCH_START, x, y)
self._paint()
if hold_ms is not None:
await loop.sleep(hold_ms)
msg = self.layout.touch_event(io.TOUCH_END, x, y)
if msg is not None:
debug_storage.new_layout_event_id = event_id
raise ui.Result(msg)
# So that these presses will keep trezor awake
# (it will not be locked after auto_lock_delay_ms)
workflow.idle_timer.touch()
self._paint()
notify_layout_change(self, event_id)
async def handle_click_signal(self) -> None:
"""Enables clicking somewhere on the screen.
Waits for `click_signal` and carries it out.
"""
from apps.debug import click_signal
while True:
event_id, x, y, hold_ms = await click_signal()
await self._click(event_id, x, y, hold_ms)
else:
@ -144,12 +165,21 @@ class RustLayout(ui.Layout):
if __debug__ and self.should_notify_layout_change:
from apps.debug import notify_layout_change
from storage import debug as debug_storage
# notify about change and do not notify again until next await.
# (handle_rendering might be called multiple times in a single await,
# because of the endless loop in __iter__)
self.should_notify_layout_change = False
notify_layout_change(self)
# Possibly there is an event ID that caused the layout change,
# so notifying with this ID.
event_id = None
if debug_storage.new_layout_event_id is not None:
event_id = debug_storage.new_layout_event_id
debug_storage.new_layout_event_id = None
notify_layout_change(self, event_id)
# Turn the brightness on again.
ui.backlight_fade(self.BACKLIGHT_LEVEL)

@ -26,15 +26,11 @@ if __debug__:
)
async def handle_debug_confirm(self) -> None:
from apps.debug import confirm_signal
try:
await confirm_signal()
except Result as r:
if r.value is not trezorui2.CONFIRMED:
raise
else:
return
from apps.debug import result_signal
_event_id, result = await result_signal()
if result is not trezorui2.CONFIRMED:
raise Result(result)
for event, x, y in (
(io.TOUCH_START, 220, 220),

@ -31,7 +31,11 @@ class HomescreenBase(RustLayout):
# In __debug__ mode, ignore {confirm,swipe,input}_signal.
def create_tasks(self) -> tuple[loop.AwaitableTask, ...]:
return self.handle_timers(), self.handle_input_and_rendering()
return (
self.handle_timers(),
self.handle_input_and_rendering(),
self.handle_click_signal(), # so we can receive debug events
)
class Homescreen(HomescreenBase):

@ -73,7 +73,6 @@ async def show_share_words(
title=title,
pages=pages,
),
is_backup=True,
),
"backup_words",
ButtonRequestType.ResetDevice,

@ -14,6 +14,7 @@
# You should have received a copy of the License along with this library.
# If not, see <https://www.gnu.org/licenses/lgpl-3.0.html>.
import json
import logging
import re
import textwrap
@ -57,133 +58,217 @@ if TYPE_CHECKING:
protobuf.MessageType, Type[protobuf.MessageType], "MessageFilter"
]
AnyDict = Dict[str, Any]
EXPECTED_RESPONSES_CONTEXT_LINES = 3
LOG = logging.getLogger(__name__)
class LayoutContent:
class UnstructuredJSONReader:
"""Contains data-parsing helpers for JSON data that have unknown structure."""
def __init__(self, json_str: str) -> None:
self.json_str = json_str
# We may not receive valid JSON, e.g. from an old model in upgrade tests
try:
self.dict: AnyDict = json.loads(json_str)
except json.JSONDecodeError:
self.dict = {}
def top_level_value(self, key: str) -> Any:
return self.dict[key]
def find_objects_with_key_and_value(self, key: str, value: Any) -> List["AnyDict"]:
def recursively_find(data: Any) -> Iterator[Any]:
if isinstance(data, dict):
if data.get(key) == value:
yield data
for val in data.values():
yield from recursively_find(val)
elif isinstance(data, list):
for item in data:
yield from recursively_find(item)
return list(recursively_find(self.dict))
def find_values_by_key(
self, key: str, only_type: Union[type, None] = None
) -> List[Any]:
def recursively_find(data: Any) -> Iterator[Any]:
if isinstance(data, dict):
if key in data:
yield data[key]
for val in data.values():
yield from recursively_find(val)
elif isinstance(data, list):
for item in data:
yield from recursively_find(item)
values = list(recursively_find(self.dict))
if only_type is not None:
values = [v for v in values if isinstance(v, only_type)]
return values
def find_unique_value_by_key(
self, key: str, default: Any, only_type: Union[type, None] = None
) -> Any:
values = self.find_values_by_key(key, only_type=only_type)
if not values:
return default
assert len(values) == 1
return values[0]
class LayoutContent(UnstructuredJSONReader):
"""Stores content of a layout as returned from Trezor.
Contains helper functions to extract specific parts of the layout.
"""
def __init__(self, lines: Sequence[str]) -> None:
self.lines = list(lines)
self.text = " ".join(self.lines)
def get_title(self) -> str:
"""Get title of the layout.
Title is located between "title" and "content" identifiers.
Example: "< Frame title : RECOVERY SHARE #1 content : < SwipePage"
-> "RECOVERY SHARE #1"
def __init__(self, json_tokens: Sequence[str]) -> None:
json_str = "".join(json_tokens)
super().__init__(json_str)
def main_component(self) -> str:
"""Getting the main component of the layout."""
return self.top_level_value("component")
def all_components(self) -> List[str]:
"""Getting all components of the layout."""
return self.find_values_by_key("component", only_type=str)
def visible_screen(self) -> str:
"""String representation of a current screen content.
Example:
SIGN TRANSACTION
--------------------
You are about to
sign 3 actions.
********************
Icon:cancel [Cancel], --- [None], CONFIRM [Confirm]
"""
match = re.search(r"title : (.*?) content :", self.text)
if not match:
return ""
return match.group(1).strip()
def get_content(self, tag_name: str = "Paragraphs", raw: bool = False) -> str:
"""Get text of the main screen content of the layout."""
content = "".join(self._get_content_lines(tag_name, raw))
if not raw and content.endswith(" "):
# Stripping possible space at the end
content = content[:-1]
return content
def get_button_texts(self) -> List[str]:
"""Get text of all buttons in the layout.
Example button: "< Button text : LADYBUG >"
-> ["LADYBUG"]
title_separator = f"\n{20*'-'}\n"
btn_separator = f"\n{20*'*'}\n"
visible = ""
if self.title():
visible += self.title()
visible += title_separator
visible += self.screen_content()
visible_buttons = self.button_contents()
if visible_buttons:
visible += btn_separator
visible += ", ".join(visible_buttons)
return visible
def title(self) -> str:
"""Getting text that is displayed as a title."""
# There could be possibly subtitle as well
title_parts: List[str] = []
def _get_str_or_dict_text(key: str) -> str:
value = self.find_unique_value_by_key(key, "")
if isinstance(value, dict):
return value["text"]
return value
title = _get_str_or_dict_text("title")
if title:
title_parts.append(title)
subtitle = _get_str_or_dict_text("subtitle")
if subtitle:
title_parts.append(subtitle)
return "\n".join(title_parts)
def text_content(self) -> str:
"""What is on the screen, in one long string, so content can be
asserted regardless of newlines.
"""
return re.findall(r"< Button text : +(.*?) >", self.text)
return self.screen_content().replace("\n", " ")
def get_seed_words(self) -> List[str]:
def screen_content(self) -> str:
"""Getting text that is displayed in the main part of the screen.
Preserving the line breaks.
"""
main_text_blocks: List[str] = []
paragraphs = self.raw_content_paragraphs()
if not paragraphs:
return self.main_component()
for par in paragraphs:
par_content = ""
for line_or_newline in par:
par_content += line_or_newline
par_content.replace("\n", " ")
main_text_blocks.append(par_content)
return "\n".join(main_text_blocks)
def raw_content_paragraphs(self) -> Union[List[List[str]], None]:
"""Getting raw paragraphs as sent from Rust."""
return self.find_unique_value_by_key("paragraphs", default=None, only_type=list)
def button_contents(self) -> List[str]:
"""Getting list of button contents."""
buttons: List[str] = []
button_objects = self.find_objects_with_key_and_value("component", "Button")
for button in button_objects:
if button.get("icon"):
buttons.append("ICON")
elif "text" in button:
buttons.append(button["text"])
return buttons
def seed_words(self) -> List[str]:
"""Get all the seed words on the screen in order.
Example content: "1. ladybug 2. acid 3. academic 4. afraid"
Example content: "1. ladybug\n2. acid\n3. academic\n4. afraid"
-> ["ladybug", "acid", "academic", "afraid"]
"""
return re.findall(r"\d+\. (\w+)\b", self.get_content())
def get_page_count(self) -> int:
words: List[str] = []
for line in self.screen_content().split("\n"):
# Dot after index is optional (present on TT, not on TR)
match = re.match(r"^\d+\.? (\w+)$", line)
if match:
words.append(match.group(1))
return words
def pin(self) -> str:
"""Get PIN from the layout."""
assert self.main_component() == "PinKeyboard"
return self.find_unique_value_by_key("pin", default="", only_type=str)
def passphrase(self) -> str:
"""Get passphrase from the layout."""
assert self.main_component() == "PassphraseKeyboard"
return self.find_unique_value_by_key("passphrase", default="", only_type=str)
def page_count(self) -> int:
"""Get number of pages for the layout."""
return self._get_number("page_count")
return (
self.find_unique_value_by_key(
"scrollbar_page_count", default=0, only_type=int
)
or self.find_unique_value_by_key("page_count", default=0, only_type=int)
or 1
)
def get_active_page(self) -> int:
def active_page(self) -> int:
"""Get current page index of the layout."""
return self._get_number("active_page")
def _get_number(self, key: str) -> int:
"""Get number connected with a specific key."""
match = re.search(rf"{key} : +(\d+)", self.text)
if not match:
return 0
return int(match.group(1))
def _get_content_lines(
self, tag_name: str = "Paragraphs", raw: bool = False
) -> List[str]:
"""Get lines of the main screen content of the layout."""
# First line should have content after the tag, last line does not store content
tag = f"< {tag_name}"
for i in range(len(self.lines)):
if tag in self.lines[i]:
first_line = self.lines[i].split(tag)[1]
all_lines = [first_line] + self.lines[i + 1 : -1]
break
else:
all_lines = self.lines[1:-1]
if raw:
return all_lines
else:
return [_clean_line(line) for line in all_lines]
return self.find_unique_value_by_key("active_page", default=0, only_type=int)
def _clean_line(line: str) -> str:
"""Cleaning the layout line for extra spaces, hyphens and ellipsis.
Line usually comes in the form of " <content> ", with trailing spaces
at both ends. It may end with a hyphen (" - ") or ellipsis (" ... ").
Hyphen means the word was split to the next line, ellipsis signals
the text continuing on the next page.
"""
# Deleting space at the beginning
if line.startswith(" "):
line = line[1:]
# Deleting a hyphen at the end, together with the space
# before it, so it will be tightly connected with the next line
if line.endswith(" - "):
line = line[:-3]
# Deleting the ellipsis at the end (but preserving the space there)
if line.endswith(" ... "):
line = line[:-4]
return line
def tt_pin_digits_order(self) -> str:
"""In what order the PIN buttons are shown on the screen. Only for TT."""
return self.top_level_value("digits_order")
def multipage_content(layouts: List[LayoutContent]) -> str:
"""Get overall content from multiple-page layout."""
final_text = ""
for layout in layouts:
final_text += layout.get_content()
# When the raw content of the page ends with ellipsis,
# we need to add a space to separate it with the next page
if layout.get_content(raw=True).endswith("... "):
final_text += " "
# Stripping possible space at the end of last page
if final_text.endswith(" "):
final_text = final_text[:-1]
return final_text
return "".join(layout.text_content() for layout in layouts)
class DebugLink:
@ -194,6 +279,7 @@ class DebugLink:
# To be set by TrezorClientDebugLink (is not known during creation time)
self.model: Optional[str] = None
self.version: Tuple[int, int, int] = (0, 0, 0)
# Where screenshots are being saved
self.screenshot_recording_dir: Optional[str] = None
@ -207,6 +293,16 @@ class DebugLink:
self.screen_text_file: Optional[Path] = None
self.last_screen_content = ""
@property
def legacy_ui(self) -> bool:
"""Differences between UI1 and UI2."""
return self.version < (2, 6, 0)
@property
def legacy_debug(self) -> bool:
"""Differences in handling debug events and LayoutContent."""
return self.version < (2, 6, 1)
def set_screen_text_file(self, file_path: Optional[Path]) -> None:
if file_path is not None:
file_path.write_bytes(b"")
@ -248,19 +344,33 @@ class DebugLink:
return self._call(messages.DebugLinkGetState())
def read_layout(self) -> LayoutContent:
return LayoutContent(self.state().layout_lines)
return LayoutContent(self.state().tokens or [])
def wait_layout(self, wait_for_external_change: bool = False) -> LayoutContent:
# Next layout change will be caused by external event
# (e.g. device being auto-locked or as a result of device_handler.run(xxx))
# and not by our debug actions/decisions.
# Resetting the debug state so we wait for the next layout change
# (and do not return the current state).
if wait_for_external_change:
self.reset_debug_events()
def wait_layout(self) -> LayoutContent:
obj = self._call(messages.DebugLinkGetState(wait_layout=True))
if isinstance(obj, messages.Failure):
raise TrezorFailure(obj)
return LayoutContent(obj.layout_lines)
return LayoutContent(obj.tokens)
def reset_debug_events(self) -> None:
# Only supported on TT and above certain version
if self.model == "T" and not self.legacy_debug:
return self._call(messages.DebugLinkResetDebugEvents())
return None
def synchronize_at(self, layout_text: str, timeout: float = 5) -> LayoutContent:
now = time.monotonic()
while True:
layout = self.read_layout()
if layout_text in layout.text:
if layout_text in layout.json_str:
return layout
if time.monotonic() - now > timeout:
raise RuntimeError("Timeout waiting for layout")
@ -293,10 +403,6 @@ class DebugLink:
state = self._call(messages.DebugLinkGetState(wait_word_list=True))
return state.reset_word
def read_reset_word_pos(self) -> int:
state = self._call(messages.DebugLinkGetState(wait_word_pos=True))
return state.reset_word_pos
def input(
self,
word: Optional[str] = None,
@ -312,7 +418,9 @@ class DebugLink:
args = sum(a is not None for a in (word, button, swipe, x))
if args != 1:
raise ValueError("Invalid input - must use one of word, button, swipe")
raise ValueError(
"Invalid input - must use one of word, button, swipe, click(x,y)"
)
decision = messages.DebugLinkDecision(
button=button, swipe=swipe, input=word, x=x, y=y, wait=wait, hold_ms=hold_ms
@ -320,7 +428,7 @@ class DebugLink:
ret = self._call(decision, nowait=not wait)
if ret is not None:
return LayoutContent(ret.lines)
return LayoutContent(ret.tokens)
# Getting the current screen after the (nowait) decision
self.save_current_screen_if_relevant(wait=False)
@ -336,22 +444,23 @@ class DebugLink:
layout = self.wait_layout()
else:
layout = self.read_layout()
self.save_debug_screen(layout.lines)
self.save_debug_screen(layout.visible_screen())
def save_debug_screen(self, lines: List[str]) -> None:
def save_debug_screen(self, screen_content: str) -> None:
if self.screen_text_file is None:
return
content = "\n".join(lines)
if not self.screen_text_file.exists():
self.screen_text_file.write_bytes(b"")
# Not writing the same screen twice
if content == self.last_screen_content:
if screen_content == self.last_screen_content:
return
self.last_screen_content = content
self.last_screen_content = screen_content
with open(self.screen_text_file, "a") as f:
f.write(content)
f.write(screen_content)
f.write("\n" + 80 * "/" + "\n")
# Type overloads make sure that when we supply `wait=True` into `click()`,
@ -371,6 +480,14 @@ class DebugLink:
x, y = click
return self.input(x=x, y=y, wait=wait)
# Made into separate function as `hold_ms: Optional[int]` in `click`
# was causing problems with @overload
def click_hold(
self, click: Tuple[int, int], hold_ms: int
) -> Optional[LayoutContent]:
x, y = click
return self.input(x=x, y=y, hold_ms=hold_ms, wait=True)
def press_yes(self, wait: bool = False) -> None:
self.input(button=messages.DebugButton.YES, wait=wait)
@ -538,6 +655,7 @@ class DebugUI:
if br.code == messages.ButtonRequestType.PinEntry:
self.debuglink.input(self.get_pin())
else:
# Paginating (going as further as possible) and pressing Yes
if br.pages is not None:
for _ in range(br.pages - 1):
self.debuglink.swipe_up(wait=True)
@ -688,7 +806,9 @@ class TrezorClientDebugLink(TrezorClient):
super().__init__(transport, ui=self.ui)
# So that we can choose right screenshotting logic (T1 vs TT)
# and know the supported debug capabilities
self.debug.model = self.features.model
self.debug.version = self.version
def reset_debug_features(self) -> None:
"""Prepare the debugging client for a new testcase.

@ -16,22 +16,22 @@ def enter_word(
return debug.click(buttons.CONFIRM_WORD, wait=True)
def confirm_recovery(debug: "DebugLink", legacy_ui: bool = False) -> None:
if not legacy_ui:
def confirm_recovery(debug: "DebugLink") -> None:
if not debug.legacy_ui and not debug.legacy_debug:
layout = debug.wait_layout()
assert layout.title().startswith("WALLET RECOVERY")
debug.click(buttons.OK, wait=True)
def select_number_of_words(
debug: "DebugLink", num_of_words: int = 20, legacy_ui: bool = False
) -> None:
def select_number_of_words(debug: "DebugLink", num_of_words: int = 20) -> None:
# select number of words
if not legacy_ui:
if not debug.legacy_ui and not debug.legacy_debug:
assert "select the number of words" in debug.read_layout().text_content()
layout = debug.click(buttons.OK, wait=True)
if legacy_ui:
if debug.legacy_ui:
assert layout.json_str == "WordSelector"
elif debug.version < (2, 6, 1):
assert "SelectWordCount" in layout.json_str
else:
# Two title options
assert layout.title() in ("SEED CHECK", "WALLET RECOVERY")
@ -45,20 +45,20 @@ def select_number_of_words(
coords = buttons.grid34(index % 3, index // 3)
layout = debug.click(coords, wait=True)
if not legacy_ui:
if not debug.legacy_ui and not debug.legacy_debug:
if num_of_words in (20, 33):
assert "Enter any share" in layout.text_content()
else:
assert "enter your recovery seed" in layout.text_content()
def enter_share(
debug: "DebugLink", share: str, legacy_ui: bool = False
) -> "LayoutContent":
def enter_share(debug: "DebugLink", share: str) -> "LayoutContent":
layout = debug.click(buttons.OK, wait=True)
if legacy_ui:
if debug.legacy_ui:
assert layout.json_str == "Slip39Keyboard"
elif debug.legacy_debug:
assert "MnemonicKeyboard" in layout.json_str
else:
assert layout.main_component() == "MnemonicKeyboard"

@ -209,6 +209,9 @@ def client(
request.session.shouldstop = "Failed to communicate with Trezor"
pytest.fail("Failed to communicate with Trezor")
# Resetting all the debug events to not be influenced by previous test
_raw_client.debug.reset_debug_events()
if test_ui:
# we need to reseed before the wipe
_raw_client.debug.reseed(0)

@ -1,5 +1,5 @@
from concurrent.futures import ThreadPoolExecutor
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any, Callable
from trezorlib.client import PASSPHRASE_ON_DEVICE
from trezorlib.transport import udp
@ -42,10 +42,15 @@ class BackgroundDeviceHandler:
self.client.ui = NullUI # type: ignore [NullUI is OK UI]
self.client.watch_layout(True)
def run(self, function, *args, **kwargs) -> None:
def run(self, function: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
"""Runs some function that interacts with a device.
Makes sure the UI is updated before returning.
"""
if self.task is not None:
raise RuntimeError("Wait for previous task first")
self.task = self._pool.submit(function, self.client, *args, **kwargs)
self.debuglink().wait_layout(wait_for_external_change=True)
def kill_task(self) -> None:
if self.task is not None:

@ -309,15 +309,11 @@ def test_upgrade_shamir_recovery(gen: str, tag: Optional[str]):
device_handler.run(device.recover, pin_protection=False)
# Flow is different for old UI and new UI
legacy_ui = emu.client.version < (2, 5, 4)
recovery.confirm_recovery(debug, legacy_ui=legacy_ui)
recovery.select_number_of_words(debug, legacy_ui=legacy_ui)
layout = recovery.enter_share(
debug, MNEMONIC_SLIP39_BASIC_20_3of6[0], legacy_ui=legacy_ui
)
assert "2 more shares" in layout.text_content()
recovery.confirm_recovery(debug)
recovery.select_number_of_words(debug)
layout = recovery.enter_share(debug, MNEMONIC_SLIP39_BASIC_20_3of6[0])
if not debug.legacy_ui and not debug.legacy_debug:
assert "2 more shares" in layout.text_content()
device_id = emu.client.features.device_id
storage = emu.get_storage()

Loading…
Cancel
Save