parent
8f4bbb8825
commit
d2597d54c1
@ -1,47 +0,0 @@
|
||||
from trezor.crypto import bip39
|
||||
|
||||
from apps.common import mnemonic, storage
|
||||
|
||||
|
||||
def get_type() -> int:
|
||||
return mnemonic.TYPE_BIP39
|
||||
|
||||
|
||||
def process_single(mnemonic: str) -> bytes:
|
||||
"""
|
||||
Receives single mnemonic and processes it. Returns what is then stored in storage or
|
||||
None if more shares are needed.
|
||||
"""
|
||||
return mnemonic.encode()
|
||||
|
||||
|
||||
def process_all(mnemonics: list) -> bytes:
|
||||
"""
|
||||
Receives all mnemonics (just one in case of BIP-39) and processes it into a master
|
||||
secret which is usually then stored in storage.
|
||||
"""
|
||||
return mnemonics[0].encode()
|
||||
|
||||
|
||||
def store(secret: bytes, needs_backup: bool, no_backup: bool) -> None:
|
||||
storage.device.store_mnemonic_secret(
|
||||
secret, mnemonic.TYPE_BIP39, needs_backup, no_backup
|
||||
)
|
||||
|
||||
|
||||
def get_seed(secret: bytes, passphrase: str, progress_bar: bool = True) -> bytes:
|
||||
if progress_bar:
|
||||
mnemonic._start_progress()
|
||||
seed = bip39.seed(secret.decode(), passphrase, mnemonic._render_progress)
|
||||
mnemonic._stop_progress()
|
||||
else:
|
||||
seed = bip39.seed(secret.decode(), passphrase)
|
||||
return seed
|
||||
|
||||
|
||||
def get_mnemonic_threshold(mnemonic: str) -> int:
|
||||
return 1
|
||||
|
||||
|
||||
def check(secret: bytes) -> bool:
|
||||
return bip39.check(secret)
|
@ -1,103 +0,0 @@
|
||||
from trezor.crypto import slip39
|
||||
|
||||
from apps.common import mnemonic, storage
|
||||
|
||||
if False:
|
||||
from typing import Optional
|
||||
|
||||
|
||||
def generate_from_secret(master_secret: bytes, count: int, threshold: int) -> list:
|
||||
"""
|
||||
Generates new Shamir backup for 'master_secret'. Multiple groups are not yet supported.
|
||||
"""
|
||||
return slip39.generate_single_group_mnemonics_from_data(
|
||||
master_secret, storage.slip39.get_identifier(), threshold, count
|
||||
)
|
||||
|
||||
|
||||
def get_type() -> int:
|
||||
return mnemonic.TYPE_SLIP39
|
||||
|
||||
|
||||
def process_single(mnemonic: str) -> Optional[bytes]:
|
||||
"""
|
||||
Receives single mnemonic and processes it. Returns what is then stored in storage or
|
||||
None if more shares are needed.
|
||||
"""
|
||||
identifier, iteration_exponent, _, _, _, index, threshold, value = slip39.decode_mnemonic(
|
||||
mnemonic
|
||||
) # TODO: use better data structure for this
|
||||
if threshold == 1:
|
||||
raise ValueError("Threshold equal to 1 is not allowed.")
|
||||
|
||||
# if recovery is not in progress already, start it and wait for more mnemonics
|
||||
if not storage.slip39.is_in_progress():
|
||||
storage.slip39.set_in_progress(True)
|
||||
storage.slip39.set_iteration_exponent(iteration_exponent)
|
||||
storage.slip39.set_identifier(identifier)
|
||||
storage.slip39.set_threshold(threshold)
|
||||
storage.slip39.set_remaining(threshold - 1)
|
||||
storage.slip39.set_words_count(len(mnemonic.split()))
|
||||
storage.slip39_mnemonics.set(index, mnemonic)
|
||||
return None # we need more shares
|
||||
|
||||
# check identifier and member index of this share against stored values
|
||||
if identifier != storage.slip39.get_identifier():
|
||||
# TODO: improve UX (tell user)
|
||||
raise ValueError("Share identifiers do not match")
|
||||
if storage.slip39_mnemonics.get(index):
|
||||
# TODO: improve UX (tell user)
|
||||
raise ValueError("This mnemonic was already entered")
|
||||
|
||||
# append to storage
|
||||
remaining = storage.slip39.get_remaining() - 1
|
||||
storage.slip39.set_remaining(remaining)
|
||||
storage.slip39.set(index, mnemonic)
|
||||
if remaining != 0:
|
||||
return None # we need more shares
|
||||
|
||||
# combine shares and return the master secret
|
||||
mnemonics = storage.slip39_mnemonics.fetch()
|
||||
if len(mnemonics) != threshold:
|
||||
raise ValueError("Some mnemonics are still missing.")
|
||||
_, _, secret = slip39.combine_mnemonics(mnemonics)
|
||||
return secret
|
||||
|
||||
|
||||
def process_all(mnemonics: list) -> bytes:
|
||||
"""
|
||||
Receives all mnemonics and processes it into pre-master secret which is usually then
|
||||
stored in the storage.
|
||||
"""
|
||||
identifier, iteration_exponent, secret = slip39.combine_mnemonics(mnemonics)
|
||||
storage.slip39.set_iteration_exponent(iteration_exponent)
|
||||
storage.slip39.set_identifier(identifier)
|
||||
return secret
|
||||
|
||||
|
||||
def store(secret: bytes, needs_backup: bool, no_backup: bool) -> None:
|
||||
storage.device.store_mnemonic_secret(
|
||||
secret, mnemonic.TYPE_SLIP39, needs_backup, no_backup
|
||||
)
|
||||
storage.slip39.delete_progress()
|
||||
|
||||
|
||||
def get_seed(
|
||||
encrypted_master_secret: bytes, passphrase: str, progress_bar: bool = True
|
||||
) -> bytes:
|
||||
if progress_bar:
|
||||
mnemonic._start_progress()
|
||||
identifier = storage.slip39.get_identifier()
|
||||
iteration_exponent = storage.slip39.get_iteration_exponent()
|
||||
|
||||
master_secret = slip39.decrypt(
|
||||
identifier, iteration_exponent, encrypted_master_secret, passphrase
|
||||
)
|
||||
if progress_bar:
|
||||
mnemonic._stop_progress()
|
||||
return master_secret
|
||||
|
||||
|
||||
def get_mnemonic_threshold(mnemonic: str) -> int:
|
||||
_, _, _, _, _, _, threshold, _ = slip39.decode_mnemonic(mnemonic)
|
||||
return threshold
|
@ -0,0 +1,87 @@
|
||||
from micropython import const
|
||||
|
||||
from apps.common.storage import common, recovery_shares
|
||||
|
||||
# Namespace:
|
||||
_NAMESPACE = common._APP_RECOVERY
|
||||
|
||||
# fmt: off
|
||||
# Keys:
|
||||
_IN_PROGRESS = const(0x00) # bool
|
||||
_DRY_RUN = const(0x01) # bool
|
||||
_WORD_COUNT = const(0x02) # int
|
||||
_REMAINING = const(0x05) # int
|
||||
_SLIP39_IDENTIFIER = const(0x03) # bytes
|
||||
_SLIP39_THRESHOLD = const(0x04) # int
|
||||
_SLIP39_ITERATION_EXPONENT = const(0x06) # int
|
||||
# fmt: on
|
||||
|
||||
if False:
|
||||
from typing import Optional
|
||||
|
||||
|
||||
def set_in_progress(val: bool):
|
||||
common._set_bool(_NAMESPACE, _IN_PROGRESS, val)
|
||||
|
||||
|
||||
def is_in_progress():
|
||||
return common._get_bool(_NAMESPACE, _IN_PROGRESS)
|
||||
|
||||
|
||||
def set_dry_run(val: bool):
|
||||
common._set_bool(_NAMESPACE, _DRY_RUN, val)
|
||||
|
||||
|
||||
def is_dry_run() -> bool:
|
||||
return common._get_bool(_NAMESPACE, _DRY_RUN)
|
||||
|
||||
|
||||
def set_word_count(count: int):
|
||||
common._set_uint8(_NAMESPACE, _WORD_COUNT, count)
|
||||
|
||||
|
||||
def get_word_count() -> int:
|
||||
return common._get_uint8(_NAMESPACE, _WORD_COUNT)
|
||||
|
||||
|
||||
def set_slip39_identifier(identifier: int) -> None:
|
||||
common._set_uint16(_NAMESPACE, _SLIP39_IDENTIFIER, identifier)
|
||||
|
||||
|
||||
def get_slip39_identifier() -> Optional[int]:
|
||||
return common._get_uint16(_NAMESPACE, _SLIP39_IDENTIFIER)
|
||||
|
||||
|
||||
def set_slip39_threshold(threshold: int) -> None:
|
||||
common._set_uint8(_NAMESPACE, _SLIP39_THRESHOLD, threshold)
|
||||
|
||||
|
||||
def get_slip39_threshold() -> Optional[int]:
|
||||
return common._get_uint8(_NAMESPACE, _SLIP39_THRESHOLD)
|
||||
|
||||
|
||||
def set_remaining(remaining: int) -> None:
|
||||
common._set_uint8(_NAMESPACE, _REMAINING, remaining)
|
||||
|
||||
|
||||
def get_remaining() -> Optional[int]:
|
||||
return common._get_uint8(_NAMESPACE, _REMAINING)
|
||||
|
||||
|
||||
def set_slip39_iteration_exponent(exponent: int) -> None:
|
||||
common._set_uint8(_NAMESPACE, _SLIP39_ITERATION_EXPONENT, exponent)
|
||||
|
||||
|
||||
def get_slip39_iteration_exponent() -> Optional[int]:
|
||||
return common._get_uint8(_NAMESPACE, _SLIP39_ITERATION_EXPONENT)
|
||||
|
||||
|
||||
def end_progress():
|
||||
common._delete(_NAMESPACE, _IN_PROGRESS)
|
||||
common._delete(_NAMESPACE, _DRY_RUN)
|
||||
common._delete(_NAMESPACE, _WORD_COUNT)
|
||||
common._delete(_NAMESPACE, _SLIP39_IDENTIFIER)
|
||||
common._delete(_NAMESPACE, _SLIP39_THRESHOLD)
|
||||
common._delete(_NAMESPACE, _REMAINING)
|
||||
common._delete(_NAMESPACE, _SLIP39_ITERATION_EXPONENT)
|
||||
recovery_shares.delete()
|
@ -1,75 +0,0 @@
|
||||
from micropython import const
|
||||
|
||||
from apps.common.storage import common, slip39_mnemonics
|
||||
|
||||
if False:
|
||||
from typing import Optional
|
||||
|
||||
# Namespace:
|
||||
_NAMESPACE = common._APP_SLIP39
|
||||
|
||||
# fmt: off
|
||||
# Keys:
|
||||
_SLIP39_IN_PROGRESS = const(0x00) # bool
|
||||
_SLIP39_IDENTIFIER = const(0x01) # bytes
|
||||
_SLIP39_THRESHOLD = const(0x02) # int
|
||||
_SLIP39_REMAINING = const(0x03) # int
|
||||
_SLIP39_WORDS_COUNT = const(0x04) # int
|
||||
_SLIP39_ITERATION_EXPONENT = const(0x05) # int
|
||||
# fmt: on
|
||||
|
||||
|
||||
def set_in_progress(val: bool) -> None:
|
||||
common._set_bool(_NAMESPACE, _SLIP39_IN_PROGRESS, val)
|
||||
|
||||
|
||||
def is_in_progress() -> bool:
|
||||
return common._get_bool(_NAMESPACE, _SLIP39_IN_PROGRESS)
|
||||
|
||||
|
||||
def set_identifier(identifier: int) -> None:
|
||||
common._set_uint16(_NAMESPACE, _SLIP39_IDENTIFIER, identifier)
|
||||
|
||||
|
||||
def get_identifier() -> Optional[int]:
|
||||
return common._get_uint16(_NAMESPACE, _SLIP39_IDENTIFIER)
|
||||
|
||||
|
||||
def set_threshold(threshold: int) -> None:
|
||||
common._set_uint8(_NAMESPACE, _SLIP39_THRESHOLD, threshold)
|
||||
|
||||
|
||||
def get_threshold() -> Optional[int]:
|
||||
return common._get_uint8(_NAMESPACE, _SLIP39_THRESHOLD)
|
||||
|
||||
|
||||
def set_remaining(remaining: int) -> None:
|
||||
common._set_uint8(_NAMESPACE, _SLIP39_REMAINING, remaining)
|
||||
|
||||
|
||||
def get_remaining() -> Optional[int]:
|
||||
return common._get_uint8(_NAMESPACE, _SLIP39_REMAINING)
|
||||
|
||||
|
||||
def set_words_count(count: int) -> None:
|
||||
common._set_uint8(_NAMESPACE, _SLIP39_WORDS_COUNT, count)
|
||||
|
||||
|
||||
def get_words_count() -> Optional[int]:
|
||||
return common._get_uint8(_NAMESPACE, _SLIP39_WORDS_COUNT)
|
||||
|
||||
|
||||
def set_iteration_exponent(exponent: int) -> None:
|
||||
common._set_uint8(_NAMESPACE, _SLIP39_ITERATION_EXPONENT, exponent)
|
||||
|
||||
|
||||
def get_iteration_exponent() -> Optional[int]:
|
||||
return common._get_uint8(_NAMESPACE, _SLIP39_ITERATION_EXPONENT)
|
||||
|
||||
|
||||
def delete_progress() -> None:
|
||||
common._delete(_NAMESPACE, _SLIP39_IN_PROGRESS)
|
||||
common._delete(_NAMESPACE, _SLIP39_REMAINING)
|
||||
common._delete(_NAMESPACE, _SLIP39_THRESHOLD)
|
||||
common._delete(_NAMESPACE, _SLIP39_WORDS_COUNT)
|
||||
slip39_mnemonics.delete()
|
@ -1,191 +0,0 @@
|
||||
from trezor import config, ui, wire
|
||||
from trezor.crypto import slip39
|
||||
from trezor.messages import ButtonRequestType
|
||||
from trezor.messages.ButtonAck import ButtonAck
|
||||
from trezor.messages.ButtonRequest import ButtonRequest
|
||||
from trezor.messages.Success import Success
|
||||
from trezor.pin import pin_to_int
|
||||
from trezor.ui.info import InfoConfirm
|
||||
from trezor.ui.mnemonic_bip39 import Bip39Keyboard
|
||||
from trezor.ui.mnemonic_slip39 import Slip39Keyboard
|
||||
from trezor.ui.text import Text
|
||||
from trezor.ui.word_select import WordSelector
|
||||
|
||||
from apps.common import mnemonic, storage
|
||||
from apps.common.confirm import require_confirm
|
||||
from apps.homescreen.homescreen import display_homescreen
|
||||
from apps.management.change_pin import request_pin_ack, request_pin_confirm
|
||||
|
||||
if __debug__:
|
||||
from apps.debug import confirm_signal, input_signal
|
||||
|
||||
if False:
|
||||
from trezor.messages.RecoveryDevice import RecoveryDevice
|
||||
|
||||
|
||||
async def recovery_device(ctx: wire.Context, msg: RecoveryDevice) -> Success:
|
||||
"""
|
||||
Recover BIP39/SLIP39 seed into empty device.
|
||||
|
||||
1. Ask for the number of words in recovered seed.
|
||||
2. Let user type in the mnemonic words one by one.
|
||||
3. Optionally check the seed validity.
|
||||
4. Optionally ask for the PIN, with confirmation.
|
||||
5. Save into storage.
|
||||
"""
|
||||
if not msg.dry_run and storage.is_initialized():
|
||||
raise wire.UnexpectedMessage("Already initialized")
|
||||
|
||||
if not storage.slip39.is_in_progress():
|
||||
if not msg.dry_run:
|
||||
title = "Wallet recovery"
|
||||
text = Text(title, ui.ICON_RECOVERY)
|
||||
text.normal("Do you really want to", "recover the wallet?", "")
|
||||
else:
|
||||
title = "Simulated recovery"
|
||||
text = Text(title, ui.ICON_RECOVERY)
|
||||
text.normal("Do you really want to", "check the recovery", "seed?")
|
||||
await require_confirm(ctx, text, code=ButtonRequestType.ProtectCall)
|
||||
|
||||
if msg.dry_run:
|
||||
if config.has_pin():
|
||||
curpin = await request_pin_ack(ctx, "Enter PIN", config.get_pin_rem())
|
||||
else:
|
||||
curpin = ""
|
||||
if not config.check_pin(pin_to_int(curpin)):
|
||||
raise wire.PinInvalid("PIN invalid")
|
||||
|
||||
# ask for the number of words
|
||||
wordcount = await request_wordcount(ctx, title)
|
||||
mnemonic_module = mnemonic.module_from_words_count(wordcount)
|
||||
else:
|
||||
wordcount = storage.slip39.get_words_count()
|
||||
mnemonic_module = mnemonic.slip39
|
||||
|
||||
mnemonic_threshold = None
|
||||
mnemonics = []
|
||||
|
||||
secret = None
|
||||
while secret is None:
|
||||
# ask for mnemonic words one by one
|
||||
words = await request_mnemonic(
|
||||
ctx, wordcount, mnemonic_module == mnemonic.slip39
|
||||
)
|
||||
if mnemonic_threshold is None:
|
||||
try:
|
||||
mnemonic_threshold = mnemonic_module.get_mnemonic_threshold(words)
|
||||
except slip39.MnemonicError:
|
||||
raise wire.ProcessError("Mnemonic is not valid")
|
||||
mnemonics.append(words)
|
||||
remaining = mnemonic_threshold - len(mnemonics)
|
||||
if remaining == 0:
|
||||
try:
|
||||
secret = mnemonic_module.process_all(mnemonics)
|
||||
except slip39.MnemonicError:
|
||||
raise wire.ProcessError("Mnemonic is not valid")
|
||||
# show a number of remaining mnemonics for SLIP39
|
||||
if secret is None and mnemonic_module == mnemonic.slip39:
|
||||
await show_remaining_slip39_mnemonics(ctx, title, remaining)
|
||||
|
||||
# check mnemonic validity
|
||||
# it is checked automatically in SLIP-39
|
||||
if mnemonic_module == mnemonic.bip39 and (msg.enforce_wordlist or msg.dry_run):
|
||||
if not mnemonic_module.check(secret):
|
||||
raise wire.ProcessError("Mnemonic is not valid")
|
||||
|
||||
# ask for pin repeatedly
|
||||
if msg.pin_protection:
|
||||
newpin = await request_pin_confirm(ctx, allow_cancel=False)
|
||||
else:
|
||||
newpin = ""
|
||||
|
||||
# dry run
|
||||
if msg.dry_run:
|
||||
return mnemonic.dry_run(secret)
|
||||
|
||||
# save into storage
|
||||
if msg.pin_protection:
|
||||
config.change_pin(pin_to_int(""), pin_to_int(newpin))
|
||||
storage.device.set_u2f_counter(msg.u2f_counter)
|
||||
storage.device.load_settings(
|
||||
label=msg.label, use_passphrase=msg.passphrase_protection
|
||||
)
|
||||
mnemonic_module.store(secret=secret, needs_backup=False, no_backup=False)
|
||||
|
||||
await show_success(ctx)
|
||||
display_homescreen()
|
||||
|
||||
return Success(message="Device recovered")
|
||||
|
||||
|
||||
async def request_wordcount(ctx: wire.Context, title: str) -> int:
|
||||
await ctx.call(ButtonRequest(code=ButtonRequestType.MnemonicWordCount), ButtonAck)
|
||||
|
||||
text = Text(title, ui.ICON_RECOVERY)
|
||||
text.normal("Number of words?")
|
||||
|
||||
if __debug__:
|
||||
count = await ctx.wait(WordSelector(text), input_signal)
|
||||
count = int(count) # if input_signal was triggered, count is a string
|
||||
else:
|
||||
count = await ctx.wait(WordSelector(text))
|
||||
|
||||
return count
|
||||
|
||||
|
||||
async def request_mnemonic(ctx: wire.Context, count: int, slip39: bool) -> str:
|
||||
await ctx.call(ButtonRequest(code=ButtonRequestType.MnemonicInput), ButtonAck)
|
||||
|
||||
words = []
|
||||
for i in range(count):
|
||||
if slip39:
|
||||
keyboard = Slip39Keyboard("Type word %s of %s:" % (i + 1, count))
|
||||
else:
|
||||
keyboard = Bip39Keyboard("Type word %s of %s:" % (i + 1, count))
|
||||
if __debug__:
|
||||
word = await ctx.wait(keyboard, input_signal)
|
||||
else:
|
||||
word = await ctx.wait(keyboard)
|
||||
words.append(word)
|
||||
|
||||
return " ".join(words)
|
||||
|
||||
|
||||
async def show_keyboard_info(ctx: wire.Context) -> None:
|
||||
await ctx.call(ButtonRequest(code=ButtonRequestType.Other), ButtonAck)
|
||||
|
||||
info = InfoConfirm(
|
||||
"Did you know? "
|
||||
"You can type the letters "
|
||||
"one by one or use it like "
|
||||
"a T9 keyboard.",
|
||||
"Great!",
|
||||
)
|
||||
if __debug__:
|
||||
await ctx.wait(info, confirm_signal)
|
||||
else:
|
||||
await ctx.wait(info)
|
||||
|
||||
|
||||
async def show_success(ctx):
|
||||
text = Text("Recovery success", ui.ICON_RECOVERY)
|
||||
text.normal("You have successfully")
|
||||
text.normal("recovered your wallet.")
|
||||
await require_confirm(
|
||||
ctx, text, ButtonRequestType.ProtectCall, cancel=None, confirm="Continue"
|
||||
)
|
||||
|
||||
|
||||
async def show_remaining_slip39_mnemonics(
|
||||
ctx: wire.Context, title: str, remaining: int
|
||||
) -> None:
|
||||
text = Text(title, ui.ICON_RECOVERY)
|
||||
text.bold("Good job!")
|
||||
text.normal("Enter %s more recovery " % remaining)
|
||||
if remaining > 1:
|
||||
text.normal("shares.")
|
||||
else:
|
||||
text.normal("share.")
|
||||
await require_confirm(
|
||||
ctx, text, ButtonRequestType.ProtectCall, cancel=None, confirm="Continue"
|
||||
)
|
@ -0,0 +1,77 @@
|
||||
from trezor import config, ui, wire
|
||||
from trezor.messages import ButtonRequestType
|
||||
from trezor.messages.Success import Success
|
||||
from trezor.pin import pin_to_int
|
||||
from trezor.ui.text import Text
|
||||
|
||||
from apps.common import storage
|
||||
from apps.common.confirm import require_confirm
|
||||
from apps.management.change_pin import request_pin_ack, request_pin_confirm
|
||||
from apps.management.recovery_device.homescreen import recovery_process
|
||||
|
||||
if False:
|
||||
from trezor.messages.RecoveryDevice import RecoveryDevice
|
||||
|
||||
|
||||
async def recovery_device(ctx: wire.Context, msg: RecoveryDevice) -> Success:
|
||||
"""
|
||||
Recover BIP39/SLIP39 seed into empty device.
|
||||
Recovery is also possible with replugged Trezor. We call this process Persistance.
|
||||
User starts the process here using the RecoveryDevice msg and then they can unplug
|
||||
the device anytime and continue without a computer.
|
||||
"""
|
||||
_check_state(msg)
|
||||
|
||||
if not msg.dry_run:
|
||||
title = "Wallet recovery"
|
||||
text = Text(title, ui.ICON_RECOVERY)
|
||||
text.normal("Do you really want to", "recover the wallet?", "")
|
||||
else:
|
||||
title = "Seed check"
|
||||
text = Text(title, ui.ICON_RECOVERY)
|
||||
text.normal("Do you really want to", "check the recovery", "seed?")
|
||||
await require_confirm(ctx, text, code=ButtonRequestType.ProtectCall)
|
||||
|
||||
# for dry run pin needs to entered
|
||||
if msg.dry_run:
|
||||
if config.has_pin():
|
||||
curpin = await request_pin_ack(ctx, "Enter PIN", config.get_pin_rem())
|
||||
else:
|
||||
curpin = ""
|
||||
if not config.check_pin(pin_to_int(curpin)):
|
||||
raise wire.PinInvalid("PIN invalid")
|
||||
|
||||
# set up pin if requested
|
||||
if msg.pin_protection:
|
||||
if msg.dry_run:
|
||||
raise wire.ProcessError("Can't setup PIN during dry_run recovery.")
|
||||
newpin = await request_pin_confirm(ctx, allow_cancel=False)
|
||||
config.change_pin(pin_to_int(""), pin_to_int(newpin))
|
||||
|
||||
storage.device.set_u2f_counter(msg.u2f_counter)
|
||||
storage.device.load_settings(
|
||||
label=msg.label, use_passphrase=msg.passphrase_protection
|
||||
)
|
||||
storage.recovery.set_in_progress(True)
|
||||
storage.recovery.set_dry_run(msg.dry_run)
|
||||
|
||||
result = await recovery_process(ctx)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def _check_state(msg: RecoveryDevice):
|
||||
if not msg.dry_run and storage.is_initialized():
|
||||
raise wire.UnexpectedMessage("Already initialized")
|
||||
if msg.dry_run and not storage.is_initialized():
|
||||
raise wire.UnexpectedMessage("Device is not initialized")
|
||||
|
||||
if storage.recovery.is_in_progress():
|
||||
raise RuntimeError(
|
||||
"Function recovery_device should not be invoked when recovery is already in progress"
|
||||
)
|
||||
|
||||
if msg.enforce_wordlist is False:
|
||||
raise wire.ProcessError(
|
||||
"Value enforce_wordlist must be True, Trezor Core enforces words automatically."
|
||||
)
|
@ -0,0 +1,179 @@
|
||||
from trezor import loop, utils, wire
|
||||
from trezor.crypto.hashlib import sha256
|
||||
from trezor.errors import IdentifierMismatchError, MnemonicError, ShareAlreadyAddedError
|
||||
from trezor.messages.Success import Success
|
||||
|
||||
from . import recover
|
||||
|
||||
from apps.common import mnemonic, storage
|
||||
from apps.common.layout import show_success
|
||||
from apps.management.recovery_device import layout
|
||||
|
||||
|
||||
async def recovery_homescreen() -> None:
|
||||
# recovery process does not communicate on the wire
|
||||
ctx = wire.DummyContext()
|
||||
try:
|
||||
await recovery_process(ctx)
|
||||
except Exception:
|
||||
# clear the loop state, so loop.run will exit and the device is soft-rebooted
|
||||
loop.clear()
|
||||
raise
|
||||
|
||||
|
||||
async def recovery_process(ctx: wire.Context) -> Success:
|
||||
try:
|
||||
result = await _continue_recovery_process(ctx)
|
||||
except recover.RecoveryAborted:
|
||||
dry_run = storage.recovery.is_dry_run()
|
||||
if dry_run:
|
||||
storage.recovery.end_progress()
|
||||
else:
|
||||
storage.wipe()
|
||||
raise wire.ActionCancelled("Cancelled")
|
||||
return result
|
||||
|
||||
|
||||
async def _continue_recovery_process(ctx: wire.Context) -> Success:
|
||||
# gather the current recovery state from storage
|
||||
in_progress = storage.recovery.is_in_progress()
|
||||
word_count = storage.recovery.get_word_count()
|
||||
dry_run = storage.recovery.is_dry_run()
|
||||
|
||||
if not in_progress: # invalid and inconsistent state
|
||||
raise RuntimeError
|
||||
if not word_count: # the first run, prompt word count from the user
|
||||
word_count = await _request_and_store_word_count(ctx, dry_run)
|
||||
|
||||
mnemonic_type = mnemonic.type_from_word_count(word_count)
|
||||
|
||||
secret = await _request_secret(ctx, word_count, mnemonic_type)
|
||||
|
||||
if dry_run:
|
||||
result = await _finish_recovery_dry_run(ctx, secret, mnemonic_type)
|
||||
else:
|
||||
result = await _finish_recovery(ctx, secret, mnemonic_type)
|
||||
return result
|
||||
|
||||
|
||||
async def _finish_recovery_dry_run(
|
||||
ctx: wire.Context, secret: bytes, mnemonic_type: int
|
||||
) -> Success:
|
||||
digest_input = sha256(secret).digest()
|
||||
stored, _ = mnemonic.get()
|
||||
digest_stored = sha256(stored).digest()
|
||||
result = utils.consteq(digest_stored, digest_input)
|
||||
|
||||
# Check that the identifier and iteration exponent match as well
|
||||
if mnemonic_type == mnemonic.TYPE_SLIP39:
|
||||
result &= (
|
||||
storage.device.get_slip39_identifier()
|
||||
== storage.recovery.get_slip39_identifier()
|
||||
)
|
||||
result &= (
|
||||
storage.device.get_slip39_iteration_exponent()
|
||||
== storage.recovery.get_slip39_iteration_exponent()
|
||||
)
|
||||
|
||||
await layout.show_dry_run_result(ctx, result)
|
||||
|
||||
storage.recovery.end_progress()
|
||||
|
||||
if result:
|
||||
return Success("The seed is valid and matches the one in the device")
|
||||
else:
|
||||
raise wire.ProcessError("The seed does not match the one in the device")
|
||||
|
||||
|
||||
async def _finish_recovery(
|
||||
ctx: wire.Context, secret: bytes, mnemonic_type: int
|
||||
) -> Success:
|
||||
storage.device.store_mnemonic_secret(
|
||||
secret, mnemonic_type, needs_backup=False, no_backup=False
|
||||
)
|
||||
if mnemonic_type == mnemonic.TYPE_SLIP39:
|
||||
storage.device.set_slip39_identifier(storage.recovery.get_slip39_identifier())
|
||||
storage.device.set_slip39_iteration_exponent(
|
||||
storage.recovery.get_slip39_iteration_exponent()
|
||||
)
|
||||
storage.recovery.end_progress()
|
||||
|
||||
await show_success(ctx, ("You have successfully", "recovered your wallet."))
|
||||
|
||||
return Success(message="Device recovered")
|
||||
|
||||
|
||||
async def _request_and_store_word_count(ctx: wire.Context, dry_run: bool) -> int:
|
||||
homepage = layout.RecoveryHomescreen("Select number of words")
|
||||
await layout.homescreen_dialog(ctx, homepage, "Select")
|
||||
|
||||
# ask for the number of words
|
||||
word_count = await layout.request_word_count(ctx, dry_run)
|
||||
|
||||
# save them into storage
|
||||
storage.recovery.set_word_count(word_count)
|
||||
|
||||
return word_count
|
||||
|
||||
|
||||
async def _request_secret(ctx: wire.Context, word_count: int, mnemonic_type: int):
|
||||
await _request_share_first_screen(ctx, word_count, mnemonic_type)
|
||||
|
||||
secret = None
|
||||
while secret is None:
|
||||
# ask for mnemonic words one by one
|
||||
mnemonics = storage.recovery_shares.fetch()
|
||||
try:
|
||||
words = await layout.request_mnemonic(
|
||||
ctx, word_count, mnemonic_type, mnemonics
|
||||
)
|
||||
except IdentifierMismatchError:
|
||||
await layout.show_identifier_mismatch(ctx)
|
||||
continue
|
||||
except ShareAlreadyAddedError:
|
||||
await layout.show_share_already_added(ctx)
|
||||
continue
|
||||
# process this seed share
|
||||
try:
|
||||
secret = recover.process_share(words, mnemonic_type)
|
||||
except MnemonicError:
|
||||
await layout.show_invalid_mnemonic(ctx, mnemonic_type)
|
||||
continue
|
||||
if secret is None:
|
||||
await _request_share_next_screen(ctx, mnemonic_type)
|
||||
|
||||
return secret
|
||||
|
||||
|
||||
async def _request_share_first_screen(
|
||||
ctx: wire.Context, word_count: int, mnemonic_type: int
|
||||
):
|
||||
if mnemonic_type == mnemonic.TYPE_BIP39:
|
||||
content = layout.RecoveryHomescreen(
|
||||
"Enter recovery seed", "(%d words)" % word_count
|
||||
)
|
||||
await layout.homescreen_dialog(ctx, content, "Enter seed")
|
||||
elif mnemonic_type == mnemonic.TYPE_SLIP39:
|
||||
remaining = storage.recovery.get_remaining()
|
||||
if remaining:
|
||||
await _request_share_next_screen(ctx, mnemonic_type)
|
||||
else:
|
||||
content = layout.RecoveryHomescreen(
|
||||
"Enter any share", "(%d words)" % word_count
|
||||
)
|
||||
await layout.homescreen_dialog(ctx, content, "Enter share")
|
||||
else:
|
||||
raise RuntimeError
|
||||
|
||||
|
||||
async def _request_share_next_screen(ctx: wire.Context, mnemonic_type: int):
|
||||
if mnemonic_type == mnemonic.TYPE_SLIP39:
|
||||
remaining = storage.recovery.get_remaining()
|
||||
if remaining == 1:
|
||||
text = "1 more share"
|
||||
else:
|
||||
text = "%d more shares" % remaining
|
||||
content = layout.RecoveryHomescreen(text, "needed to enter")
|
||||
await layout.homescreen_dialog(ctx, content, "Enter share")
|
||||
else:
|
||||
raise RuntimeError
|
@ -0,0 +1,222 @@
|
||||
from trezor import ui, wire
|
||||
from trezor.errors import IdentifierMismatchError, ShareAlreadyAddedError
|
||||
from trezor.messages import ButtonRequestType
|
||||
from trezor.messages.ButtonAck import ButtonAck
|
||||
from trezor.messages.ButtonRequest import ButtonRequest
|
||||
from trezor.ui.info import InfoConfirm
|
||||
from trezor.ui.text import Text
|
||||
from trezor.ui.word_select import WordSelector
|
||||
|
||||
from .bip39_keyboard import Bip39Keyboard
|
||||
from .recover import RecoveryAborted
|
||||
from .slip39_keyboard import Slip39Keyboard
|
||||
|
||||
from apps.common import mnemonic, storage
|
||||
from apps.common.confirm import confirm, require_confirm
|
||||
from apps.common.layout import show_success, show_warning
|
||||
|
||||
if __debug__:
|
||||
from apps.debug import input_signal, confirm_signal
|
||||
|
||||
if False:
|
||||
from typing import List
|
||||
|
||||
|
||||
async def confirm_abort(ctx: wire.Context, dry_run: bool = False) -> bool:
|
||||
if dry_run:
|
||||
text = Text("Abort seed check", ui.ICON_WIPE)
|
||||
text.normal("Do you really want to", "abort the seed check?")
|
||||
else:
|
||||
text = Text("Abort recovery", ui.ICON_WIPE)
|
||||
text.normal("Do you really want to", "abort the recovery", "process?")
|
||||
text.bold("All progress will be lost.")
|
||||
return await confirm(ctx, text, code=ButtonRequestType.ProtectCall)
|
||||
|
||||
|
||||
async def request_word_count(ctx: wire.Context, dry_run: bool) -> int:
|
||||
await ctx.call(ButtonRequest(code=ButtonRequestType.MnemonicWordCount), ButtonAck)
|
||||
|
||||
if dry_run:
|
||||
text = Text("Seed check", ui.ICON_RECOVERY)
|
||||
else:
|
||||
text = Text("Wallet recovery", ui.ICON_RECOVERY)
|
||||
text.normal("Number of words?")
|
||||
|
||||
if __debug__:
|
||||
count = await ctx.wait(WordSelector(text), input_signal)
|
||||
count = int(count) # if input_signal was triggered, count is a string
|
||||
else:
|
||||
count = await ctx.wait(WordSelector(text))
|
||||
|
||||
return count
|
||||
|
||||
|
||||
async def request_mnemonic(
|
||||
ctx: wire.Context, count: int, mnemonic_type: int, mnemonics: List[str]
|
||||
) -> str:
|
||||
await ctx.call(ButtonRequest(code=ButtonRequestType.MnemonicInput), ButtonAck)
|
||||
|
||||
words = []
|
||||
for i in range(count):
|
||||
if mnemonic_type == mnemonic.TYPE_SLIP39:
|
||||
keyboard = Slip39Keyboard("Type word %s of %s:" % (i + 1, count))
|
||||
else:
|
||||
keyboard = Bip39Keyboard("Type word %s of %s:" % (i + 1, count))
|
||||
if __debug__:
|
||||
word = await ctx.wait(keyboard, input_signal)
|
||||
else:
|
||||
word = await ctx.wait(keyboard)
|
||||
|
||||
if mnemonic_type == mnemonic.TYPE_SLIP39 and mnemonics:
|
||||
# check if first 3 words of mnemonic match
|
||||
# we can check against the first one, others were checked already
|
||||
if i < 3:
|
||||
share_list = mnemonics[0].split(" ")
|
||||
if share_list[i] != word:
|
||||
raise IdentifierMismatchError()
|
||||
elif i == 3:
|
||||
for share in mnemonics:
|
||||
share_list = share.split(" ")
|
||||
# check if the fourth word is different from previous shares
|
||||
if share_list[i] == word:
|
||||
raise ShareAlreadyAddedError()
|
||||
|
||||
words.append(word)
|
||||
|
||||
return " ".join(words)
|
||||
|
||||
|
||||
async def show_dry_run_result(ctx: wire.Context, result: bool) -> None:
|
||||
if result:
|
||||
await show_success(
|
||||
ctx,
|
||||
(
|
||||
"The entered recovery",
|
||||
"seed is valid and matches",
|
||||
"the one in the device.",
|
||||
),
|
||||
)
|
||||
else:
|
||||
await show_warning(
|
||||
ctx,
|
||||
(
|
||||
"The entered recovery",
|
||||
"seed is valid but does not",
|
||||
"match the one in the device.",
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
async def show_dry_run_different_type(ctx: wire.Context) -> None:
|
||||
text = Text("Dry run failure", ui.ICON_CANCEL)
|
||||
text.normal("Seed in the device was")
|
||||
text.normal("created using another")
|
||||
text.normal("backup mechanism.")
|
||||
await require_confirm(
|
||||
ctx, text, ButtonRequestType.ProtectCall, cancel=None, confirm="Continue"
|
||||
)
|
||||
|
||||
|
||||
async def show_keyboard_info(ctx: wire.Context) -> None:
|
||||
# TODO: do not send ButtonRequestType.Other
|
||||
await ctx.call(ButtonRequest(code=ButtonRequestType.Other), ButtonAck)
|
||||
|
||||
info = InfoConfirm(
|
||||
"Did you know? "
|
||||
"You can type the letters "
|
||||
"one by one or use it like "
|
||||
"a T9 keyboard.",
|
||||
"Great!",
|
||||
)
|
||||
if __debug__:
|
||||
await ctx.wait(info, confirm_signal)
|
||||
else:
|
||||
await ctx.wait(info)
|
||||
|
||||
|
||||
async def show_invalid_mnemonic(ctx, mnemonic_type: int):
|
||||
if mnemonic_type == mnemonic.TYPE_SLIP39:
|
||||
await show_warning(
|
||||
ctx,
|
||||
("You have entered", "a recovery share", "that is not valid."),
|
||||
button="Try again",
|
||||
)
|
||||
else:
|
||||
await show_warning(
|
||||
ctx,
|
||||
("You have entered", "a recovery seed", "that is not valid."),
|
||||
button="Try again",
|
||||
)
|
||||
|
||||
|
||||
async def show_share_already_added(ctx):
|
||||
return await show_warning(
|
||||
ctx,
|
||||
("Share already entered,", "please enter", "a different share."),
|
||||
button="Try again",
|
||||
)
|
||||
|
||||
|
||||
async def show_identifier_mismatch(ctx):
|
||||
return await show_warning(
|
||||
ctx,
|
||||
("You have entered", "a share from another", "Shamir Backup."),
|
||||
button="Try again",
|
||||
)
|
||||
|
||||
|
||||
class RecoveryHomescreen(ui.Control):
|
||||
def __init__(self, text: str, subtext: str = None):
|
||||
self.text = text
|
||||
self.subtext = subtext
|
||||
self.dry_run = storage.recovery.is_dry_run()
|
||||
self.repaint = True
|
||||
|
||||
def on_render(self):
|
||||
if not self.repaint:
|
||||
return
|
||||
|
||||
if self.dry_run:
|
||||
heading = "SEED CHECK"
|
||||
else:
|
||||
heading = "RECOVERY MODE"
|
||||
ui.header_warning(heading, clear=False)
|
||||
|
||||
if not self.subtext:
|
||||
ui.display.text_center(ui.WIDTH // 2, 80, self.text, ui.BOLD, ui.FG, ui.BG)
|
||||
else:
|
||||
ui.display.text_center(ui.WIDTH // 2, 65, self.text, ui.BOLD, ui.FG, ui.BG)
|
||||
ui.display.text_center(
|
||||
ui.WIDTH // 2, 92, self.subtext, ui.NORMAL, ui.FG, ui.BG
|
||||
)
|
||||
|
||||
ui.display.text_center(
|
||||
ui.WIDTH // 2, 130, "It is safe to eject Trezor", ui.NORMAL, ui.GREY, ui.BG
|
||||
)
|
||||
ui.display.text_center(
|
||||
ui.WIDTH // 2, 155, "and continue later", ui.NORMAL, ui.GREY, ui.BG
|
||||
)
|
||||
|
||||
self.repaint = False
|
||||
|
||||
|
||||
async def homescreen_dialog(
|
||||
ctx: wire.Context, homepage: RecoveryHomescreen, button_label: str
|
||||
) -> None:
|
||||
while True:
|
||||
# make sure the homepage gets painted, even after cancelling the dialog
|
||||
homepage.repaint = True
|
||||
continue_recovery = await confirm(
|
||||
ctx,
|
||||
homepage,
|
||||
code=ButtonRequestType.RecoveryHomepage,
|
||||
confirm=button_label,
|
||||
major_confirm=True,
|
||||
)
|
||||
if continue_recovery:
|
||||
# go forward in the recovery process
|
||||
break
|
||||
# user has chosen to abort, confirm the choice
|
||||
dry_run = storage.recovery.is_dry_run()
|
||||
if await confirm_abort(ctx, dry_run):
|
||||
raise RecoveryAborted
|
@ -0,0 +1,67 @@
|
||||
from trezor.crypto import bip39, slip39
|
||||
from trezor.errors import MnemonicError
|
||||
|
||||
from apps.common import mnemonic, storage
|
||||
|
||||
if False:
|
||||
from typing import Optional
|
||||
|
||||
|
||||
class RecoveryAborted(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def process_share(words: str, mnemonic_type: int):
|
||||
if mnemonic_type == mnemonic.TYPE_BIP39:
|
||||
return _process_bip39(words)
|
||||
else:
|
||||
return _process_slip39(words)
|
||||
|
||||
|
||||
def _process_bip39(words: str) -> bytes:
|
||||
"""
|
||||
Receives single mnemonic and processes it. Returns what is then stored
|
||||
in the storage, which is the mnemonic itself for BIP-39.
|
||||
"""
|
||||
if not bip39.check(words):
|
||||
raise MnemonicError()
|
||||
return words.encode()
|
||||
|
||||
|
||||
def _process_slip39(words: str) -> Optional[bytes]:
|
||||
"""
|
||||
Receives single mnemonic and processes it. Returns what is then stored in storage or
|
||||
None if more shares are needed.
|
||||
"""
|
||||
identifier, iteration_exponent, _, _, _, index, threshold, value = slip39.decode_mnemonic(
|
||||
words
|
||||
) # TODO: use better data structure for this
|
||||
if threshold == 1:
|
||||
raise ValueError("Threshold equal to 1 is not allowed.")
|
||||
|
||||
# if this is the first share, parse and store metadata
|
||||
if not storage.recovery.get_remaining():
|
||||
storage.recovery.set_slip39_iteration_exponent(iteration_exponent)
|
||||
storage.recovery.set_slip39_identifier(identifier)
|
||||
storage.recovery.set_slip39_threshold(threshold)
|
||||
storage.recovery.set_remaining(threshold - 1)
|
||||
storage.recovery_shares.set(index, words)
|
||||
return None # we need more shares
|
||||
|
||||
# These should be checked by UI before so it's a Runtime exception otherwise
|
||||
if identifier != storage.recovery.get_slip39_identifier():
|
||||
raise RuntimeError("Slip39: Share identifiers do not match")
|
||||
if storage.recovery_shares.get(index):
|
||||
raise RuntimeError("Slip39: This mnemonic was already entered")
|
||||
|
||||
# add mnemonic to storage
|
||||
remaining = storage.recovery.get_remaining() - 1
|
||||
storage.recovery.set_remaining(remaining)
|
||||
storage.recovery_shares.set(index, words)
|
||||
if remaining != 0:
|
||||
return None # we need more shares
|
||||
|
||||
# combine shares and return the master secret
|
||||
mnemonics = storage.recovery_shares.fetch()
|
||||
identifier, iteration_exponent, secret = slip39.combine_mnemonics(mnemonics)
|
||||
return secret
|
@ -0,0 +1,15 @@
|
||||
# TODO: review: I'm not sure where this should be, but the idea
|
||||
# is to have a file containing all Runtime exceptions, that means
|
||||
# exceptions that are internal only
|
||||
|
||||
|
||||
class MnemonicError(RuntimeError):
|
||||
pass
|
||||
|
||||
|
||||
class IdentifierMismatchError(MnemonicError):
|
||||
pass
|
||||
|
||||
|
||||
class ShareAlreadyAddedError(MnemonicError):
|
||||
pass
|
Loading…
Reference in new issue