mirror of
https://github.com/trezor/trezor-firmware.git
synced 2024-12-03 04:58:25 +00:00
6b5f578d02
Can be built by `TREZOR_MODEL=R make build_unix`, `make build_unix_frozen` does not work yet. All the dialogs are not very pretty, they are just meant to work.
305 lines
9.2 KiB
Python
305 lines
9.2 KiB
Python
from typing import TYPE_CHECKING
|
|
|
|
import storage.cache
|
|
import storage.device
|
|
from trezor import config, utils, wire, workflow
|
|
from trezor.enums import MessageType
|
|
from trezor.messages import Success
|
|
|
|
from . import workflow_handlers
|
|
|
|
if TYPE_CHECKING:
|
|
from trezor import protobuf
|
|
from trezor.messages import (
|
|
Features,
|
|
Initialize,
|
|
EndSession,
|
|
GetFeatures,
|
|
Cancel,
|
|
LockDevice,
|
|
Ping,
|
|
DoPreauthorized,
|
|
CancelAuthorization,
|
|
)
|
|
|
|
|
|
def get_features() -> Features:
|
|
import storage.recovery
|
|
import storage.sd_salt
|
|
import storage # workaround for https://github.com/microsoft/pyright/issues/2685
|
|
|
|
from trezor import sdcard
|
|
from trezor.enums import Capability
|
|
from trezor.messages import Features
|
|
|
|
from apps.common import mnemonic, safety_checks
|
|
|
|
f = Features(
|
|
vendor="trezor.io",
|
|
fw_vendor=utils.firmware_vendor(),
|
|
language="en-US",
|
|
major_version=utils.VERSION_MAJOR,
|
|
minor_version=utils.VERSION_MINOR,
|
|
patch_version=utils.VERSION_PATCH,
|
|
revision=utils.SCM_REVISION,
|
|
model=utils.MODEL,
|
|
device_id=storage.device.get_device_id(),
|
|
label=storage.device.get_label(),
|
|
pin_protection=config.has_pin(),
|
|
unlocked=config.is_unlocked(),
|
|
)
|
|
|
|
if utils.BITCOIN_ONLY:
|
|
f.capabilities = [
|
|
Capability.Bitcoin,
|
|
Capability.Crypto,
|
|
Capability.Shamir,
|
|
Capability.ShamirGroups,
|
|
]
|
|
else:
|
|
f.capabilities = [
|
|
Capability.Bitcoin,
|
|
Capability.Bitcoin_like,
|
|
Capability.Binance,
|
|
Capability.Cardano,
|
|
Capability.Crypto,
|
|
Capability.EOS,
|
|
Capability.Ethereum,
|
|
Capability.Monero,
|
|
Capability.NEM,
|
|
Capability.Ripple,
|
|
Capability.Stellar,
|
|
Capability.Tezos,
|
|
Capability.U2F,
|
|
Capability.Shamir,
|
|
Capability.ShamirGroups,
|
|
]
|
|
|
|
# Other models are not capable of PassphraseEntry
|
|
if utils.MODEL in ("T",):
|
|
f.capabilities.append(Capability.PassphraseEntry)
|
|
|
|
f.sd_card_present = sdcard.is_present()
|
|
f.initialized = storage.device.is_initialized()
|
|
|
|
# private fields:
|
|
if config.is_unlocked():
|
|
# passphrase_protection is private, see #1807
|
|
f.passphrase_protection = storage.device.is_passphrase_enabled()
|
|
f.needs_backup = storage.device.needs_backup()
|
|
f.unfinished_backup = storage.device.unfinished_backup()
|
|
f.no_backup = storage.device.no_backup()
|
|
f.flags = storage.device.get_flags()
|
|
f.recovery_mode = storage.recovery.is_in_progress()
|
|
f.backup_type = mnemonic.get_type()
|
|
f.sd_protection = storage.sd_salt.is_enabled()
|
|
f.wipe_code_protection = config.has_wipe_code()
|
|
f.passphrase_always_on_device = storage.device.get_passphrase_always_on_device()
|
|
f.safety_checks = safety_checks.read_setting()
|
|
f.auto_lock_delay_ms = storage.device.get_autolock_delay_ms()
|
|
f.display_rotation = storage.device.get_rotation()
|
|
f.experimental_features = storage.device.get_experimental_features()
|
|
|
|
return f
|
|
|
|
|
|
async def handle_Initialize(ctx: wire.Context, msg: Initialize) -> Features:
|
|
session_id = storage.cache.start_session(msg.session_id)
|
|
|
|
if not utils.BITCOIN_ONLY:
|
|
derive_cardano = storage.cache.get(storage.cache.APP_COMMON_DERIVE_CARDANO)
|
|
have_seed = storage.cache.is_set(storage.cache.APP_COMMON_SEED)
|
|
|
|
if (
|
|
have_seed
|
|
and msg.derive_cardano is not None
|
|
and msg.derive_cardano != bool(derive_cardano)
|
|
):
|
|
# seed is already derived, and host wants to change derive_cardano setting
|
|
# => create a new session
|
|
storage.cache.end_current_session()
|
|
session_id = storage.cache.start_session()
|
|
have_seed = False
|
|
|
|
if not have_seed:
|
|
storage.cache.set(
|
|
storage.cache.APP_COMMON_DERIVE_CARDANO,
|
|
b"\x01" if msg.derive_cardano else b"",
|
|
)
|
|
|
|
features = get_features()
|
|
features.session_id = session_id
|
|
return features
|
|
|
|
|
|
async def handle_GetFeatures(ctx: wire.Context, msg: GetFeatures) -> Features:
|
|
return get_features()
|
|
|
|
|
|
async def handle_Cancel(ctx: wire.Context, msg: Cancel) -> Success:
|
|
raise wire.ActionCancelled
|
|
|
|
|
|
async def handle_LockDevice(ctx: wire.Context, msg: LockDevice) -> Success:
|
|
lock_device()
|
|
return Success()
|
|
|
|
|
|
async def handle_EndSession(ctx: wire.Context, msg: EndSession) -> Success:
|
|
storage.cache.end_current_session()
|
|
return Success()
|
|
|
|
|
|
async def handle_Ping(ctx: wire.Context, msg: Ping) -> Success:
|
|
if msg.button_protection:
|
|
from trezor.ui.layouts import confirm_action
|
|
from trezor.enums import ButtonRequestType as B
|
|
|
|
await confirm_action(ctx, "ping", "Confirm", "ping", br_code=B.ProtectCall)
|
|
return Success(message=msg.message)
|
|
|
|
|
|
async def handle_DoPreauthorized(
|
|
ctx: wire.Context, msg: DoPreauthorized
|
|
) -> protobuf.MessageType:
|
|
from trezor.messages import PreauthorizedRequest
|
|
from apps.common import authorization
|
|
|
|
if not authorization.is_set():
|
|
raise wire.ProcessError("No preauthorized operation")
|
|
|
|
wire_types = authorization.get_wire_types()
|
|
utils.ensure(bool(wire_types), "Unsupported preauthorization found")
|
|
|
|
req = await ctx.call_any(PreauthorizedRequest(), *wire_types)
|
|
|
|
assert req.MESSAGE_WIRE_TYPE is not None
|
|
handler = workflow_handlers.find_registered_handler(
|
|
ctx.iface, req.MESSAGE_WIRE_TYPE
|
|
)
|
|
if handler is None:
|
|
return wire.unexpected_message()
|
|
|
|
return await handler(ctx, req, authorization.get()) # type: ignore [Expected 2 positional arguments]
|
|
|
|
|
|
async def handle_CancelAuthorization(
|
|
ctx: wire.Context, msg: CancelAuthorization
|
|
) -> protobuf.MessageType:
|
|
from apps.common import authorization
|
|
|
|
authorization.clear()
|
|
return Success(message="Authorization cancelled")
|
|
|
|
|
|
ALLOW_WHILE_LOCKED = (
|
|
MessageType.Initialize,
|
|
MessageType.EndSession,
|
|
MessageType.GetFeatures,
|
|
MessageType.Cancel,
|
|
MessageType.LockDevice,
|
|
MessageType.DoPreauthorized,
|
|
MessageType.WipeDevice,
|
|
)
|
|
|
|
|
|
def set_homescreen() -> None:
|
|
import storage.recovery
|
|
|
|
if not config.is_unlocked():
|
|
from apps.homescreen.lockscreen import lockscreen
|
|
|
|
workflow.set_default(lockscreen)
|
|
|
|
elif storage.recovery.is_in_progress():
|
|
from apps.management.recovery_device.homescreen import recovery_homescreen
|
|
|
|
workflow.set_default(recovery_homescreen)
|
|
|
|
else:
|
|
from apps.homescreen.homescreen import homescreen
|
|
|
|
workflow.set_default(homescreen)
|
|
|
|
|
|
def lock_device() -> None:
|
|
if config.has_pin():
|
|
config.lock()
|
|
wire.find_handler = get_pinlocked_handler
|
|
set_homescreen()
|
|
workflow.close_others()
|
|
|
|
|
|
def lock_device_if_unlocked() -> None:
|
|
if config.is_unlocked():
|
|
lock_device()
|
|
|
|
|
|
async def unlock_device(ctx: wire.GenericContext = wire.DUMMY_CONTEXT) -> None:
|
|
"""Ensure the device is in unlocked state.
|
|
|
|
If the storage is locked, attempt to unlock it. Reset the homescreen and the wire
|
|
handler.
|
|
"""
|
|
from apps.common.request_pin import verify_user_pin
|
|
|
|
if not config.is_unlocked():
|
|
# verify_user_pin will raise if the PIN was invalid
|
|
await verify_user_pin(ctx)
|
|
|
|
set_homescreen()
|
|
wire.find_handler = workflow_handlers.find_registered_handler
|
|
|
|
|
|
def get_pinlocked_handler(
|
|
iface: wire.WireInterface, msg_type: int
|
|
) -> wire.Handler[wire.Msg] | None:
|
|
orig_handler = workflow_handlers.find_registered_handler(iface, msg_type)
|
|
if orig_handler is None:
|
|
return None
|
|
|
|
if __debug__:
|
|
import usb
|
|
|
|
if iface is usb.iface_debug:
|
|
return orig_handler
|
|
|
|
if msg_type in ALLOW_WHILE_LOCKED:
|
|
return orig_handler
|
|
|
|
async def wrapper(ctx: wire.Context, msg: wire.Msg) -> protobuf.MessageType:
|
|
await unlock_device(ctx)
|
|
return await orig_handler(ctx, msg)
|
|
|
|
return wrapper
|
|
|
|
|
|
# this function is also called when handling ApplySettings
|
|
def reload_settings_from_storage() -> None:
|
|
from trezor import ui
|
|
|
|
workflow.idle_timer.set(
|
|
storage.device.get_autolock_delay_ms(), lock_device_if_unlocked
|
|
)
|
|
wire.experimental_enabled = storage.device.get_experimental_features()
|
|
ui.display.orientation(storage.device.get_rotation())
|
|
|
|
|
|
def boot() -> None:
|
|
workflow_handlers.register(MessageType.Initialize, handle_Initialize)
|
|
workflow_handlers.register(MessageType.GetFeatures, handle_GetFeatures)
|
|
workflow_handlers.register(MessageType.Cancel, handle_Cancel)
|
|
workflow_handlers.register(MessageType.LockDevice, handle_LockDevice)
|
|
workflow_handlers.register(MessageType.EndSession, handle_EndSession)
|
|
workflow_handlers.register(MessageType.Ping, handle_Ping)
|
|
workflow_handlers.register(MessageType.DoPreauthorized, handle_DoPreauthorized)
|
|
workflow_handlers.register(
|
|
MessageType.CancelAuthorization, handle_CancelAuthorization
|
|
)
|
|
|
|
reload_settings_from_storage()
|
|
if config.is_unlocked():
|
|
wire.find_handler = workflow_handlers.find_registered_handler
|
|
else:
|
|
wire.find_handler = get_pinlocked_handler
|