1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2024-12-01 20:08:26 +00:00
This commit is contained in:
M1nd3r 2024-11-22 09:25:49 +01:00
parent 7bf2f9f2da
commit 051b11f692
6 changed files with 81 additions and 27 deletions

View File

@ -208,17 +208,20 @@ if __debug__:
msg: DebugLinkDecision, msg: DebugLinkDecision,
) -> DebugLinkState | None: ) -> DebugLinkState | None:
from trezor import ui, workflow from trezor import ui, workflow
log.debug(__name__, "decision 1")
workflow.idle_timer.touch() workflow.idle_timer.touch()
x = msg.x # local_cache_attribute x = msg.x # local_cache_attribute
y = msg.y # local_cache_attribute y = msg.y # local_cache_attribute
log.debug(__name__, "decision 2")
await wait_until_layout_is_running() await wait_until_layout_is_running()
assert isinstance(ui.CURRENT_LAYOUT, ui.Layout) assert isinstance(ui.CURRENT_LAYOUT, ui.Layout)
layout_change_box.clear() layout_change_box.clear()
log.debug(__name__, "decision 3")
try: try:
log.debug(__name__, "decision try")
# click on specific coordinates, with possible hold # click on specific coordinates, with possible hold
if x is not None and y is not None: if x is not None and y is not None:
await _layout_click(x, y, msg.hold_ms or 0) await _layout_click(x, y, msg.hold_ms or 0)
@ -230,7 +233,13 @@ if __debug__:
elif msg.button is not None: elif msg.button is not None:
await _layout_event(msg.button) await _layout_event(msg.button)
elif msg.input is not None: elif msg.input is not None:
log.debug(__name__, "decision input")
try:
ui.CURRENT_LAYOUT._emit_message(msg.input) ui.CURRENT_LAYOUT._emit_message(msg.input)
except Exception as e:
print(type(e))
log.debug(__name__, "decision input end")
else: else:
raise RuntimeError("Invalid DebugLinkDecision message") raise RuntimeError("Invalid DebugLinkDecision message")
@ -244,6 +253,7 @@ if __debug__:
# If no exception was raised, the layout did not shut down. That means that it # If no exception was raised, the layout did not shut down. That means that it
# just updated itself. The update is already live for the caller to retrieve. # just updated itself. The update is already live for the caller to retrieve.
log.debug(__name__, "decision end")
def _state( def _state(
thp_pairing_code_entry_code: int | None = None, thp_pairing_code_entry_code: int | None = None,
@ -431,22 +441,31 @@ if __debug__:
ctx.iface.iface_num(), ctx.iface.iface_num(),
msg_type, msg_type,
) )
log.debug(__name__, "here 1")
if msg.type not in WORKFLOW_HANDLERS: if msg.type not in WORKFLOW_HANDLERS:
log.debug(__name__, "here a")
await ctx.write(message_handler.unexpected_message()) await ctx.write(message_handler.unexpected_message())
continue continue
elif req_type is None: elif req_type is None:
log.debug(__name__, "here b")
# Message type is in workflow handlers but not in protobuf # Message type is in workflow handlers but not in protobuf
# definitions. This indicates a deprecated message. # definitions. This indicates a deprecated message.
# We put a no-op handler for those messages. # We put a no-op handler for those messages.
# XXX return a Failure here? # XXX return a Failure here?
await ctx.write(Success()) await ctx.write(Success())
continue continue
log.debug(__name__, "here 3")
req_msg = message_handler.wrap_protobuf_load(msg.data, req_type) req_msg = message_handler.wrap_protobuf_load(msg.data, req_type)
try: try:
log.debug(__name__, "here 4")
res_msg = await WORKFLOW_HANDLERS[msg.type](req_msg) res_msg = await WORKFLOW_HANDLERS[msg.type](req_msg)
log.debug(__name__, "here 5")
except Exception as exc: except Exception as exc:
# Log and ignore, never die. # Log and ignore, never die.
log.exception(__name__, exc) log.exception(__name__, exc)

View File

@ -59,7 +59,7 @@ CENTER_BUTTON = buttons.grid35(1, 2)
def set_autolock_delay(device_handler: "BackgroundDeviceHandler", delay_ms: int): def set_autolock_delay(device_handler: "BackgroundDeviceHandler", delay_ms: int):
debug = device_handler.debuglink() debug = device_handler.debuglink()
device_handler.run(device.apply_settings, auto_lock_delay_ms=delay_ms) # type: ignore device_handler.run_with_session(device.apply_settings, auto_lock_delay_ms=delay_ms) # type: ignore
assert "PinKeyboard" in debug.read_layout().all_components() assert "PinKeyboard" in debug.read_layout().all_components()

View File

