mirror of
https://github.com/trezor/trezor-firmware.git
synced 2025-01-22 05:10:56 +00:00
wip
This commit is contained in:
parent
08c0c94f42
commit
7415c962a6
@ -208,17 +208,20 @@ if __debug__:
|
|||||||
msg: DebugLinkDecision,
|
msg: DebugLinkDecision,
|
||||||
) -> DebugLinkState | None:
|
) -> DebugLinkState | None:
|
||||||
from trezor import ui, workflow
|
from trezor import ui, workflow
|
||||||
|
log.debug(__name__, "decision 1")
|
||||||
workflow.idle_timer.touch()
|
workflow.idle_timer.touch()
|
||||||
|
|
||||||
x = msg.x # local_cache_attribute
|
x = msg.x # local_cache_attribute
|
||||||
y = msg.y # local_cache_attribute
|
y = msg.y # local_cache_attribute
|
||||||
|
log.debug(__name__, "decision 2")
|
||||||
await wait_until_layout_is_running()
|
await wait_until_layout_is_running()
|
||||||
assert isinstance(ui.CURRENT_LAYOUT, ui.Layout)
|
assert isinstance(ui.CURRENT_LAYOUT, ui.Layout)
|
||||||
layout_change_box.clear()
|
layout_change_box.clear()
|
||||||
|
log.debug(__name__, "decision 3")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
log.debug(__name__, "decision try")
|
||||||
|
|
||||||
# click on specific coordinates, with possible hold
|
# click on specific coordinates, with possible hold
|
||||||
if x is not None and y is not None:
|
if x is not None and y is not None:
|
||||||
await _layout_click(x, y, msg.hold_ms or 0)
|
await _layout_click(x, y, msg.hold_ms or 0)
|
||||||
@ -230,7 +233,13 @@ if __debug__:
|
|||||||
elif msg.button is not None:
|
elif msg.button is not None:
|
||||||
await _layout_event(msg.button)
|
await _layout_event(msg.button)
|
||||||
elif msg.input is not None:
|
elif msg.input is not None:
|
||||||
|
log.debug(__name__, "decision input")
|
||||||
|
try:
|
||||||
ui.CURRENT_LAYOUT._emit_message(msg.input)
|
ui.CURRENT_LAYOUT._emit_message(msg.input)
|
||||||
|
except Exception as e:
|
||||||
|
print(type(e))
|
||||||
|
log.debug(__name__, "decision input end")
|
||||||
|
|
||||||
else:
|
else:
|
||||||
raise RuntimeError("Invalid DebugLinkDecision message")
|
raise RuntimeError("Invalid DebugLinkDecision message")
|
||||||
|
|
||||||
@ -244,6 +253,7 @@ if __debug__:
|
|||||||
|
|
||||||
# If no exception was raised, the layout did not shut down. That means that it
|
# If no exception was raised, the layout did not shut down. That means that it
|
||||||
# just updated itself. The update is already live for the caller to retrieve.
|
# just updated itself. The update is already live for the caller to retrieve.
|
||||||
|
log.debug(__name__, "decision end")
|
||||||
|
|
||||||
def _state(
|
def _state(
|
||||||
thp_pairing_code_entry_code: int | None = None,
|
thp_pairing_code_entry_code: int | None = None,
|
||||||
@ -431,22 +441,31 @@ if __debug__:
|
|||||||
ctx.iface.iface_num(),
|
ctx.iface.iface_num(),
|
||||||
msg_type,
|
msg_type,
|
||||||
)
|
)
|
||||||
|
log.debug(__name__, "here 1")
|
||||||
if msg.type not in WORKFLOW_HANDLERS:
|
if msg.type not in WORKFLOW_HANDLERS:
|
||||||
|
log.debug(__name__, "here a")
|
||||||
|
|
||||||
await ctx.write(message_handler.unexpected_message())
|
await ctx.write(message_handler.unexpected_message())
|
||||||
continue
|
continue
|
||||||
|
|
||||||
elif req_type is None:
|
elif req_type is None:
|
||||||
|
log.debug(__name__, "here b")
|
||||||
|
|
||||||
# Message type is in workflow handlers but not in protobuf
|
# Message type is in workflow handlers but not in protobuf
|
||||||
# definitions. This indicates a deprecated message.
|
# definitions. This indicates a deprecated message.
|
||||||
# We put a no-op handler for those messages.
|
# We put a no-op handler for those messages.
|
||||||
# XXX return a Failure here?
|
# XXX return a Failure here?
|
||||||
await ctx.write(Success())
|
await ctx.write(Success())
|
||||||
continue
|
continue
|
||||||
|
log.debug(__name__, "here 3")
|
||||||
|
|
||||||
req_msg = message_handler.wrap_protobuf_load(msg.data, req_type)
|
req_msg = message_handler.wrap_protobuf_load(msg.data, req_type)
|
||||||
try:
|
try:
|
||||||
|
log.debug(__name__, "here 4")
|
||||||
|
|
||||||
res_msg = await WORKFLOW_HANDLERS[msg.type](req_msg)
|
res_msg = await WORKFLOW_HANDLERS[msg.type](req_msg)
|
||||||
|
log.debug(__name__, "here 5")
|
||||||
|
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
# Log and ignore, never die.
|
# Log and ignore, never die.
|
||||||
log.exception(__name__, exc)
|
log.exception(__name__, exc)
|
||||||
|
@ -59,7 +59,7 @@ CENTER_BUTTON = buttons.grid35(1, 2)
|
|||||||
def set_autolock_delay(device_handler: "BackgroundDeviceHandler", delay_ms: int):
|
def set_autolock_delay(device_handler: "BackgroundDeviceHandler", delay_ms: int):
|
||||||
debug = device_handler.debuglink()
|
debug = device_handler.debuglink()
|
||||||
|
|
||||||
device_handler.run(device.apply_settings, auto_lock_delay_ms=delay_ms) # type: ignore
|
device_handler.run_with_session(device.apply_settings, auto_lock_delay_ms=delay_ms) # type: ignore
|
||||||
|
|
||||||
assert "PinKeyboard" in debug.read_layout().all_components()
|
assert "PinKeyboard" in debug.read_layout().all_components()
|
||||||
|
|
||||||
|
@ -19,7 +19,7 @@ from typing import TYPE_CHECKING
|
|||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from trezorlib import models
|
from trezorlib import messages, models
|
||||||
from trezorlib.debuglink import LayoutType
|
from trezorlib.debuglink import LayoutType
|
||||||
|
|
||||||
from .. import buttons, common
|
from .. import buttons, common
|
||||||
@ -34,6 +34,9 @@ PIN4 = "1234"
|
|||||||
@pytest.mark.setup_client(pin=PIN4)
|
@pytest.mark.setup_client(pin=PIN4)
|
||||||
def test_hold_to_lock(device_handler: "BackgroundDeviceHandler"):
|
def test_hold_to_lock(device_handler: "BackgroundDeviceHandler"):
|
||||||
debug = device_handler.debuglink()
|
debug = device_handler.debuglink()
|
||||||
|
session = device_handler.client.get_management_session()
|
||||||
|
session.call(messages.LockDevice())
|
||||||
|
session.refresh_features()
|
||||||
|
|
||||||
short_duration = {
|
short_duration = {
|
||||||
models.T1B1: 500,
|
models.T1B1: 500,
|
||||||
@ -59,9 +62,10 @@ def test_hold_to_lock(device_handler: "BackgroundDeviceHandler"):
|
|||||||
assert device_handler.features().unlocked is False
|
assert device_handler.features().unlocked is False
|
||||||
|
|
||||||
# unlock with message
|
# unlock with message
|
||||||
device_handler.run(common.get_test_address)
|
device_handler.run_with_session(common.get_test_address)
|
||||||
|
|
||||||
assert "PinKeyboard" in debug.read_layout().all_components()
|
assert "PinKeyboard" in debug.read_layout().all_components()
|
||||||
|
time.sleep(10)
|
||||||
debug.input("1234")
|
debug.input("1234")
|
||||||
assert device_handler.result()
|
assert device_handler.result()
|
||||||
|
|
||||||
|
@ -33,6 +33,9 @@ class NullUI:
|
|||||||
else:
|
else:
|
||||||
raise NotImplementedError("NullUI should not be used with T1")
|
raise NotImplementedError("NullUI should not be used with T1")
|
||||||
|
|
||||||
|
def debug_callback_button(self, session: Any, msg: Any) -> Any:
|
||||||
|
raise RuntimeError("unexpected call to a fake debuglink")
|
||||||
|
|
||||||
|
|
||||||
class BackgroundDeviceHandler:
|
class BackgroundDeviceHandler:
|
||||||
_pool = ThreadPoolExecutor()
|
_pool = ThreadPoolExecutor()
|
||||||
@ -60,13 +63,28 @@ class BackgroundDeviceHandler:
|
|||||||
with self.debuglink().wait_for_layout_change():
|
with self.debuglink().wait_for_layout_change():
|
||||||
self.task = self._pool.submit(function, self.client, *args, **kwargs)
|
self.task = self._pool.submit(function, self.client, *args, **kwargs)
|
||||||
|
|
||||||
|
def run_with_session(
|
||||||
|
self, function: Callable[..., Any], *args: Any, **kwargs: Any
|
||||||
|
) -> None:
|
||||||
|
"""Runs some function that interacts with a device.
|
||||||
|
|
||||||
|
Makes sure the UI is updated before returning.
|
||||||
|
"""
|
||||||
|
if self.task is not None:
|
||||||
|
raise RuntimeError("Wait for previous task first")
|
||||||
|
|
||||||
|
# wait for the first UI change triggered by the task running in the background
|
||||||
|
with self.debuglink().wait_for_layout_change():
|
||||||
|
session = self.client.get_session()
|
||||||
|
self.task = self._pool.submit(function, session, *args, **kwargs)
|
||||||
|
|
||||||
def kill_task(self) -> None:
|
def kill_task(self) -> None:
|
||||||
if self.task is not None:
|
if self.task is not None:
|
||||||
# Force close the client, which should raise an exception in a client
|
# Force close the client, which should raise an exception in a client
|
||||||
# waiting on IO. Does not work over Bridge, because bridge doesn't have
|
# waiting on IO. Does not work over Bridge, because bridge doesn't have
|
||||||
# a close() method.
|
# a close() method.
|
||||||
while self.client.session_counter > 0:
|
# while self.client.session_counter > 0:
|
||||||
self.client.close()
|
# self.client.close()
|
||||||
try:
|
try:
|
||||||
self.task.result(timeout=1)
|
self.task.result(timeout=1)
|
||||||
except Exception:
|
except Exception:
|
||||||
@ -90,7 +108,6 @@ class BackgroundDeviceHandler:
|
|||||||
def features(self) -> "Features":
|
def features(self) -> "Features":
|
||||||
if self.task is not None:
|
if self.task is not None:
|
||||||
raise RuntimeError("Cannot query features while task is running")
|
raise RuntimeError("Cannot query features while task is running")
|
||||||
self.client.init_device()
|
|
||||||
return self.client.features
|
return self.client.features
|
||||||
|
|
||||||
def debuglink(self) -> "DebugLink":
|
def debuglink(self) -> "DebugLink":
|
||||||
|
@ -21,6 +21,7 @@ import pytest
|
|||||||
from shamir_mnemonic import shamir
|
from shamir_mnemonic import shamir
|
||||||
|
|
||||||
from trezorlib import btc, debuglink, device, exceptions, fido, models
|
from trezorlib import btc, debuglink, device, exceptions, fido, models
|
||||||
|
from trezorlib.debuglink import SessionDebugWrapper as Session
|
||||||
from trezorlib.messages import (
|
from trezorlib.messages import (
|
||||||
ApplySettings,
|
ApplySettings,
|
||||||
BackupAvailability,
|
BackupAvailability,
|
||||||
@ -57,15 +58,19 @@ STRENGTH = 128
|
|||||||
@for_all()
|
@for_all()
|
||||||
def test_upgrade_load(gen: str, tag: str) -> None:
|
def test_upgrade_load(gen: str, tag: str) -> None:
|
||||||
def asserts(client: "Client"):
|
def asserts(client: "Client"):
|
||||||
|
client.refresh_features()
|
||||||
assert not client.features.pin_protection
|
assert not client.features.pin_protection
|
||||||
assert not client.features.passphrase_protection
|
assert not client.features.passphrase_protection
|
||||||
assert client.features.initialized
|
assert client.features.initialized
|
||||||
assert client.features.label == LABEL
|
assert client.features.label == LABEL
|
||||||
assert btc.get_address(client, "Bitcoin", PATH) == ADDRESS
|
assert (
|
||||||
|
btc.get_address(client.get_session(passphrase=""), "Bitcoin", PATH)
|
||||||
|
== ADDRESS
|
||||||
|
)
|
||||||
|
|
||||||
with EmulatorWrapper(gen, tag) as emu:
|
with EmulatorWrapper(gen, tag) as emu:
|
||||||
debuglink.load_device_by_mnemonic(
|
debuglink.load_device_by_mnemonic(
|
||||||
emu.client,
|
emu.client.get_management_session(),
|
||||||
mnemonic=MNEMONIC,
|
mnemonic=MNEMONIC,
|
||||||
pin="",
|
pin="",
|
||||||
passphrase_protection=False,
|
passphrase_protection=False,
|
||||||
@ -164,11 +169,14 @@ def test_upgrade_wipe_code(gen: str, tag: str):
|
|||||||
assert client.features.initialized
|
assert client.features.initialized
|
||||||
assert client.features.label == LABEL
|
assert client.features.label == LABEL
|
||||||
client.use_pin_sequence([PIN])
|
client.use_pin_sequence([PIN])
|
||||||
assert btc.get_address(client, "Bitcoin", PATH) == ADDRESS
|
assert (
|
||||||
|
btc.get_address(client.get_session(passphrase=""), "Bitcoin", PATH)
|
||||||
|
== ADDRESS
|
||||||
|
)
|
||||||
|
|
||||||
with EmulatorWrapper(gen, tag) as emu:
|
with EmulatorWrapper(gen, tag) as emu:
|
||||||
debuglink.load_device_by_mnemonic(
|
debuglink.load_device_by_mnemonic(
|
||||||
emu.client,
|
emu.client.get_management_session(),
|
||||||
mnemonic=MNEMONIC,
|
mnemonic=MNEMONIC,
|
||||||
pin=PIN,
|
pin=PIN,
|
||||||
passphrase_protection=False,
|
passphrase_protection=False,
|
||||||
@ -177,7 +185,10 @@ def test_upgrade_wipe_code(gen: str, tag: str):
|
|||||||
|
|
||||||
# Set wipe code.
|
# Set wipe code.
|
||||||
emu.client.use_pin_sequence([PIN, WIPE_CODE, WIPE_CODE])
|
emu.client.use_pin_sequence([PIN, WIPE_CODE, WIPE_CODE])
|
||||||
device.change_wipe_code(emu.client)
|
session = Session(emu.client.get_management_session())
|
||||||
|
session.refresh_features()
|
||||||
|
raise Exception("client", str(emu.client.pin_callback))
|
||||||
|
device.change_wipe_code(session)
|
||||||
|
|
||||||
device_id = emu.client.features.device_id
|
device_id = emu.client.features.device_id
|
||||||
asserts(emu.client)
|
asserts(emu.client)
|
||||||
|
@ -21,6 +21,7 @@ import pytest
|
|||||||
from trezorlib import btc, device, mapping, messages, models, protobuf
|
from trezorlib import btc, device, mapping, messages, models, protobuf
|
||||||
from trezorlib._internal.emulator import Emulator
|
from trezorlib._internal.emulator import Emulator
|
||||||
from trezorlib.tools import parse_path
|
from trezorlib.tools import parse_path
|
||||||
|
from trezorlib.debuglink import SessionDebugWrapper as Session
|
||||||
|
|
||||||
from ..emulators import EmulatorWrapper
|
from ..emulators import EmulatorWrapper
|
||||||
from . import for_all
|
from . import for_all
|
||||||
@ -47,11 +48,12 @@ def emulator(gen: str, tag: str) -> Iterator[Emulator]:
|
|||||||
with EmulatorWrapper(gen, tag) as emu:
|
with EmulatorWrapper(gen, tag) as emu:
|
||||||
# set up a passphrase-protected device
|
# set up a passphrase-protected device
|
||||||
device.reset(
|
device.reset(
|
||||||
emu.client,
|
emu.client.get_management_session(),
|
||||||
pin_protection=False,
|
pin_protection=False,
|
||||||
skip_backup=True,
|
skip_backup=True,
|
||||||
)
|
)
|
||||||
resp = emu.client.call(
|
emu.client = emu.client.get_new_client()
|
||||||
|
resp = emu.client.get_management_session().call(
|
||||||
ApplySettingsCompat(use_passphrase=True, passphrase_source=SOURCE_HOST)
|
ApplySettingsCompat(use_passphrase=True, passphrase_source=SOURCE_HOST)
|
||||||
)
|
)
|
||||||
assert isinstance(resp, messages.Success)
|
assert isinstance(resp, messages.Success)
|
||||||
@ -87,11 +89,11 @@ def test_passphrase_works(emulator: Emulator):
|
|||||||
messages.ButtonRequest,
|
messages.ButtonRequest,
|
||||||
messages.Address,
|
messages.Address,
|
||||||
]
|
]
|
||||||
|
emu_session = emulator.client.get_session(passphrase="")
|
||||||
with emulator.client:
|
with emulator.client, Session(emu_session) as session:
|
||||||
emulator.client.use_passphrase("TREZOR")
|
emulator.client.use_passphrase("TREZOR")
|
||||||
emulator.client.set_expected_responses(expected_responses)
|
session.set_expected_responses(expected_responses)
|
||||||
btc.get_address(emulator.client, "Testnet", parse_path("44h/1h/0h/0/0"))
|
btc.get_address(session, "Testnet", parse_path("44h/1h/0h/0/0"))
|
||||||
|
|
||||||
|
|
||||||
@for_all(
|
@for_all(
|
||||||
@ -131,13 +133,14 @@ def test_init_device(emulator: Emulator):
|
|||||||
messages.Address,
|
messages.Address,
|
||||||
]
|
]
|
||||||
|
|
||||||
with emulator.client:
|
emu_session = emulator.client.get_session(passphrase="")
|
||||||
|
with emulator.client, Session(emu_session) as session:
|
||||||
emulator.client.use_passphrase("TREZOR")
|
emulator.client.use_passphrase("TREZOR")
|
||||||
emulator.client.set_expected_responses(expected_responses)
|
session.set_expected_responses(expected_responses)
|
||||||
|
|
||||||
btc.get_address(emulator.client, "Testnet", parse_path("44h/1h/0h/0/0"))
|
btc.get_address(session, "Testnet", parse_path("44h/1h/0h/0/0"))
|
||||||
# in TT < 2.3.0 session_id will only be available after PassphraseStateRequest
|
# in TT < 2.3.0 session_id will only be available after PassphraseStateRequest
|
||||||
session_id = emulator.client.session_id
|
session_id = emulator.client.features.session_id
|
||||||
emulator.client.init_device()
|
# emulator.client.init_device()
|
||||||
btc.get_address(emulator.client, "Testnet", parse_path("44h/1h/0h/0/0"))
|
btc.get_address(emulator.client, "Testnet", parse_path("44h/1h/0h/0/0"))
|
||||||
assert session_id == emulator.client.session_id
|
assert session_id == emulator.client.features.session_id
|
||||||
|
Loading…
Reference in New Issue
Block a user