import storage.cache import storage.device from trezor import config, utils, wire, workflow from trezor.enums import MessageType from trezor.messages import Success from . import workflow_handlers if False: from trezor import protobuf from typing import NoReturn from trezor.messages import ( Features, Initialize, EndSession, GetFeatures, Cancel, LockDevice, Ping, DoPreauthorized, CancelAuthorization, ) def get_features() -> Features: import storage.recovery import storage.sd_salt from trezor import sdcard from trezor.enums import Capability from trezor.messages import Features from apps.common import mnemonic, safety_checks f = Features( vendor="trezor.io", language="en-US", major_version=utils.VERSION_MAJOR, minor_version=utils.VERSION_MINOR, patch_version=utils.VERSION_PATCH, revision=utils.SCM_REVISION, model=utils.MODEL, device_id=storage.device.get_device_id(), label=storage.device.get_label(), pin_protection=config.has_pin(), unlocked=config.is_unlocked(), ) if utils.BITCOIN_ONLY: f.capabilities = [ Capability.Bitcoin, Capability.Crypto, Capability.Shamir, Capability.ShamirGroups, Capability.PassphraseEntry, ] else: f.capabilities = [ Capability.Bitcoin, Capability.Bitcoin_like, Capability.Binance, Capability.Cardano, Capability.Crypto, Capability.EOS, Capability.Ethereum, Capability.Monero, Capability.NEM, Capability.Ripple, Capability.Stellar, Capability.Tezos, Capability.U2F, Capability.Shamir, Capability.ShamirGroups, Capability.PassphraseEntry, ] f.sd_card_present = sdcard.is_present() f.initialized = storage.device.is_initialized() # private fields: if config.is_unlocked(): # passphrase_protection is private, see #1807 f.passphrase_protection = storage.device.is_passphrase_enabled() f.needs_backup = storage.device.needs_backup() f.unfinished_backup = storage.device.unfinished_backup() f.no_backup = storage.device.no_backup() f.flags = storage.device.get_flags() f.recovery_mode = storage.recovery.is_in_progress() f.backup_type = mnemonic.get_type() f.sd_protection = storage.sd_salt.is_enabled() f.wipe_code_protection = config.has_wipe_code() f.passphrase_always_on_device = storage.device.get_passphrase_always_on_device() f.safety_checks = safety_checks.read_setting() f.auto_lock_delay_ms = storage.device.get_autolock_delay_ms() f.display_rotation = storage.device.get_rotation() f.experimental_features = storage.device.get_experimental_features() return f async def handle_Initialize(ctx: wire.Context, msg: Initialize) -> Features: session_id = storage.cache.start_session(msg.session_id) if not utils.BITCOIN_ONLY: derive_cardano = storage.cache.get(storage.cache.APP_COMMON_DERIVE_CARDANO) have_seed = storage.cache.is_set(storage.cache.APP_COMMON_SEED) if ( have_seed and msg.derive_cardano is not None and msg.derive_cardano != bool(derive_cardano) ): # seed is already derived, and host wants to change derive_cardano setting # => create a new session storage.cache.end_current_session() session_id = storage.cache.start_session() have_seed = False if not have_seed: storage.cache.set( storage.cache.APP_COMMON_DERIVE_CARDANO, b"\x01" if msg.derive_cardano else b"", ) features = get_features() features.session_id = session_id return features async def handle_GetFeatures(ctx: wire.Context, msg: GetFeatures) -> Features: return get_features() async def handle_Cancel(ctx: wire.Context, msg: Cancel) -> NoReturn: raise wire.ActionCancelled async def handle_LockDevice(ctx: wire.Context, msg: LockDevice) -> Success: lock_device() return Success() async def handle_EndSession(ctx: wire.Context, msg: EndSession) -> Success: storage.cache.end_current_session() return Success() async def handle_Ping(ctx: wire.Context, msg: Ping) -> Success: if msg.button_protection: from trezor.ui.layouts import confirm_action from trezor.enums import ButtonRequestType as B await confirm_action(ctx, "ping", "Confirm", "ping", br_code=B.ProtectCall) return Success(message=msg.message) async def handle_DoPreauthorized( ctx: wire.Context, msg: DoPreauthorized ) -> protobuf.MessageType: from trezor.messages import PreauthorizedRequest from apps.common import authorization if not authorization.is_set(): raise wire.ProcessError("No preauthorized operation") wire_types = authorization.get_wire_types() utils.ensure(bool(wire_types), "Unsupported preauthorization found") req = await ctx.call_any(PreauthorizedRequest(), *wire_types) assert req.MESSAGE_WIRE_TYPE is not None handler = workflow_handlers.find_registered_handler( ctx.iface, req.MESSAGE_WIRE_TYPE ) if handler is None: return wire.unexpected_message() return await handler(ctx, req, authorization.get()) # type: ignore async def handle_CancelAuthorization( ctx: wire.Context, msg: CancelAuthorization ) -> protobuf.MessageType: from apps.common import authorization authorization.clear() return Success(message="Authorization cancelled") ALLOW_WHILE_LOCKED = ( MessageType.Initialize, MessageType.EndSession, MessageType.GetFeatures, MessageType.Cancel, MessageType.LockDevice, MessageType.DoPreauthorized, MessageType.WipeDevice, ) def set_homescreen() -> None: import storage.recovery if not config.is_unlocked(): from apps.homescreen.lockscreen import lockscreen workflow.set_default(lockscreen) elif storage.recovery.is_in_progress(): from apps.management.recovery_device.homescreen import recovery_homescreen workflow.set_default(recovery_homescreen) else: from apps.homescreen.homescreen import homescreen workflow.set_default(homescreen) def lock_device() -> None: if config.has_pin(): config.lock() wire.find_handler = get_pinlocked_handler set_homescreen() workflow.close_others() def lock_device_if_unlocked() -> None: if config.is_unlocked(): lock_device() async def unlock_device(ctx: wire.GenericContext = wire.DUMMY_CONTEXT) -> None: """Ensure the device is in unlocked state. If the storage is locked, attempt to unlock it. Reset the homescreen and the wire handler. """ from apps.common.request_pin import verify_user_pin if not config.is_unlocked(): # verify_user_pin will raise if the PIN was invalid await verify_user_pin(ctx) set_homescreen() wire.find_handler = workflow_handlers.find_registered_handler def get_pinlocked_handler( iface: wire.WireInterface, msg_type: int ) -> wire.Handler[wire.Msg] | None: orig_handler = workflow_handlers.find_registered_handler(iface, msg_type) if orig_handler is None: return None if __debug__: import usb if iface is usb.iface_debug: return orig_handler if msg_type in ALLOW_WHILE_LOCKED: return orig_handler async def wrapper(ctx: wire.Context, msg: wire.Msg) -> protobuf.MessageType: # mypy limitation: orig_handler is not recognized as non-None assert orig_handler is not None await unlock_device(ctx) return await orig_handler(ctx, msg) return wrapper # this function is also called when handling ApplySettings def reload_settings_from_storage() -> None: from trezor import ui workflow.idle_timer.set( storage.device.get_autolock_delay_ms(), lock_device_if_unlocked ) wire.experimental_enabled = storage.device.get_experimental_features() ui.display.orientation(storage.device.get_rotation()) def boot() -> None: workflow_handlers.register(MessageType.Initialize, handle_Initialize) workflow_handlers.register(MessageType.GetFeatures, handle_GetFeatures) workflow_handlers.register(MessageType.Cancel, handle_Cancel) workflow_handlers.register(MessageType.LockDevice, handle_LockDevice) workflow_handlers.register(MessageType.EndSession, handle_EndSession) workflow_handlers.register(MessageType.Ping, handle_Ping) workflow_handlers.register(MessageType.DoPreauthorized, handle_DoPreauthorized) workflow_handlers.register( MessageType.CancelAuthorization, handle_CancelAuthorization ) reload_settings_from_storage() if config.is_unlocked(): wire.find_handler = workflow_handlers.find_registered_handler else: wire.find_handler = get_pinlocked_handler