@ -19,7 +19,7 @@ from typing import TYPE_CHECKING
import pytest import pytest
from trezorlib import models from trezorlib import messages, models
from trezorlib.debuglink import LayoutType from trezorlib.debuglink import LayoutType
from .. import buttons, common from .. import buttons, common
@ -34,6 +34,9 @@ PIN4 = "1234"
@pytest.mark.setup_client(pin=PIN4) @pytest.mark.setup_client(pin=PIN4)
def test_hold_to_lock(device_handler: "BackgroundDeviceHandler"): def test_hold_to_lock(device_handler: "BackgroundDeviceHandler"):
debug = device_handler.debuglink() debug = device_handler.debuglink()
session = device_handler.client.get_management_session()
session.call(messages.LockDevice())
session.refresh_features()
short_duration = { short_duration = {
models.T1B1: 500, models.T1B1: 500,
@ -59,9 +62,10 @@ def test_hold_to_lock(device_handler: "BackgroundDeviceHandler"):
assert device_handler.features().unlocked is False assert device_handler.features().unlocked is False
# unlock with message # unlock with message
device_handler.run(common.get_test_address) device_handler.run_with_session(common.get_test_address)
assert "PinKeyboard" in debug.read_layout().all_components() assert "PinKeyboard" in debug.read_layout().all_components()
time.sleep(10)
debug.input("1234") debug.input("1234")
assert device_handler.result() assert device_handler.result()

View File

@ -33,6 +33,9 @@ class NullUI:
else: else:
raise NotImplementedError("NullUI should not be used with T1") raise NotImplementedError("NullUI should not be used with T1")
def debug_callback_button(self, session: Any, msg: Any) -> Any:
raise RuntimeError("unexpected call to a fake debuglink")
class BackgroundDeviceHandler: class BackgroundDeviceHandler:
_pool = ThreadPoolExecutor() _pool = ThreadPoolExecutor()
@ -60,13 +63,28 @@ class BackgroundDeviceHandler:
with self.debuglink().wait_for_layout_change(): with self.debuglink().wait_for_layout_change():
self.task = self._pool.submit(function, self.client, *args, **kwargs) self.task = self._pool.submit(function, self.client, *args, **kwargs)
def run_with_session(
self, function: Callable[..., Any], *args: Any, **kwargs: Any
) -> None:
"""Runs some function that interacts with a device.
Makes sure the UI is updated before returning.
"""
if self.task is not None:
raise RuntimeError("Wait for previous task first")
# wait for the first UI change triggered by the task running in the background
with self.debuglink().wait_for_layout_change():
session = self.client.get_session()
self.task = self._pool.submit(function, session, *args, **kwargs)
def kill_task(self) -> None: def kill_task(self) -> None:
if self.task is not None: if self.task is not None:
# Force close the client, which should raise an exception in a client # Force close the client, which should raise an exception in a client
# waiting on IO. Does not work over Bridge, because bridge doesn't have # waiting on IO. Does not work over Bridge, because bridge doesn't have
# a close() method. # a close() method.
while self.client.session_counter > 0: # while self.client.session_counter > 0:
self.client.close() # self.client.close()
try: try:
self.task.result(timeout=1) self.task.result(timeout=1)
except Exception: except Exception:
@ -90,7 +108,6 @@ class BackgroundDeviceHandler:
def features(self) -> "Features": def features(self) -> "Features":
if self.task is not None: if self.task is not None:
raise RuntimeError("Cannot query features while task is running") raise RuntimeError("Cannot query features while task is running")
self.client.init_device()
return self.client.features return self.client.features
def debuglink(self) -> "DebugLink": def debuglink(self) -> "DebugLink":

View File

@ -21,6 +21,7 @@ import pytest
from shamir_mnemonic import shamir from shamir_mnemonic import shamir
from trezorlib import btc, debuglink, device, exceptions, fido, models from trezorlib import btc, debuglink, device, exceptions, fido, models
from trezorlib.debuglink import SessionDebugWrapper as Session
from trezorlib.messages import ( from trezorlib.messages import (
ApplySettings, ApplySettings,
BackupAvailability, BackupAvailability,
@ -57,15 +58,19 @@ STRENGTH = 128
@for_all() @for_all()
def test_upgrade_load(gen: str, tag: str) -> None: def test_upgrade_load(gen: str, tag: str) -> None:
def asserts(client: "Client"): def asserts(client: "Client"):
client.refresh_features()
assert not client.features.pin_protection assert not client.features.pin_protection
assert not client.features.passphrase_protection assert not client.features.passphrase_protection
assert client.features.initialized assert client.features.initialized
assert client.features.label == LABEL assert client.features.label == LABEL
assert btc.get_address(client, "Bitcoin", PATH) == ADDRESS assert (
btc.get_address(client.get_session(passphrase=""), "Bitcoin", PATH)
== ADDRESS
)
with EmulatorWrapper(gen, tag) as emu: with EmulatorWrapper(gen, tag) as emu:
debuglink.load_device_by_mnemonic( debuglink.load_device_by_mnemonic(
emu.client, emu.client.get_management_session(),
mnemonic=MNEMONIC, mnemonic=MNEMONIC,
pin="", pin="",
passphrase_protection=False, passphrase_protection=False,
@ -164,11 +169,14 @@ def test_upgrade_wipe_code(gen: str, tag: str):
assert client.features.initialized assert client.features.initialized
assert client.features.label == LABEL assert client.features.label == LABEL
client.use_pin_sequence([PIN]) client.use_pin_sequence([PIN])
assert btc.get_address(client, "Bitcoin", PATH) == ADDRESS assert (
btc.get_address(client.get_session(passphrase=""), "Bitcoin", PATH)
== ADDRESS
)
with EmulatorWrapper(gen, tag) as emu: with EmulatorWrapper(gen, tag) as emu:
debuglink.load_device_by_mnemonic( debuglink.load_device_by_mnemonic(
emu.client, emu.client.get_management_session(),
mnemonic=MNEMONIC, mnemonic=MNEMONIC,
pin=PIN, pin=PIN,
passphrase_protection=False, passphrase_protection=False,
@ -177,7 +185,10 @@ def test_upgrade_wipe_code(gen: str, tag: str):
# Set wipe code. # Set wipe code.
emu.client.use_pin_sequence([PIN, WIPE_CODE, WIPE_CODE]) emu.client.use_pin_sequence([PIN, WIPE_CODE, WIPE_CODE])
device.change_wipe_code(emu.client) session = Session(emu.client.get_management_session())
session.refresh_features()
raise Exception("client", str(emu.client.pin_callback))
device.change_wipe_code(session)
device_id = emu.client.features.device_id device_id = emu.client.features.device_id
asserts(emu.client) asserts(emu.client)

View File

@ -21,6 +21,7 @@ import pytest
from trezorlib import btc, device, mapping, messages, models, protobuf from trezorlib import btc, device, mapping, messages, models, protobuf
from trezorlib._internal.emulator import Emulator from trezorlib._internal.emulator import Emulator
from trezorlib.tools import parse_path from trezorlib.tools import parse_path
from trezorlib.debuglink import SessionDebugWrapper as Session
from ..emulators import EmulatorWrapper from ..emulators import EmulatorWrapper
from . import for_all from . import for_all
@ -47,11 +48,12 @@ def emulator(gen: str, tag: str) -> Iterator[Emulator]:
with EmulatorWrapper(gen, tag) as emu: with EmulatorWrapper(gen, tag) as emu:
# set up a passphrase-protected device # set up a passphrase-protected device
device.reset( device.reset(
emu.client, emu.client.get_management_session(),
pin_protection=False, pin_protection=False,
skip_backup=True, skip_backup=True,
) )
resp = emu.client.call( emu.client = emu.client.get_new_client()
resp = emu.client.get_management_session().call(
ApplySettingsCompat(use_passphrase=True, passphrase_source=SOURCE_HOST) ApplySettingsCompat(use_passphrase=True, passphrase_source=SOURCE_HOST)
) )
assert isinstance(resp, messages.Success) assert isinstance(resp, messages.Success)
@ -87,11 +89,11 @@ def test_passphrase_works(emulator: Emulator):
messages.ButtonRequest, messages.ButtonRequest,
messages.Address, messages.Address,
] ]
emu_session = emulator.client.get_session(passphrase="")
with emulator.client: with emulator.client, Session(emu_session) as session:
emulator.client.use_passphrase("TREZOR") emulator.client.use_passphrase("TREZOR")
emulator.client.set_expected_responses(expected_responses) session.set_expected_responses(expected_responses)
btc.get_address(emulator.client, "Testnet", parse_path("44h/1h/0h/0/0")) btc.get_address(session, "Testnet", parse_path("44h/1h/0h/0/0"))
@for_all( @for_all(
@ -131,13 +133,14 @@ def test_init_device(emulator: Emulator):
messages.Address, messages.Address,
] ]
with emulator.client: emu_session = emulator.client.get_session(passphrase="")
with emulator.client, Session(emu_session) as session:
emulator.client.use_passphrase("TREZOR") emulator.client.use_passphrase("TREZOR")
emulator.client.set_expected_responses(expected_responses) session.set_expected_responses(expected_responses)
btc.get_address(emulator.client, "Testnet", parse_path("44h/1h/0h/0/0")) btc.get_address(session, "Testnet", parse_path("44h/1h/0h/0/0"))
# in TT < 2.3.0 session_id will only be available after PassphraseStateRequest # in TT < 2.3.0 session_id will only be available after PassphraseStateRequest
session_id = emulator.client.session_id session_id = emulator.client.features.session_id
emulator.client.init_device() # emulator.client.init_device()
btc.get_address(emulator.client, "Testnet", parse_path("44h/1h/0h/0/0")) btc.get_address(emulator.client, "Testnet", parse_path("44h/1h/0h/0/0"))
assert session_id == emulator.client.session_id assert session_id == emulator.client.features.session_id