diff --git a/core/src/apps/management/apply_flags.py b/core/src/apps/management/apply_flags.py index 04f7c65bba..77e457a71b 100644 --- a/core/src/apps/management/apply_flags.py +++ b/core/src/apps/management/apply_flags.py @@ -1,16 +1,18 @@ from typing import TYPE_CHECKING -import storage.device -from storage.device import set_flags -from trezor import wire -from trezor.messages import Success - if TYPE_CHECKING: + from trezor.messages import Success from trezor.messages import ApplyFlags + from trezor.wire import GenericContext -async def apply_flags(ctx: wire.GenericContext, msg: ApplyFlags) -> Success: +async def apply_flags(ctx: GenericContext, msg: ApplyFlags) -> Success: + import storage.device + from storage.device import set_flags + from trezor.wire import NotInitialized + from trezor.messages import Success + if not storage.device.is_initialized(): - raise wire.NotInitialized("Device is not initialized") + raise NotInitialized("Device is not initialized") set_flags(msg.flags) return Success(message="Flags applied") diff --git a/core/src/apps/management/apply_settings.py b/core/src/apps/management/apply_settings.py index fdf8bfd34d..a57bdc6d85 100644 --- a/core/src/apps/management/apply_settings.py +++ b/core/src/apps/management/apply_settings.py @@ -1,159 +1,175 @@ from typing import TYPE_CHECKING -import storage.device -from trezor import ui, wire -from trezor.enums import ButtonRequestType, SafetyCheckLevel -from trezor.messages import Success -from trezor.strings import format_duration_ms +from trezor.enums import ButtonRequestType from trezor.ui.layouts import confirm_action - -from apps.base import reload_settings_from_storage -from apps.common import safety_checks +from trezor.wire import DataError if TYPE_CHECKING: - from trezor.messages import ApplySettings + from trezor.messages import ApplySettings, Success + from trezor.wire import Context, GenericContext + from trezor.enums import SafetyCheckLevel -def validate_homescreen(homescreen: bytes) -> None: +BRT_PROTECT_CALL = ButtonRequestType.ProtectCall # CACHE + + +def _validate_homescreen(homescreen: bytes) -> None: + from trezor import ui + import storage.device as storage_device + if homescreen == b"": return - if len(homescreen) > storage.device.HOMESCREEN_MAXSIZE: - raise wire.DataError( - f"Homescreen is too large, maximum size is {storage.device.HOMESCREEN_MAXSIZE} bytes" + if len(homescreen) > storage_device.HOMESCREEN_MAXSIZE: + raise DataError( + f"Homescreen is too large, maximum size is {storage_device.HOMESCREEN_MAXSIZE} bytes" ) try: w, h, toif_format = ui.display.toif_info(homescreen) except ValueError: - raise wire.DataError("Invalid homescreen") + raise DataError("Invalid homescreen") if w != 144 or h != 144: - raise wire.DataError("Homescreen must be 144x144 pixel large") + raise DataError("Homescreen must be 144x144 pixel large") if toif_format != ui.display.TOIF_FULL_COLOR_BE: - raise wire.DataError("Homescreen must be full-color TOIF image") + raise DataError("Homescreen must be full-color TOIF image") -async def apply_settings(ctx: wire.Context, msg: ApplySettings) -> Success: - if not storage.device.is_initialized(): - raise wire.NotInitialized("Device is not initialized") +async def apply_settings(ctx: Context, msg: ApplySettings) -> Success: + import storage.device as storage_device + from apps.common import safety_checks + from trezor.messages import Success + from trezor.wire import ProcessError, NotInitialized + from apps.base import reload_settings_from_storage + + if not storage_device.is_initialized(): + raise NotInitialized("Device is not initialized") + homescreen = msg.homescreen # local_cache_attribute + label = msg.label # local_cache_attribute + auto_lock_delay_ms = msg.auto_lock_delay_ms # local_cache_attribute + use_passphrase = msg.use_passphrase # local_cache_attribute + passphrase_always_on_device = ( + msg.passphrase_always_on_device + ) # local_cache_attribute + display_rotation = msg.display_rotation # local_cache_attribute + msg_safety_checks = msg.safety_checks # local_cache_attribute + experimental_features = msg.experimental_features # local_cache_attribute + if ( - msg.homescreen is None - and msg.label is None - and msg.use_passphrase is None - and msg.passphrase_always_on_device is None - and msg.display_rotation is None - and msg.auto_lock_delay_ms is None - and msg.safety_checks is None - and msg.experimental_features is None + homescreen is None + and label is None + and use_passphrase is None + and passphrase_always_on_device is None + and display_rotation is None + and auto_lock_delay_ms is None + and msg_safety_checks is None + and experimental_features is None ): - raise wire.ProcessError("No setting provided") + raise ProcessError("No setting provided") - if msg.homescreen is not None: - validate_homescreen(msg.homescreen) - await require_confirm_change_homescreen(ctx) + if homescreen is not None: + _validate_homescreen(homescreen) + await _require_confirm_change_homescreen(ctx) try: - storage.device.set_homescreen(msg.homescreen) + storage_device.set_homescreen(homescreen) except ValueError: - raise wire.DataError("Invalid homescreen") + raise DataError("Invalid homescreen") - if msg.label is not None: - if len(msg.label) > storage.device.LABEL_MAXLENGTH: - raise wire.DataError("Label too long") - await require_confirm_change_label(ctx, msg.label) - storage.device.set_label(msg.label) + if label is not None: + if len(label) > storage_device.LABEL_MAXLENGTH: + raise DataError("Label too long") + await _require_confirm_change_label(ctx, label) + storage_device.set_label(label) - if msg.use_passphrase is not None: - await require_confirm_change_passphrase(ctx, msg.use_passphrase) - storage.device.set_passphrase_enabled(msg.use_passphrase) + if use_passphrase is not None: + await _require_confirm_change_passphrase(ctx, use_passphrase) + storage_device.set_passphrase_enabled(use_passphrase) - if msg.passphrase_always_on_device is not None: - if not storage.device.is_passphrase_enabled(): - raise wire.DataError("Passphrase is not enabled") - await require_confirm_change_passphrase_source( - ctx, msg.passphrase_always_on_device + if passphrase_always_on_device is not None: + if not storage_device.is_passphrase_enabled(): + raise DataError("Passphrase is not enabled") + await _require_confirm_change_passphrase_source( + ctx, passphrase_always_on_device ) - storage.device.set_passphrase_always_on_device(msg.passphrase_always_on_device) + storage_device.set_passphrase_always_on_device(passphrase_always_on_device) - if msg.auto_lock_delay_ms is not None: - if msg.auto_lock_delay_ms < storage.device.AUTOLOCK_DELAY_MINIMUM: - raise wire.ProcessError("Auto-lock delay too short") - if msg.auto_lock_delay_ms > storage.device.AUTOLOCK_DELAY_MAXIMUM: - raise wire.ProcessError("Auto-lock delay too long") - await require_confirm_change_autolock_delay(ctx, msg.auto_lock_delay_ms) - storage.device.set_autolock_delay_ms(msg.auto_lock_delay_ms) + if auto_lock_delay_ms is not None: + if auto_lock_delay_ms < storage_device.AUTOLOCK_DELAY_MINIMUM: + raise ProcessError("Auto-lock delay too short") + if auto_lock_delay_ms > storage_device.AUTOLOCK_DELAY_MAXIMUM: + raise ProcessError("Auto-lock delay too long") + await _require_confirm_change_autolock_delay(ctx, auto_lock_delay_ms) + storage_device.set_autolock_delay_ms(auto_lock_delay_ms) - if msg.safety_checks is not None: - await require_confirm_safety_checks(ctx, msg.safety_checks) - safety_checks.apply_setting(msg.safety_checks) + if msg_safety_checks is not None: + await _require_confirm_safety_checks(ctx, msg_safety_checks) + safety_checks.apply_setting(msg_safety_checks) - if msg.display_rotation is not None: - await require_confirm_change_display_rotation(ctx, msg.display_rotation) - storage.device.set_rotation(msg.display_rotation) + if display_rotation is not None: + await _require_confirm_change_display_rotation(ctx, display_rotation) + storage_device.set_rotation(display_rotation) - if msg.experimental_features is not None: - await require_confirm_experimental_features(ctx, msg.experimental_features) - storage.device.set_experimental_features(msg.experimental_features) + if experimental_features is not None: + await _require_confirm_experimental_features(ctx, experimental_features) + storage_device.set_experimental_features(experimental_features) reload_settings_from_storage() return Success(message="Settings applied") -async def require_confirm_change_homescreen(ctx: wire.GenericContext) -> None: +async def _require_confirm_change_homescreen(ctx: GenericContext) -> None: await confirm_action( ctx, "set_homescreen", "Set homescreen", description="Do you really want to change the homescreen image?", - br_code=ButtonRequestType.ProtectCall, + br_code=BRT_PROTECT_CALL, ) -async def require_confirm_change_label(ctx: wire.GenericContext, label: str) -> None: +async def _require_confirm_change_label(ctx: GenericContext, label: str) -> None: await confirm_action( ctx, "set_label", "Change label", description="Do you really want to change the label to {}?", description_param=label, - br_code=ButtonRequestType.ProtectCall, + br_code=BRT_PROTECT_CALL, ) -async def require_confirm_change_passphrase( - ctx: wire.GenericContext, use: bool -) -> None: - if use: - description = "Do you really want to enable passphrase encryption?" - else: - description = "Do you really want to disable passphrase encryption?" +async def _require_confirm_change_passphrase(ctx: GenericContext, use: bool) -> None: + template = "Do you really want to {} passphrase encryption?" + description = template.format("enable" if use else "disable") await confirm_action( ctx, "set_passphrase", "Enable passphrase" if use else "Disable passphrase", description=description, - br_code=ButtonRequestType.ProtectCall, + br_code=BRT_PROTECT_CALL, ) -async def require_confirm_change_passphrase_source( - ctx: wire.GenericContext, passphrase_always_on_device: bool +async def _require_confirm_change_passphrase_source( + ctx: GenericContext, passphrase_always_on_device: bool ) -> None: - if passphrase_always_on_device: - description = "Do you really want to enter passphrase always on the device?" - else: - description = "Do you want to revoke the passphrase on device setting?" + description = ( + "Do you really want to enter passphrase always on the device?" + if passphrase_always_on_device + else "Do you want to revoke the passphrase on device setting?" + ) await confirm_action( ctx, "set_passphrase_source", "Passphrase source", description=description, - br_code=ButtonRequestType.ProtectCall, + br_code=BRT_PROTECT_CALL, ) -async def require_confirm_change_display_rotation( - ctx: wire.GenericContext, rotation: int +async def _require_confirm_change_display_rotation( + ctx: GenericContext, rotation: int ) -> None: if rotation == 0: label = "north" @@ -164,80 +180,81 @@ async def require_confirm_change_display_rotation( elif rotation == 270: label = "west" else: - raise wire.DataError("Unsupported display rotation") + raise DataError("Unsupported display rotation") + await confirm_action( ctx, "set_rotation", "Change rotation", description="Do you really want to change display rotation to {}?", description_param=label, - br_code=ButtonRequestType.ProtectCall, + br_code=BRT_PROTECT_CALL, ) -async def require_confirm_change_autolock_delay( - ctx: wire.GenericContext, delay_ms: int +async def _require_confirm_change_autolock_delay( + ctx: GenericContext, delay_ms: int ) -> None: + from trezor.strings import format_duration_ms + await confirm_action( ctx, "set_autolock_delay", "Auto-lock delay", description="Do you really want to auto-lock your device after {}?", description_param=format_duration_ms(delay_ms), - br_code=ButtonRequestType.ProtectCall, + br_code=BRT_PROTECT_CALL, ) -async def require_confirm_safety_checks( - ctx: wire.GenericContext, level: SafetyCheckLevel +async def _require_confirm_safety_checks( + ctx: GenericContext, level: SafetyCheckLevel ) -> None: - if level == SafetyCheckLevel.PromptAlways: - await confirm_action( - ctx, - "set_safety_checks", - "Safety override", - hold=True, - verb="Hold to confirm", - description="Trezor will allow you to approve some actions which might be unsafe.", - action="Are you sure?", - reverse=True, - larger_vspace=True, - br_code=ButtonRequestType.ProtectCall, - ) - elif level == SafetyCheckLevel.PromptTemporarily: - await confirm_action( - ctx, - "set_safety_checks", - "Safety override", - hold=True, - verb="Hold to confirm", - description="Trezor will temporarily allow you to approve some actions which might be unsafe.", - action="Are you sure?", - reverse=True, - br_code=ButtonRequestType.ProtectCall, - ) - elif level == SafetyCheckLevel.Strict: + from trezor.enums import SafetyCheckLevel + + if level == SafetyCheckLevel.Strict: await confirm_action( ctx, "set_safety_checks", "Safety checks", description="Do you really want to enforce strict safety checks (recommended)?", - br_code=ButtonRequestType.ProtectCall, + br_code=BRT_PROTECT_CALL, + ) + elif level in (SafetyCheckLevel.PromptAlways, SafetyCheckLevel.PromptTemporarily): + # Reusing most stuff for both levels + template = ( + "Trezor will{}allow you to approve some actions which might be unsafe." + ) + description = template.format( + " temporarily " if level == SafetyCheckLevel.PromptTemporarily else " " + ) + + await confirm_action( + ctx, + "set_safety_checks", + "Safety override", + "Are you sure?", + description, + hold=True, + verb="Hold to confirm", + reverse=True, + larger_vspace=level == SafetyCheckLevel.PromptAlways, + br_code=BRT_PROTECT_CALL, ) else: raise ValueError # enum value out of range -async def require_confirm_experimental_features( - ctx: wire.GenericContext, enable: bool +async def _require_confirm_experimental_features( + ctx: GenericContext, enable: bool ) -> None: if enable: await confirm_action( ctx, "set_experimental_features", "Experimental mode", - description="Enable experimental features?", - action="Only for development and beta testing!", + "Only for development and beta testing!", + "Enable experimental features?", reverse=True, - br_code=ButtonRequestType.ProtectCall, + br_code=BRT_PROTECT_CALL, ) diff --git a/core/src/apps/management/backup_device.py b/core/src/apps/management/backup_device.py index 6715684842..83425c1057 100644 --- a/core/src/apps/management/backup_device.py +++ b/core/src/apps/management/backup_device.py @@ -1,34 +1,34 @@ from typing import TYPE_CHECKING -import storage -import storage.device -from trezor import wire -from trezor.messages import Success - -from apps.common import mnemonic - -from .reset_device import backup_seed, layout - if TYPE_CHECKING: - from trezor.messages import BackupDevice + from trezor.messages import BackupDevice, Success + from trezor.wire import Context -async def backup_device(ctx: wire.Context, msg: BackupDevice) -> Success: - if not storage.device.is_initialized(): +async def backup_device(ctx: Context, msg: BackupDevice) -> Success: + import storage.device as storage_device + from trezor import wire + from trezor.messages import Success + + from apps.common import mnemonic + + from .reset_device import backup_seed, layout + + if not storage_device.is_initialized(): raise wire.NotInitialized("Device is not initialized") - if not storage.device.needs_backup(): + if not storage_device.needs_backup(): raise wire.ProcessError("Seed already backed up") mnemonic_secret, mnemonic_type = mnemonic.get() if mnemonic_secret is None: raise RuntimeError - storage.device.set_unfinished_backup(True) - storage.device.set_backed_up() + storage_device.set_unfinished_backup(True) + storage_device.set_backed_up() await backup_seed(ctx, mnemonic_type, mnemonic_secret) - storage.device.set_unfinished_backup(False) + storage_device.set_unfinished_backup(False) await layout.show_backup_success(ctx) diff --git a/core/src/apps/management/backup_types.py b/core/src/apps/management/backup_types.py index 442c83f399..ba901a2ea9 100644 --- a/core/src/apps/management/backup_types.py +++ b/core/src/apps/management/backup_types.py @@ -1,6 +1,10 @@ -from trezor.crypto.slip39 import Share +from typing import TYPE_CHECKING + from trezor.enums import BackupType +if TYPE_CHECKING: + from trezor.crypto.slip39 import Share + _BIP39_WORD_COUNTS = (12, 18, 24) _SLIP39_WORD_COUNTS = (20, 33) diff --git a/core/src/apps/management/change_pin.py b/core/src/apps/management/change_pin.py index a947608031..45826dd0d3 100644 --- a/core/src/apps/management/change_pin.py +++ b/core/src/apps/management/change_pin.py @@ -1,29 +1,31 @@ from typing import TYPE_CHECKING -from storage.device import is_initialized from trezor import config, wire -from trezor.messages import Success -from trezor.ui.layouts import confirm_action, show_success - -from apps.common.request_pin import ( - error_pin_invalid, - error_pin_matches_wipe_code, - request_pin_and_sd_salt, - request_pin_confirm, -) if TYPE_CHECKING: from typing import Awaitable - from trezor.messages import ChangePin + from trezor.messages import ChangePin, Success + from trezor.wire import Context -async def change_pin(ctx: wire.Context, msg: ChangePin) -> Success: +async def change_pin(ctx: Context, msg: ChangePin) -> Success: + from storage.device import is_initialized + from trezor.messages import Success + from trezor.ui.layouts import show_success + + from apps.common.request_pin import ( + error_pin_invalid, + error_pin_matches_wipe_code, + request_pin_and_sd_salt, + request_pin_confirm, + ) + if not is_initialized(): raise wire.NotInitialized("Device is not initialized") # confirm that user wants to change the pin - await require_confirm_change_pin(ctx, msg) + await _require_confirm_change_pin(ctx, msg) # get old pin curpin, salt = await request_pin_and_sd_salt(ctx, "Enter old PIN") @@ -61,7 +63,9 @@ async def change_pin(ctx: wire.Context, msg: ChangePin) -> Success: return Success(message=msg_wire) -def require_confirm_change_pin(ctx: wire.Context, msg: ChangePin) -> Awaitable[None]: +def _require_confirm_change_pin(ctx: Context, msg: ChangePin) -> Awaitable[None]: + from trezor.ui.layouts import confirm_action + has_pin = config.has_pin() if msg.remove and has_pin: # removing pin @@ -69,8 +73,8 @@ def require_confirm_change_pin(ctx: wire.Context, msg: ChangePin) -> Awaitable[N ctx, "set_pin", "Remove PIN", - description="Do you really want to", - action="disable PIN protection?", + "disable PIN protection?", + "Do you really want to", reverse=True, ) @@ -79,8 +83,8 @@ def require_confirm_change_pin(ctx: wire.Context, msg: ChangePin) -> Awaitable[N ctx, "set_pin", "Change PIN", - description="Do you really want to", - action="change your PIN?", + "change your PIN?", + "Do you really want to", reverse=True, ) @@ -89,8 +93,8 @@ def require_confirm_change_pin(ctx: wire.Context, msg: ChangePin) -> Awaitable[N ctx, "set_pin", "Enable PIN", - description="Do you really want to", - action="enable PIN protection?", + "enable PIN protection?", + "Do you really want to", reverse=True, ) diff --git a/core/src/apps/management/change_wipe_code.py b/core/src/apps/management/change_wipe_code.py index b9e5ecbcb1..67f3355b67 100644 --- a/core/src/apps/management/change_wipe_code.py +++ b/core/src/apps/management/change_wipe_code.py @@ -1,25 +1,25 @@ from typing import TYPE_CHECKING -from storage.device import is_initialized -from trezor import config, ui, wire -from trezor.messages import Success -from trezor.ui.layouts import confirm_action, show_popup, show_success - -from apps.common.request_pin import ( - error_pin_invalid, - request_pin, - request_pin_and_sd_salt, -) - if TYPE_CHECKING: from typing import Awaitable + from trezor.wire import Context - from trezor.messages import ChangeWipeCode + from trezor.messages import ChangeWipeCode, Success -async def change_wipe_code(ctx: wire.Context, msg: ChangeWipeCode) -> Success: +async def change_wipe_code(ctx: Context, msg: ChangeWipeCode) -> Success: + from storage.device import is_initialized + from trezor.wire import NotInitialized + from trezor.ui.layouts import show_success + from trezor.messages import Success + from trezor import config + from apps.common.request_pin import ( + error_pin_invalid, + request_pin_and_sd_salt, + ) + if not is_initialized(): - raise wire.NotInitialized("Device is not initialized") + raise NotInitialized("Device is not initialized") # Confirm that user wants to set or remove the wipe code. has_wipe_code = config.has_wipe_code() @@ -58,15 +58,19 @@ async def change_wipe_code(ctx: wire.Context, msg: ChangeWipeCode) -> Success: def _require_confirm_action( - ctx: wire.Context, msg: ChangeWipeCode, has_wipe_code: bool + ctx: Context, msg: ChangeWipeCode, has_wipe_code: bool ) -> Awaitable[None]: + from trezor import ui + from trezor.wire import ProcessError + from trezor.ui.layouts import confirm_action + if msg.remove and has_wipe_code: return confirm_action( ctx, "disable_wipe_code", - title="Disable wipe code", - description="Do you really want to", - action="disable wipe code protection?", + "Disable wipe code", + "disable wipe code protection?", + "Do you really want to", reverse=True, icon=ui.ICON_CONFIG, ) @@ -75,9 +79,9 @@ def _require_confirm_action( return confirm_action( ctx, "change_wipe_code", - title="Change wipe code", - description="Do you really want to", - action="change the wipe code?", + "Change wipe code", + "change the wipe code?", + "Do you really want to", reverse=True, icon=ui.ICON_CONFIG, ) @@ -86,39 +90,36 @@ def _require_confirm_action( return confirm_action( ctx, "set_wipe_code", - title="Set wipe code", - description="Do you really want to", - action="set the wipe code?", + "Set wipe code", + "set the wipe code?", + "Do you really want to", reverse=True, icon=ui.ICON_CONFIG, ) # Removing non-existing wipe code. - raise wire.ProcessError("Wipe code protection is already disabled") + raise ProcessError("Wipe code protection is already disabled") -async def _request_wipe_code_confirm(ctx: wire.Context, pin: str) -> str: +async def _request_wipe_code_confirm(ctx: Context, pin: str) -> str: + from trezor.ui.layouts import show_popup + from apps.common.request_pin import request_pin + while True: code1 = await request_pin(ctx, "Enter new wipe code") if code1 == pin: - await _wipe_code_invalid() + # _wipe_code_invalid + await show_popup( + "Invalid wipe code", + "The wipe code must be\ndifferent from your PIN.\n\nPlease try again.", + ) continue code2 = await request_pin(ctx, "Re-enter new wipe code") if code1 == code2: return code1 - await _wipe_code_mismatch() - - -async def _wipe_code_invalid() -> None: - await show_popup( - title="Invalid wipe code", - description="The wipe code must be\ndifferent from your PIN.\n\nPlease try again.", - ) - - -async def _wipe_code_mismatch() -> None: - await show_popup( - title="Code mismatch", - description="The wipe codes you\nentered do not match.\n\nPlease try again.", - ) + # _wipe_code_mismatch + await show_popup( + "Code mismatch", + "The wipe codes you\nentered do not match.\n\nPlease try again.", + ) diff --git a/core/src/apps/management/get_next_u2f_counter.py b/core/src/apps/management/get_next_u2f_counter.py index fba46429bb..cacaaa1aa1 100644 --- a/core/src/apps/management/get_next_u2f_counter.py +++ b/core/src/apps/management/get_next_u2f_counter.py @@ -1,28 +1,28 @@ from typing import TYPE_CHECKING -import storage.device -from trezor import ui, wire -from trezor.enums import ButtonRequestType -from trezor.messages import NextU2FCounter -from trezor.ui.layouts import confirm_action - if TYPE_CHECKING: - from trezor.messages import GetNextU2FCounter + from trezor.messages import GetNextU2FCounter, NextU2FCounter + from trezor.wire import Context -async def get_next_u2f_counter( - ctx: wire.Context, msg: GetNextU2FCounter -) -> NextU2FCounter: - if not storage.device.is_initialized(): - raise wire.NotInitialized("Device is not initialized") +async def get_next_u2f_counter(ctx: Context, msg: GetNextU2FCounter) -> NextU2FCounter: + import storage.device as storage_device + from trezor import ui + from trezor.wire import NotInitialized + from trezor.enums import ButtonRequestType + from trezor.messages import NextU2FCounter + from trezor.ui.layouts import confirm_action + + if not storage_device.is_initialized(): + raise NotInitialized("Device is not initialized") await confirm_action( ctx, "get_u2f_counter", - title="Get next U2F counter", + "Get next U2F counter", description="Do you really want to increase and retrieve\nthe U2F counter?", icon=ui.ICON_CONFIG, br_code=ButtonRequestType.ProtectCall, ) - return NextU2FCounter(u2f_counter=storage.device.next_u2f_counter()) + return NextU2FCounter(u2f_counter=storage_device.next_u2f_counter()) diff --git a/core/src/apps/management/get_nonce.py b/core/src/apps/management/get_nonce.py index 30ff20463c..5671427910 100644 --- a/core/src/apps/management/get_nonce.py +++ b/core/src/apps/management/get_nonce.py @@ -1,15 +1,15 @@ from typing import TYPE_CHECKING -from storage import cache -from trezor import wire -from trezor.crypto import random -from trezor.messages import Nonce - if TYPE_CHECKING: - from trezor.messages import GetNonce + from trezor.messages import GetNonce, Nonce + from trezor.wire import Context -async def get_nonce(ctx: wire.Context, msg: GetNonce) -> Nonce: +async def get_nonce(ctx: Context, msg: GetNonce) -> Nonce: + from storage import cache + from trezor.crypto import random + from trezor.messages import Nonce + nonce = random.bytes(32) cache.set(cache.APP_COMMON_NONCE, nonce) return Nonce(nonce=nonce) diff --git a/core/src/apps/management/reboot_to_bootloader.py b/core/src/apps/management/reboot_to_bootloader.py index 0e4bbe3be4..2ab516ac1c 100644 --- a/core/src/apps/management/reboot_to_bootloader.py +++ b/core/src/apps/management/reboot_to_bootloader.py @@ -1,16 +1,17 @@ from typing import TYPE_CHECKING -import storage.device -from trezor import io, loop, utils, wire -from trezor.messages import Success -from trezor.ui.layouts import confirm_action - if TYPE_CHECKING: from trezor.messages import RebootToBootloader from typing import NoReturn + from trezor.wire import Context -async def reboot_to_bootloader(ctx: wire.Context, msg: RebootToBootloader) -> NoReturn: +async def reboot_to_bootloader(ctx: Context, msg: RebootToBootloader) -> NoReturn: + import storage.device + from trezor import io, loop, utils, wire + from trezor.messages import Success + from trezor.ui.layouts import confirm_action + if not storage.device.get_experimental_features(): raise wire.UnexpectedMessage("Experimental features are not enabled") diff --git a/core/src/apps/management/recovery_device/__init__.py b/core/src/apps/management/recovery_device/__init__.py index 5c55e9942c..deb71b46ac 100644 --- a/core/src/apps/management/recovery_device/__init__.py +++ b/core/src/apps/management/recovery_device/__init__.py @@ -1,24 +1,9 @@ from typing import TYPE_CHECKING -import storage -import storage.device -import storage.recovery -from trezor import config, ui, wire, workflow -from trezor.enums import ButtonRequestType -from trezor.messages import Success -from trezor.ui.layouts import confirm_action, confirm_reset_device - -from apps.common.request_pin import ( - error_pin_invalid, - request_pin_and_sd_salt, - request_pin_confirm, -) - -from .homescreen import recovery_homescreen, recovery_process - if TYPE_CHECKING: from trezor.messages import RecoveryDevice - + from trezor.wire import Context + from trezor.messages import Success # List of RecoveryDevice fields that can be set when doing dry-run recovery. # All except `dry_run` are allowed for T1 compatibility, but their values are ignored. @@ -26,69 +11,52 @@ if TYPE_CHECKING: DRY_RUN_ALLOWED_FIELDS = ("dry_run", "word_count", "enforce_wordlist", "type") -async def recovery_device(ctx: wire.Context, msg: RecoveryDevice) -> Success: +async def recovery_device(ctx: Context, msg: RecoveryDevice) -> Success: """ Recover BIP39/SLIP39 seed into empty device. Recovery is also possible with replugged Trezor. We call this process Persistence. User starts the process here using the RecoveryDevice msg and then they can unplug the device anytime and continue without a computer. """ - _validate(msg) + import storage + import storage.device as storage_device + import storage.recovery as storage_recovery + from trezor import config, ui, wire, workflow + from trezor.enums import ButtonRequestType + from trezor.ui.layouts import confirm_action, confirm_reset_device + from apps.common.request_pin import ( + error_pin_invalid, + request_pin_and_sd_salt, + request_pin_confirm, + ) + from .homescreen import recovery_homescreen, recovery_process - if storage.recovery.is_in_progress(): - return await recovery_process(ctx) + dry_run = msg.dry_run # local_cache_attribute - await _continue_dialog(ctx, msg) - - if not msg.dry_run: - # wipe storage to make sure the device is in a clear state - storage.reset() - - # for dry run pin needs to be entered - if msg.dry_run: - curpin, salt = await request_pin_and_sd_salt(ctx, "Enter PIN") - if not config.check_pin(curpin, salt): - await error_pin_invalid(ctx) - - if not msg.dry_run: - # set up pin if requested - if msg.pin_protection: - newpin = await request_pin_confirm(ctx, allow_cancel=False) - config.change_pin("", newpin, None, None) - - storage.device.set_passphrase_enabled(bool(msg.passphrase_protection)) - if msg.u2f_counter is not None: - storage.device.set_u2f_counter(msg.u2f_counter) - if msg.label is not None: - storage.device.set_label(msg.label) - - storage.recovery.set_in_progress(True) - storage.recovery.set_dry_run(bool(msg.dry_run)) - - workflow.set_default(recovery_homescreen) - return await recovery_process(ctx) - - -def _validate(msg: RecoveryDevice) -> None: - if not msg.dry_run and storage.device.is_initialized(): + # -------------------------------------------------------- + # validate + if not dry_run and storage_device.is_initialized(): raise wire.UnexpectedMessage("Already initialized") - if msg.dry_run and not storage.device.is_initialized(): + if dry_run and not storage_device.is_initialized(): raise wire.NotInitialized("Device is not initialized") - if msg.enforce_wordlist is False: raise wire.ProcessError( "Value enforce_wordlist must be True, Trezor Core enforces words automatically." ) - - if msg.dry_run: + if dry_run: # check that only allowed fields are set for key, value in msg.__dict__.items(): if key not in DRY_RUN_ALLOWED_FIELDS and value is not None: raise wire.ProcessError(f"Forbidden field set in dry-run: {key}") + # END validate + # -------------------------------------------------------- + if storage_recovery.is_in_progress(): + return await recovery_process(ctx) -async def _continue_dialog(ctx: wire.Context, msg: RecoveryDevice) -> None: - if not msg.dry_run: + # -------------------------------------------------------- + # _continue_dialog + if not dry_run: await confirm_reset_device( ctx, "Do you really want to\nrecover a wallet?", recovery=True ) @@ -96,8 +64,38 @@ async def _continue_dialog(ctx: wire.Context, msg: RecoveryDevice) -> None: await confirm_action( ctx, "confirm_seedcheck", - title="Seed check", + "Seed check", description="Do you really want to check the recovery seed?", icon=ui.ICON_RECOVERY, br_code=ButtonRequestType.ProtectCall, ) + # END _continue_dialog + # -------------------------------------------------------- + + if not dry_run: + # wipe storage to make sure the device is in a clear state + storage.reset() + + # for dry run pin needs to be entered + if dry_run: + curpin, salt = await request_pin_and_sd_salt(ctx, "Enter PIN") + if not config.check_pin(curpin, salt): + await error_pin_invalid(ctx) + + if not dry_run: + # set up pin if requested + if msg.pin_protection: + newpin = await request_pin_confirm(ctx, allow_cancel=False) + config.change_pin("", newpin, None, None) + + storage_device.set_passphrase_enabled(bool(msg.passphrase_protection)) + if msg.u2f_counter is not None: + storage_device.set_u2f_counter(msg.u2f_counter) + if msg.label is not None: + storage_device.set_label(msg.label) + + storage_recovery.set_in_progress(True) + storage_recovery.set_dry_run(bool(dry_run)) + + workflow.set_default(recovery_homescreen) + return await recovery_process(ctx) diff --git a/core/src/apps/management/recovery_device/homescreen.py b/core/src/apps/management/recovery_device/homescreen.py index 0f3806758e..01a5f92879 100644 --- a/core/src/apps/management/recovery_device/homescreen.py +++ b/core/src/apps/management/recovery_device/homescreen.py @@ -1,24 +1,23 @@ -import storage -import storage.device -import storage.recovery -import storage.recovery_shares -from trezor import strings, utils, wire, workflow -from trezor.crypto import slip39 -from trezor.crypto.hashlib import sha256 -from trezor.enums import BackupType, MessageType -from trezor.errors import MnemonicError -from trezor.messages import Success -from trezor.ui.layouts import show_success +from typing import TYPE_CHECKING -from apps.common import mnemonic -from apps.homescreen.homescreen import homescreen +import storage.device as storage_device +import storage.recovery as storage_recovery +from trezor import wire +from trezor.messages import Success from .. import backup_types from . import layout, recover +if TYPE_CHECKING: + from trezor.wire import GenericContext + from trezor.enums import BackupType + async def recovery_homescreen() -> None: - if not storage.recovery.is_in_progress(): + from trezor import workflow + from apps.homescreen.homescreen import homescreen + + if not storage_recovery.is_in_progress(): workflow.set_default(homescreen) return @@ -27,22 +26,27 @@ async def recovery_homescreen() -> None: await recovery_process(ctx) -async def recovery_process(ctx: wire.GenericContext) -> Success: +async def recovery_process(ctx: GenericContext) -> Success: + from trezor.enums import MessageType + import storage + wire.AVOID_RESTARTING_FOR = (MessageType.Initialize, MessageType.GetFeatures) try: return await _continue_recovery_process(ctx) except recover.RecoveryAborted: - dry_run = storage.recovery.is_dry_run() + dry_run = storage_recovery.is_dry_run() if dry_run: - storage.recovery.end_progress() + storage_recovery.end_progress() else: storage.wipe() raise wire.ActionCancelled -async def _continue_recovery_process(ctx: wire.GenericContext) -> Success: +async def _continue_recovery_process(ctx: GenericContext) -> Success: + from trezor.errors import MnemonicError + # gather the current recovery state from storage - dry_run = storage.recovery.is_dry_run() + dry_run = storage_recovery.is_dry_run() word_count, backup_type = recover.load_slip39_state() # Both word_count and backup_type are derived from the same data. Both will be @@ -60,7 +64,10 @@ async def _continue_recovery_process(ctx: wire.GenericContext) -> Success: while secret is None: if is_first_step: # If we are starting recovery, ask for word count first... - word_count = await _request_word_count(ctx, dry_run) + # _request_word_count + await layout.homescreen_dialog(ctx, "Select", "Select number of words") + # ask for the number of words + word_count = await layout.request_word_count(ctx, dry_run) # ...and only then show the starting screen with word count. await _request_share_first_screen(ctx, word_count) assert word_count is not None @@ -91,8 +98,12 @@ async def _continue_recovery_process(ctx: wire.GenericContext) -> Success: async def _finish_recovery_dry_run( - ctx: wire.GenericContext, secret: bytes, backup_type: BackupType + ctx: GenericContext, secret: bytes, backup_type: BackupType ) -> Success: + from trezor.crypto.hashlib import sha256 + from trezor import utils + from apps.common import mnemonic + if backup_type is None: raise RuntimeError @@ -105,15 +116,15 @@ async def _finish_recovery_dry_run( # Check that the identifier and iteration exponent match as well if is_slip39: result &= ( - storage.device.get_slip39_identifier() - == storage.recovery.get_slip39_identifier() + storage_device.get_slip39_identifier() + == storage_recovery.get_slip39_identifier() ) result &= ( - storage.device.get_slip39_iteration_exponent() - == storage.recovery.get_slip39_iteration_exponent() + storage_device.get_slip39_iteration_exponent() + == storage_recovery.get_slip39_iteration_exponent() ) - storage.recovery.end_progress() + storage_recovery.end_progress() await layout.show_dry_run_result(ctx, result, is_slip39) @@ -124,24 +135,27 @@ async def _finish_recovery_dry_run( async def _finish_recovery( - ctx: wire.GenericContext, secret: bytes, backup_type: BackupType + ctx: GenericContext, secret: bytes, backup_type: BackupType ) -> Success: + from trezor.ui.layouts import show_success + from trezor.enums import BackupType + if backup_type is None: raise RuntimeError - storage.device.store_mnemonic_secret( + storage_device.store_mnemonic_secret( secret, backup_type, needs_backup=False, no_backup=False ) if backup_type in (BackupType.Slip39_Basic, BackupType.Slip39_Advanced): - identifier = storage.recovery.get_slip39_identifier() - exponent = storage.recovery.get_slip39_iteration_exponent() + identifier = storage_recovery.get_slip39_identifier() + exponent = storage_recovery.get_slip39_iteration_exponent() if identifier is None or exponent is None: # Identifier and exponent need to be stored in storage at this point raise RuntimeError - storage.device.set_slip39_identifier(identifier) - storage.device.set_slip39_iteration_exponent(exponent) + storage_device.set_slip39_identifier(identifier) + storage_device.set_slip39_iteration_exponent(exponent) - storage.recovery.end_progress() + storage_recovery.end_progress() await show_success( ctx, "success_recovery", "You have successfully recovered your wallet." @@ -149,15 +163,8 @@ async def _finish_recovery( return Success(message="Device recovered") -async def _request_word_count(ctx: wire.GenericContext, dry_run: bool) -> int: - await layout.homescreen_dialog(ctx, "Select", "Select number of words") - - # ask for the number of words - return await layout.request_word_count(ctx, dry_run) - - async def _process_words( - ctx: wire.GenericContext, words: str + ctx: GenericContext, words: str ) -> tuple[bytes | None, BackupType]: word_count = len(words.split(" ")) is_slip39 = backup_types.is_slip39_word_count(word_count) @@ -178,11 +185,9 @@ async def _process_words( return secret, backup_type -async def _request_share_first_screen( - ctx: wire.GenericContext, word_count: int -) -> None: +async def _request_share_first_screen(ctx: GenericContext, word_count: int) -> None: if backup_types.is_slip39_word_count(word_count): - remaining = storage.recovery.fetch_slip39_remaining_shares() + remaining = storage_recovery.fetch_slip39_remaining_shares() if remaining: await _request_share_next_screen(ctx) else: @@ -195,9 +200,11 @@ async def _request_share_first_screen( ) -async def _request_share_next_screen(ctx: wire.GenericContext) -> None: - remaining = storage.recovery.fetch_slip39_remaining_shares() - group_count = storage.recovery.get_slip39_group_count() +async def _request_share_next_screen(ctx: GenericContext) -> None: + from trezor import strings + + remaining = storage_recovery.fetch_slip39_remaining_shares() + group_count = storage_recovery.get_slip39_group_count() if not remaining: # 'remaining' should be stored at this point raise RuntimeError @@ -214,11 +221,14 @@ async def _request_share_next_screen(ctx: wire.GenericContext) -> None: await layout.homescreen_dialog(ctx, "Enter share", text, "needed to enter") -async def _show_remaining_groups_and_shares(ctx: wire.GenericContext) -> None: +async def _show_remaining_groups_and_shares(ctx: GenericContext) -> None: """ Show info dialog for Slip39 Advanced - what shares are to be entered. """ - shares_remaining = storage.recovery.fetch_slip39_remaining_shares() + from trezor.crypto import slip39 + import storage.recovery_shares as storage_recovery_shares + + shares_remaining = storage_recovery.fetch_slip39_remaining_shares() # should be stored at this point assert shares_remaining @@ -231,13 +241,13 @@ async def _show_remaining_groups_and_shares(ctx: wire.GenericContext) -> None: share = None for index, remaining in enumerate(shares_remaining): if 0 <= remaining < slip39.MAX_SHARE_COUNT: - m = storage.recovery_shares.fetch_group(index)[0] + m = storage_recovery_shares.fetch_group(index)[0] if not share: share = slip39.decode_mnemonic(m) identifier = m.split(" ")[0:3] groups.add((remaining, tuple(identifier))) elif remaining == slip39.MAX_SHARE_COUNT: # no shares yet - identifier = storage.recovery_shares.fetch_group(first_entered_index)[ + identifier = storage_recovery_shares.fetch_group(first_entered_index)[ 0 ].split(" ")[0:2] groups.add((remaining, tuple(identifier))) diff --git a/core/src/apps/management/recovery_device/layout.py b/core/src/apps/management/recovery_device/layout.py index 76175340d0..d734da13a8 100644 --- a/core/src/apps/management/recovery_device/layout.py +++ b/core/src/apps/management/recovery_device/layout.py @@ -1,28 +1,25 @@ from typing import TYPE_CHECKING -import storage.recovery -from trezor import ui, wire from trezor.enums import ButtonRequestType -from trezor.ui.layouts import confirm_action, show_success, show_warning -from trezor.ui.layouts.common import button_request +from trezor.ui.layouts import show_warning from trezor.ui.layouts.recovery import ( # noqa: F401 - continue_recovery, - request_word, request_word_count, show_group_share_success, show_remaining_shares, ) from .. import backup_types -from . import word_validity -from .recover import RecoveryAborted if TYPE_CHECKING: from typing import Callable from trezor.enums import BackupType + from trezor.wire import GenericContext -async def confirm_abort(ctx: wire.GenericContext, dry_run: bool = False) -> None: +async def _confirm_abort(ctx: GenericContext, dry_run: bool = False) -> None: + from trezor import ui + from trezor.ui.layouts import confirm_action + if dry_run: await confirm_action( ctx, @@ -37,8 +34,8 @@ async def confirm_abort(ctx: wire.GenericContext, dry_run: bool = False) -> None ctx, "abort_recovery", "Abort recovery", - description="Do you really want to abort the recovery process?", - action="All progress will be lost.", + "All progress will be lost.", + "Do you really want to abort the recovery process?", reverse=True, icon=ui.ICON_WIPE, br_code=ButtonRequestType.ProtectCall, @@ -46,9 +43,13 @@ async def confirm_abort(ctx: wire.GenericContext, dry_run: bool = False) -> None async def request_mnemonic( - ctx: wire.GenericContext, word_count: int, backup_type: BackupType | None + ctx: GenericContext, word_count: int, backup_type: BackupType | None ) -> str | None: - await button_request(ctx, "mnemonic", code=ButtonRequestType.MnemonicInput) + from . import word_validity + from trezor.ui.layouts.common import button_request + from trezor.ui.layouts.recovery import request_word + + await button_request(ctx, "mnemonic", ButtonRequestType.MnemonicInput) words: list[str] = [] for i in range(word_count): @@ -60,21 +61,38 @@ async def request_mnemonic( try: word_validity.check(backup_type, words) except word_validity.AlreadyAdded: - await show_share_already_added(ctx) + # show_share_already_added + await show_warning( + ctx, + "warning_known_share", + "Share already entered,\nplease enter\na different share.", + ) return None except word_validity.IdentifierMismatch: - await show_identifier_mismatch(ctx) + # show_identifier_mismatch + await show_warning( + ctx, + "warning_mismatched_share", + "You have entered\na share from another\nShamir Backup.", + ) return None except word_validity.ThresholdReached: - await show_group_threshold_reached(ctx) + # show_group_threshold_reached + await show_warning( + ctx, + "warning_group_threshold", + "Threshold of this\ngroup has been reached.\nInput share from\ndifferent group.", + ) return None return " ".join(words) async def show_dry_run_result( - ctx: wire.GenericContext, result: bool, is_slip39: bool + ctx: GenericContext, result: bool, is_slip39: bool ) -> None: + from trezor.ui.layouts import show_success + if result: if is_slip39: text = "The entered recovery\nshares are valid and\nmatch what is currently\nin the device." @@ -89,7 +107,7 @@ async def show_dry_run_result( await show_warning(ctx, "warning_dry_recovery", text, button="Continue") -async def show_invalid_mnemonic(ctx: wire.GenericContext, word_count: int) -> None: +async def show_invalid_mnemonic(ctx: GenericContext, word_count: int) -> None: if backup_types.is_slip39_word_count(word_count): await show_warning( ctx, @@ -104,39 +122,20 @@ async def show_invalid_mnemonic(ctx: wire.GenericContext, word_count: int) -> No ) -async def show_share_already_added(ctx: wire.GenericContext) -> None: - await show_warning( - ctx, - "warning_known_share", - "Share already entered,\nplease enter\na different share.", - ) - - -async def show_identifier_mismatch(ctx: wire.GenericContext) -> None: - await show_warning( - ctx, - "warning_mismatched_share", - "You have entered\na share from another\nShamir Backup.", - ) - - -async def show_group_threshold_reached(ctx: wire.GenericContext) -> None: - await show_warning( - ctx, - "warning_group_threshold", - "Threshold of this\ngroup has been reached.\nInput share from\ndifferent group.", - ) - - async def homescreen_dialog( - ctx: wire.GenericContext, + ctx: GenericContext, button_label: str, text: str, subtext: str | None = None, info_func: Callable | None = None, ) -> None: + from .recover import RecoveryAborted + import storage.recovery as storage_recovery + from trezor.wire import ActionCancelled + from trezor.ui.layouts.recovery import continue_recovery + while True: - dry_run = storage.recovery.is_dry_run() + dry_run = storage_recovery.is_dry_run() if await continue_recovery( ctx, button_label, text, subtext, info_func, dry_run ): @@ -144,8 +143,8 @@ async def homescreen_dialog( break # user has chosen to abort, confirm the choice try: - await confirm_abort(ctx, dry_run) - except wire.ActionCancelled: + await _confirm_abort(ctx, dry_run) + except ActionCancelled: pass else: raise RecoveryAborted diff --git a/core/src/apps/management/recovery_device/recover.py b/core/src/apps/management/recovery_device/recover.py index e44b1b5357..0f3461bd24 100644 --- a/core/src/apps/management/recovery_device/recover.py +++ b/core/src/apps/management/recovery_device/recover.py @@ -1,11 +1,8 @@ from typing import TYPE_CHECKING -import storage.recovery +import storage.recovery as storage_recovery import storage.recovery_shares -from trezor.crypto import bip39, slip39 -from trezor.errors import MnemonicError - -from .. import backup_types +from trezor.crypto import slip39 if TYPE_CHECKING: from trezor.enums import BackupType @@ -20,6 +17,9 @@ def process_bip39(words: str) -> bytes: Receives single mnemonic and processes it. Returns what is then stored in the storage, which is the mnemonic itself for BIP-39. """ + from trezor.crypto import bip39 + from trezor.errors import MnemonicError + if not bip39.check(words): raise MnemonicError return words.encode() @@ -32,17 +32,17 @@ def process_slip39(words: str) -> tuple[bytes | None, slip39.Share]: """ share = slip39.decode_mnemonic(words) - remaining = storage.recovery.fetch_slip39_remaining_shares() + group_index = share.group_index # local_cache_attribute + + remaining = storage_recovery.fetch_slip39_remaining_shares() # if this is the first share, parse and store metadata if not remaining: - storage.recovery.set_slip39_group_count(share.group_count) - storage.recovery.set_slip39_iteration_exponent(share.iteration_exponent) - storage.recovery.set_slip39_identifier(share.identifier) - storage.recovery.set_slip39_remaining_shares( - share.threshold - 1, share.group_index - ) - storage.recovery_shares.set(share.index, share.group_index, words) + storage_recovery.set_slip39_group_count(share.group_count) + storage_recovery.set_slip39_iteration_exponent(share.iteration_exponent) + storage_recovery.set_slip39_identifier(share.identifier) + storage_recovery.set_slip39_remaining_shares(share.threshold - 1, group_index) + storage.recovery_shares.set(share.index, group_index, words) # if share threshold and group threshold are 1 # we can calculate the secret right away @@ -54,24 +54,21 @@ def process_slip39(words: str) -> tuple[bytes | None, slip39.Share]: return None, share # These should be checked by UI before so it's a Runtime exception otherwise - if share.identifier != storage.recovery.get_slip39_identifier(): + if share.identifier != storage_recovery.get_slip39_identifier(): raise RuntimeError("Slip39: Share identifiers do not match") - if share.iteration_exponent != storage.recovery.get_slip39_iteration_exponent(): + if share.iteration_exponent != storage_recovery.get_slip39_iteration_exponent(): raise RuntimeError("Slip39: Share exponents do not match") - if storage.recovery_shares.get(share.index, share.group_index): + if storage.recovery_shares.get(share.index, group_index): raise RuntimeError("Slip39: This mnemonic was already entered") - if share.group_count != storage.recovery.get_slip39_group_count(): + if share.group_count != storage_recovery.get_slip39_group_count(): raise RuntimeError("Slip39: Group count does not match") remaining_for_share = ( - storage.recovery.get_slip39_remaining_shares(share.group_index) - or share.threshold + storage_recovery.get_slip39_remaining_shares(group_index) or share.threshold ) - storage.recovery.set_slip39_remaining_shares( - remaining_for_share - 1, share.group_index - ) - remaining[share.group_index] = remaining_for_share - 1 - storage.recovery_shares.set(share.index, share.group_index, words) + storage_recovery.set_slip39_remaining_shares(remaining_for_share - 1, group_index) + remaining[group_index] = remaining_for_share - 1 + storage.recovery_shares.set(share.index, group_index, words) if remaining.count(0) < share.group_threshold: # we need more shares @@ -97,6 +94,8 @@ if TYPE_CHECKING: def load_slip39_state() -> Slip39State: + from .. import backup_types + previous_mnemonics = fetch_previous_mnemonics() if not previous_mnemonics: return None, None @@ -109,9 +108,9 @@ def load_slip39_state() -> Slip39State: def fetch_previous_mnemonics() -> list[list[str]] | None: mnemonics = [] - if not storage.recovery.get_slip39_group_count(): + if not storage_recovery.get_slip39_group_count(): return None - for i in range(storage.recovery.get_slip39_group_count()): + for i in range(storage_recovery.get_slip39_group_count()): mnemonics.append(storage.recovery_shares.fetch_group(i)) if not any(p for p in mnemonics): return None diff --git a/core/src/apps/management/recovery_device/word_validity.py b/core/src/apps/management/recovery_device/word_validity.py index 8058bc61d7..cf5cb1030e 100644 --- a/core/src/apps/management/recovery_device/word_validity.py +++ b/core/src/apps/management/recovery_device/word_validity.py @@ -1,7 +1,7 @@ -import storage.recovery -from trezor.enums import BackupType +from typing import TYPE_CHECKING -from . import recover +if TYPE_CHECKING: + from trezor.enums import BackupType class WordValidityResult(Exception): @@ -21,6 +21,9 @@ class ThresholdReached(WordValidityResult): def check(backup_type: BackupType | None, partial_mnemonic: list[str]) -> None: + from trezor.enums import BackupType + from . import recover + # we can't perform any checks if the backup type was not yet decided if backup_type is None: return @@ -34,15 +37,15 @@ def check(backup_type: BackupType | None, partial_mnemonic: list[str]) -> None: raise RuntimeError if backup_type == BackupType.Slip39_Basic: - check_slip39_basic(partial_mnemonic, previous_mnemonics) + _check_slip39_basic(partial_mnemonic, previous_mnemonics) elif backup_type == BackupType.Slip39_Advanced: - check_slip39_advanced(partial_mnemonic, previous_mnemonics) + _check_slip39_advanced(partial_mnemonic, previous_mnemonics) else: # there are no other backup types raise RuntimeError -def check_slip39_basic( +def _check_slip39_basic( partial_mnemonic: list[str], previous_mnemonics: list[list[str]] ) -> None: # check if first 3 words of mnemonic match @@ -61,9 +64,11 @@ def check_slip39_basic( raise AlreadyAdded -def check_slip39_advanced( +def _check_slip39_advanced( partial_mnemonic: list[str], previous_mnemonics: list[list[str]] ) -> None: + import storage.recovery + current_index = len(partial_mnemonic) - 1 current_word = partial_mnemonic[-1] diff --git a/core/src/apps/management/reset_device/__init__.py b/core/src/apps/management/reset_device/__init__.py index adbd808b5c..471ff0a52d 100644 --- a/core/src/apps/management/reset_device/__init__.py +++ b/core/src/apps/management/reset_device/__init__.py @@ -1,16 +1,11 @@ from typing import TYPE_CHECKING import storage -import storage.device -from trezor import config, wire -from trezor.crypto import bip39, hashlib, random, slip39 +import storage.device as storage_device +from trezor.crypto import slip39 from trezor.enums import BackupType -from trezor.messages import EntropyAck, EntropyRequest, Success -from trezor.ui.layouts import confirm_backup, confirm_reset_device -from trezor.ui.loader import LoadingAnimation +from trezor.wire import ProcessError -from .. import backup_types -from ..change_pin import request_pin_confirm from . import layout if __debug__: @@ -18,18 +13,33 @@ if __debug__: if TYPE_CHECKING: from trezor.messages import ResetDevice - -_DEFAULT_BACKUP_TYPE = BackupType.Bip39 + from trezor.wire import Context + from trezor.messages import Success -async def reset_device(ctx: wire.Context, msg: ResetDevice) -> Success: +BAK_T_BIP39 = BackupType.Bip39 # global_import_cache +BAK_T_SLIP39_BASIC = BackupType.Slip39_Basic # global_import_cache +BAK_T_SLIP39_ADVANCED = BackupType.Slip39_Advanced # global_import_cache +_DEFAULT_BACKUP_TYPE = BAK_T_BIP39 + + +async def reset_device(ctx: Context, msg: ResetDevice) -> Success: + from trezor import config + from trezor.ui.loader import LoadingAnimation + from apps.common.request_pin import request_pin_confirm + from trezor.ui.layouts import confirm_backup, confirm_reset_device + from trezor.crypto import bip39, random + from trezor.messages import Success, EntropyAck, EntropyRequest + + backup_type = msg.backup_type # local_cache_attribute + # validate parameters and device state _validate_reset_device(msg) # make sure user knows they're setting up a new wallet - if msg.backup_type == BackupType.Slip39_Basic: + if backup_type == BAK_T_SLIP39_BASIC: prompt = "Create a new wallet\nwith Shamir Backup?" - elif msg.backup_type == BackupType.Slip39_Advanced: + elif backup_type == BAK_T_SLIP39_ADVANCED: prompt = "Create a new wallet\nwith Super Shamir?" else: prompt = "Do you want to create\na new wallet?" @@ -43,7 +53,7 @@ async def reset_device(ctx: wire.Context, msg: ResetDevice) -> Success: if msg.pin_protection: newpin = await request_pin_confirm(ctx) if not config.change_pin("", newpin, None, None): - raise wire.ProcessError("Failed to set PIN") + raise ProcessError("Failed to set PIN") # generate and display internal entropy int_entropy = random.bytes(32) @@ -59,13 +69,13 @@ async def reset_device(ctx: wire.Context, msg: ResetDevice) -> Success: secret = _compute_secret_from_entropy(int_entropy, ext_entropy, msg.strength) # Check backup type, perform type-specific handling - if msg.backup_type == BackupType.Bip39: + if backup_type == BAK_T_BIP39: # in BIP-39 we store mnemonic string instead of the secret secret = bip39.from_data(secret).encode() - elif msg.backup_type in (BackupType.Slip39_Basic, BackupType.Slip39_Advanced): + elif backup_type in (BAK_T_SLIP39_BASIC, BAK_T_SLIP39_ADVANCED): # generate and set SLIP39 parameters - storage.device.set_slip39_identifier(slip39.generate_random_identifier()) - storage.device.set_slip39_iteration_exponent(slip39.DEFAULT_ITERATION_EXPONENT) + storage_device.set_slip39_identifier(slip39.generate_random_identifier()) + storage_device.set_slip39_iteration_exponent(slip39.DEFAULT_ITERATION_EXPONENT) else: # Unknown backup type. raise RuntimeError @@ -80,15 +90,15 @@ async def reset_device(ctx: wire.Context, msg: ResetDevice) -> Success: # generate and display backup information for the master secret if perform_backup: - await backup_seed(ctx, msg.backup_type, secret) + await backup_seed(ctx, backup_type, secret) # write settings and master secret into storage if msg.label is not None: - storage.device.set_label(msg.label) - storage.device.set_passphrase_enabled(bool(msg.passphrase_protection)) - storage.device.store_mnemonic_secret( + storage_device.set_label(msg.label) + storage_device.set_passphrase_enabled(bool(msg.passphrase_protection)) + storage_device.store_mnemonic_secret( secret, # for SLIP-39, this is the EMS - msg.backup_type, + backup_type, needs_backup=not perform_backup, no_backup=bool(msg.no_backup), ) @@ -100,19 +110,17 @@ async def reset_device(ctx: wire.Context, msg: ResetDevice) -> Success: return Success(message="Initialized") -async def backup_slip39_basic( - ctx: wire.Context, encrypted_master_secret: bytes -) -> None: +async def _backup_slip39_basic(ctx: Context, encrypted_master_secret: bytes) -> None: # get number of shares - await layout.slip39_show_checklist(ctx, 0, BackupType.Slip39_Basic) + await layout.slip39_show_checklist(ctx, 0, BAK_T_SLIP39_BASIC) shares_count = await layout.slip39_prompt_number_of_shares(ctx) # get threshold - await layout.slip39_show_checklist(ctx, 1, BackupType.Slip39_Basic) + await layout.slip39_show_checklist(ctx, 1, BAK_T_SLIP39_BASIC) threshold = await layout.slip39_prompt_threshold(ctx, shares_count) - identifier = storage.device.get_slip39_identifier() - iteration_exponent = storage.device.get_slip39_iteration_exponent() + identifier = storage_device.get_slip39_identifier() + iteration_exponent = storage_device.get_slip39_iteration_exponent() if identifier is None or iteration_exponent is None: raise ValueError @@ -126,43 +134,41 @@ async def backup_slip39_basic( )[0] # show and confirm individual shares - await layout.slip39_show_checklist(ctx, 2, BackupType.Slip39_Basic) + await layout.slip39_show_checklist(ctx, 2, BAK_T_SLIP39_BASIC) await layout.slip39_basic_show_and_confirm_shares(ctx, mnemonics) -async def backup_slip39_advanced( - ctx: wire.Context, encrypted_master_secret: bytes -) -> None: +async def _backup_slip39_advanced(ctx: Context, encrypted_master_secret: bytes) -> None: # get number of groups - await layout.slip39_show_checklist(ctx, 0, BackupType.Slip39_Advanced) + await layout.slip39_show_checklist(ctx, 0, BAK_T_SLIP39_ADVANCED) groups_count = await layout.slip39_advanced_prompt_number_of_groups(ctx) # get group threshold - await layout.slip39_show_checklist(ctx, 1, BackupType.Slip39_Advanced) + await layout.slip39_show_checklist(ctx, 1, BAK_T_SLIP39_ADVANCED) group_threshold = await layout.slip39_advanced_prompt_group_threshold( ctx, groups_count ) # get shares and thresholds - await layout.slip39_show_checklist(ctx, 2, BackupType.Slip39_Advanced) + await layout.slip39_show_checklist(ctx, 2, BAK_T_SLIP39_ADVANCED) groups = [] for i in range(groups_count): share_count = await layout.slip39_prompt_number_of_shares(ctx, i) share_threshold = await layout.slip39_prompt_threshold(ctx, share_count, i) groups.append((share_threshold, share_count)) - identifier = storage.device.get_slip39_identifier() - iteration_exponent = storage.device.get_slip39_iteration_exponent() + identifier = storage_device.get_slip39_identifier() + iteration_exponent = storage_device.get_slip39_iteration_exponent() if identifier is None or iteration_exponent is None: raise ValueError # generate the mnemonics mnemonics = slip39.split_ems( - group_threshold=group_threshold, - groups=groups, - identifier=identifier, - iteration_exponent=iteration_exponent, - encrypted_master_secret=encrypted_master_secret, + group_threshold, + groups, + identifier, + iteration_exponent, + encrypted_master_secret, ) # show and confirm individual shares @@ -170,28 +176,33 @@ async def backup_slip39_advanced( def _validate_reset_device(msg: ResetDevice) -> None: - msg.backup_type = msg.backup_type or _DEFAULT_BACKUP_TYPE - if msg.backup_type not in ( - BackupType.Bip39, - BackupType.Slip39_Basic, - BackupType.Slip39_Advanced, + from .. import backup_types + from trezor.wire import UnexpectedMessage + + backup_type = msg.backup_type or _DEFAULT_BACKUP_TYPE + if backup_type not in ( + BAK_T_BIP39, + BAK_T_SLIP39_BASIC, + BAK_T_SLIP39_ADVANCED, ): - raise wire.ProcessError("Backup type not implemented.") - if backup_types.is_slip39_backup_type(msg.backup_type): + raise ProcessError("Backup type not implemented.") + if backup_types.is_slip39_backup_type(backup_type): if msg.strength not in (128, 256): - raise wire.ProcessError("Invalid strength (has to be 128 or 256 bits)") + raise ProcessError("Invalid strength (has to be 128 or 256 bits)") else: # BIP-39 if msg.strength not in (128, 192, 256): - raise wire.ProcessError("Invalid strength (has to be 128, 192 or 256 bits)") + raise ProcessError("Invalid strength (has to be 128, 192 or 256 bits)") if msg.display_random and (msg.skip_backup or msg.no_backup): - raise wire.ProcessError("Can't show internal entropy when backup is skipped") - if storage.device.is_initialized(): - raise wire.UnexpectedMessage("Already initialized") + raise ProcessError("Can't show internal entropy when backup is skipped") + if storage_device.is_initialized(): + raise UnexpectedMessage("Already initialized") def _compute_secret_from_entropy( int_entropy: bytes, ext_entropy: bytes, strength_in_bytes: int ) -> bytes: + from trezor.crypto import hashlib + # combine internal and external entropy ehash = hashlib.sha256() ehash.update(int_entropy) @@ -204,11 +215,11 @@ def _compute_secret_from_entropy( async def backup_seed( - ctx: wire.Context, backup_type: BackupType, mnemonic_secret: bytes + ctx: Context, backup_type: BackupType, mnemonic_secret: bytes ) -> None: - if backup_type == BackupType.Slip39_Basic: - await backup_slip39_basic(ctx, mnemonic_secret) - elif backup_type == BackupType.Slip39_Advanced: - await backup_slip39_advanced(ctx, mnemonic_secret) + if backup_type == BAK_T_SLIP39_BASIC: + await _backup_slip39_basic(ctx, mnemonic_secret) + elif backup_type == BAK_T_SLIP39_ADVANCED: + await _backup_slip39_advanced(ctx, mnemonic_secret) else: await layout.bip39_show_and_confirm_mnemonic(ctx, mnemonic_secret.decode()) diff --git a/core/src/apps/management/reset_device/layout.py b/core/src/apps/management/reset_device/layout.py index 9a09520183..5a894c45b6 100644 --- a/core/src/apps/management/reset_device/layout.py +++ b/core/src/apps/management/reset_device/layout.py @@ -1,14 +1,10 @@ from micropython import const -from typing import Sequence +from typing import TYPE_CHECKING -from trezor import ui, utils, wire -from trezor.crypto import random from trezor.enums import ButtonRequestType -from trezor.ui.layouts import confirm_blob, show_success, show_warning +from trezor.ui.layouts import show_success from trezor.ui.layouts.reset import ( # noqa: F401 - select_word, show_share_words, - show_warning_backup, slip39_advanced_prompt_group_threshold, slip39_advanced_prompt_number_of_groups, slip39_prompt_number_of_shares, @@ -16,18 +12,25 @@ from trezor.ui.layouts.reset import ( # noqa: F401 slip39_show_checklist, ) +if TYPE_CHECKING: + from typing import Sequence + from trezor.wire import GenericContext + if __debug__: from apps import debug _NUM_OF_CHOICES = const(3) -async def show_internal_entropy(ctx: wire.GenericContext, entropy: bytes) -> None: +async def show_internal_entropy(ctx: GenericContext, entropy: bytes) -> None: + from trezor import ui + from trezor.ui.layouts import confirm_blob + await confirm_blob( ctx, "entropy", "Internal entropy", - data=entropy, + entropy, icon=ui.ICON_RESET, icon_color=ui.ORANGE_ICON, br_code=ButtonRequestType.ResetDevice, @@ -35,13 +38,16 @@ async def show_internal_entropy(ctx: wire.GenericContext, entropy: bytes) -> Non async def _confirm_word( - ctx: wire.GenericContext, + ctx: GenericContext, share_index: int | None, share_words: Sequence[str], offset: int, count: int, group_index: int | None = None, ) -> bool: + from trezor.crypto import random + from trezor.ui.layouts.reset import select_word + # remove duplicates non_duplicates = list(set(share_words)) # shuffle list @@ -67,11 +73,13 @@ async def _confirm_word( async def _confirm_share_words( - ctx: wire.GenericContext, + ctx: GenericContext, share_index: int | None, share_words: Sequence[str], group_index: int | None = None, ) -> bool: + from trezor import utils + # divide list into thirds, rounding up, so that chunking by `third` always yields # three parts (the last one might be shorter) third = (len(share_words) + 2) // 3 @@ -87,7 +95,7 @@ async def _confirm_share_words( async def _show_confirmation_success( - ctx: wire.GenericContext, + ctx: GenericContext, share_index: int | None = None, num_of_shares: int | None = None, group_index: int | None = None, @@ -111,12 +119,14 @@ async def _show_confirmation_success( subheader = f"Group {group_index + 1} - Share {share_index + 1}\nchecked successfully." text = "Continue with the next\nshare." - return await show_success(ctx, "success_recovery", text, subheader=subheader) + return await show_success(ctx, "success_recovery", text, subheader) async def _show_confirmation_failure( - ctx: wire.GenericContext, share_index: int | None + ctx: GenericContext, share_index: int | None ) -> None: + from trezor.ui.layouts import show_warning + if share_index is None: header = "Recovery seed" else: @@ -124,30 +134,30 @@ async def _show_confirmation_failure( await show_warning( ctx, "warning_backup_check", - header=header, - subheader="That is the wrong word.", - content="Please check again.", - button="Check again", - br_code=ButtonRequestType.ResetDevice, + "Please check again.", + header, + "That is the wrong word.", + "Check again", + ButtonRequestType.ResetDevice, ) -async def show_backup_warning(ctx: wire.GenericContext, slip39: bool = False) -> None: +async def show_backup_warning(ctx: GenericContext, slip39: bool = False) -> None: + from trezor.ui.layouts.reset import show_warning_backup + await show_warning_backup(ctx, slip39) -async def show_backup_success(ctx: wire.GenericContext) -> None: +async def show_backup_success(ctx: GenericContext) -> None: text = "Use your backup\nwhen you need to\nrecover your wallet." - await show_success(ctx, "success_backup", text, subheader="Your backup is done.") + await show_success(ctx, "success_backup", text, "Your backup is done.") # BIP39 # === -async def bip39_show_and_confirm_mnemonic( - ctx: wire.GenericContext, mnemonic: str -) -> None: +async def bip39_show_and_confirm_mnemonic(ctx: GenericContext, mnemonic: str) -> None: # warn user about mnemonic safety await show_backup_warning(ctx) @@ -155,7 +165,7 @@ async def bip39_show_and_confirm_mnemonic( while True: # display paginated mnemonic on the screen - await show_share_words(ctx, share_words=words) + await show_share_words(ctx, words) # make the user confirm some words from the mnemonic if await _confirm_share_words(ctx, None, words): @@ -170,10 +180,10 @@ async def bip39_show_and_confirm_mnemonic( async def slip39_basic_show_and_confirm_shares( - ctx: wire.GenericContext, shares: Sequence[str] + ctx: GenericContext, shares: Sequence[str] ) -> None: # warn user about mnemonic safety - await show_backup_warning(ctx, slip39=True) + await show_backup_warning(ctx, True) for index, share in enumerate(shares): share_words = share.split(" ") @@ -183,19 +193,17 @@ async def slip39_basic_show_and_confirm_shares( # make the user confirm words from the share if await _confirm_share_words(ctx, index, share_words): - await _show_confirmation_success( - ctx, share_index=index, num_of_shares=len(shares) - ) + await _show_confirmation_success(ctx, index, len(shares)) break # this share is confirmed, go to next one else: await _show_confirmation_failure(ctx, index) async def slip39_advanced_show_and_confirm_shares( - ctx: wire.GenericContext, shares: Sequence[Sequence[str]] + ctx: GenericContext, shares: Sequence[Sequence[str]] ) -> None: # warn user about mnemonic safety - await show_backup_warning(ctx, slip39=True) + await show_backup_warning(ctx, True) for group_index, group in enumerate(shares): for share_index, share in enumerate(group): @@ -210,9 +218,9 @@ async def slip39_advanced_show_and_confirm_shares( ): await _show_confirmation_success( ctx, - share_index=share_index, - num_of_shares=len(group), - group_index=group_index, + share_index, + len(group), + group_index, ) break # this share is confirmed, go to next one else: diff --git a/core/src/apps/management/sd_protect.py b/core/src/apps/management/sd_protect.py index 267d2b99ff..699ea16d5d 100644 --- a/core/src/apps/management/sd_protect.py +++ b/core/src/apps/management/sd_protect.py @@ -1,60 +1,65 @@ from typing import TYPE_CHECKING -import storage.device -import storage.sd_salt -from trezor import config, wire -from trezor.crypto import random +import storage.device as storage_device +import storage.sd_salt as storage_sd_salt +from trezor import config from trezor.enums import SdProtectOperationType from trezor.messages import Success -from trezor.ui.layouts import confirm_action, show_success +from trezor.ui.layouts import show_success +from trezor.wire import ProcessError -from apps.common.request_pin import ( - error_pin_invalid, - request_pin, - request_pin_and_sd_salt, -) -from apps.common.sdcard import confirm_retry_sd, ensure_sdcard +from apps.common.request_pin import error_pin_invalid, request_pin_and_sd_salt +from apps.common.sdcard import ensure_sdcard if TYPE_CHECKING: from typing import Awaitable from trezor.messages import SdProtect + from trezor.wire import Context def _make_salt() -> tuple[bytes, bytes, bytes]: - salt = random.bytes(storage.sd_salt.SD_SALT_LEN_BYTES) - auth_key = random.bytes(storage.device.SD_SALT_AUTH_KEY_LEN_BYTES) - tag = storage.sd_salt.compute_auth_tag(salt, auth_key) + from trezor.crypto import random + + salt = random.bytes(storage_sd_salt.SD_SALT_LEN_BYTES) + auth_key = random.bytes(storage_device.SD_SALT_AUTH_KEY_LEN_BYTES) + tag = storage_sd_salt.compute_auth_tag(salt, auth_key) return salt, auth_key, tag async def _set_salt( - ctx: wire.Context, salt: bytes, salt_tag: bytes, stage: bool = False + ctx: Context, salt: bytes, salt_tag: bytes, stage: bool = False ) -> None: + from apps.common.sdcard import confirm_retry_sd + while True: await ensure_sdcard(ctx) try: - return storage.sd_salt.set_sd_salt(salt, salt_tag, stage) + return storage_sd_salt.set_sd_salt(salt, salt_tag, stage) except OSError: - await confirm_retry_sd(ctx, exc=wire.ProcessError("SD card I/O error.")) + await confirm_retry_sd(ctx, ProcessError("SD card I/O error.")) -async def sd_protect(ctx: wire.Context, msg: SdProtect) -> Success: - if not storage.device.is_initialized(): - raise wire.NotInitialized("Device is not initialized") +async def sd_protect(ctx: Context, msg: SdProtect) -> Success: + from trezor.wire import NotInitialized + + if not storage_device.is_initialized(): + raise NotInitialized("Device is not initialized") if msg.operation == SdProtectOperationType.ENABLE: - return await sd_protect_enable(ctx, msg) + return await _sd_protect_enable(ctx, msg) elif msg.operation == SdProtectOperationType.DISABLE: - return await sd_protect_disable(ctx, msg) + return await _sd_protect_disable(ctx, msg) elif msg.operation == SdProtectOperationType.REFRESH: - return await sd_protect_refresh(ctx, msg) + return await _sd_protect_refresh(ctx, msg) else: - raise wire.ProcessError("Unknown operation") + raise ProcessError("Unknown operation") -async def sd_protect_enable(ctx: wire.Context, msg: SdProtect) -> Success: - if storage.sd_salt.is_enabled(): - raise wire.ProcessError("SD card protection already enabled") +async def _sd_protect_enable(ctx: Context, msg: SdProtect) -> Success: + from apps.common.request_pin import request_pin + + if storage_sd_salt.is_enabled(): + raise ProcessError("SD card protection already enabled") # Confirm that user wants to proceed with the operation. await require_confirm_sd_protect(ctx, msg) @@ -75,7 +80,7 @@ async def sd_protect_enable(ctx: wire.Context, msg: SdProtect) -> Success: if not config.change_pin(pin, pin, None, salt): # Wrong PIN. Clean up the prepared salt file. try: - storage.sd_salt.remove_sd_salt() + storage_sd_salt.remove_sd_salt() except Exception: # The cleanup is not necessary for the correct functioning of # SD-protection. If it fails for any reason, we suppress the @@ -83,7 +88,7 @@ async def sd_protect_enable(ctx: wire.Context, msg: SdProtect) -> Success: pass await error_pin_invalid(ctx) - storage.device.set_sd_salt_auth_key(salt_auth_key) + storage_device.set_sd_salt_auth_key(salt_auth_key) await show_success( ctx, "success_sd", "You have successfully enabled SD protection." @@ -91,9 +96,9 @@ async def sd_protect_enable(ctx: wire.Context, msg: SdProtect) -> Success: return Success(message="SD card protection enabled") -async def sd_protect_disable(ctx: wire.Context, msg: SdProtect) -> Success: - if not storage.sd_salt.is_enabled(): - raise wire.ProcessError("SD card protection not enabled") +async def _sd_protect_disable(ctx: Context, msg: SdProtect) -> Success: + if not storage_sd_salt.is_enabled(): + raise ProcessError("SD card protection not enabled") # Note that the SD card doesn't need to be present in order to disable SD # protection. The cleanup will not happen in such case, but that does not matter. @@ -108,11 +113,11 @@ async def sd_protect_disable(ctx: wire.Context, msg: SdProtect) -> Success: if not config.change_pin(pin, pin, salt, None): await error_pin_invalid(ctx) - storage.device.set_sd_salt_auth_key(None) + storage_device.set_sd_salt_auth_key(None) try: # Clean up. - storage.sd_salt.remove_sd_salt() + storage_sd_salt.remove_sd_salt() except Exception: # The cleanup is not necessary for the correct functioning of # SD-protection. If it fails for any reason, we suppress the exception, @@ -125,9 +130,9 @@ async def sd_protect_disable(ctx: wire.Context, msg: SdProtect) -> Success: return Success(message="SD card protection disabled") -async def sd_protect_refresh(ctx: wire.Context, msg: SdProtect) -> Success: - if not storage.sd_salt.is_enabled(): - raise wire.ProcessError("SD card protection not enabled") +async def _sd_protect_refresh(ctx: Context, msg: SdProtect) -> Success: + if not storage_sd_salt.is_enabled(): + raise ProcessError("SD card protection not enabled") # Confirm that user wants to proceed with the operation. await require_confirm_sd_protect(ctx, msg) @@ -145,11 +150,11 @@ async def sd_protect_refresh(ctx: wire.Context, msg: SdProtect) -> Success: if not config.change_pin(pin, pin, old_salt, new_salt): await error_pin_invalid(ctx) - storage.device.set_sd_salt_auth_key(new_auth_key) + storage_device.set_sd_salt_auth_key(new_auth_key) try: # Clean up. - storage.sd_salt.commit_sd_salt() + storage_sd_salt.commit_sd_salt() except Exception: # If the cleanup fails, then request_sd_salt() will bring the SD card # into a consistent state. We suppress the exception, because overall @@ -162,7 +167,9 @@ async def sd_protect_refresh(ctx: wire.Context, msg: SdProtect) -> Success: return Success(message="SD card protection refreshed") -def require_confirm_sd_protect(ctx: wire.Context, msg: SdProtect) -> Awaitable[None]: +def require_confirm_sd_protect(ctx: Context, msg: SdProtect) -> Awaitable[None]: + from trezor.ui.layouts import confirm_action + if msg.operation == SdProtectOperationType.ENABLE: text = "Do you really want to secure your device with SD card protection?" elif msg.operation == SdProtectOperationType.DISABLE: @@ -170,6 +177,6 @@ def require_confirm_sd_protect(ctx: wire.Context, msg: SdProtect) -> Awaitable[N elif msg.operation == SdProtectOperationType.REFRESH: text = "Do you really want to replace the current\nSD card secret with a newly generated one?" else: - raise wire.ProcessError("Unknown operation") + raise ProcessError("Unknown operation") return confirm_action(ctx, "set_sd", "SD card protection", description=text) diff --git a/core/src/apps/management/set_u2f_counter.py b/core/src/apps/management/set_u2f_counter.py index 76da2a2a7d..f0c9694622 100644 --- a/core/src/apps/management/set_u2f_counter.py +++ b/core/src/apps/management/set_u2f_counter.py @@ -1,17 +1,18 @@ from typing import TYPE_CHECKING -import storage.device -from trezor import ui, wire -from trezor.enums import ButtonRequestType -from trezor.messages import Success -from trezor.ui.layouts import confirm_action - if TYPE_CHECKING: - from trezor.messages import SetU2FCounter + from trezor.messages import SetU2FCounter, Success + from trezor.wire import Context -async def set_u2f_counter(ctx: wire.Context, msg: SetU2FCounter) -> Success: - if not storage.device.is_initialized(): +async def set_u2f_counter(ctx: Context, msg: SetU2FCounter) -> Success: + import storage.device as storage_device + from trezor import ui, wire + from trezor.enums import ButtonRequestType + from trezor.messages import Success + from trezor.ui.layouts import confirm_action + + if not storage_device.is_initialized(): raise wire.NotInitialized("Device is not initialized") if msg.u2f_counter is None: raise wire.ProcessError("No value provided") @@ -19,13 +20,13 @@ async def set_u2f_counter(ctx: wire.Context, msg: SetU2FCounter) -> Success: await confirm_action( ctx, "set_u2f_counter", - title="Set U2F counter", + "Set U2F counter", description="Do you really want to\nset the U2F counter\nto {}?", description_param=str(msg.u2f_counter), icon=ui.ICON_CONFIG, br_code=ButtonRequestType.ProtectCall, ) - storage.device.set_u2f_counter(msg.u2f_counter) + storage_device.set_u2f_counter(msg.u2f_counter) return Success(message="U2F counter set") diff --git a/core/src/apps/management/wipe_device.py b/core/src/apps/management/wipe_device.py index 9ad4f633fa..deee460b50 100644 --- a/core/src/apps/management/wipe_device.py +++ b/core/src/apps/management/wipe_device.py @@ -1,25 +1,25 @@ from typing import TYPE_CHECKING -import storage -from trezor import ui -from trezor.enums import ButtonRequestType -from trezor.messages import Success -from trezor.ui.layouts import confirm_action - -from .apply_settings import reload_settings_from_storage - if TYPE_CHECKING: - from trezor import wire - from trezor.messages import WipeDevice + from trezor.wire import GenericContext + from trezor.messages import WipeDevice, Success -async def wipe_device(ctx: wire.GenericContext, msg: WipeDevice) -> Success: +async def wipe_device(ctx: GenericContext, msg: WipeDevice) -> Success: + import storage + from trezor import ui + from trezor.enums import ButtonRequestType + from trezor.messages import Success + from trezor.ui.layouts import confirm_action + + from apps.base import reload_settings_from_storage + await confirm_action( ctx, "confirm_wipe", - title="Wipe device", - description="Do you really want to\nwipe the device?\n", - action="All data will be lost.", + "Wipe device", + "All data will be lost.", + "Do you really want to\nwipe the device?\n", reverse=True, verb="Hold to confirm", hold=True,