import storage import storage.device import storage.recovery import storage.sd_salt from storage import cache from trezor import config, sdcard, utils, wire, workflow from trezor.messages import Capability, MessageType from trezor.messages.Features import Features from trezor.messages.PreauthorizedRequest import PreauthorizedRequest from trezor.messages.Success import Success from apps.common import mnemonic, safety_checks from apps.common.request_pin import verify_user_pin if False: import protobuf from typing import Iterable, NoReturn, Optional, Protocol from trezor.messages.Initialize import Initialize from trezor.messages.EndSession import EndSession from trezor.messages.GetFeatures import GetFeatures from trezor.messages.Cancel import Cancel from trezor.messages.LockDevice import LockDevice from trezor.messages.Ping import Ping from trezor.messages.DoPreauthorized import DoPreauthorized from trezor.messages.CancelAuthorization import CancelAuthorization if False: class Authorization(Protocol): def expected_wire_types(self) -> Iterable[int]: ... def __del__(self) -> None: ... def get_features() -> Features: f = Features() f.vendor = "trezor.io" f.language = "en-US" f.major_version = utils.VERSION_MAJOR f.minor_version = utils.VERSION_MINOR f.patch_version = utils.VERSION_PATCH f.revision = utils.GITREV.encode() f.model = utils.MODEL f.device_id = storage.device.get_device_id() f.label = storage.device.get_label() f.pin_protection = config.has_pin() f.unlocked = config.is_unlocked() f.passphrase_protection = storage.device.is_passphrase_enabled() if utils.BITCOIN_ONLY: f.capabilities = [ Capability.Bitcoin, Capability.Crypto, Capability.Shamir, Capability.ShamirGroups, Capability.PassphraseEntry, ] else: f.capabilities = [ Capability.Bitcoin, Capability.Bitcoin_like, Capability.Binance, Capability.Cardano, Capability.Crypto, Capability.EOS, Capability.Ethereum, Capability.Lisk, Capability.Monero, Capability.NEM, Capability.Ripple, Capability.Stellar, Capability.Tezos, Capability.U2F, Capability.Shamir, Capability.ShamirGroups, Capability.PassphraseEntry, ] f.sd_card_present = sdcard.is_present() f.initialized = storage.device.is_initialized() # private fields: if config.is_unlocked(): f.needs_backup = storage.device.needs_backup() f.unfinished_backup = storage.device.unfinished_backup() f.no_backup = storage.device.no_backup() f.flags = storage.device.get_flags() f.recovery_mode = storage.recovery.is_in_progress() f.backup_type = mnemonic.get_type() f.sd_protection = storage.sd_salt.is_enabled() f.wipe_code_protection = config.has_wipe_code() f.passphrase_always_on_device = storage.device.get_passphrase_always_on_device() f.safety_checks = safety_checks.read_setting() f.auto_lock_delay_ms = storage.device.get_autolock_delay_ms() f.display_rotation = storage.device.get_rotation() return f async def handle_Initialize(ctx: wire.Context, msg: Initialize) -> Features: features = get_features() if msg.session_id: msg.session_id = bytes(msg.session_id) features.session_id = cache.start_session(msg.session_id) return features async def handle_GetFeatures(ctx: wire.Context, msg: GetFeatures) -> Features: return get_features() async def handle_Cancel(ctx: wire.Context, msg: Cancel) -> NoReturn: raise wire.ActionCancelled async def handle_LockDevice(ctx: wire.Context, msg: LockDevice) -> Success: lock_device() return Success() async def handle_EndSession(ctx: wire.Context, msg: EndSession) -> Success: cache.end_current_session() return Success() async def handle_Ping(ctx: wire.Context, msg: Ping) -> Success: if msg.button_protection: from apps.common.confirm import require_confirm from trezor.messages.ButtonRequestType import ProtectCall from trezor.ui.text import Text await require_confirm(ctx, Text("Confirm"), ProtectCall) return Success(message=msg.message) async def handle_DoPreauthorized( ctx: wire.Context, msg: DoPreauthorized ) -> protobuf.MessageType: authorization = storage.cache.get( storage.cache.APP_BASE_AUTHORIZATION ) # type: Authorization if not authorization: raise wire.ProcessError("No preauthorized operation") req = await ctx.call_any( PreauthorizedRequest(), *authorization.expected_wire_types() ) handler = wire.find_registered_workflow_handler(ctx.iface, req.MESSAGE_WIRE_TYPE) if handler is None: return wire.unexpected_message() return await handler(ctx, req, authorization) # type: ignore def set_authorization(authorization: Authorization) -> None: previous = storage.cache.get( storage.cache.APP_BASE_AUTHORIZATION ) # type: Authorization if previous: previous.__del__() storage.cache.set(storage.cache.APP_BASE_AUTHORIZATION, authorization) async def handle_CancelAuthorization( ctx: wire.Context, msg: CancelAuthorization ) -> protobuf.MessageType: authorization = storage.cache.get( storage.cache.APP_BASE_AUTHORIZATION ) # type: Authorization if not authorization: raise wire.ProcessError("No preauthorized operation") authorization.__del__() storage.cache.delete(storage.cache.APP_BASE_AUTHORIZATION) return Success(message="Authorization cancelled") ALLOW_WHILE_LOCKED = ( MessageType.Initialize, MessageType.EndSession, MessageType.GetFeatures, MessageType.Cancel, MessageType.LockDevice, MessageType.DoPreauthorized, MessageType.WipeDevice, ) def set_homescreen() -> None: if not config.is_unlocked(): from apps.homescreen.lockscreen import lockscreen workflow.set_default(lockscreen) elif storage.recovery.is_in_progress(): from apps.management.recovery_device.homescreen import recovery_homescreen workflow.set_default(recovery_homescreen) else: from apps.homescreen.homescreen import homescreen workflow.set_default(homescreen) def lock_device() -> None: if config.has_pin(): config.lock() wire.find_handler = get_pinlocked_handler set_homescreen() workflow.close_others() async def unlock_device(ctx: wire.GenericContext = wire.DUMMY_CONTEXT) -> None: """Ensure the device is in unlocked state. If the storage is locked, attempt to unlock it. Reset the homescreen and the wire handler. """ if not config.is_unlocked(): # verify_user_pin will raise if the PIN was invalid await verify_user_pin(ctx) set_homescreen() wire.find_handler = wire.find_registered_workflow_handler def get_pinlocked_handler( iface: wire.WireInterface, msg_type: int ) -> Optional[wire.Handler[wire.Msg]]: orig_handler = wire.find_registered_workflow_handler(iface, msg_type) if orig_handler is None: return None if __debug__: import usb if iface is usb.iface_debug: return orig_handler if msg_type in ALLOW_WHILE_LOCKED: return orig_handler async def wrapper(ctx: wire.Context, msg: wire.Msg) -> protobuf.MessageType: # mypy limitation: orig_handler is not recognized as non-None assert orig_handler is not None await unlock_device(ctx) return await orig_handler(ctx, msg) return wrapper def boot() -> None: wire.register(MessageType.Initialize, handle_Initialize) wire.register(MessageType.GetFeatures, handle_GetFeatures) wire.register(MessageType.Cancel, handle_Cancel) wire.register(MessageType.LockDevice, handle_LockDevice) wire.register(MessageType.EndSession, handle_EndSession) wire.register(MessageType.Ping, handle_Ping) wire.register(MessageType.DoPreauthorized, handle_DoPreauthorized) wire.register(MessageType.CancelAuthorization, handle_CancelAuthorization) workflow.idle_timer.set(storage.device.get_autolock_delay_ms(), lock_device)