mirror of
https://github.com/trezor/trezor-firmware.git
synced 2024-11-15 20:19:23 +00:00
core: some mypy in recovery-related modules
This commit is contained in:
parent
50a240dbc0
commit
4030874c7e
@ -13,7 +13,7 @@ from apps.common import HARDENED
|
||||
from apps.common.confirm import confirm, require_confirm
|
||||
|
||||
if False:
|
||||
from typing import Iterable, List
|
||||
from typing import Iterable
|
||||
from trezor import wire
|
||||
|
||||
|
||||
@ -79,8 +79,8 @@ def address_n_to_str(address_n: list) -> str:
|
||||
|
||||
async def show_warning(
|
||||
ctx: wire.Context,
|
||||
content: List[str],
|
||||
subheader: List[str] = [],
|
||||
content: Iterable[str],
|
||||
subheader: Iterable[str] = [],
|
||||
button: str = "Try again",
|
||||
) -> None:
|
||||
text = Text("Warning", ui.ICON_WRONG, ui.RED)
|
||||
@ -97,8 +97,8 @@ async def show_warning(
|
||||
|
||||
async def show_success(
|
||||
ctx: wire.Context,
|
||||
content: List[str] = [],
|
||||
subheader: List[str] = [],
|
||||
content: Iterable[str] = [],
|
||||
subheader: Iterable[str] = [],
|
||||
button: str = "Continue",
|
||||
) -> None:
|
||||
text = Text("Success", ui.ICON_CONFIRM, ui.GREEN)
|
||||
|
@ -20,15 +20,15 @@ if False:
|
||||
from typing import Optional
|
||||
|
||||
|
||||
def set_in_progress(val: bool):
|
||||
def set_in_progress(val: bool) -> None:
|
||||
common._set_bool(_NAMESPACE, _IN_PROGRESS, val)
|
||||
|
||||
|
||||
def is_in_progress():
|
||||
def is_in_progress() -> bool:
|
||||
return common._get_bool(_NAMESPACE, _IN_PROGRESS)
|
||||
|
||||
|
||||
def set_dry_run(val: bool):
|
||||
def set_dry_run(val: bool) -> None:
|
||||
common._set_bool(_NAMESPACE, _DRY_RUN, val)
|
||||
|
||||
|
||||
@ -36,11 +36,11 @@ def is_dry_run() -> bool:
|
||||
return common._get_bool(_NAMESPACE, _DRY_RUN)
|
||||
|
||||
|
||||
def set_word_count(count: int):
|
||||
def set_word_count(count: int) -> None:
|
||||
common._set_uint8(_NAMESPACE, _WORD_COUNT, count)
|
||||
|
||||
|
||||
def get_word_count() -> int:
|
||||
def get_word_count() -> Optional[int]:
|
||||
return common._get_uint8(_NAMESPACE, _WORD_COUNT)
|
||||
|
||||
|
||||
@ -76,7 +76,7 @@ def get_slip39_iteration_exponent() -> Optional[int]:
|
||||
return common._get_uint8(_NAMESPACE, _SLIP39_ITERATION_EXPONENT)
|
||||
|
||||
|
||||
def end_progress():
|
||||
def end_progress() -> None:
|
||||
common._delete(_NAMESPACE, _IN_PROGRESS)
|
||||
common._delete(_NAMESPACE, _DRY_RUN)
|
||||
common._delete(_NAMESPACE, _WORD_COUNT)
|
||||
|
@ -64,7 +64,7 @@ def require_confirm_change_pin(ctx, msg):
|
||||
return require_confirm(ctx, text)
|
||||
|
||||
|
||||
async def request_pin_confirm(ctx, *args, **kwargs):
|
||||
async def request_pin_confirm(ctx: wire.Context, *args, **kwargs) -> str:
|
||||
while True:
|
||||
pin1 = await request_pin_ack(ctx, "Enter new PIN", *args, **kwargs)
|
||||
pin2 = await request_pin_ack(ctx, "Re-enter new PIN", *args, **kwargs)
|
||||
@ -73,7 +73,7 @@ async def request_pin_confirm(ctx, *args, **kwargs):
|
||||
await pin_mismatch()
|
||||
|
||||
|
||||
async def request_pin_ack(ctx, *args, **kwargs):
|
||||
async def request_pin_ack(ctx: wire.Context, *args, **kwargs) -> str:
|
||||
try:
|
||||
await ctx.call(ButtonRequest(code=ButtonRequestType.Other), ButtonAck)
|
||||
return await ctx.wait(request_pin(*args, **kwargs))
|
||||
|
@ -48,19 +48,21 @@ async def recovery_device(ctx: wire.Context, msg: RecoveryDevice) -> Success:
|
||||
newpin = await request_pin_confirm(ctx, allow_cancel=False)
|
||||
config.change_pin(pin_to_int(""), pin_to_int(newpin))
|
||||
|
||||
storage.device.set_u2f_counter(msg.u2f_counter)
|
||||
if msg.u2f_counter:
|
||||
storage.device.set_u2f_counter(msg.u2f_counter)
|
||||
storage.device.load_settings(
|
||||
label=msg.label, use_passphrase=msg.passphrase_protection
|
||||
)
|
||||
storage.recovery.set_in_progress(True)
|
||||
storage.recovery.set_dry_run(msg.dry_run)
|
||||
if msg.dry_run:
|
||||
storage.recovery.set_dry_run(msg.dry_run)
|
||||
|
||||
result = await recovery_process(ctx)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def _check_state(msg: RecoveryDevice):
|
||||
def _check_state(msg: RecoveryDevice) -> None:
|
||||
if not msg.dry_run and storage.is_initialized():
|
||||
raise wire.UnexpectedMessage("Already initialized")
|
||||
if msg.dry_run and not storage.is_initialized():
|
||||
|
@ -93,10 +93,13 @@ async def _finish_recovery(
|
||||
secret, mnemonic_type, needs_backup=False, no_backup=False
|
||||
)
|
||||
if mnemonic_type == mnemonic.TYPE_SLIP39:
|
||||
storage.device.set_slip39_identifier(storage.recovery.get_slip39_identifier())
|
||||
storage.device.set_slip39_iteration_exponent(
|
||||
storage.recovery.get_slip39_iteration_exponent()
|
||||
)
|
||||
identifier = storage.recovery.get_slip39_identifier()
|
||||
exponent = storage.recovery.get_slip39_iteration_exponent()
|
||||
if identifier is None or exponent is None:
|
||||
# Identifier and exponent need to be stored in storage at this point
|
||||
raise RuntimeError
|
||||
storage.device.set_slip39_identifier(identifier)
|
||||
storage.device.set_slip39_iteration_exponent(exponent)
|
||||
storage.recovery.end_progress()
|
||||
|
||||
await show_success(ctx, ("You have successfully", "recovered your wallet."))
|
||||
@ -117,7 +120,9 @@ async def _request_and_store_word_count(ctx: wire.Context, dry_run: bool) -> int
|
||||
return word_count
|
||||
|
||||
|
||||
async def _request_secret(ctx: wire.Context, word_count: int, mnemonic_type: int):
|
||||
async def _request_secret(
|
||||
ctx: wire.Context, word_count: int, mnemonic_type: int
|
||||
) -> bytes:
|
||||
await _request_share_first_screen(ctx, word_count, mnemonic_type)
|
||||
|
||||
secret = None
|
||||
@ -148,7 +153,7 @@ async def _request_secret(ctx: wire.Context, word_count: int, mnemonic_type: int
|
||||
|
||||
async def _request_share_first_screen(
|
||||
ctx: wire.Context, word_count: int, mnemonic_type: int
|
||||
):
|
||||
) -> None:
|
||||
if mnemonic_type == mnemonic.TYPE_BIP39:
|
||||
content = layout.RecoveryHomescreen(
|
||||
"Enter recovery seed", "(%d words)" % word_count
|
||||
@ -167,9 +172,12 @@ async def _request_share_first_screen(
|
||||
raise RuntimeError
|
||||
|
||||
|
||||
async def _request_share_next_screen(ctx: wire.Context, mnemonic_type: int):
|
||||
async def _request_share_next_screen(ctx: wire.Context, mnemonic_type: int) -> None:
|
||||
if mnemonic_type == mnemonic.TYPE_SLIP39:
|
||||
remaining = storage.recovery.get_remaining()
|
||||
if not remaining:
|
||||
# 'remaining' should be stored at this point
|
||||
raise RuntimeError
|
||||
if remaining == 1:
|
||||
text = "1 more share"
|
||||
else:
|
||||
|
@ -150,21 +150,21 @@ async def show_keyboard_info(ctx: wire.Context) -> None:
|
||||
await ctx.wait(info)
|
||||
|
||||
|
||||
async def show_invalid_mnemonic(ctx, mnemonic_type: int):
|
||||
async def show_invalid_mnemonic(ctx: wire.Context, mnemonic_type: int) -> None:
|
||||
if mnemonic_type == mnemonic.TYPE_SLIP39:
|
||||
await show_warning(ctx, ("You have entered", "an invalid recovery", "share."))
|
||||
else:
|
||||
await show_warning(ctx, ("You have entered", "an invalid recovery", "seed."))
|
||||
|
||||
|
||||
async def show_share_already_added(ctx):
|
||||
return await show_warning(
|
||||
async def show_share_already_added(ctx: wire.Context) -> None:
|
||||
await show_warning(
|
||||
ctx, ("Share already entered,", "please enter", "a different share.")
|
||||
)
|
||||
|
||||
|
||||
async def show_identifier_mismatch(ctx):
|
||||
return await show_warning(
|
||||
async def show_identifier_mismatch(ctx: wire.Context) -> None:
|
||||
await show_warning(
|
||||
ctx, ("You have entered", "a share from another", "Shamir Backup.")
|
||||
)
|
||||
|
||||
@ -176,7 +176,7 @@ class RecoveryHomescreen(ui.Control):
|
||||
self.dry_run = storage.recovery.is_dry_run()
|
||||
self.repaint = True
|
||||
|
||||
def on_render(self):
|
||||
def on_render(self) -> None:
|
||||
if not self.repaint:
|
||||
return
|
||||
|
||||
|
@ -11,7 +11,7 @@ class RecoveryAborted(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def process_share(words: str, mnemonic_type: int):
|
||||
def process_share(words: str, mnemonic_type: int) -> Optional[bytes]:
|
||||
if mnemonic_type == mnemonic.TYPE_BIP39:
|
||||
return _process_bip39(words)
|
||||
else:
|
||||
|
@ -126,7 +126,7 @@ def run() -> None:
|
||||
# rationale: We use untyped lists here, because that is what the C API supports.
|
||||
|
||||
|
||||
def clear():
|
||||
def clear() -> None:
|
||||
"""Clear all queue state. Any scheduled or paused tasks will be forgotten."""
|
||||
_ = [0, 0, 0]
|
||||
while _queue:
|
||||
|
Loading…
Reference in New Issue
Block a user