You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
trezor-firmware/core/src/apps/base.py

278 lines
8.6 KiB

import storage.cache
import storage.device
from trezor import config, utils, wire, workflow
from trezor.messages import MessageType
from trezor.messages.Success import Success
from . import workflow_handlers
if False:
import protobuf
from typing import NoReturn
from trezor.messages.Features import Features
from trezor.messages.Initialize import Initialize
from trezor.messages.EndSession import EndSession
from trezor.messages.GetFeatures import GetFeatures
from trezor.messages.Cancel import Cancel
from trezor.messages.LockDevice import LockDevice
from trezor.messages.Ping import Ping
from trezor.messages.DoPreauthorized import DoPreauthorized
from trezor.messages.CancelAuthorization import CancelAuthorization
def get_features() -> Features:
import storage.recovery
import storage.sd_salt
from trezor import sdcard
from trezor.messages import Capability
from trezor.messages.Features import Features
from apps.common import mnemonic, safety_checks
f = Features(
vendor="trezor.io",
language="en-US",
major_version=utils.VERSION_MAJOR,
minor_version=utils.VERSION_MINOR,
patch_version=utils.VERSION_PATCH,
revision=utils.SCM_REVISION,
model=utils.MODEL,
device_id=storage.device.get_device_id(),
label=storage.device.get_label(),
pin_protection=config.has_pin(),
unlocked=config.is_unlocked(),
passphrase_protection=storage.device.is_passphrase_enabled(),
)
if utils.BITCOIN_ONLY:
f.capabilities = [
Capability.Bitcoin,
Capability.Crypto,
Capability.Shamir,
Capability.ShamirGroups,
Capability.PassphraseEntry,
]
else:
f.capabilities = [
Capability.Bitcoin,
Capability.Bitcoin_like,
Capability.Binance,
Capability.Cardano,
Capability.Crypto,
Capability.EOS,
Capability.Ethereum,
Capability.Lisk,
Capability.Monero,
Capability.NEM,
Capability.Ripple,
Capability.Stellar,
Capability.Tezos,
Capability.U2F,
Capability.Shamir,
Capability.ShamirGroups,
Capability.PassphraseEntry,
]
f.sd_card_present = sdcard.is_present()
f.initialized = storage.device.is_initialized()
# private fields:
if config.is_unlocked():
f.needs_backup = storage.device.needs_backup()
f.unfinished_backup = storage.device.unfinished_backup()
f.no_backup = storage.device.no_backup()
f.flags = storage.device.get_flags()
f.recovery_mode = storage.recovery.is_in_progress()
f.backup_type = mnemonic.get_type()
f.sd_protection = storage.sd_salt.is_enabled()
f.wipe_code_protection = config.has_wipe_code()
f.passphrase_always_on_device = storage.device.get_passphrase_always_on_device()
f.safety_checks = safety_checks.read_setting()
f.auto_lock_delay_ms = storage.device.get_autolock_delay_ms()
f.display_rotation = storage.device.get_rotation()
f.experimental_features = storage.device.get_experimental_features()
return f
async def handle_Initialize(ctx: wire.Context, msg: Initialize) -> Features:
features = get_features()
if msg.session_id:
msg.session_id = bytes(msg.session_id)
features.session_id = storage.cache.start_session(msg.session_id)
return features
async def handle_GetFeatures(ctx: wire.Context, msg: GetFeatures) -> Features:
return get_features()
async def handle_Cancel(ctx: wire.Context, msg: Cancel) -> NoReturn:
raise wire.ActionCancelled
async def handle_LockDevice(ctx: wire.Context, msg: LockDevice) -> Success:
lock_device()
return Success()
async def handle_EndSession(ctx: wire.Context, msg: EndSession) -> Success:
storage.cache.end_current_session()
return Success()
async def handle_Ping(ctx: wire.Context, msg: Ping) -> Success:
if msg.button_protection:
from trezor.ui.layouts import confirm_action
from trezor.messages.ButtonRequestType import ProtectCall
await confirm_action(ctx, "ping", "Confirm", "ping", br_code=ProtectCall)
return Success(message=msg.message)
async def handle_DoPreauthorized(
ctx: wire.Context, msg: DoPreauthorized
) -> protobuf.MessageType:
from trezor.messages.PreauthorizedRequest import PreauthorizedRequest
from apps.common import authorization
if not authorization.is_set():
raise wire.ProcessError("No preauthorized operation")
wire_types = authorization.get_wire_types()
utils.ensure(bool(wire_types), "Unsupported preauthorization found")
req = await ctx.call_any(PreauthorizedRequest(), *wire_types)
handler = workflow_handlers.find_registered_handler(
ctx.iface, req.MESSAGE_WIRE_TYPE
)
if handler is None:
return wire.unexpected_message()
return await handler(ctx, req, authorization.get()) # type: ignore
async def handle_CancelAuthorization(
ctx: wire.Context, msg: CancelAuthorization
) -> protobuf.MessageType:
from apps.common import authorization
authorization.clear()
return Success(message="Authorization cancelled")
ALLOW_WHILE_LOCKED = (
MessageType.Initialize,
MessageType.EndSession,
MessageType.GetFeatures,
MessageType.Cancel,
MessageType.LockDevice,
MessageType.DoPreauthorized,
MessageType.WipeDevice,
)
def set_homescreen() -> None:
import storage.recovery
if not config.is_unlocked():
from apps.homescreen.lockscreen import lockscreen
workflow.set_default(lockscreen)
elif storage.recovery.is_in_progress():
from apps.management.recovery_device.homescreen import recovery_homescreen
workflow.set_default(recovery_homescreen)
else:
from apps.homescreen.homescreen import homescreen
workflow.set_default(homescreen)
def lock_device() -> None:
if config.has_pin():
config.lock()
wire.find_handler = get_pinlocked_handler
set_homescreen()
workflow.close_others()
def lock_device_if_unlocked() -> None:
if config.is_unlocked():
lock_device()
async def unlock_device(ctx: wire.GenericContext = wire.DUMMY_CONTEXT) -> None:
"""Ensure the device is in unlocked state.
If the storage is locked, attempt to unlock it. Reset the homescreen and the wire
handler.
"""
from apps.common.request_pin import verify_user_pin
if not config.is_unlocked():
# verify_user_pin will raise if the PIN was invalid
await verify_user_pin(ctx)
set_homescreen()
wire.find_handler = workflow_handlers.find_registered_handler
def get_pinlocked_handler(
iface: wire.WireInterface, msg_type: int
) -> wire.Handler[wire.Msg] | None:
orig_handler = workflow_handlers.find_registered_handler(iface, msg_type)
if orig_handler is None:
return None
if __debug__:
import usb
if iface is usb.iface_debug:
return orig_handler
if msg_type in ALLOW_WHILE_LOCKED:
return orig_handler
async def wrapper(ctx: wire.Context, msg: wire.Msg) -> protobuf.MessageType:
# mypy limitation: orig_handler is not recognized as non-None
assert orig_handler is not None
await unlock_device(ctx)
return await orig_handler(ctx, msg)
return wrapper
# this function is also called when handling ApplySettings
def reload_settings_from_storage() -> None:
from trezor import ui
workflow.idle_timer.set(
storage.device.get_autolock_delay_ms(), lock_device_if_unlocked
)
wire.experimental_enabled = storage.device.get_experimental_features()
ui.display.orientation(storage.device.get_rotation())
def boot() -> None:
workflow_handlers.register(MessageType.Initialize, handle_Initialize)
workflow_handlers.register(MessageType.GetFeatures, handle_GetFeatures)
workflow_handlers.register(MessageType.Cancel, handle_Cancel)
workflow_handlers.register(MessageType.LockDevice, handle_LockDevice)
workflow_handlers.register(MessageType.EndSession, handle_EndSession)
workflow_handlers.register(MessageType.Ping, handle_Ping)
workflow_handlers.register(MessageType.DoPreauthorized, handle_DoPreauthorized)
workflow_handlers.register(
MessageType.CancelAuthorization, handle_CancelAuthorization
)
reload_settings_from_storage()
if config.is_unlocked():
wire.find_handler = workflow_handlers.find_registered_handler
else:
wire.find_handler = get_pinlocked_handler