1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2025-01-03 12:00:59 +00:00

chore(core): decrease management size by 1440 bytes

This commit is contained in:
grdddj 2022-09-18 16:20:10 +02:00 committed by matejcik
parent 48b4c5aaba
commit a29ea11ea2
19 changed files with 657 additions and 590 deletions

View File

@ -1,16 +1,18 @@
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from trezor.messages import Success
from trezor.messages import ApplyFlags
from trezor.wire import GenericContext
async def apply_flags(ctx: GenericContext, msg: ApplyFlags) -> Success:
import storage.device
from storage.device import set_flags
from trezor import wire
from trezor.wire import NotInitialized
from trezor.messages import Success
if TYPE_CHECKING:
from trezor.messages import ApplyFlags
async def apply_flags(ctx: wire.GenericContext, msg: ApplyFlags) -> Success:
if not storage.device.is_initialized():
raise wire.NotInitialized("Device is not initialized")
raise NotInitialized("Device is not initialized")
set_flags(msg.flags)
return Success(message="Flags applied")

View File

@ -1,159 +1,175 @@
from typing import TYPE_CHECKING
import storage.device
from trezor import ui, wire
from trezor.enums import ButtonRequestType, SafetyCheckLevel
from trezor.messages import Success
from trezor.strings import format_duration_ms
from trezor.enums import ButtonRequestType
from trezor.ui.layouts import confirm_action
from apps.base import reload_settings_from_storage
from apps.common import safety_checks
from trezor.wire import DataError
if TYPE_CHECKING:
from trezor.messages import ApplySettings
from trezor.messages import ApplySettings, Success
from trezor.wire import Context, GenericContext
from trezor.enums import SafetyCheckLevel
def validate_homescreen(homescreen: bytes) -> None:
BRT_PROTECT_CALL = ButtonRequestType.ProtectCall # CACHE
def _validate_homescreen(homescreen: bytes) -> None:
from trezor import ui
import storage.device as storage_device
if homescreen == b"":
return
if len(homescreen) > storage.device.HOMESCREEN_MAXSIZE:
raise wire.DataError(
f"Homescreen is too large, maximum size is {storage.device.HOMESCREEN_MAXSIZE} bytes"
if len(homescreen) > storage_device.HOMESCREEN_MAXSIZE:
raise DataError(
f"Homescreen is too large, maximum size is {storage_device.HOMESCREEN_MAXSIZE} bytes"
)
try:
w, h, toif_format = ui.display.toif_info(homescreen)
except ValueError:
raise wire.DataError("Invalid homescreen")
raise DataError("Invalid homescreen")
if w != 144 or h != 144:
raise wire.DataError("Homescreen must be 144x144 pixel large")
raise DataError("Homescreen must be 144x144 pixel large")
if toif_format != ui.display.TOIF_FULL_COLOR_BE:
raise wire.DataError("Homescreen must be full-color TOIF image")
raise DataError("Homescreen must be full-color TOIF image")
async def apply_settings(ctx: wire.Context, msg: ApplySettings) -> Success:
if not storage.device.is_initialized():
raise wire.NotInitialized("Device is not initialized")
async def apply_settings(ctx: Context, msg: ApplySettings) -> Success:
import storage.device as storage_device
from apps.common import safety_checks
from trezor.messages import Success
from trezor.wire import ProcessError, NotInitialized
from apps.base import reload_settings_from_storage
if not storage_device.is_initialized():
raise NotInitialized("Device is not initialized")
homescreen = msg.homescreen # local_cache_attribute
label = msg.label # local_cache_attribute
auto_lock_delay_ms = msg.auto_lock_delay_ms # local_cache_attribute
use_passphrase = msg.use_passphrase # local_cache_attribute
passphrase_always_on_device = (
msg.passphrase_always_on_device
) # local_cache_attribute
display_rotation = msg.display_rotation # local_cache_attribute
msg_safety_checks = msg.safety_checks # local_cache_attribute
experimental_features = msg.experimental_features # local_cache_attribute
if (
msg.homescreen is None
and msg.label is None
and msg.use_passphrase is None
and msg.passphrase_always_on_device is None
and msg.display_rotation is None
and msg.auto_lock_delay_ms is None
and msg.safety_checks is None
and msg.experimental_features is None
homescreen is None
and label is None
and use_passphrase is None
and passphrase_always_on_device is None
and display_rotation is None
and auto_lock_delay_ms is None
and msg_safety_checks is None
and experimental_features is None
):
raise wire.ProcessError("No setting provided")
raise ProcessError("No setting provided")
if msg.homescreen is not None:
validate_homescreen(msg.homescreen)
await require_confirm_change_homescreen(ctx)
if homescreen is not None:
_validate_homescreen(homescreen)
await _require_confirm_change_homescreen(ctx)
try:
storage.device.set_homescreen(msg.homescreen)
storage_device.set_homescreen(homescreen)
except ValueError:
raise wire.DataError("Invalid homescreen")
raise DataError("Invalid homescreen")
if msg.label is not None:
if len(msg.label) > storage.device.LABEL_MAXLENGTH:
raise wire.DataError("Label too long")
await require_confirm_change_label(ctx, msg.label)
storage.device.set_label(msg.label)
if label is not None:
if len(label) > storage_device.LABEL_MAXLENGTH:
raise DataError("Label too long")
await _require_confirm_change_label(ctx, label)
storage_device.set_label(label)
if msg.use_passphrase is not None:
await require_confirm_change_passphrase(ctx, msg.use_passphrase)
storage.device.set_passphrase_enabled(msg.use_passphrase)
if use_passphrase is not None:
await _require_confirm_change_passphrase(ctx, use_passphrase)
storage_device.set_passphrase_enabled(use_passphrase)
if msg.passphrase_always_on_device is not None:
if not storage.device.is_passphrase_enabled():
raise wire.DataError("Passphrase is not enabled")
await require_confirm_change_passphrase_source(
ctx, msg.passphrase_always_on_device
if passphrase_always_on_device is not None:
if not storage_device.is_passphrase_enabled():
raise DataError("Passphrase is not enabled")
await _require_confirm_change_passphrase_source(
ctx, passphrase_always_on_device
)
storage.device.set_passphrase_always_on_device(msg.passphrase_always_on_device)
storage_device.set_passphrase_always_on_device(passphrase_always_on_device)
if msg.auto_lock_delay_ms is not None:
if msg.auto_lock_delay_ms < storage.device.AUTOLOCK_DELAY_MINIMUM:
raise wire.ProcessError("Auto-lock delay too short")
if msg.auto_lock_delay_ms > storage.device.AUTOLOCK_DELAY_MAXIMUM:
raise wire.ProcessError("Auto-lock delay too long")
await require_confirm_change_autolock_delay(ctx, msg.auto_lock_delay_ms)
storage.device.set_autolock_delay_ms(msg.auto_lock_delay_ms)
if auto_lock_delay_ms is not None:
if auto_lock_delay_ms < storage_device.AUTOLOCK_DELAY_MINIMUM:
raise ProcessError("Auto-lock delay too short")
if auto_lock_delay_ms > storage_device.AUTOLOCK_DELAY_MAXIMUM:
raise ProcessError("Auto-lock delay too long")
await _require_confirm_change_autolock_delay(ctx, auto_lock_delay_ms)
storage_device.set_autolock_delay_ms(auto_lock_delay_ms)
if msg.safety_checks is not None:
await require_confirm_safety_checks(ctx, msg.safety_checks)
safety_checks.apply_setting(msg.safety_checks)
if msg_safety_checks is not None:
await _require_confirm_safety_checks(ctx, msg_safety_checks)
safety_checks.apply_setting(msg_safety_checks)
if msg.display_rotation is not None:
await require_confirm_change_display_rotation(ctx, msg.display_rotation)
storage.device.set_rotation(msg.display_rotation)
if display_rotation is not None:
await _require_confirm_change_display_rotation(ctx, display_rotation)
storage_device.set_rotation(display_rotation)
if msg.experimental_features is not None:
await require_confirm_experimental_features(ctx, msg.experimental_features)
storage.device.set_experimental_features(msg.experimental_features)
if experimental_features is not None:
await _require_confirm_experimental_features(ctx, experimental_features)
storage_device.set_experimental_features(experimental_features)
reload_settings_from_storage()
return Success(message="Settings applied")
async def require_confirm_change_homescreen(ctx: wire.GenericContext) -> None:
async def _require_confirm_change_homescreen(ctx: GenericContext) -> None:
await confirm_action(
ctx,
"set_homescreen",
"Set homescreen",
description="Do you really want to change the homescreen image?",
br_code=ButtonRequestType.ProtectCall,
br_code=BRT_PROTECT_CALL,
)
async def require_confirm_change_label(ctx: wire.GenericContext, label: str) -> None:
async def _require_confirm_change_label(ctx: GenericContext, label: str) -> None:
await confirm_action(
ctx,
"set_label",
"Change label",
description="Do you really want to change the label to {}?",
description_param=label,
br_code=ButtonRequestType.ProtectCall,
br_code=BRT_PROTECT_CALL,
)
async def require_confirm_change_passphrase(
ctx: wire.GenericContext, use: bool
) -> None:
if use:
description = "Do you really want to enable passphrase encryption?"
else:
description = "Do you really want to disable passphrase encryption?"
async def _require_confirm_change_passphrase(ctx: GenericContext, use: bool) -> None:
template = "Do you really want to {} passphrase encryption?"
description = template.format("enable" if use else "disable")
await confirm_action(
ctx,
"set_passphrase",
"Enable passphrase" if use else "Disable passphrase",
description=description,
br_code=ButtonRequestType.ProtectCall,
br_code=BRT_PROTECT_CALL,
)
async def require_confirm_change_passphrase_source(
ctx: wire.GenericContext, passphrase_always_on_device: bool
async def _require_confirm_change_passphrase_source(
ctx: GenericContext, passphrase_always_on_device: bool
) -> None:
if passphrase_always_on_device:
description = "Do you really want to enter passphrase always on the device?"
else:
description = "Do you want to revoke the passphrase on device setting?"
description = (
"Do you really want to enter passphrase always on the device?"
if passphrase_always_on_device
else "Do you want to revoke the passphrase on device setting?"
)
await confirm_action(
ctx,
"set_passphrase_source",
"Passphrase source",
description=description,
br_code=ButtonRequestType.ProtectCall,
br_code=BRT_PROTECT_CALL,
)
async def require_confirm_change_display_rotation(
ctx: wire.GenericContext, rotation: int
async def _require_confirm_change_display_rotation(
ctx: GenericContext, rotation: int
) -> None:
if rotation == 0:
label = "north"
@ -164,80 +180,81 @@ async def require_confirm_change_display_rotation(
elif rotation == 270:
label = "west"
else:
raise wire.DataError("Unsupported display rotation")
raise DataError("Unsupported display rotation")
await confirm_action(
ctx,
"set_rotation",
"Change rotation",
description="Do you really want to change display rotation to {}?",
description_param=label,
br_code=ButtonRequestType.ProtectCall,
br_code=BRT_PROTECT_CALL,
)
async def require_confirm_change_autolock_delay(
ctx: wire.GenericContext, delay_ms: int
async def _require_confirm_change_autolock_delay(
ctx: GenericContext, delay_ms: int
) -> None:
from trezor.strings import format_duration_ms
await confirm_action(
ctx,
"set_autolock_delay",
"Auto-lock delay",
description="Do you really want to auto-lock your device after {}?",
description_param=format_duration_ms(delay_ms),
br_code=ButtonRequestType.ProtectCall,
br_code=BRT_PROTECT_CALL,
)
async def require_confirm_safety_checks(
ctx: wire.GenericContext, level: SafetyCheckLevel
async def _require_confirm_safety_checks(
ctx: GenericContext, level: SafetyCheckLevel
) -> None:
if level == SafetyCheckLevel.PromptAlways:
await confirm_action(
ctx,
"set_safety_checks",
"Safety override",
hold=True,
verb="Hold to confirm",
description="Trezor will allow you to approve some actions which might be unsafe.",
action="Are you sure?",
reverse=True,
larger_vspace=True,
br_code=ButtonRequestType.ProtectCall,
)
elif level == SafetyCheckLevel.PromptTemporarily:
await confirm_action(
ctx,
"set_safety_checks",
"Safety override",
hold=True,
verb="Hold to confirm",
description="Trezor will temporarily allow you to approve some actions which might be unsafe.",
action="Are you sure?",
reverse=True,
br_code=ButtonRequestType.ProtectCall,
)
elif level == SafetyCheckLevel.Strict:
from trezor.enums import SafetyCheckLevel
if level == SafetyCheckLevel.Strict:
await confirm_action(
ctx,
"set_safety_checks",
"Safety checks",
description="Do you really want to enforce strict safety checks (recommended)?",
br_code=ButtonRequestType.ProtectCall,
br_code=BRT_PROTECT_CALL,
)
elif level in (SafetyCheckLevel.PromptAlways, SafetyCheckLevel.PromptTemporarily):
# Reusing most stuff for both levels
template = (
"Trezor will{}allow you to approve some actions which might be unsafe."
)
description = template.format(
" temporarily " if level == SafetyCheckLevel.PromptTemporarily else " "
)
await confirm_action(
ctx,
"set_safety_checks",
"Safety override",
"Are you sure?",
description,
hold=True,
verb="Hold to confirm",
reverse=True,
larger_vspace=level == SafetyCheckLevel.PromptAlways,
br_code=BRT_PROTECT_CALL,
)
else:
raise ValueError # enum value out of range
async def require_confirm_experimental_features(
ctx: wire.GenericContext, enable: bool
async def _require_confirm_experimental_features(
ctx: GenericContext, enable: bool
) -> None:
if enable:
await confirm_action(
ctx,
"set_experimental_features",
"Experimental mode",
description="Enable experimental features?",
action="Only for development and beta testing!",
"Only for development and beta testing!",
"Enable experimental features?",
reverse=True,
br_code=ButtonRequestType.ProtectCall,
br_code=BRT_PROTECT_CALL,
)

View File

@ -1,7 +1,12 @@
from typing import TYPE_CHECKING
import storage
import storage.device
if TYPE_CHECKING:
from trezor.messages import BackupDevice, Success
from trezor.wire import Context
async def backup_device(ctx: Context, msg: BackupDevice) -> Success:
import storage.device as storage_device
from trezor import wire
from trezor.messages import Success
@ -9,26 +14,21 @@ from apps.common import mnemonic
from .reset_device import backup_seed, layout
if TYPE_CHECKING:
from trezor.messages import BackupDevice
async def backup_device(ctx: wire.Context, msg: BackupDevice) -> Success:
if not storage.device.is_initialized():
if not storage_device.is_initialized():
raise wire.NotInitialized("Device is not initialized")
if not storage.device.needs_backup():
if not storage_device.needs_backup():
raise wire.ProcessError("Seed already backed up")
mnemonic_secret, mnemonic_type = mnemonic.get()
if mnemonic_secret is None:
raise RuntimeError
storage.device.set_unfinished_backup(True)
storage.device.set_backed_up()
storage_device.set_unfinished_backup(True)
storage_device.set_backed_up()
await backup_seed(ctx, mnemonic_type, mnemonic_secret)
storage.device.set_unfinished_backup(False)
storage_device.set_unfinished_backup(False)
await layout.show_backup_success(ctx)

View File

@ -1,6 +1,10 @@
from trezor.crypto.slip39 import Share
from typing import TYPE_CHECKING
from trezor.enums import BackupType
if TYPE_CHECKING:
from trezor.crypto.slip39 import Share
_BIP39_WORD_COUNTS = (12, 18, 24)
_SLIP39_WORD_COUNTS = (20, 33)

View File

@ -1,9 +1,18 @@
from typing import TYPE_CHECKING
from storage.device import is_initialized
from trezor import config, wire
if TYPE_CHECKING:
from typing import Awaitable
from trezor.messages import ChangePin, Success
from trezor.wire import Context
async def change_pin(ctx: Context, msg: ChangePin) -> Success:
from storage.device import is_initialized
from trezor.messages import Success
from trezor.ui.layouts import confirm_action, show_success
from trezor.ui.layouts import show_success
from apps.common.request_pin import (
error_pin_invalid,
@ -12,18 +21,11 @@ from apps.common.request_pin import (
request_pin_confirm,
)
if TYPE_CHECKING:
from typing import Awaitable
from trezor.messages import ChangePin
async def change_pin(ctx: wire.Context, msg: ChangePin) -> Success:
if not is_initialized():
raise wire.NotInitialized("Device is not initialized")
# confirm that user wants to change the pin
await require_confirm_change_pin(ctx, msg)
await _require_confirm_change_pin(ctx, msg)
# get old pin
curpin, salt = await request_pin_and_sd_salt(ctx, "Enter old PIN")
@ -61,7 +63,9 @@ async def change_pin(ctx: wire.Context, msg: ChangePin) -> Success:
return Success(message=msg_wire)
def require_confirm_change_pin(ctx: wire.Context, msg: ChangePin) -> Awaitable[None]:
def _require_confirm_change_pin(ctx: Context, msg: ChangePin) -> Awaitable[None]:
from trezor.ui.layouts import confirm_action
has_pin = config.has_pin()
if msg.remove and has_pin: # removing pin
@ -69,8 +73,8 @@ def require_confirm_change_pin(ctx: wire.Context, msg: ChangePin) -> Awaitable[N
ctx,
"set_pin",
"Remove PIN",
description="Do you really want to",
action="disable PIN protection?",
"disable PIN protection?",
"Do you really want to",
reverse=True,
)
@ -79,8 +83,8 @@ def require_confirm_change_pin(ctx: wire.Context, msg: ChangePin) -> Awaitable[N
ctx,
"set_pin",
"Change PIN",
description="Do you really want to",
action="change your PIN?",
"change your PIN?",
"Do you really want to",
reverse=True,
)
@ -89,8 +93,8 @@ def require_confirm_change_pin(ctx: wire.Context, msg: ChangePin) -> Awaitable[N
ctx,
"set_pin",
"Enable PIN",
description="Do you really want to",
action="enable PIN protection?",
"enable PIN protection?",
"Do you really want to",
reverse=True,
)

View File

@ -1,25 +1,25 @@
from typing import TYPE_CHECKING
from storage.device import is_initialized
from trezor import config, ui, wire
from trezor.messages import Success
from trezor.ui.layouts import confirm_action, show_popup, show_success
from apps.common.request_pin import (
error_pin_invalid,
request_pin,
request_pin_and_sd_salt,
)
if TYPE_CHECKING:
from typing import Awaitable
from trezor.wire import Context
from trezor.messages import ChangeWipeCode
from trezor.messages import ChangeWipeCode, Success
async def change_wipe_code(ctx: wire.Context, msg: ChangeWipeCode) -> Success:
async def change_wipe_code(ctx: Context, msg: ChangeWipeCode) -> Success:
from storage.device import is_initialized
from trezor.wire import NotInitialized
from trezor.ui.layouts import show_success
from trezor.messages import Success
from trezor import config
from apps.common.request_pin import (
error_pin_invalid,
request_pin_and_sd_salt,
)
if not is_initialized():
raise wire.NotInitialized("Device is not initialized")
raise NotInitialized("Device is not initialized")
# Confirm that user wants to set or remove the wipe code.
has_wipe_code = config.has_wipe_code()
@ -58,15 +58,19 @@ async def change_wipe_code(ctx: wire.Context, msg: ChangeWipeCode) -> Success:
def _require_confirm_action(
ctx: wire.Context, msg: ChangeWipeCode, has_wipe_code: bool
ctx: Context, msg: ChangeWipeCode, has_wipe_code: bool
) -> Awaitable[None]:
from trezor import ui
from trezor.wire import ProcessError
from trezor.ui.layouts import confirm_action
if msg.remove and has_wipe_code:
return confirm_action(
ctx,
"disable_wipe_code",
title="Disable wipe code",
description="Do you really want to",
action="disable wipe code protection?",
"Disable wipe code",
"disable wipe code protection?",
"Do you really want to",
reverse=True,
icon=ui.ICON_CONFIG,
)
@ -75,9 +79,9 @@ def _require_confirm_action(
return confirm_action(
ctx,
"change_wipe_code",
title="Change wipe code",
description="Do you really want to",
action="change the wipe code?",
"Change wipe code",
"change the wipe code?",
"Do you really want to",
reverse=True,
icon=ui.ICON_CONFIG,
)
@ -86,39 +90,36 @@ def _require_confirm_action(
return confirm_action(
ctx,
"set_wipe_code",
title="Set wipe code",
description="Do you really want to",
action="set the wipe code?",
"Set wipe code",
"set the wipe code?",
"Do you really want to",
reverse=True,
icon=ui.ICON_CONFIG,
)
# Removing non-existing wipe code.
raise wire.ProcessError("Wipe code protection is already disabled")
raise ProcessError("Wipe code protection is already disabled")
async def _request_wipe_code_confirm(ctx: wire.Context, pin: str) -> str:
async def _request_wipe_code_confirm(ctx: Context, pin: str) -> str:
from trezor.ui.layouts import show_popup
from apps.common.request_pin import request_pin
while True:
code1 = await request_pin(ctx, "Enter new wipe code")
if code1 == pin:
await _wipe_code_invalid()
# _wipe_code_invalid
await show_popup(
"Invalid wipe code",
"The wipe code must be\ndifferent from your PIN.\n\nPlease try again.",
)
continue
code2 = await request_pin(ctx, "Re-enter new wipe code")
if code1 == code2:
return code1
await _wipe_code_mismatch()
async def _wipe_code_invalid() -> None:
# _wipe_code_mismatch
await show_popup(
title="Invalid wipe code",
description="The wipe code must be\ndifferent from your PIN.\n\nPlease try again.",
)
async def _wipe_code_mismatch() -> None:
await show_popup(
title="Code mismatch",
description="The wipe codes you\nentered do not match.\n\nPlease try again.",
"Code mismatch",
"The wipe codes you\nentered do not match.\n\nPlease try again.",
)

View File

@ -1,28 +1,28 @@
from typing import TYPE_CHECKING
import storage.device
from trezor import ui, wire
if TYPE_CHECKING:
from trezor.messages import GetNextU2FCounter, NextU2FCounter
from trezor.wire import Context
async def get_next_u2f_counter(ctx: Context, msg: GetNextU2FCounter) -> NextU2FCounter:
import storage.device as storage_device
from trezor import ui
from trezor.wire import NotInitialized
from trezor.enums import ButtonRequestType
from trezor.messages import NextU2FCounter
from trezor.ui.layouts import confirm_action
if TYPE_CHECKING:
from trezor.messages import GetNextU2FCounter
async def get_next_u2f_counter(
ctx: wire.Context, msg: GetNextU2FCounter
) -> NextU2FCounter:
if not storage.device.is_initialized():
raise wire.NotInitialized("Device is not initialized")
if not storage_device.is_initialized():
raise NotInitialized("Device is not initialized")
await confirm_action(
ctx,
"get_u2f_counter",
title="Get next U2F counter",
"Get next U2F counter",
description="Do you really want to increase and retrieve\nthe U2F counter?",
icon=ui.ICON_CONFIG,
br_code=ButtonRequestType.ProtectCall,
)
return NextU2FCounter(u2f_counter=storage.device.next_u2f_counter())
return NextU2FCounter(u2f_counter=storage_device.next_u2f_counter())

View File

@ -1,15 +1,15 @@
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from trezor.messages import GetNonce, Nonce
from trezor.wire import Context
async def get_nonce(ctx: Context, msg: GetNonce) -> Nonce:
from storage import cache
from trezor import wire
from trezor.crypto import random
from trezor.messages import Nonce
if TYPE_CHECKING:
from trezor.messages import GetNonce
async def get_nonce(ctx: wire.Context, msg: GetNonce) -> Nonce:
nonce = random.bytes(32)
cache.set(cache.APP_COMMON_NONCE, nonce)
return Nonce(nonce=nonce)

View File

@ -1,16 +1,17 @@
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from trezor.messages import RebootToBootloader
from typing import NoReturn
from trezor.wire import Context
async def reboot_to_bootloader(ctx: Context, msg: RebootToBootloader) -> NoReturn:
import storage.device
from trezor import io, loop, utils, wire
from trezor.messages import Success
from trezor.ui.layouts import confirm_action
if TYPE_CHECKING:
from trezor.messages import RebootToBootloader
from typing import NoReturn
async def reboot_to_bootloader(ctx: wire.Context, msg: RebootToBootloader) -> NoReturn:
if not storage.device.get_experimental_features():
raise wire.UnexpectedMessage("Experimental features are not enabled")

View File

@ -1,24 +1,9 @@
from typing import TYPE_CHECKING
import storage
import storage.device
import storage.recovery
from trezor import config, ui, wire, workflow
from trezor.enums import ButtonRequestType
from trezor.messages import Success
from trezor.ui.layouts import confirm_action, confirm_reset_device
from apps.common.request_pin import (
error_pin_invalid,
request_pin_and_sd_salt,
request_pin_confirm,
)
from .homescreen import recovery_homescreen, recovery_process
if TYPE_CHECKING:
from trezor.messages import RecoveryDevice
from trezor.wire import Context
from trezor.messages import Success
# List of RecoveryDevice fields that can be set when doing dry-run recovery.
# All except `dry_run` are allowed for T1 compatibility, but their values are ignored.
@ -26,69 +11,52 @@ if TYPE_CHECKING:
DRY_RUN_ALLOWED_FIELDS = ("dry_run", "word_count", "enforce_wordlist", "type")
async def recovery_device(ctx: wire.Context, msg: RecoveryDevice) -> Success:
async def recovery_device(ctx: Context, msg: RecoveryDevice) -> Success:
"""
Recover BIP39/SLIP39 seed into empty device.
Recovery is also possible with replugged Trezor. We call this process Persistence.
User starts the process here using the RecoveryDevice msg and then they can unplug
the device anytime and continue without a computer.
"""
_validate(msg)
import storage
import storage.device as storage_device
import storage.recovery as storage_recovery
from trezor import config, ui, wire, workflow
from trezor.enums import ButtonRequestType
from trezor.ui.layouts import confirm_action, confirm_reset_device
from apps.common.request_pin import (
error_pin_invalid,
request_pin_and_sd_salt,
request_pin_confirm,
)
from .homescreen import recovery_homescreen, recovery_process
if storage.recovery.is_in_progress():
return await recovery_process(ctx)
dry_run = msg.dry_run # local_cache_attribute
await _continue_dialog(ctx, msg)
if not msg.dry_run:
# wipe storage to make sure the device is in a clear state
storage.reset()
# for dry run pin needs to be entered
if msg.dry_run:
curpin, salt = await request_pin_and_sd_salt(ctx, "Enter PIN")
if not config.check_pin(curpin, salt):
await error_pin_invalid(ctx)
if not msg.dry_run:
# set up pin if requested
if msg.pin_protection:
newpin = await request_pin_confirm(ctx, allow_cancel=False)
config.change_pin("", newpin, None, None)
storage.device.set_passphrase_enabled(bool(msg.passphrase_protection))
if msg.u2f_counter is not None:
storage.device.set_u2f_counter(msg.u2f_counter)
if msg.label is not None:
storage.device.set_label(msg.label)
storage.recovery.set_in_progress(True)
storage.recovery.set_dry_run(bool(msg.dry_run))
workflow.set_default(recovery_homescreen)
return await recovery_process(ctx)
def _validate(msg: RecoveryDevice) -> None:
if not msg.dry_run and storage.device.is_initialized():
# --------------------------------------------------------
# validate
if not dry_run and storage_device.is_initialized():
raise wire.UnexpectedMessage("Already initialized")
if msg.dry_run and not storage.device.is_initialized():
if dry_run and not storage_device.is_initialized():
raise wire.NotInitialized("Device is not initialized")
if msg.enforce_wordlist is False:
raise wire.ProcessError(
"Value enforce_wordlist must be True, Trezor Core enforces words automatically."
)
if msg.dry_run:
if dry_run:
# check that only allowed fields are set
for key, value in msg.__dict__.items():
if key not in DRY_RUN_ALLOWED_FIELDS and value is not None:
raise wire.ProcessError(f"Forbidden field set in dry-run: {key}")
# END validate
# --------------------------------------------------------
if storage_recovery.is_in_progress():
return await recovery_process(ctx)
async def _continue_dialog(ctx: wire.Context, msg: RecoveryDevice) -> None:
if not msg.dry_run:
# --------------------------------------------------------
# _continue_dialog
if not dry_run:
await confirm_reset_device(
ctx, "Do you really want to\nrecover a wallet?", recovery=True
)
@ -96,8 +64,38 @@ async def _continue_dialog(ctx: wire.Context, msg: RecoveryDevice) -> None:
await confirm_action(
ctx,
"confirm_seedcheck",
title="Seed check",
"Seed check",
description="Do you really want to check the recovery seed?",
icon=ui.ICON_RECOVERY,
br_code=ButtonRequestType.ProtectCall,
)
# END _continue_dialog
# --------------------------------------------------------
if not dry_run:
# wipe storage to make sure the device is in a clear state
storage.reset()
# for dry run pin needs to be entered
if dry_run:
curpin, salt = await request_pin_and_sd_salt(ctx, "Enter PIN")
if not config.check_pin(curpin, salt):
await error_pin_invalid(ctx)
if not dry_run:
# set up pin if requested
if msg.pin_protection:
newpin = await request_pin_confirm(ctx, allow_cancel=False)
config.change_pin("", newpin, None, None)
storage_device.set_passphrase_enabled(bool(msg.passphrase_protection))
if msg.u2f_counter is not None:
storage_device.set_u2f_counter(msg.u2f_counter)
if msg.label is not None:
storage_device.set_label(msg.label)
storage_recovery.set_in_progress(True)
storage_recovery.set_dry_run(bool(dry_run))
workflow.set_default(recovery_homescreen)
return await recovery_process(ctx)

View File

@ -1,24 +1,23 @@
import storage
import storage.device
import storage.recovery
import storage.recovery_shares
from trezor import strings, utils, wire, workflow
from trezor.crypto import slip39
from trezor.crypto.hashlib import sha256
from trezor.enums import BackupType, MessageType
from trezor.errors import MnemonicError
from trezor.messages import Success
from trezor.ui.layouts import show_success
from typing import TYPE_CHECKING
from apps.common import mnemonic
from apps.homescreen.homescreen import homescreen
import storage.device as storage_device
import storage.recovery as storage_recovery
from trezor import wire
from trezor.messages import Success
from .. import backup_types
from . import layout, recover
if TYPE_CHECKING:
from trezor.wire import GenericContext
from trezor.enums import BackupType
async def recovery_homescreen() -> None:
if not storage.recovery.is_in_progress():
from trezor import workflow
from apps.homescreen.homescreen import homescreen
if not storage_recovery.is_in_progress():
workflow.set_default(homescreen)
return
@ -27,22 +26,27 @@ async def recovery_homescreen() -> None:
await recovery_process(ctx)
async def recovery_process(ctx: wire.GenericContext) -> Success:
async def recovery_process(ctx: GenericContext) -> Success:
from trezor.enums import MessageType
import storage
wire.AVOID_RESTARTING_FOR = (MessageType.Initialize, MessageType.GetFeatures)
try:
return await _continue_recovery_process(ctx)
except recover.RecoveryAborted:
dry_run = storage.recovery.is_dry_run()
dry_run = storage_recovery.is_dry_run()
if dry_run:
storage.recovery.end_progress()
storage_recovery.end_progress()
else:
storage.wipe()
raise wire.ActionCancelled
async def _continue_recovery_process(ctx: wire.GenericContext) -> Success:
async def _continue_recovery_process(ctx: GenericContext) -> Success:
from trezor.errors import MnemonicError
# gather the current recovery state from storage
dry_run = storage.recovery.is_dry_run()
dry_run = storage_recovery.is_dry_run()
word_count, backup_type = recover.load_slip39_state()
# Both word_count and backup_type are derived from the same data. Both will be
@ -60,7 +64,10 @@ async def _continue_recovery_process(ctx: wire.GenericContext) -> Success:
while secret is None:
if is_first_step:
# If we are starting recovery, ask for word count first...
word_count = await _request_word_count(ctx, dry_run)
# _request_word_count
await layout.homescreen_dialog(ctx, "Select", "Select number of words")
# ask for the number of words
word_count = await layout.request_word_count(ctx, dry_run)
# ...and only then show the starting screen with word count.
await _request_share_first_screen(ctx, word_count)
assert word_count is not None
@ -91,8 +98,12 @@ async def _continue_recovery_process(ctx: wire.GenericContext) -> Success:
async def _finish_recovery_dry_run(
ctx: wire.GenericContext, secret: bytes, backup_type: BackupType
ctx: GenericContext, secret: bytes, backup_type: BackupType
) -> Success:
from trezor.crypto.hashlib import sha256
from trezor import utils
from apps.common import mnemonic
if backup_type is None:
raise RuntimeError
@ -105,15 +116,15 @@ async def _finish_recovery_dry_run(
# Check that the identifier and iteration exponent match as well
if is_slip39:
result &= (
storage.device.get_slip39_identifier()
== storage.recovery.get_slip39_identifier()
storage_device.get_slip39_identifier()
== storage_recovery.get_slip39_identifier()
)
result &= (
storage.device.get_slip39_iteration_exponent()
== storage.recovery.get_slip39_iteration_exponent()
storage_device.get_slip39_iteration_exponent()
== storage_recovery.get_slip39_iteration_exponent()
)
storage.recovery.end_progress()
storage_recovery.end_progress()
await layout.show_dry_run_result(ctx, result, is_slip39)
@ -124,24 +135,27 @@ async def _finish_recovery_dry_run(
async def _finish_recovery(
ctx: wire.GenericContext, secret: bytes, backup_type: BackupType
ctx: GenericContext, secret: bytes, backup_type: BackupType
) -> Success:
from trezor.ui.layouts import show_success
from trezor.enums import BackupType
if backup_type is None:
raise RuntimeError
storage.device.store_mnemonic_secret(
storage_device.store_mnemonic_secret(
secret, backup_type, needs_backup=False, no_backup=False
)
if backup_type in (BackupType.Slip39_Basic, BackupType.Slip39_Advanced):
identifier = storage.recovery.get_slip39_identifier()
exponent = storage.recovery.get_slip39_iteration_exponent()
identifier = storage_recovery.get_slip39_identifier()
exponent = storage_recovery.get_slip39_iteration_exponent()
if identifier is None or exponent is None:
# Identifier and exponent need to be stored in storage at this point
raise RuntimeError
storage.device.set_slip39_identifier(identifier)
storage.device.set_slip39_iteration_exponent(exponent)
storage_device.set_slip39_identifier(identifier)
storage_device.set_slip39_iteration_exponent(exponent)
storage.recovery.end_progress()
storage_recovery.end_progress()
await show_success(
ctx, "success_recovery", "You have successfully recovered your wallet."
@ -149,15 +163,8 @@ async def _finish_recovery(
return Success(message="Device recovered")
async def _request_word_count(ctx: wire.GenericContext, dry_run: bool) -> int:
await layout.homescreen_dialog(ctx, "Select", "Select number of words")
# ask for the number of words
return await layout.request_word_count(ctx, dry_run)
async def _process_words(
ctx: wire.GenericContext, words: str
ctx: GenericContext, words: str
) -> tuple[bytes | None, BackupType]:
word_count = len(words.split(" "))
is_slip39 = backup_types.is_slip39_word_count(word_count)
@ -178,11 +185,9 @@ async def _process_words(
return secret, backup_type
async def _request_share_first_screen(
ctx: wire.GenericContext, word_count: int
) -> None:
async def _request_share_first_screen(ctx: GenericContext, word_count: int) -> None:
if backup_types.is_slip39_word_count(word_count):
remaining = storage.recovery.fetch_slip39_remaining_shares()
remaining = storage_recovery.fetch_slip39_remaining_shares()
if remaining:
await _request_share_next_screen(ctx)
else:
@ -195,9 +200,11 @@ async def _request_share_first_screen(
)
async def _request_share_next_screen(ctx: wire.GenericContext) -> None:
remaining = storage.recovery.fetch_slip39_remaining_shares()
group_count = storage.recovery.get_slip39_group_count()
async def _request_share_next_screen(ctx: GenericContext) -> None:
from trezor import strings
remaining = storage_recovery.fetch_slip39_remaining_shares()
group_count = storage_recovery.get_slip39_group_count()
if not remaining:
# 'remaining' should be stored at this point
raise RuntimeError
@ -214,11 +221,14 @@ async def _request_share_next_screen(ctx: wire.GenericContext) -> None:
await layout.homescreen_dialog(ctx, "Enter share", text, "needed to enter")
async def _show_remaining_groups_and_shares(ctx: wire.GenericContext) -> None:
async def _show_remaining_groups_and_shares(ctx: GenericContext) -> None:
"""
Show info dialog for Slip39 Advanced - what shares are to be entered.
"""
shares_remaining = storage.recovery.fetch_slip39_remaining_shares()
from trezor.crypto import slip39
import storage.recovery_shares as storage_recovery_shares
shares_remaining = storage_recovery.fetch_slip39_remaining_shares()
# should be stored at this point
assert shares_remaining
@ -231,13 +241,13 @@ async def _show_remaining_groups_and_shares(ctx: wire.GenericContext) -> None:
share = None
for index, remaining in enumerate(shares_remaining):
if 0 <= remaining < slip39.MAX_SHARE_COUNT:
m = storage.recovery_shares.fetch_group(index)[0]
m = storage_recovery_shares.fetch_group(index)[0]
if not share:
share = slip39.decode_mnemonic(m)
identifier = m.split(" ")[0:3]
groups.add((remaining, tuple(identifier)))
elif remaining == slip39.MAX_SHARE_COUNT: # no shares yet
identifier = storage.recovery_shares.fetch_group(first_entered_index)[
identifier = storage_recovery_shares.fetch_group(first_entered_index)[
0
].split(" ")[0:2]
groups.add((remaining, tuple(identifier)))

View File

@ -1,28 +1,25 @@
from typing import TYPE_CHECKING
import storage.recovery
from trezor import ui, wire
from trezor.enums import ButtonRequestType
from trezor.ui.layouts import confirm_action, show_success, show_warning
from trezor.ui.layouts.common import button_request
from trezor.ui.layouts import show_warning
from trezor.ui.layouts.recovery import ( # noqa: F401
continue_recovery,
request_word,
request_word_count,
show_group_share_success,
show_remaining_shares,
)
from .. import backup_types
from . import word_validity
from .recover import RecoveryAborted
if TYPE_CHECKING:
from typing import Callable
from trezor.enums import BackupType
from trezor.wire import GenericContext
async def confirm_abort(ctx: wire.GenericContext, dry_run: bool = False) -> None:
async def _confirm_abort(ctx: GenericContext, dry_run: bool = False) -> None:
from trezor import ui
from trezor.ui.layouts import confirm_action
if dry_run:
await confirm_action(
ctx,
@ -37,8 +34,8 @@ async def confirm_abort(ctx: wire.GenericContext, dry_run: bool = False) -> None
ctx,
"abort_recovery",
"Abort recovery",
description="Do you really want to abort the recovery process?",
action="All progress will be lost.",
"All progress will be lost.",
"Do you really want to abort the recovery process?",
reverse=True,
icon=ui.ICON_WIPE,
br_code=ButtonRequestType.ProtectCall,
@ -46,9 +43,13 @@ async def confirm_abort(ctx: wire.GenericContext, dry_run: bool = False) -> None
async def request_mnemonic(
ctx: wire.GenericContext, word_count: int, backup_type: BackupType | None
ctx: GenericContext, word_count: int, backup_type: BackupType | None
) -> str | None:
await button_request(ctx, "mnemonic", code=ButtonRequestType.MnemonicInput)
from . import word_validity
from trezor.ui.layouts.common import button_request
from trezor.ui.layouts.recovery import request_word
await button_request(ctx, "mnemonic", ButtonRequestType.MnemonicInput)
words: list[str] = []
for i in range(word_count):
@ -60,21 +61,38 @@ async def request_mnemonic(
try:
word_validity.check(backup_type, words)
except word_validity.AlreadyAdded:
await show_share_already_added(ctx)
# show_share_already_added
await show_warning(
ctx,
"warning_known_share",
"Share already entered,\nplease enter\na different share.",
)
return None
except word_validity.IdentifierMismatch:
await show_identifier_mismatch(ctx)
# show_identifier_mismatch
await show_warning(
ctx,
"warning_mismatched_share",
"You have entered\na share from another\nShamir Backup.",
)
return None
except word_validity.ThresholdReached:
await show_group_threshold_reached(ctx)
# show_group_threshold_reached
await show_warning(
ctx,
"warning_group_threshold",
"Threshold of this\ngroup has been reached.\nInput share from\ndifferent group.",
)
return None
return " ".join(words)
async def show_dry_run_result(
ctx: wire.GenericContext, result: bool, is_slip39: bool
ctx: GenericContext, result: bool, is_slip39: bool
) -> None:
from trezor.ui.layouts import show_success
if result:
if is_slip39:
text = "The entered recovery\nshares are valid and\nmatch what is currently\nin the device."
@ -89,7 +107,7 @@ async def show_dry_run_result(
await show_warning(ctx, "warning_dry_recovery", text, button="Continue")
async def show_invalid_mnemonic(ctx: wire.GenericContext, word_count: int) -> None:
async def show_invalid_mnemonic(ctx: GenericContext, word_count: int) -> None:
if backup_types.is_slip39_word_count(word_count):
await show_warning(
ctx,
@ -104,39 +122,20 @@ async def show_invalid_mnemonic(ctx: wire.GenericContext, word_count: int) -> No
)
async def show_share_already_added(ctx: wire.GenericContext) -> None:
await show_warning(
ctx,
"warning_known_share",
"Share already entered,\nplease enter\na different share.",
)
async def show_identifier_mismatch(ctx: wire.GenericContext) -> None:
await show_warning(
ctx,
"warning_mismatched_share",
"You have entered\na share from another\nShamir Backup.",
)
async def show_group_threshold_reached(ctx: wire.GenericContext) -> None:
await show_warning(
ctx,
"warning_group_threshold",
"Threshold of this\ngroup has been reached.\nInput share from\ndifferent group.",
)
async def homescreen_dialog(
ctx: wire.GenericContext,
ctx: GenericContext,
button_label: str,
text: str,
subtext: str | None = None,
info_func: Callable | None = None,
) -> None:
from .recover import RecoveryAborted
import storage.recovery as storage_recovery
from trezor.wire import ActionCancelled
from trezor.ui.layouts.recovery import continue_recovery
while True:
dry_run = storage.recovery.is_dry_run()
dry_run = storage_recovery.is_dry_run()
if await continue_recovery(
ctx, button_label, text, subtext, info_func, dry_run
):
@ -144,8 +143,8 @@ async def homescreen_dialog(
break
# user has chosen to abort, confirm the choice
try:
await confirm_abort(ctx, dry_run)
except wire.ActionCancelled:
await _confirm_abort(ctx, dry_run)
except ActionCancelled:
pass
else:
raise RecoveryAborted

View File

@ -1,11 +1,8 @@
from typing import TYPE_CHECKING
import storage.recovery
import storage.recovery as storage_recovery
import storage.recovery_shares
from trezor.crypto import bip39, slip39
from trezor.errors import MnemonicError
from .. import backup_types
from trezor.crypto import slip39
if TYPE_CHECKING:
from trezor.enums import BackupType
@ -20,6 +17,9 @@ def process_bip39(words: str) -> bytes:
Receives single mnemonic and processes it. Returns what is then stored
in the storage, which is the mnemonic itself for BIP-39.
"""
from trezor.crypto import bip39
from trezor.errors import MnemonicError
if not bip39.check(words):
raise MnemonicError
return words.encode()
@ -32,17 +32,17 @@ def process_slip39(words: str) -> tuple[bytes | None, slip39.Share]:
"""
share = slip39.decode_mnemonic(words)
remaining = storage.recovery.fetch_slip39_remaining_shares()
group_index = share.group_index # local_cache_attribute
remaining = storage_recovery.fetch_slip39_remaining_shares()
# if this is the first share, parse and store metadata
if not remaining:
storage.recovery.set_slip39_group_count(share.group_count)
storage.recovery.set_slip39_iteration_exponent(share.iteration_exponent)
storage.recovery.set_slip39_identifier(share.identifier)
storage.recovery.set_slip39_remaining_shares(
share.threshold - 1, share.group_index
)
storage.recovery_shares.set(share.index, share.group_index, words)
storage_recovery.set_slip39_group_count(share.group_count)
storage_recovery.set_slip39_iteration_exponent(share.iteration_exponent)
storage_recovery.set_slip39_identifier(share.identifier)
storage_recovery.set_slip39_remaining_shares(share.threshold - 1, group_index)
storage.recovery_shares.set(share.index, group_index, words)
# if share threshold and group threshold are 1
# we can calculate the secret right away
@ -54,24 +54,21 @@ def process_slip39(words: str) -> tuple[bytes | None, slip39.Share]:
return None, share
# These should be checked by UI before so it's a Runtime exception otherwise
if share.identifier != storage.recovery.get_slip39_identifier():
if share.identifier != storage_recovery.get_slip39_identifier():
raise RuntimeError("Slip39: Share identifiers do not match")
if share.iteration_exponent != storage.recovery.get_slip39_iteration_exponent():
if share.iteration_exponent != storage_recovery.get_slip39_iteration_exponent():
raise RuntimeError("Slip39: Share exponents do not match")
if storage.recovery_shares.get(share.index, share.group_index):
if storage.recovery_shares.get(share.index, group_index):
raise RuntimeError("Slip39: This mnemonic was already entered")
if share.group_count != storage.recovery.get_slip39_group_count():
if share.group_count != storage_recovery.get_slip39_group_count():
raise RuntimeError("Slip39: Group count does not match")
remaining_for_share = (
storage.recovery.get_slip39_remaining_shares(share.group_index)
or share.threshold
storage_recovery.get_slip39_remaining_shares(group_index) or share.threshold
)
storage.recovery.set_slip39_remaining_shares(
remaining_for_share - 1, share.group_index
)
remaining[share.group_index] = remaining_for_share - 1
storage.recovery_shares.set(share.index, share.group_index, words)
storage_recovery.set_slip39_remaining_shares(remaining_for_share - 1, group_index)
remaining[group_index] = remaining_for_share - 1
storage.recovery_shares.set(share.index, group_index, words)
if remaining.count(0) < share.group_threshold:
# we need more shares
@ -97,6 +94,8 @@ if TYPE_CHECKING:
def load_slip39_state() -> Slip39State:
from .. import backup_types
previous_mnemonics = fetch_previous_mnemonics()
if not previous_mnemonics:
return None, None
@ -109,9 +108,9 @@ def load_slip39_state() -> Slip39State:
def fetch_previous_mnemonics() -> list[list[str]] | None:
mnemonics = []
if not storage.recovery.get_slip39_group_count():
if not storage_recovery.get_slip39_group_count():
return None
for i in range(storage.recovery.get_slip39_group_count()):
for i in range(storage_recovery.get_slip39_group_count()):
mnemonics.append(storage.recovery_shares.fetch_group(i))
if not any(p for p in mnemonics):
return None

View File

@ -1,7 +1,7 @@
import storage.recovery
from trezor.enums import BackupType
from typing import TYPE_CHECKING
from . import recover
if TYPE_CHECKING:
from trezor.enums import BackupType
class WordValidityResult(Exception):
@ -21,6 +21,9 @@ class ThresholdReached(WordValidityResult):
def check(backup_type: BackupType | None, partial_mnemonic: list[str]) -> None:
from trezor.enums import BackupType
from . import recover
# we can't perform any checks if the backup type was not yet decided
if backup_type is None:
return
@ -34,15 +37,15 @@ def check(backup_type: BackupType | None, partial_mnemonic: list[str]) -> None:
raise RuntimeError
if backup_type == BackupType.Slip39_Basic:
check_slip39_basic(partial_mnemonic, previous_mnemonics)
_check_slip39_basic(partial_mnemonic, previous_mnemonics)
elif backup_type == BackupType.Slip39_Advanced:
check_slip39_advanced(partial_mnemonic, previous_mnemonics)
_check_slip39_advanced(partial_mnemonic, previous_mnemonics)
else:
# there are no other backup types
raise RuntimeError
def check_slip39_basic(
def _check_slip39_basic(
partial_mnemonic: list[str], previous_mnemonics: list[list[str]]
) -> None:
# check if first 3 words of mnemonic match
@ -61,9 +64,11 @@ def check_slip39_basic(
raise AlreadyAdded
def check_slip39_advanced(
def _check_slip39_advanced(
partial_mnemonic: list[str], previous_mnemonics: list[list[str]]
) -> None:
import storage.recovery
current_index = len(partial_mnemonic) - 1
current_word = partial_mnemonic[-1]

View File

@ -1,16 +1,11 @@
from typing import TYPE_CHECKING
import storage
import storage.device
from trezor import config, wire
from trezor.crypto import bip39, hashlib, random, slip39
import storage.device as storage_device
from trezor.crypto import slip39
from trezor.enums import BackupType
from trezor.messages import EntropyAck, EntropyRequest, Success
from trezor.ui.layouts import confirm_backup, confirm_reset_device
from trezor.ui.loader import LoadingAnimation
from trezor.wire import ProcessError
from .. import backup_types
from ..change_pin import request_pin_confirm
from . import layout
if __debug__:
@ -18,18 +13,33 @@ if __debug__:
if TYPE_CHECKING:
from trezor.messages import ResetDevice
_DEFAULT_BACKUP_TYPE = BackupType.Bip39
from trezor.wire import Context
from trezor.messages import Success
async def reset_device(ctx: wire.Context, msg: ResetDevice) -> Success:
BAK_T_BIP39 = BackupType.Bip39 # global_import_cache
BAK_T_SLIP39_BASIC = BackupType.Slip39_Basic # global_import_cache
BAK_T_SLIP39_ADVANCED = BackupType.Slip39_Advanced # global_import_cache
_DEFAULT_BACKUP_TYPE = BAK_T_BIP39
async def reset_device(ctx: Context, msg: ResetDevice) -> Success:
from trezor import config
from trezor.ui.loader import LoadingAnimation
from apps.common.request_pin import request_pin_confirm
from trezor.ui.layouts import confirm_backup, confirm_reset_device
from trezor.crypto import bip39, random
from trezor.messages import Success, EntropyAck, EntropyRequest
backup_type = msg.backup_type # local_cache_attribute
# validate parameters and device state
_validate_reset_device(msg)
# make sure user knows they're setting up a new wallet
if msg.backup_type == BackupType.Slip39_Basic:
if backup_type == BAK_T_SLIP39_BASIC:
prompt = "Create a new wallet\nwith Shamir Backup?"
elif msg.backup_type == BackupType.Slip39_Advanced:
elif backup_type == BAK_T_SLIP39_ADVANCED:
prompt = "Create a new wallet\nwith Super Shamir?"
else:
prompt = "Do you want to create\na new wallet?"
@ -43,7 +53,7 @@ async def reset_device(ctx: wire.Context, msg: ResetDevice) -> Success:
if msg.pin_protection:
newpin = await request_pin_confirm(ctx)
if not config.change_pin("", newpin, None, None):
raise wire.ProcessError("Failed to set PIN")
raise ProcessError("Failed to set PIN")
# generate and display internal entropy
int_entropy = random.bytes(32)
@ -59,13 +69,13 @@ async def reset_device(ctx: wire.Context, msg: ResetDevice) -> Success:
secret = _compute_secret_from_entropy(int_entropy, ext_entropy, msg.strength)
# Check backup type, perform type-specific handling
if msg.backup_type == BackupType.Bip39:
if backup_type == BAK_T_BIP39:
# in BIP-39 we store mnemonic string instead of the secret
secret = bip39.from_data(secret).encode()
elif msg.backup_type in (BackupType.Slip39_Basic, BackupType.Slip39_Advanced):
elif backup_type in (BAK_T_SLIP39_BASIC, BAK_T_SLIP39_ADVANCED):
# generate and set SLIP39 parameters
storage.device.set_slip39_identifier(slip39.generate_random_identifier())
storage.device.set_slip39_iteration_exponent(slip39.DEFAULT_ITERATION_EXPONENT)
storage_device.set_slip39_identifier(slip39.generate_random_identifier())
storage_device.set_slip39_iteration_exponent(slip39.DEFAULT_ITERATION_EXPONENT)
else:
# Unknown backup type.
raise RuntimeError
@ -80,15 +90,15 @@ async def reset_device(ctx: wire.Context, msg: ResetDevice) -> Success:
# generate and display backup information for the master secret
if perform_backup:
await backup_seed(ctx, msg.backup_type, secret)
await backup_seed(ctx, backup_type, secret)
# write settings and master secret into storage
if msg.label is not None:
storage.device.set_label(msg.label)
storage.device.set_passphrase_enabled(bool(msg.passphrase_protection))
storage.device.store_mnemonic_secret(
storage_device.set_label(msg.label)
storage_device.set_passphrase_enabled(bool(msg.passphrase_protection))
storage_device.store_mnemonic_secret(
secret, # for SLIP-39, this is the EMS
msg.backup_type,
backup_type,
needs_backup=not perform_backup,
no_backup=bool(msg.no_backup),
)
@ -100,19 +110,17 @@ async def reset_device(ctx: wire.Context, msg: ResetDevice) -> Success:
return Success(message="Initialized")
async def backup_slip39_basic(
ctx: wire.Context, encrypted_master_secret: bytes
) -> None:
async def _backup_slip39_basic(ctx: Context, encrypted_master_secret: bytes) -> None:
# get number of shares
await layout.slip39_show_checklist(ctx, 0, BackupType.Slip39_Basic)
await layout.slip39_show_checklist(ctx, 0, BAK_T_SLIP39_BASIC)
shares_count = await layout.slip39_prompt_number_of_shares(ctx)
# get threshold
await layout.slip39_show_checklist(ctx, 1, BackupType.Slip39_Basic)
await layout.slip39_show_checklist(ctx, 1, BAK_T_SLIP39_BASIC)
threshold = await layout.slip39_prompt_threshold(ctx, shares_count)
identifier = storage.device.get_slip39_identifier()
iteration_exponent = storage.device.get_slip39_iteration_exponent()
identifier = storage_device.get_slip39_identifier()
iteration_exponent = storage_device.get_slip39_iteration_exponent()
if identifier is None or iteration_exponent is None:
raise ValueError
@ -126,43 +134,41 @@ async def backup_slip39_basic(
)[0]
# show and confirm individual shares
await layout.slip39_show_checklist(ctx, 2, BackupType.Slip39_Basic)
await layout.slip39_show_checklist(ctx, 2, BAK_T_SLIP39_BASIC)
await layout.slip39_basic_show_and_confirm_shares(ctx, mnemonics)
async def backup_slip39_advanced(
ctx: wire.Context, encrypted_master_secret: bytes
) -> None:
async def _backup_slip39_advanced(ctx: Context, encrypted_master_secret: bytes) -> None:
# get number of groups
await layout.slip39_show_checklist(ctx, 0, BackupType.Slip39_Advanced)
await layout.slip39_show_checklist(ctx, 0, BAK_T_SLIP39_ADVANCED)
groups_count = await layout.slip39_advanced_prompt_number_of_groups(ctx)
# get group threshold
await layout.slip39_show_checklist(ctx, 1, BackupType.Slip39_Advanced)
await layout.slip39_show_checklist(ctx, 1, BAK_T_SLIP39_ADVANCED)
group_threshold = await layout.slip39_advanced_prompt_group_threshold(
ctx, groups_count
)
# get shares and thresholds
await layout.slip39_show_checklist(ctx, 2, BackupType.Slip39_Advanced)
await layout.slip39_show_checklist(ctx, 2, BAK_T_SLIP39_ADVANCED)
groups = []
for i in range(groups_count):
share_count = await layout.slip39_prompt_number_of_shares(ctx, i)
share_threshold = await layout.slip39_prompt_threshold(ctx, share_count, i)
groups.append((share_threshold, share_count))
identifier = storage.device.get_slip39_identifier()
iteration_exponent = storage.device.get_slip39_iteration_exponent()
identifier = storage_device.get_slip39_identifier()
iteration_exponent = storage_device.get_slip39_iteration_exponent()
if identifier is None or iteration_exponent is None:
raise ValueError
# generate the mnemonics
mnemonics = slip39.split_ems(
group_threshold=group_threshold,
groups=groups,
identifier=identifier,
iteration_exponent=iteration_exponent,
encrypted_master_secret=encrypted_master_secret,
group_threshold,
groups,
identifier,
iteration_exponent,
encrypted_master_secret,
)
# show and confirm individual shares
@ -170,28 +176,33 @@ async def backup_slip39_advanced(
def _validate_reset_device(msg: ResetDevice) -> None:
msg.backup_type = msg.backup_type or _DEFAULT_BACKUP_TYPE
if msg.backup_type not in (
BackupType.Bip39,
BackupType.Slip39_Basic,
BackupType.Slip39_Advanced,
from .. import backup_types
from trezor.wire import UnexpectedMessage
backup_type = msg.backup_type or _DEFAULT_BACKUP_TYPE
if backup_type not in (
BAK_T_BIP39,
BAK_T_SLIP39_BASIC,
BAK_T_SLIP39_ADVANCED,
):
raise wire.ProcessError("Backup type not implemented.")
if backup_types.is_slip39_backup_type(msg.backup_type):
raise ProcessError("Backup type not implemented.")
if backup_types.is_slip39_backup_type(backup_type):
if msg.strength not in (128, 256):
raise wire.ProcessError("Invalid strength (has to be 128 or 256 bits)")
raise ProcessError("Invalid strength (has to be 128 or 256 bits)")
else: # BIP-39
if msg.strength not in (128, 192, 256):
raise wire.ProcessError("Invalid strength (has to be 128, 192 or 256 bits)")
raise ProcessError("Invalid strength (has to be 128, 192 or 256 bits)")
if msg.display_random and (msg.skip_backup or msg.no_backup):
raise wire.ProcessError("Can't show internal entropy when backup is skipped")
if storage.device.is_initialized():
raise wire.UnexpectedMessage("Already initialized")
raise ProcessError("Can't show internal entropy when backup is skipped")
if storage_device.is_initialized():
raise UnexpectedMessage("Already initialized")
def _compute_secret_from_entropy(
int_entropy: bytes, ext_entropy: bytes, strength_in_bytes: int
) -> bytes:
from trezor.crypto import hashlib
# combine internal and external entropy
ehash = hashlib.sha256()
ehash.update(int_entropy)
@ -204,11 +215,11 @@ def _compute_secret_from_entropy(
async def backup_seed(
ctx: wire.Context, backup_type: BackupType, mnemonic_secret: bytes
ctx: Context, backup_type: BackupType, mnemonic_secret: bytes
) -> None:
if backup_type == BackupType.Slip39_Basic:
await backup_slip39_basic(ctx, mnemonic_secret)
elif backup_type == BackupType.Slip39_Advanced:
await backup_slip39_advanced(ctx, mnemonic_secret)
if backup_type == BAK_T_SLIP39_BASIC:
await _backup_slip39_basic(ctx, mnemonic_secret)
elif backup_type == BAK_T_SLIP39_ADVANCED:
await _backup_slip39_advanced(ctx, mnemonic_secret)
else:
await layout.bip39_show_and_confirm_mnemonic(ctx, mnemonic_secret.decode())

View File

@ -1,14 +1,10 @@
from micropython import const
from typing import Sequence
from typing import TYPE_CHECKING
from trezor import ui, utils, wire
from trezor.crypto import random
from trezor.enums import ButtonRequestType
from trezor.ui.layouts import confirm_blob, show_success, show_warning
from trezor.ui.layouts import show_success
from trezor.ui.layouts.reset import ( # noqa: F401
select_word,
show_share_words,
show_warning_backup,
slip39_advanced_prompt_group_threshold,
slip39_advanced_prompt_number_of_groups,
slip39_prompt_number_of_shares,
@ -16,18 +12,25 @@ from trezor.ui.layouts.reset import ( # noqa: F401
slip39_show_checklist,
)
if TYPE_CHECKING:
from typing import Sequence
from trezor.wire import GenericContext
if __debug__:
from apps import debug
_NUM_OF_CHOICES = const(3)
async def show_internal_entropy(ctx: wire.GenericContext, entropy: bytes) -> None:
async def show_internal_entropy(ctx: GenericContext, entropy: bytes) -> None:
from trezor import ui
from trezor.ui.layouts import confirm_blob
await confirm_blob(
ctx,
"entropy",
"Internal entropy",
data=entropy,
entropy,
icon=ui.ICON_RESET,
icon_color=ui.ORANGE_ICON,
br_code=ButtonRequestType.ResetDevice,
@ -35,13 +38,16 @@ async def show_internal_entropy(ctx: wire.GenericContext, entropy: bytes) -> Non
async def _confirm_word(
ctx: wire.GenericContext,
ctx: GenericContext,
share_index: int | None,
share_words: Sequence[str],
offset: int,
count: int,
group_index: int | None = None,
) -> bool:
from trezor.crypto import random
from trezor.ui.layouts.reset import select_word
# remove duplicates
non_duplicates = list(set(share_words))
# shuffle list
@ -67,11 +73,13 @@ async def _confirm_word(
async def _confirm_share_words(
ctx: wire.GenericContext,
ctx: GenericContext,
share_index: int | None,
share_words: Sequence[str],
group_index: int | None = None,
) -> bool:
from trezor import utils
# divide list into thirds, rounding up, so that chunking by `third` always yields
# three parts (the last one might be shorter)
third = (len(share_words) + 2) // 3
@ -87,7 +95,7 @@ async def _confirm_share_words(
async def _show_confirmation_success(
ctx: wire.GenericContext,
ctx: GenericContext,
share_index: int | None = None,
num_of_shares: int | None = None,
group_index: int | None = None,
@ -111,12 +119,14 @@ async def _show_confirmation_success(
subheader = f"Group {group_index + 1} - Share {share_index + 1}\nchecked successfully."
text = "Continue with the next\nshare."
return await show_success(ctx, "success_recovery", text, subheader=subheader)
return await show_success(ctx, "success_recovery", text, subheader)
async def _show_confirmation_failure(
ctx: wire.GenericContext, share_index: int | None
ctx: GenericContext, share_index: int | None
) -> None:
from trezor.ui.layouts import show_warning
if share_index is None:
header = "Recovery seed"
else:
@ -124,30 +134,30 @@ async def _show_confirmation_failure(
await show_warning(
ctx,
"warning_backup_check",
header=header,
subheader="That is the wrong word.",
content="Please check again.",
button="Check again",
br_code=ButtonRequestType.ResetDevice,
"Please check again.",
header,
"That is the wrong word.",
"Check again",
ButtonRequestType.ResetDevice,
)
async def show_backup_warning(ctx: wire.GenericContext, slip39: bool = False) -> None:
async def show_backup_warning(ctx: GenericContext, slip39: bool = False) -> None:
from trezor.ui.layouts.reset import show_warning_backup
await show_warning_backup(ctx, slip39)
async def show_backup_success(ctx: wire.GenericContext) -> None:
async def show_backup_success(ctx: GenericContext) -> None:
text = "Use your backup\nwhen you need to\nrecover your wallet."
await show_success(ctx, "success_backup", text, subheader="Your backup is done.")
await show_success(ctx, "success_backup", text, "Your backup is done.")
# BIP39
# ===
async def bip39_show_and_confirm_mnemonic(
ctx: wire.GenericContext, mnemonic: str
) -> None:
async def bip39_show_and_confirm_mnemonic(ctx: GenericContext, mnemonic: str) -> None:
# warn user about mnemonic safety
await show_backup_warning(ctx)
@ -155,7 +165,7 @@ async def bip39_show_and_confirm_mnemonic(
while True:
# display paginated mnemonic on the screen
await show_share_words(ctx, share_words=words)
await show_share_words(ctx, words)
# make the user confirm some words from the mnemonic
if await _confirm_share_words(ctx, None, words):
@ -170,10 +180,10 @@ async def bip39_show_and_confirm_mnemonic(
async def slip39_basic_show_and_confirm_shares(
ctx: wire.GenericContext, shares: Sequence[str]
ctx: GenericContext, shares: Sequence[str]
) -> None:
# warn user about mnemonic safety
await show_backup_warning(ctx, slip39=True)
await show_backup_warning(ctx, True)
for index, share in enumerate(shares):
share_words = share.split(" ")
@ -183,19 +193,17 @@ async def slip39_basic_show_and_confirm_shares(
# make the user confirm words from the share
if await _confirm_share_words(ctx, index, share_words):
await _show_confirmation_success(
ctx, share_index=index, num_of_shares=len(shares)
)
await _show_confirmation_success(ctx, index, len(shares))
break # this share is confirmed, go to next one
else:
await _show_confirmation_failure(ctx, index)
async def slip39_advanced_show_and_confirm_shares(
ctx: wire.GenericContext, shares: Sequence[Sequence[str]]
ctx: GenericContext, shares: Sequence[Sequence[str]]
) -> None:
# warn user about mnemonic safety
await show_backup_warning(ctx, slip39=True)
await show_backup_warning(ctx, True)
for group_index, group in enumerate(shares):
for share_index, share in enumerate(group):
@ -210,9 +218,9 @@ async def slip39_advanced_show_and_confirm_shares(
):
await _show_confirmation_success(
ctx,
share_index=share_index,
num_of_shares=len(group),
group_index=group_index,
share_index,
len(group),
group_index,
)
break # this share is confirmed, go to next one
else:

View File

@ -1,60 +1,65 @@
from typing import TYPE_CHECKING
import storage.device
import storage.sd_salt
from trezor import config, wire
from trezor.crypto import random
import storage.device as storage_device
import storage.sd_salt as storage_sd_salt
from trezor import config
from trezor.enums import SdProtectOperationType
from trezor.messages import Success
from trezor.ui.layouts import confirm_action, show_success
from trezor.ui.layouts import show_success
from trezor.wire import ProcessError
from apps.common.request_pin import (
error_pin_invalid,
request_pin,
request_pin_and_sd_salt,
)
from apps.common.sdcard import confirm_retry_sd, ensure_sdcard
from apps.common.request_pin import error_pin_invalid, request_pin_and_sd_salt
from apps.common.sdcard import ensure_sdcard
if TYPE_CHECKING:
from typing import Awaitable
from trezor.messages import SdProtect
from trezor.wire import Context
def _make_salt() -> tuple[bytes, bytes, bytes]:
salt = random.bytes(storage.sd_salt.SD_SALT_LEN_BYTES)
auth_key = random.bytes(storage.device.SD_SALT_AUTH_KEY_LEN_BYTES)
tag = storage.sd_salt.compute_auth_tag(salt, auth_key)
from trezor.crypto import random
salt = random.bytes(storage_sd_salt.SD_SALT_LEN_BYTES)
auth_key = random.bytes(storage_device.SD_SALT_AUTH_KEY_LEN_BYTES)
tag = storage_sd_salt.compute_auth_tag(salt, auth_key)
return salt, auth_key, tag
async def _set_salt(
ctx: wire.Context, salt: bytes, salt_tag: bytes, stage: bool = False
ctx: Context, salt: bytes, salt_tag: bytes, stage: bool = False
) -> None:
from apps.common.sdcard import confirm_retry_sd
while True:
await ensure_sdcard(ctx)
try:
return storage.sd_salt.set_sd_salt(salt, salt_tag, stage)
return storage_sd_salt.set_sd_salt(salt, salt_tag, stage)
except OSError:
await confirm_retry_sd(ctx, exc=wire.ProcessError("SD card I/O error."))
await confirm_retry_sd(ctx, ProcessError("SD card I/O error."))
async def sd_protect(ctx: wire.Context, msg: SdProtect) -> Success:
if not storage.device.is_initialized():
raise wire.NotInitialized("Device is not initialized")
async def sd_protect(ctx: Context, msg: SdProtect) -> Success:
from trezor.wire import NotInitialized
if not storage_device.is_initialized():
raise NotInitialized("Device is not initialized")
if msg.operation == SdProtectOperationType.ENABLE:
return await sd_protect_enable(ctx, msg)
return await _sd_protect_enable(ctx, msg)
elif msg.operation == SdProtectOperationType.DISABLE:
return await sd_protect_disable(ctx, msg)
return await _sd_protect_disable(ctx, msg)
elif msg.operation == SdProtectOperationType.REFRESH:
return await sd_protect_refresh(ctx, msg)
return await _sd_protect_refresh(ctx, msg)
else:
raise wire.ProcessError("Unknown operation")
raise ProcessError("Unknown operation")
async def sd_protect_enable(ctx: wire.Context, msg: SdProtect) -> Success:
if storage.sd_salt.is_enabled():
raise wire.ProcessError("SD card protection already enabled")
async def _sd_protect_enable(ctx: Context, msg: SdProtect) -> Success:
from apps.common.request_pin import request_pin
if storage_sd_salt.is_enabled():
raise ProcessError("SD card protection already enabled")
# Confirm that user wants to proceed with the operation.
await require_confirm_sd_protect(ctx, msg)
@ -75,7 +80,7 @@ async def sd_protect_enable(ctx: wire.Context, msg: SdProtect) -> Success:
if not config.change_pin(pin, pin, None, salt):
# Wrong PIN. Clean up the prepared salt file.
try:
storage.sd_salt.remove_sd_salt()
storage_sd_salt.remove_sd_salt()
except Exception:
# The cleanup is not necessary for the correct functioning of
# SD-protection. If it fails for any reason, we suppress the
@ -83,7 +88,7 @@ async def sd_protect_enable(ctx: wire.Context, msg: SdProtect) -> Success:
pass
await error_pin_invalid(ctx)
storage.device.set_sd_salt_auth_key(salt_auth_key)
storage_device.set_sd_salt_auth_key(salt_auth_key)
await show_success(
ctx, "success_sd", "You have successfully enabled SD protection."
@ -91,9 +96,9 @@ async def sd_protect_enable(ctx: wire.Context, msg: SdProtect) -> Success:
return Success(message="SD card protection enabled")
async def sd_protect_disable(ctx: wire.Context, msg: SdProtect) -> Success:
if not storage.sd_salt.is_enabled():
raise wire.ProcessError("SD card protection not enabled")
async def _sd_protect_disable(ctx: Context, msg: SdProtect) -> Success:
if not storage_sd_salt.is_enabled():
raise ProcessError("SD card protection not enabled")
# Note that the SD card doesn't need to be present in order to disable SD
# protection. The cleanup will not happen in such case, but that does not matter.
@ -108,11 +113,11 @@ async def sd_protect_disable(ctx: wire.Context, msg: SdProtect) -> Success:
if not config.change_pin(pin, pin, salt, None):
await error_pin_invalid(ctx)
storage.device.set_sd_salt_auth_key(None)
storage_device.set_sd_salt_auth_key(None)
try:
# Clean up.
storage.sd_salt.remove_sd_salt()
storage_sd_salt.remove_sd_salt()
except Exception:
# The cleanup is not necessary for the correct functioning of
# SD-protection. If it fails for any reason, we suppress the exception,
@ -125,9 +130,9 @@ async def sd_protect_disable(ctx: wire.Context, msg: SdProtect) -> Success:
return Success(message="SD card protection disabled")
async def sd_protect_refresh(ctx: wire.Context, msg: SdProtect) -> Success:
if not storage.sd_salt.is_enabled():
raise wire.ProcessError("SD card protection not enabled")
async def _sd_protect_refresh(ctx: Context, msg: SdProtect) -> Success:
if not storage_sd_salt.is_enabled():
raise ProcessError("SD card protection not enabled")
# Confirm that user wants to proceed with the operation.
await require_confirm_sd_protect(ctx, msg)
@ -145,11 +150,11 @@ async def sd_protect_refresh(ctx: wire.Context, msg: SdProtect) -> Success:
if not config.change_pin(pin, pin, old_salt, new_salt):
await error_pin_invalid(ctx)
storage.device.set_sd_salt_auth_key(new_auth_key)
storage_device.set_sd_salt_auth_key(new_auth_key)
try:
# Clean up.
storage.sd_salt.commit_sd_salt()
storage_sd_salt.commit_sd_salt()
except Exception:
# If the cleanup fails, then request_sd_salt() will bring the SD card
# into a consistent state. We suppress the exception, because overall
@ -162,7 +167,9 @@ async def sd_protect_refresh(ctx: wire.Context, msg: SdProtect) -> Success:
return Success(message="SD card protection refreshed")
def require_confirm_sd_protect(ctx: wire.Context, msg: SdProtect) -> Awaitable[None]:
def require_confirm_sd_protect(ctx: Context, msg: SdProtect) -> Awaitable[None]:
from trezor.ui.layouts import confirm_action
if msg.operation == SdProtectOperationType.ENABLE:
text = "Do you really want to secure your device with SD card protection?"
elif msg.operation == SdProtectOperationType.DISABLE:
@ -170,6 +177,6 @@ def require_confirm_sd_protect(ctx: wire.Context, msg: SdProtect) -> Awaitable[N
elif msg.operation == SdProtectOperationType.REFRESH:
text = "Do you really want to replace the current\nSD card secret with a newly generated one?"
else:
raise wire.ProcessError("Unknown operation")
raise ProcessError("Unknown operation")
return confirm_action(ctx, "set_sd", "SD card protection", description=text)

View File

@ -1,17 +1,18 @@
from typing import TYPE_CHECKING
import storage.device
if TYPE_CHECKING:
from trezor.messages import SetU2FCounter, Success
from trezor.wire import Context
async def set_u2f_counter(ctx: Context, msg: SetU2FCounter) -> Success:
import storage.device as storage_device
from trezor import ui, wire
from trezor.enums import ButtonRequestType
from trezor.messages import Success
from trezor.ui.layouts import confirm_action
if TYPE_CHECKING:
from trezor.messages import SetU2FCounter
async def set_u2f_counter(ctx: wire.Context, msg: SetU2FCounter) -> Success:
if not storage.device.is_initialized():
if not storage_device.is_initialized():
raise wire.NotInitialized("Device is not initialized")
if msg.u2f_counter is None:
raise wire.ProcessError("No value provided")
@ -19,13 +20,13 @@ async def set_u2f_counter(ctx: wire.Context, msg: SetU2FCounter) -> Success:
await confirm_action(
ctx,
"set_u2f_counter",
title="Set U2F counter",
"Set U2F counter",
description="Do you really want to\nset the U2F counter\nto {}?",
description_param=str(msg.u2f_counter),
icon=ui.ICON_CONFIG,
br_code=ButtonRequestType.ProtectCall,
)
storage.device.set_u2f_counter(msg.u2f_counter)
storage_device.set_u2f_counter(msg.u2f_counter)
return Success(message="U2F counter set")

View File

@ -1,25 +1,25 @@
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from trezor.wire import GenericContext
from trezor.messages import WipeDevice, Success
async def wipe_device(ctx: GenericContext, msg: WipeDevice) -> Success:
import storage
from trezor import ui
from trezor.enums import ButtonRequestType
from trezor.messages import Success
from trezor.ui.layouts import confirm_action
from .apply_settings import reload_settings_from_storage
from apps.base import reload_settings_from_storage
if TYPE_CHECKING:
from trezor import wire
from trezor.messages import WipeDevice
async def wipe_device(ctx: wire.GenericContext, msg: WipeDevice) -> Success:
await confirm_action(
ctx,
"confirm_wipe",
title="Wipe device",
description="Do you really want to\nwipe the device?\n",
action="All data will be lost.",
"Wipe device",
"All data will be lost.",
"Do you really want to\nwipe the device?\n",
reverse=True,
verb="Hold to confirm",
hold=True,