1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2024-12-18 04:18:10 +00:00

Shamir persistance (#347)

Shamir persistance
This commit is contained in:
Tomas Susanka 2019-07-24 15:34:06 +02:00 committed by GitHub
commit f6d127523e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 1012 additions and 596 deletions

View File

@ -64,6 +64,9 @@ message ButtonRequest {
ButtonRequest_MnemonicInput = 13; ButtonRequest_MnemonicInput = 13;
ButtonRequest_PassphraseType = 14; ButtonRequest_PassphraseType = 14;
ButtonRequest_UnknownDerivationPath = 15; ButtonRequest_UnknownDerivationPath = 15;
ButtonRequest_RecoveryHomepage = 16;
ButtonRequest_Success = 17;
ButtonRequest_Warning = 18;
} }
} }

View File

@ -135,7 +135,9 @@ STATIC void wrapped_ui_wait_callback(uint32_t current, uint32_t total) {
} }
/// def seed( /// def seed(
/// mnemonic: str, passphrase: str, callback: Tuple[int, int, None] = None /// mnemonic: str,
/// passphrase: str,
/// callback: Callable[[int, int], None] = None,
/// ) -> bytes: /// ) -> bytes:
/// """ /// """
/// Generate seed from mnemonic and passphrase. /// Generate seed from mnemonic and passphrase.

View File

@ -40,7 +40,9 @@ def check(mnemonic: str) -> bool:
# extmod/modtrezorcrypto/modtrezorcrypto-bip39.h # extmod/modtrezorcrypto/modtrezorcrypto-bip39.h
def seed( def seed(
mnemonic: str, passphrase: str, callback: Tuple[int, int, None] = None mnemonic: str,
passphrase: str,
callback: Callable[[int, int], None] = None,
) -> bytes: ) -> bytes:
""" """
Generate seed from mnemonic and passphrase. Generate seed from mnemonic and passphrase.

View File

@ -1,6 +1,4 @@
def const(c: int) -> int: def const(c: int) -> int: ...
return c
def mem_info() -> None: ... def mem_info() -> None: ...
def mem_current() -> int: ... def mem_current() -> int: ...
def mem_total() -> int: ... def mem_total() -> int: ...

View File

@ -38,8 +38,8 @@ async def get_keychain(ctx: wire.Context) -> Keychain:
passphrase = await protect_by_passphrase(ctx) passphrase = await protect_by_passphrase(ctx)
cache.set_passphrase(passphrase) cache.set_passphrase(passphrase)
# TODO fix for SLIP-39! # TODO fix for SLIP-39!
mnemonic_secret, mnemonic_type = mnemonic.get() mnemonic_secret, mnemonic_module = mnemonic.get()
if mnemonic_type == mnemonic.TYPE_SLIP39: if mnemonic_module == mnemonic.slip39:
# TODO: we need to modify bip32.from_mnemonic_cardano to accept entropy directly # TODO: we need to modify bip32.from_mnemonic_cardano to accept entropy directly
raise NotImplementedError("SLIP-39 currently does not support Cardano") raise NotImplementedError("SLIP-39 currently does not support Cardano")
else: else:

View File

@ -84,7 +84,14 @@ async def show_warning(
button: str = "Continue", button: str = "Continue",
) -> None: ) -> None:
text = Text("Warning", ui.ICON_WRONG, ui.RED) text = Text("Warning", ui.ICON_WRONG, ui.RED)
await _message(ctx, text, content, subheader, button) if subheader:
text.bold(subheader)
text.br_half()
for row in content:
text.normal(row)
await require_confirm(
ctx, text, ButtonRequestType.Warning, confirm=button, cancel=None
)
async def show_success( async def show_success(
@ -94,21 +101,11 @@ async def show_success(
button: str = "Continue", button: str = "Continue",
) -> None: ) -> None:
text = Text("Success", ui.ICON_CONFIRM, ui.GREEN) text = Text("Success", ui.ICON_CONFIRM, ui.GREEN)
await _message(ctx, text, content, subheader, button)
async def _message(
ctx: wire.Context,
text: Text,
content: List[str],
subheader: str = None,
button: str = "Continue",
) -> None:
if subheader: if subheader:
text.bold(subheader) text.bold(subheader)
text.br_half() text.br_half()
for row in content: for row in content:
text.normal(row) text.normal(row)
await require_confirm( await require_confirm(
ctx, text, ButtonRequestType.Other, confirm=button, cancel=None ctx, text, ButtonRequestType.Success, confirm=button, cancel=None
) )

View File

@ -1,20 +1,23 @@
from micropython import const from micropython import const
from trezor import ui, wire, workflow from trezor import ui, workflow
from trezor.crypto.hashlib import sha256 from trezor.crypto import bip39, slip39
from trezor.messages.Success import Success
from trezor.utils import consteq
from apps.common import storage from apps.common import storage
from apps.common.mnemonic import bip39, slip39
if False: if False:
from typing import Any, Tuple from typing import Tuple
TYPE_BIP39 = const(0) TYPE_BIP39 = const(0)
TYPE_SLIP39 = const(1) TYPE_SLIP39 = const(1)
TYPES_WORD_COUNT = {12: bip39, 18: bip39, 24: bip39, 20: slip39, 33: slip39} TYPES_WORD_COUNT = {
12: TYPE_BIP39,
18: TYPE_BIP39,
24: TYPE_BIP39,
20: TYPE_SLIP39,
33: TYPE_SLIP39,
}
def get() -> Tuple[bytes, int]: def get() -> Tuple[bytes, int]:
@ -27,26 +30,30 @@ def get() -> Tuple[bytes, int]:
def get_seed(passphrase: str = "", progress_bar: bool = True) -> bytes: def get_seed(passphrase: str = "", progress_bar: bool = True) -> bytes:
mnemonic_secret, mnemonic_type = get() mnemonic_secret, mnemonic_type = get()
render_func = None
if progress_bar:
_start_progress()
render_func = _render_progress
if mnemonic_type == TYPE_BIP39: if mnemonic_type == TYPE_BIP39:
return bip39.get_seed(mnemonic_secret, passphrase, progress_bar) seed = bip39.seed(mnemonic_secret.decode(), passphrase, render_func)
elif mnemonic_type == TYPE_SLIP39: elif mnemonic_type == TYPE_SLIP39:
return slip39.get_seed(mnemonic_secret, passphrase, progress_bar) identifier = storage.device.get_slip39_identifier()
raise ValueError("Unknown mnemonic type") iteration_exponent = storage.device.get_slip39_iteration_exponent()
seed = slip39.decrypt(
identifier, iteration_exponent, mnemonic_secret, passphrase
def dry_run(secret: bytes) -> None:
digest_input = sha256(secret).digest()
stored, _ = get()
digest_stored = sha256(stored).digest()
if consteq(digest_stored, digest_input):
return Success(message="The seed is valid and matches the one in the device")
else:
raise wire.ProcessError(
"The seed is valid but does not match the one in the device"
) )
if progress_bar:
_stop_progress()
return seed
def module_from_words_count(count: int) -> Any:
def type_from_word_count(count: int) -> int:
if count not in TYPES_WORD_COUNT:
raise RuntimeError("Recovery: Unknown words count")
return TYPES_WORD_COUNT[count] return TYPES_WORD_COUNT[count]

View File

@ -1,47 +0,0 @@
from trezor.crypto import bip39
from apps.common import mnemonic, storage
def get_type() -> int:
return mnemonic.TYPE_BIP39
def process_single(mnemonic: str) -> bytes:
"""
Receives single mnemonic and processes it. Returns what is then stored in storage or
None if more shares are needed.
"""
return mnemonic.encode()
def process_all(mnemonics: list) -> bytes:
"""
Receives all mnemonics (just one in case of BIP-39) and processes it into a master
secret which is usually then stored in storage.
"""
return mnemonics[0].encode()
def store(secret: bytes, needs_backup: bool, no_backup: bool) -> None:
storage.device.store_mnemonic_secret(
secret, mnemonic.TYPE_BIP39, needs_backup, no_backup
)
def get_seed(secret: bytes, passphrase: str, progress_bar: bool = True) -> bytes:
if progress_bar:
mnemonic._start_progress()
seed = bip39.seed(secret.decode(), passphrase, mnemonic._render_progress)
mnemonic._stop_progress()
else:
seed = bip39.seed(secret.decode(), passphrase)
return seed
def get_mnemonic_threshold(mnemonic: str) -> int:
return 1
def check(secret: bytes) -> bool:
return bip39.check(secret)

View File

@ -1,103 +0,0 @@
from trezor.crypto import slip39
from apps.common import mnemonic, storage
if False:
from typing import Optional
def generate_from_secret(master_secret: bytes, count: int, threshold: int) -> list:
"""
Generates new Shamir backup for 'master_secret'. Multiple groups are not yet supported.
"""
return slip39.generate_single_group_mnemonics_from_data(
master_secret, storage.slip39.get_identifier(), threshold, count
)
def get_type() -> int:
return mnemonic.TYPE_SLIP39
def process_single(mnemonic: str) -> Optional[bytes]:
"""
Receives single mnemonic and processes it. Returns what is then stored in storage or
None if more shares are needed.
"""
identifier, iteration_exponent, _, _, _, index, threshold, value = slip39.decode_mnemonic(
mnemonic
) # TODO: use better data structure for this
if threshold == 1:
raise ValueError("Threshold equal to 1 is not allowed.")
# if recovery is not in progress already, start it and wait for more mnemonics
if not storage.slip39.is_in_progress():
storage.slip39.set_in_progress(True)
storage.slip39.set_iteration_exponent(iteration_exponent)
storage.slip39.set_identifier(identifier)
storage.slip39.set_threshold(threshold)
storage.slip39.set_remaining(threshold - 1)
storage.slip39.set_words_count(len(mnemonic.split()))
storage.slip39_mnemonics.set(index, mnemonic)
return None # we need more shares
# check identifier and member index of this share against stored values
if identifier != storage.slip39.get_identifier():
# TODO: improve UX (tell user)
raise ValueError("Share identifiers do not match")
if storage.slip39_mnemonics.get(index):
# TODO: improve UX (tell user)
raise ValueError("This mnemonic was already entered")
# append to storage
remaining = storage.slip39.get_remaining() - 1
storage.slip39.set_remaining(remaining)
storage.slip39.set(index, mnemonic)
if remaining != 0:
return None # we need more shares
# combine shares and return the master secret
mnemonics = storage.slip39_mnemonics.fetch()
if len(mnemonics) != threshold:
raise ValueError("Some mnemonics are still missing.")
_, _, secret = slip39.combine_mnemonics(mnemonics)
return secret
def process_all(mnemonics: list) -> bytes:
"""
Receives all mnemonics and processes it into pre-master secret which is usually then
stored in the storage.
"""
identifier, iteration_exponent, secret = slip39.combine_mnemonics(mnemonics)
storage.slip39.set_iteration_exponent(iteration_exponent)
storage.slip39.set_identifier(identifier)
return secret
def store(secret: bytes, needs_backup: bool, no_backup: bool) -> None:
storage.device.store_mnemonic_secret(
secret, mnemonic.TYPE_SLIP39, needs_backup, no_backup
)
storage.slip39.delete_progress()
def get_seed(
encrypted_master_secret: bytes, passphrase: str, progress_bar: bool = True
) -> bytes:
if progress_bar:
mnemonic._start_progress()
identifier = storage.slip39.get_identifier()
iteration_exponent = storage.slip39.get_iteration_exponent()
master_secret = slip39.decrypt(
identifier, iteration_exponent, encrypted_master_secret, passphrase
)
if progress_bar:
mnemonic._stop_progress()
return master_secret
def get_mnemonic_threshold(mnemonic: str) -> int:
_, _, _, _, _, _, threshold, _ = slip39.decode_mnemonic(mnemonic)
return threshold

View File

@ -1,7 +1,7 @@
from trezor import config from trezor import config
from apps.common import cache from apps.common import cache
from apps.common.storage import common, device, slip39 from apps.common.storage import common, device, recovery
def set_current_version() -> None: def set_current_version() -> None:
@ -9,7 +9,7 @@ def set_current_version() -> None:
def is_initialized() -> bool: def is_initialized() -> bool:
return device.is_version_stored() and not slip39.is_in_progress() return device.is_version_stored() and not recovery.is_in_progress()
def wipe() -> None: def wipe() -> None:

View File

@ -7,8 +7,8 @@ if False:
# fmt: off # fmt: off
# Intentionally not using const() to allow import in submodules. # Intentionally not using const() to allow import in submodules.
_APP_DEVICE = 0x01 _APP_DEVICE = 0x01
_APP_SLIP39 = 0x02 _APP_RECOVERY = 0x02
_APP_SLIP39_MNEMONICS = 0x03 _APP_RECOVERY_SHARES = 0x03
# fmt: on # fmt: on
_FALSE_BYTE = b"\x00" _FALSE_BYTE = b"\x00"

View File

@ -13,22 +13,24 @@ _NAMESPACE = common._APP_DEVICE
# fmt: off # fmt: off
# Keys: # Keys:
_DEVICE_ID = const(0x00) # bytes _DEVICE_ID = const(0x00) # bytes
_VERSION = const(0x01) # int _VERSION = const(0x01) # int
_MNEMONIC_SECRET = const(0x02) # bytes _MNEMONIC_SECRET = const(0x02) # bytes
_LANGUAGE = const(0x03) # str _LANGUAGE = const(0x03) # str
_LABEL = const(0x04) # str _LABEL = const(0x04) # str
_USE_PASSPHRASE = const(0x05) # bool (0x01 or empty) _USE_PASSPHRASE = const(0x05) # bool (0x01 or empty)
_HOMESCREEN = const(0x06) # bytes _HOMESCREEN = const(0x06) # bytes
_NEEDS_BACKUP = const(0x07) # bool (0x01 or empty) _NEEDS_BACKUP = const(0x07) # bool (0x01 or empty)
_FLAGS = const(0x08) # int _FLAGS = const(0x08) # int
_U2F_COUNTER = const(0x09) # int _U2F_COUNTER = const(0x09) # int
_PASSPHRASE_SOURCE = const(0x0A) # int _PASSPHRASE_SOURCE = const(0x0A) # int
_UNFINISHED_BACKUP = const(0x0B) # bool (0x01 or empty) _UNFINISHED_BACKUP = const(0x0B) # bool (0x01 or empty)
_AUTOLOCK_DELAY_MS = const(0x0C) # int _AUTOLOCK_DELAY_MS = const(0x0C) # int
_NO_BACKUP = const(0x0D) # bool (0x01 or empty) _NO_BACKUP = const(0x0D) # bool (0x01 or empty)
_MNEMONIC_TYPE = const(0x0E) # int _MNEMONIC_TYPE = const(0x0E) # int
_ROTATION = const(0x0F) # int _ROTATION = const(0x0F) # int
_SLIP39_IDENTIFIER = const(0x10) # bool
_SLIP39_ITERATION_EXPONENT = const(0x11) # int
# fmt: on # fmt: on
HOMESCREEN_MAXSIZE = 16384 HOMESCREEN_MAXSIZE = 16384
@ -202,3 +204,33 @@ def next_u2f_counter() -> Optional[int]:
def set_u2f_counter(count: int) -> None: def set_u2f_counter(count: int) -> None:
common._set_counter(_NAMESPACE, _U2F_COUNTER, count, True) # writable when locked common._set_counter(_NAMESPACE, _U2F_COUNTER, count, True) # writable when locked
def set_slip39_identifier(identifier: int) -> None:
"""
The device's actual SLIP-39 identifier used in passphrase derivation.
Not to be confused with recovery.identifier, which is stored only during
the recovery process and it is copied here upon success.
"""
common._set_uint16(_NAMESPACE, _SLIP39_IDENTIFIER, identifier)
def get_slip39_identifier() -> Optional[int]:
"""The device's actual SLIP-39 identifier used in passphrase derivation."""
return common._get_uint16(_NAMESPACE, _SLIP39_IDENTIFIER)
def set_slip39_iteration_exponent(exponent: int) -> None:
"""
The device's actual SLIP-39 iteration exponent used in passphrase derivation.
Not to be confused with recovery.iteration_exponent, which is stored only during
the recovery process and it is copied here upon success.
"""
common._set_uint8(_NAMESPACE, _SLIP39_ITERATION_EXPONENT, exponent)
def get_slip39_iteration_exponent() -> Optional[int]:
"""
The device's actual SLIP-39 iteration exponent used in passphrase derivation.
"""
return common._get_uint8(_NAMESPACE, _SLIP39_ITERATION_EXPONENT)

View File

@ -0,0 +1,87 @@
from micropython import const
from apps.common.storage import common, recovery_shares
# Namespace:
_NAMESPACE = common._APP_RECOVERY
# fmt: off
# Keys:
_IN_PROGRESS = const(0x00) # bool
_DRY_RUN = const(0x01) # bool
_WORD_COUNT = const(0x02) # int
_REMAINING = const(0x05) # int
_SLIP39_IDENTIFIER = const(0x03) # bytes
_SLIP39_THRESHOLD = const(0x04) # int
_SLIP39_ITERATION_EXPONENT = const(0x06) # int
# fmt: on
if False:
from typing import Optional
def set_in_progress(val: bool):
common._set_bool(_NAMESPACE, _IN_PROGRESS, val)
def is_in_progress():
return common._get_bool(_NAMESPACE, _IN_PROGRESS)
def set_dry_run(val: bool):
common._set_bool(_NAMESPACE, _DRY_RUN, val)
def is_dry_run() -> bool:
return common._get_bool(_NAMESPACE, _DRY_RUN)
def set_word_count(count: int):
common._set_uint8(_NAMESPACE, _WORD_COUNT, count)
def get_word_count() -> int:
return common._get_uint8(_NAMESPACE, _WORD_COUNT)
def set_slip39_identifier(identifier: int) -> None:
common._set_uint16(_NAMESPACE, _SLIP39_IDENTIFIER, identifier)
def get_slip39_identifier() -> Optional[int]:
return common._get_uint16(_NAMESPACE, _SLIP39_IDENTIFIER)
def set_slip39_threshold(threshold: int) -> None:
common._set_uint8(_NAMESPACE, _SLIP39_THRESHOLD, threshold)
def get_slip39_threshold() -> Optional[int]:
return common._get_uint8(_NAMESPACE, _SLIP39_THRESHOLD)
def set_remaining(remaining: int) -> None:
common._set_uint8(_NAMESPACE, _REMAINING, remaining)
def get_remaining() -> Optional[int]:
return common._get_uint8(_NAMESPACE, _REMAINING)
def set_slip39_iteration_exponent(exponent: int) -> None:
common._set_uint8(_NAMESPACE, _SLIP39_ITERATION_EXPONENT, exponent)
def get_slip39_iteration_exponent() -> Optional[int]:
return common._get_uint8(_NAMESPACE, _SLIP39_ITERATION_EXPONENT)
def end_progress():
common._delete(_NAMESPACE, _IN_PROGRESS)
common._delete(_NAMESPACE, _DRY_RUN)
common._delete(_NAMESPACE, _WORD_COUNT)
common._delete(_NAMESPACE, _SLIP39_IDENTIFIER)
common._delete(_NAMESPACE, _SLIP39_THRESHOLD)
common._delete(_NAMESPACE, _REMAINING)
common._delete(_NAMESPACE, _SLIP39_ITERATION_EXPONENT)
recovery_shares.delete()

View File

@ -10,11 +10,11 @@ if False:
def set(index: int, mnemonic: str) -> None: def set(index: int, mnemonic: str) -> None:
common._set(common._APP_SLIP39_MNEMONICS, index, mnemonic.encode()) common._set(common._APP_RECOVERY_SHARES, index, mnemonic.encode())
def get(index: int) -> Optional[str]: def get(index: int) -> Optional[str]:
m = common._get(common._APP_SLIP39_MNEMONICS, index) m = common._get(common._APP_RECOVERY_SHARES, index)
if m: if m:
return m.decode() return m.decode()
return None return None
@ -31,4 +31,4 @@ def fetch() -> List[str]:
def delete() -> None: def delete() -> None:
for index in range(0, slip39.MAX_SHARE_COUNT): for index in range(0, slip39.MAX_SHARE_COUNT):
common._delete(common._APP_SLIP39_MNEMONICS, index) common._delete(common._APP_RECOVERY_SHARES, index)

View File

@ -1,75 +0,0 @@
from micropython import const
from apps.common.storage import common, slip39_mnemonics
if False:
from typing import Optional
# Namespace:
_NAMESPACE = common._APP_SLIP39
# fmt: off
# Keys:
_SLIP39_IN_PROGRESS = const(0x00) # bool
_SLIP39_IDENTIFIER = const(0x01) # bytes
_SLIP39_THRESHOLD = const(0x02) # int
_SLIP39_REMAINING = const(0x03) # int
_SLIP39_WORDS_COUNT = const(0x04) # int
_SLIP39_ITERATION_EXPONENT = const(0x05) # int
# fmt: on
def set_in_progress(val: bool) -> None:
common._set_bool(_NAMESPACE, _SLIP39_IN_PROGRESS, val)
def is_in_progress() -> bool:
return common._get_bool(_NAMESPACE, _SLIP39_IN_PROGRESS)
def set_identifier(identifier: int) -> None:
common._set_uint16(_NAMESPACE, _SLIP39_IDENTIFIER, identifier)
def get_identifier() -> Optional[int]:
return common._get_uint16(_NAMESPACE, _SLIP39_IDENTIFIER)
def set_threshold(threshold: int) -> None:
common._set_uint8(_NAMESPACE, _SLIP39_THRESHOLD, threshold)
def get_threshold() -> Optional[int]:
return common._get_uint8(_NAMESPACE, _SLIP39_THRESHOLD)
def set_remaining(remaining: int) -> None:
common._set_uint8(_NAMESPACE, _SLIP39_REMAINING, remaining)
def get_remaining() -> Optional[int]:
return common._get_uint8(_NAMESPACE, _SLIP39_REMAINING)
def set_words_count(count: int) -> None:
common._set_uint8(_NAMESPACE, _SLIP39_WORDS_COUNT, count)
def get_words_count() -> Optional[int]:
return common._get_uint8(_NAMESPACE, _SLIP39_WORDS_COUNT)
def set_iteration_exponent(exponent: int) -> None:
common._set_uint8(_NAMESPACE, _SLIP39_ITERATION_EXPONENT, exponent)
def get_iteration_exponent() -> Optional[int]:
return common._get_uint8(_NAMESPACE, _SLIP39_ITERATION_EXPONENT)
def delete_progress() -> None:
common._delete(_NAMESPACE, _SLIP39_IN_PROGRESS)
common._delete(_NAMESPACE, _SLIP39_REMAINING)
common._delete(_NAMESPACE, _SLIP39_THRESHOLD)
common._delete(_NAMESPACE, _SLIP39_WORDS_COUNT)
slip39_mnemonics.delete()

View File

@ -73,9 +73,10 @@ async def handle_Ping(ctx: wire.Context, msg: Ping) -> Success:
return Success(message=msg.message) return Success(message=msg.message)
def boot() -> None: def boot(features_only: bool = False) -> None:
register(MessageType.Initialize, protobuf_workflow, handle_Initialize) register(MessageType.Initialize, protobuf_workflow, handle_Initialize)
register(MessageType.GetFeatures, protobuf_workflow, handle_GetFeatures) register(MessageType.GetFeatures, protobuf_workflow, handle_GetFeatures)
register(MessageType.Cancel, protobuf_workflow, handle_Cancel) if not features_only:
register(MessageType.ClearSession, protobuf_workflow, handle_ClearSession) register(MessageType.Cancel, protobuf_workflow, handle_Cancel)
register(MessageType.Ping, protobuf_workflow, handle_Ping) register(MessageType.ClearSession, protobuf_workflow, handle_ClearSession)
register(MessageType.Ping, protobuf_workflow, handle_Ping)

View File

@ -17,9 +17,7 @@ async def homescreen() -> None:
def display_homescreen() -> None: def display_homescreen() -> None:
image = None image = None
if storage.slip39.is_in_progress(): if not storage.is_initialized():
label = "Waiting for other shares"
elif not storage.is_initialized():
label = "Go to trezor.io/start" label = "Go to trezor.io/start"
else: else:
label = storage.device.get_label() or "My Trezor" label = storage.device.get_label() or "My Trezor"
@ -29,28 +27,14 @@ def display_homescreen() -> None:
image = res.load("apps/homescreen/res/bg.toif") image = res.load("apps/homescreen/res/bg.toif")
if storage.is_initialized() and storage.device.no_backup(): if storage.is_initialized() and storage.device.no_backup():
_err("SEEDLESS") ui.header_error("SEEDLESS")
elif storage.is_initialized() and storage.device.unfinished_backup(): elif storage.is_initialized() and storage.device.unfinished_backup():
_err("BACKUP FAILED!") ui.header_error("BACKUP FAILED!")
elif storage.is_initialized() and storage.device.needs_backup(): elif storage.is_initialized() and storage.device.needs_backup():
_warn("NEEDS BACKUP!") ui.header_warning("NEEDS BACKUP!")
elif storage.is_initialized() and not config.has_pin(): elif storage.is_initialized() and not config.has_pin():
_warn("PIN NOT SET!") ui.header_warning("PIN NOT SET!")
elif storage.slip39.is_in_progress():
_warn("SHAMIR IN PROGRESS!")
else: else:
ui.display.bar(0, 0, ui.WIDTH, ui.HEIGHT, ui.BG) ui.display.bar(0, 0, ui.WIDTH, ui.HEIGHT, ui.BG)
ui.display.avatar(48, 48 - 10, image, ui.WHITE, ui.BLACK) ui.display.avatar(48, 48 - 10, image, ui.WHITE, ui.BLACK)
ui.display.text_center(ui.WIDTH // 2, 220, label, ui.BOLD, ui.FG, ui.BG) ui.display.text_center(ui.WIDTH // 2, 220, label, ui.BOLD, ui.FG, ui.BG)
def _warn(message: str) -> None:
ui.display.bar(0, 0, ui.WIDTH, 30, ui.YELLOW)
ui.display.text_center(ui.WIDTH // 2, 22, message, ui.BOLD, ui.BLACK, ui.YELLOW)
ui.display.bar(0, 30, ui.WIDTH, ui.HEIGHT - 30, ui.BG)
def _err(message: str) -> None:
ui.display.bar(0, 0, ui.WIDTH, 30, ui.RED)
ui.display.text_center(ui.WIDTH // 2, 22, message, ui.BOLD, ui.WHITE, ui.RED)
ui.display.bar(0, 30, ui.WIDTH, ui.HEIGHT - 30, ui.BG)

View File

@ -12,8 +12,8 @@ async def backup_device(ctx, msg):
if not storage.device.needs_backup(): if not storage.device.needs_backup():
raise wire.ProcessError("Seed already backed up") raise wire.ProcessError("Seed already backed up")
mnemonic_secret, mnemonic_type = mnemonic.get() mnemonic_secret, mnemonic_module = mnemonic.get()
slip39 = mnemonic_type == mnemonic.TYPE_SLIP39 slip39 = mnemonic_module == mnemonic.slip39
storage.device.set_unfinished_backup(True) storage.device.set_unfinished_backup(True)
storage.device.set_backed_up() storage.device.set_backed_up()

View File

@ -1,11 +1,11 @@
from trezor import config, wire from trezor import config, wire
from trezor.crypto import bip39
from trezor.messages.Success import Success from trezor.messages.Success import Success
from trezor.pin import pin_to_int from trezor.pin import pin_to_int
from trezor.ui.text import Text from trezor.ui.text import Text
from apps.common import storage from apps.common import mnemonic, storage
from apps.common.confirm import require_confirm from apps.common.confirm import require_confirm
from apps.common.mnemonic import bip39
async def load_device(ctx, msg): async def load_device(ctx, msg):
@ -24,10 +24,9 @@ async def load_device(ctx, msg):
text.normal("Continue only if you", "know what you are doing!") text.normal("Continue only if you", "know what you are doing!")
await require_confirm(ctx, text) await require_confirm(ctx, text)
secret = bip39.process_all([msg.mnemonic])
storage.device.store_mnemonic_secret( storage.device.store_mnemonic_secret(
secret=secret, secret=msg.mnemonic.encode(),
mnemonic_type=bip39.get_type(), mnemonic_type=mnemonic.TYPE_BIP39,
needs_backup=True, needs_backup=True,
no_backup=False, no_backup=False,
) )

View File

@ -1,191 +0,0 @@
from trezor import config, ui, wire
from trezor.crypto import slip39
from trezor.messages import ButtonRequestType
from trezor.messages.ButtonAck import ButtonAck
from trezor.messages.ButtonRequest import ButtonRequest
from trezor.messages.Success import Success
from trezor.pin import pin_to_int
from trezor.ui.info import InfoConfirm
from trezor.ui.mnemonic_bip39 import Bip39Keyboard
from trezor.ui.mnemonic_slip39 import Slip39Keyboard
from trezor.ui.text import Text
from trezor.ui.word_select import WordSelector
from apps.common import mnemonic, storage
from apps.common.confirm import require_confirm
from apps.homescreen.homescreen import display_homescreen
from apps.management.change_pin import request_pin_ack, request_pin_confirm
if __debug__:
from apps.debug import confirm_signal, input_signal
if False:
from trezor.messages.RecoveryDevice import RecoveryDevice
async def recovery_device(ctx: wire.Context, msg: RecoveryDevice) -> Success:
"""
Recover BIP39/SLIP39 seed into empty device.
1. Ask for the number of words in recovered seed.
2. Let user type in the mnemonic words one by one.
3. Optionally check the seed validity.
4. Optionally ask for the PIN, with confirmation.
5. Save into storage.
"""
if not msg.dry_run and storage.is_initialized():
raise wire.UnexpectedMessage("Already initialized")
if not storage.slip39.is_in_progress():
if not msg.dry_run:
title = "Wallet recovery"
text = Text(title, ui.ICON_RECOVERY)
text.normal("Do you really want to", "recover the wallet?", "")
else:
title = "Simulated recovery"
text = Text(title, ui.ICON_RECOVERY)
text.normal("Do you really want to", "check the recovery", "seed?")
await require_confirm(ctx, text, code=ButtonRequestType.ProtectCall)
if msg.dry_run:
if config.has_pin():
curpin = await request_pin_ack(ctx, "Enter PIN", config.get_pin_rem())
else:
curpin = ""
if not config.check_pin(pin_to_int(curpin)):
raise wire.PinInvalid("PIN invalid")
# ask for the number of words
wordcount = await request_wordcount(ctx, title)
mnemonic_module = mnemonic.module_from_words_count(wordcount)
else:
wordcount = storage.slip39.get_words_count()
mnemonic_module = mnemonic.slip39
mnemonic_threshold = None
mnemonics = []
secret = None
while secret is None:
# ask for mnemonic words one by one
words = await request_mnemonic(
ctx, wordcount, mnemonic_module == mnemonic.slip39
)
if mnemonic_threshold is None:
try:
mnemonic_threshold = mnemonic_module.get_mnemonic_threshold(words)
except slip39.MnemonicError:
raise wire.ProcessError("Mnemonic is not valid")
mnemonics.append(words)
remaining = mnemonic_threshold - len(mnemonics)
if remaining == 0:
try:
secret = mnemonic_module.process_all(mnemonics)
except slip39.MnemonicError:
raise wire.ProcessError("Mnemonic is not valid")
# show a number of remaining mnemonics for SLIP39
if secret is None and mnemonic_module == mnemonic.slip39:
await show_remaining_slip39_mnemonics(ctx, title, remaining)
# check mnemonic validity
# it is checked automatically in SLIP-39
if mnemonic_module == mnemonic.bip39 and (msg.enforce_wordlist or msg.dry_run):
if not mnemonic_module.check(secret):
raise wire.ProcessError("Mnemonic is not valid")
# ask for pin repeatedly
if msg.pin_protection:
newpin = await request_pin_confirm(ctx, allow_cancel=False)
else:
newpin = ""
# dry run
if msg.dry_run:
return mnemonic.dry_run(secret)
# save into storage
if msg.pin_protection:
config.change_pin(pin_to_int(""), pin_to_int(newpin))
storage.device.set_u2f_counter(msg.u2f_counter)
storage.device.load_settings(
label=msg.label, use_passphrase=msg.passphrase_protection
)
mnemonic_module.store(secret=secret, needs_backup=False, no_backup=False)
await show_success(ctx)
display_homescreen()
return Success(message="Device recovered")
async def request_wordcount(ctx: wire.Context, title: str) -> int:
await ctx.call(ButtonRequest(code=ButtonRequestType.MnemonicWordCount), ButtonAck)
text = Text(title, ui.ICON_RECOVERY)
text.normal("Number of words?")
if __debug__:
count = await ctx.wait(WordSelector(text), input_signal)
count = int(count) # if input_signal was triggered, count is a string
else:
count = await ctx.wait(WordSelector(text))
return count
async def request_mnemonic(ctx: wire.Context, count: int, slip39: bool) -> str:
await ctx.call(ButtonRequest(code=ButtonRequestType.MnemonicInput), ButtonAck)
words = []
for i in range(count):
if slip39:
keyboard = Slip39Keyboard("Type word %s of %s:" % (i + 1, count))
else:
keyboard = Bip39Keyboard("Type word %s of %s:" % (i + 1, count))
if __debug__:
word = await ctx.wait(keyboard, input_signal)
else:
word = await ctx.wait(keyboard)
words.append(word)
return " ".join(words)
async def show_keyboard_info(ctx: wire.Context) -> None:
await ctx.call(ButtonRequest(code=ButtonRequestType.Other), ButtonAck)
info = InfoConfirm(
"Did you know? "
"You can type the letters "
"one by one or use it like "
"a T9 keyboard.",
"Great!",
)
if __debug__:
await ctx.wait(info, confirm_signal)
else:
await ctx.wait(info)
async def show_success(ctx):
text = Text("Recovery success", ui.ICON_RECOVERY)
text.normal("You have successfully")
text.normal("recovered your wallet.")
await require_confirm(
ctx, text, ButtonRequestType.ProtectCall, cancel=None, confirm="Continue"
)
async def show_remaining_slip39_mnemonics(
ctx: wire.Context, title: str, remaining: int
) -> None:
text = Text(title, ui.ICON_RECOVERY)
text.bold("Good job!")
text.normal("Enter %s more recovery " % remaining)
if remaining > 1:
text.normal("shares.")
else:
text.normal("share.")
await require_confirm(
ctx, text, ButtonRequestType.ProtectCall, cancel=None, confirm="Continue"
)

View File

@ -0,0 +1,77 @@
from trezor import config, ui, wire
from trezor.messages import ButtonRequestType
from trezor.messages.Success import Success
from trezor.pin import pin_to_int
from trezor.ui.text import Text
from apps.common import storage
from apps.common.confirm import require_confirm
from apps.management.change_pin import request_pin_ack, request_pin_confirm
from apps.management.recovery_device.homescreen import recovery_process
if False:
from trezor.messages.RecoveryDevice import RecoveryDevice
async def recovery_device(ctx: wire.Context, msg: RecoveryDevice) -> Success:
"""
Recover BIP39/SLIP39 seed into empty device.
Recovery is also possible with replugged Trezor. We call this process Persistance.
User starts the process here using the RecoveryDevice msg and then they can unplug
the device anytime and continue without a computer.
"""
_check_state(msg)
if not msg.dry_run:
title = "Wallet recovery"
text = Text(title, ui.ICON_RECOVERY)
text.normal("Do you really want to", "recover the wallet?", "")
else:
title = "Seed check"
text = Text(title, ui.ICON_RECOVERY)
text.normal("Do you really want to", "check the recovery", "seed?")
await require_confirm(ctx, text, code=ButtonRequestType.ProtectCall)
# for dry run pin needs to entered
if msg.dry_run:
if config.has_pin():
curpin = await request_pin_ack(ctx, "Enter PIN", config.get_pin_rem())
else:
curpin = ""
if not config.check_pin(pin_to_int(curpin)):
raise wire.PinInvalid("PIN invalid")
# set up pin if requested
if msg.pin_protection:
if msg.dry_run:
raise wire.ProcessError("Can't setup PIN during dry_run recovery.")
newpin = await request_pin_confirm(ctx, allow_cancel=False)
config.change_pin(pin_to_int(""), pin_to_int(newpin))
storage.device.set_u2f_counter(msg.u2f_counter)
storage.device.load_settings(
label=msg.label, use_passphrase=msg.passphrase_protection
)
storage.recovery.set_in_progress(True)
storage.recovery.set_dry_run(msg.dry_run)
result = await recovery_process(ctx)
return result
def _check_state(msg: RecoveryDevice):
if not msg.dry_run and storage.is_initialized():
raise wire.UnexpectedMessage("Already initialized")
if msg.dry_run and not storage.is_initialized():
raise wire.UnexpectedMessage("Device is not initialized")
if storage.recovery.is_in_progress():
raise RuntimeError(
"Function recovery_device should not be invoked when recovery is already in progress"
)
if msg.enforce_wordlist is False:
raise wire.ProcessError(
"Value enforce_wordlist must be True, Trezor Core enforces words automatically."
)

View File

@ -0,0 +1,180 @@
from trezor import loop, utils, wire
from trezor.crypto.hashlib import sha256
from trezor.errors import IdentifierMismatchError, MnemonicError, ShareAlreadyAddedError
from trezor.messages.Success import Success
from . import recover
from apps.common import mnemonic, storage
from apps.common.layout import show_success
from apps.management.recovery_device import layout
async def recovery_homescreen() -> None:
# recovery process does not communicate on the wire
ctx = wire.DummyContext()
try:
await recovery_process(ctx)
finally:
# clear the loop state, so loop.run will exit
loop.clear()
# clear the registered wire handlers to avoid conflicts
wire.clear()
async def recovery_process(ctx: wire.Context) -> Success:
try:
result = await _continue_recovery_process(ctx)
except recover.RecoveryAborted:
dry_run = storage.recovery.is_dry_run()
if dry_run:
storage.recovery.end_progress()
else:
storage.wipe()
raise wire.ActionCancelled("Cancelled")
return result
async def _continue_recovery_process(ctx: wire.Context) -> Success:
# gather the current recovery state from storage
in_progress = storage.recovery.is_in_progress()
word_count = storage.recovery.get_word_count()
dry_run = storage.recovery.is_dry_run()
if not in_progress: # invalid and inconsistent state
raise RuntimeError
if not word_count: # the first run, prompt word count from the user
word_count = await _request_and_store_word_count(ctx, dry_run)
mnemonic_type = mnemonic.type_from_word_count(word_count)
secret = await _request_secret(ctx, word_count, mnemonic_type)
if dry_run:
result = await _finish_recovery_dry_run(ctx, secret, mnemonic_type)
else:
result = await _finish_recovery(ctx, secret, mnemonic_type)
return result
async def _finish_recovery_dry_run(
ctx: wire.Context, secret: bytes, mnemonic_type: int
) -> Success:
digest_input = sha256(secret).digest()
stored, _ = mnemonic.get()
digest_stored = sha256(stored).digest()
result = utils.consteq(digest_stored, digest_input)
# Check that the identifier and iteration exponent match as well
if mnemonic_type == mnemonic.TYPE_SLIP39:
result &= (
storage.device.get_slip39_identifier()
== storage.recovery.get_slip39_identifier()
)
result &= (
storage.device.get_slip39_iteration_exponent()
== storage.recovery.get_slip39_iteration_exponent()
)
await layout.show_dry_run_result(ctx, result)
storage.recovery.end_progress()
if result:
return Success("The seed is valid and matches the one in the device")
else:
raise wire.ProcessError("The seed does not match the one in the device")
async def _finish_recovery(
ctx: wire.Context, secret: bytes, mnemonic_type: int
) -> Success:
storage.device.store_mnemonic_secret(
secret, mnemonic_type, needs_backup=False, no_backup=False
)
if mnemonic_type == mnemonic.TYPE_SLIP39:
storage.device.set_slip39_identifier(storage.recovery.get_slip39_identifier())
storage.device.set_slip39_iteration_exponent(
storage.recovery.get_slip39_iteration_exponent()
)
storage.recovery.end_progress()
await show_success(ctx, ("You have successfully", "recovered your wallet."))
return Success(message="Device recovered")
async def _request_and_store_word_count(ctx: wire.Context, dry_run: bool) -> int:
homepage = layout.RecoveryHomescreen("Select number of words")
await layout.homescreen_dialog(ctx, homepage, "Select")
# ask for the number of words
word_count = await layout.request_word_count(ctx, dry_run)
# save them into storage
storage.recovery.set_word_count(word_count)
return word_count
async def _request_secret(ctx: wire.Context, word_count: int, mnemonic_type: int):
await _request_share_first_screen(ctx, word_count, mnemonic_type)
secret = None
while secret is None:
# ask for mnemonic words one by one
mnemonics = storage.recovery_shares.fetch()
try:
words = await layout.request_mnemonic(
ctx, word_count, mnemonic_type, mnemonics
)
except IdentifierMismatchError:
await layout.show_identifier_mismatch(ctx)
continue
except ShareAlreadyAddedError:
await layout.show_share_already_added(ctx)
continue
# process this seed share
try:
secret = recover.process_share(words, mnemonic_type)
except MnemonicError:
await layout.show_invalid_mnemonic(ctx, mnemonic_type)
continue
if secret is None:
await _request_share_next_screen(ctx, mnemonic_type)
return secret
async def _request_share_first_screen(
ctx: wire.Context, word_count: int, mnemonic_type: int
):
if mnemonic_type == mnemonic.TYPE_BIP39:
content = layout.RecoveryHomescreen(
"Enter recovery seed", "(%d words)" % word_count
)
await layout.homescreen_dialog(ctx, content, "Enter seed")
elif mnemonic_type == mnemonic.TYPE_SLIP39:
remaining = storage.recovery.get_remaining()
if remaining:
await _request_share_next_screen(ctx, mnemonic_type)
else:
content = layout.RecoveryHomescreen(
"Enter any share", "(%d words)" % word_count
)
await layout.homescreen_dialog(ctx, content, "Enter share")
else:
raise RuntimeError
async def _request_share_next_screen(ctx: wire.Context, mnemonic_type: int):
if mnemonic_type == mnemonic.TYPE_SLIP39:
remaining = storage.recovery.get_remaining()
if remaining == 1:
text = "1 more share"
else:
text = "%d more shares" % remaining
content = layout.RecoveryHomescreen(text, "needed to enter")
await layout.homescreen_dialog(ctx, content, "Enter share")
else:
raise RuntimeError

View File

@ -0,0 +1,222 @@
from trezor import ui, wire
from trezor.errors import IdentifierMismatchError, ShareAlreadyAddedError
from trezor.messages import ButtonRequestType
from trezor.messages.ButtonAck import ButtonAck
from trezor.messages.ButtonRequest import ButtonRequest
from trezor.ui.info import InfoConfirm
from trezor.ui.text import Text
from trezor.ui.word_select import WordSelector
from .bip39_keyboard import Bip39Keyboard
from .recover import RecoveryAborted
from .slip39_keyboard import Slip39Keyboard
from apps.common import mnemonic, storage
from apps.common.confirm import confirm, require_confirm
from apps.common.layout import show_success, show_warning
if __debug__:
from apps.debug import input_signal, confirm_signal
if False:
from typing import List
async def confirm_abort(ctx: wire.Context, dry_run: bool = False) -> bool:
if dry_run:
text = Text("Abort seed check", ui.ICON_WIPE)
text.normal("Do you really want to", "abort the seed check?")
else:
text = Text("Abort recovery", ui.ICON_WIPE)
text.normal("Do you really want to", "abort the recovery", "process?")
text.bold("All progress will be lost.")
return await confirm(ctx, text, code=ButtonRequestType.ProtectCall)
async def request_word_count(ctx: wire.Context, dry_run: bool) -> int:
await ctx.call(ButtonRequest(code=ButtonRequestType.MnemonicWordCount), ButtonAck)
if dry_run:
text = Text("Seed check", ui.ICON_RECOVERY)
else:
text = Text("Wallet recovery", ui.ICON_RECOVERY)
text.normal("Number of words?")
if __debug__:
count = await ctx.wait(WordSelector(text), input_signal)
count = int(count) # if input_signal was triggered, count is a string
else:
count = await ctx.wait(WordSelector(text))
return count
async def request_mnemonic(
ctx: wire.Context, count: int, mnemonic_type: int, mnemonics: List[str]
) -> str:
await ctx.call(ButtonRequest(code=ButtonRequestType.MnemonicInput), ButtonAck)
words = []
for i in range(count):
if mnemonic_type == mnemonic.TYPE_SLIP39:
keyboard = Slip39Keyboard("Type word %s of %s:" % (i + 1, count))
else:
keyboard = Bip39Keyboard("Type word %s of %s:" % (i + 1, count))
if __debug__:
word = await ctx.wait(keyboard, input_signal)
else:
word = await ctx.wait(keyboard)
if mnemonic_type == mnemonic.TYPE_SLIP39 and mnemonics:
# check if first 3 words of mnemonic match
# we can check against the first one, others were checked already
if i < 3:
share_list = mnemonics[0].split(" ")
if share_list[i] != word:
raise IdentifierMismatchError()
elif i == 3:
for share in mnemonics:
share_list = share.split(" ")
# check if the fourth word is different from previous shares
if share_list[i] == word:
raise ShareAlreadyAddedError()
words.append(word)
return " ".join(words)
async def show_dry_run_result(ctx: wire.Context, result: bool) -> None:
if result:
await show_success(
ctx,
(
"The entered recovery",
"seed is valid and matches",
"the one in the device.",
),
)
else:
await show_warning(
ctx,
(
"The entered recovery",
"seed is valid but does not",
"match the one in the device.",
),
)
async def show_dry_run_different_type(ctx: wire.Context) -> None:
text = Text("Dry run failure", ui.ICON_CANCEL)
text.normal("Seed in the device was")
text.normal("created using another")
text.normal("backup mechanism.")
await require_confirm(
ctx, text, ButtonRequestType.ProtectCall, cancel=None, confirm="Continue"
)
async def show_keyboard_info(ctx: wire.Context) -> None:
# TODO: do not send ButtonRequestType.Other
await ctx.call(ButtonRequest(code=ButtonRequestType.Other), ButtonAck)
info = InfoConfirm(
"Did you know? "
"You can type the letters "
"one by one or use it like "
"a T9 keyboard.",
"Great!",
)
if __debug__:
await ctx.wait(info, confirm_signal)
else:
await ctx.wait(info)
async def show_invalid_mnemonic(ctx, mnemonic_type: int):
if mnemonic_type == mnemonic.TYPE_SLIP39:
await show_warning(
ctx,
("You have entered", "a recovery share", "that is not valid."),
button="Try again",
)
else:
await show_warning(
ctx,
("You have entered", "a recovery seed", "that is not valid."),
button="Try again",
)
async def show_share_already_added(ctx):
return await show_warning(
ctx,
("Share already entered,", "please enter", "a different share."),
button="Try again",
)
async def show_identifier_mismatch(ctx):
return await show_warning(
ctx,
("You have entered", "a share from another", "Shamir Backup."),
button="Try again",
)
class RecoveryHomescreen(ui.Control):
def __init__(self, text: str, subtext: str = None):
self.text = text
self.subtext = subtext
self.dry_run = storage.recovery.is_dry_run()
self.repaint = True
def on_render(self):
if not self.repaint:
return
if self.dry_run:
heading = "SEED CHECK"
else:
heading = "RECOVERY MODE"
ui.header_warning(heading, clear=False)
if not self.subtext:
ui.display.text_center(ui.WIDTH // 2, 80, self.text, ui.BOLD, ui.FG, ui.BG)
else:
ui.display.text_center(ui.WIDTH // 2, 65, self.text, ui.BOLD, ui.FG, ui.BG)
ui.display.text_center(
ui.WIDTH // 2, 92, self.subtext, ui.NORMAL, ui.FG, ui.BG
)
ui.display.text_center(
ui.WIDTH // 2, 130, "It is safe to eject Trezor", ui.NORMAL, ui.GREY, ui.BG
)
ui.display.text_center(
ui.WIDTH // 2, 155, "and continue later", ui.NORMAL, ui.GREY, ui.BG
)
self.repaint = False
async def homescreen_dialog(
ctx: wire.Context, homepage: RecoveryHomescreen, button_label: str
) -> None:
while True:
# make sure the homepage gets painted, even after cancelling the dialog
homepage.repaint = True
continue_recovery = await confirm(
ctx,
homepage,
code=ButtonRequestType.RecoveryHomepage,
confirm=button_label,
major_confirm=True,
)
if continue_recovery:
# go forward in the recovery process
break
# user has chosen to abort, confirm the choice
dry_run = storage.recovery.is_dry_run()
if await confirm_abort(ctx, dry_run):
raise RecoveryAborted

View File

@ -0,0 +1,67 @@
from trezor.crypto import bip39, slip39
from trezor.errors import MnemonicError
from apps.common import mnemonic, storage
if False:
from typing import Optional
class RecoveryAborted(Exception):
pass
def process_share(words: str, mnemonic_type: int):
if mnemonic_type == mnemonic.TYPE_BIP39:
return _process_bip39(words)
else:
return _process_slip39(words)
def _process_bip39(words: str) -> bytes:
"""
Receives single mnemonic and processes it. Returns what is then stored
in the storage, which is the mnemonic itself for BIP-39.
"""
if not bip39.check(words):
raise MnemonicError()
return words.encode()
def _process_slip39(words: str) -> Optional[bytes]:
"""
Receives single mnemonic and processes it. Returns what is then stored in storage or
None if more shares are needed.
"""
identifier, iteration_exponent, _, _, _, index, threshold, value = slip39.decode_mnemonic(
words
) # TODO: use better data structure for this
if threshold == 1:
raise ValueError("Threshold equal to 1 is not allowed.")
# if this is the first share, parse and store metadata
if not storage.recovery.get_remaining():
storage.recovery.set_slip39_iteration_exponent(iteration_exponent)
storage.recovery.set_slip39_identifier(identifier)
storage.recovery.set_slip39_threshold(threshold)
storage.recovery.set_remaining(threshold - 1)
storage.recovery_shares.set(index, words)
return None # we need more shares
# These should be checked by UI before so it's a Runtime exception otherwise
if identifier != storage.recovery.get_slip39_identifier():
raise RuntimeError("Slip39: Share identifiers do not match")
if storage.recovery_shares.get(index):
raise RuntimeError("Slip39: This mnemonic was already entered")
# add mnemonic to storage
remaining = storage.recovery.get_remaining() - 1
storage.recovery.set_remaining(remaining)
storage.recovery_shares.set(index, words)
if remaining != 0:
return None # we need more shares
# combine shares and return the master secret
mnemonics = storage.recovery_shares.fetch()
identifier, iteration_exponent, secret = slip39.combine_mnemonics(mnemonics)
return secret

View File

@ -50,8 +50,8 @@ async def reset_device(ctx: wire.Context, msg: ResetDevice) -> Success:
secret = _compute_secret_from_entropy(int_entropy, ext_entropy, msg.strength) secret = _compute_secret_from_entropy(int_entropy, ext_entropy, msg.strength)
if is_slip39_simple: if is_slip39_simple:
storage.slip39.set_identifier(slip39.generate_random_identifier()) storage.device.set_slip39_identifier(slip39.generate_random_identifier())
storage.slip39.set_iteration_exponent(slip39.DEFAULT_ITERATION_EXPONENT) storage.device.set_slip39_iteration_exponent(slip39.DEFAULT_ITERATION_EXPONENT)
# should we back up the wallet now? # should we back up the wallet now?
if not msg.no_backup and not msg.skip_backup: if not msg.no_backup and not msg.skip_backup:
@ -75,13 +75,17 @@ async def reset_device(ctx: wire.Context, msg: ResetDevice) -> Success:
label=msg.label, use_passphrase=msg.passphrase_protection label=msg.label, use_passphrase=msg.passphrase_protection
) )
if is_slip39_simple: if is_slip39_simple:
mnemonic.slip39.store( storage.device.store_mnemonic_secret(
secret=secret, needs_backup=msg.skip_backup, no_backup=msg.no_backup secret,
mnemonic.TYPE_SLIP39,
needs_backup=msg.skip_backup,
no_backup=msg.no_backup,
) )
else: else:
# in BIP-39 we store mnemonic string instead of the secret # in BIP-39 we store mnemonic string instead of the secret
mnemonic.bip39.store( storage.device.store_mnemonic_secret(
secret=bip39.from_data(secret).encode(), bip39.from_data(secret).encode(),
mnemonic.TYPE_BIP39,
needs_backup=msg.skip_backup, needs_backup=msg.skip_backup,
no_backup=msg.no_backup, no_backup=msg.no_backup,
) )
@ -103,7 +107,9 @@ async def backup_slip39_wallet(ctx: wire.Context, secret: bytes) -> None:
threshold = await layout.slip39_prompt_threshold(ctx, shares_count) threshold = await layout.slip39_prompt_threshold(ctx, shares_count)
# generate the mnemonics # generate the mnemonics
mnemonics = mnemonic.slip39.generate_from_secret(secret, shares_count, threshold) mnemonics = slip39.generate_single_group_mnemonics_from_data(
secret, storage.device.get_slip39_identifier(), threshold, shares_count
)
# show and confirm individual shares # show and confirm individual shares
await layout.slip39_show_checklist_show_shares(ctx, shares_count, threshold) await layout.slip39_show_checklist_show_shares(ctx, shares_count, threshold)

View File

@ -6,56 +6,85 @@ import boot # noqa: F401
# prepare the USB interfaces, but do not connect to the host yet # prepare the USB interfaces, but do not connect to the host yet
import usb import usb
from trezor import loop, wire, workflow, utils from trezor import utils
# load applications # start the USB
import apps.homescreen
import apps.management
import apps.wallet
import apps.ethereum
import apps.lisk
import apps.monero
import apps.nem
import apps.stellar
import apps.ripple
import apps.cardano
import apps.tezos
import apps.eos
if __debug__:
import apps.debug
else:
import apps.webauthn
# boot applications
apps.homescreen.boot()
apps.management.boot()
apps.wallet.boot()
apps.ethereum.boot()
apps.lisk.boot()
apps.monero.boot()
apps.nem.boot()
apps.stellar.boot()
apps.ripple.boot()
apps.cardano.boot()
apps.tezos.boot()
apps.eos.boot()
if __debug__:
apps.debug.boot()
else:
apps.webauthn.boot(usb.iface_webauthn)
# initialize the wire codec and start the USB
wire.setup(usb.iface_wire)
if __debug__:
wire.setup(usb.iface_debug)
usb.bus.open() usb.bus.open()
# switch into unprivileged mode, as we don't need the extra permissions anymore # switch into unprivileged mode, as we don't need the extra permissions anymore
utils.set_mode_unprivileged() utils.set_mode_unprivileged()
# run main event loop and specify which screen is the default
from apps.homescreen.homescreen import homescreen
workflow.startdefault(homescreen) def _boot_recovery():
loop.run() # load applications
import apps.homescreen
# boot applications
apps.homescreen.boot(features_only=True)
from apps.management.recovery_device.homescreen import recovery_homescreen
loop.schedule(recovery_homescreen())
def _boot_default():
# load applications
import apps.homescreen
import apps.management
import apps.wallet
import apps.ethereum
import apps.lisk
import apps.monero
import apps.nem
import apps.stellar
import apps.ripple
import apps.cardano
import apps.tezos
import apps.eos
if __debug__:
import apps.debug
else:
import apps.webauthn
# boot applications
apps.homescreen.boot()
apps.management.boot()
apps.wallet.boot()
apps.ethereum.boot()
apps.lisk.boot()
apps.monero.boot()
apps.nem.boot()
apps.stellar.boot()
apps.ripple.boot()
apps.cardano.boot()
apps.tezos.boot()
apps.eos.boot()
if __debug__:
apps.debug.boot()
else:
apps.webauthn.boot(usb.iface_webauthn)
# run main event loop and specify which screen is the default
from apps.homescreen.homescreen import homescreen
workflow.startdefault(homescreen)
from trezor import loop, wire, workflow
from apps.common.storage import recovery
while True:
# initialize the wire codec
wire.setup(usb.iface_wire)
if __debug__:
wire.setup(usb.iface_debug)
# boot either in recovery or default mode
if recovery.is_in_progress():
_boot_recovery()
else:
_boot_default()
loop.run()
# loop is empty, reboot

View File

@ -21,6 +21,7 @@
from micropython import const from micropython import const
from trezor.crypto import hashlib, hmac, pbkdf2, random from trezor.crypto import hashlib, hmac, pbkdf2, random
from trezor.errors import MnemonicError
from trezorcrypto import shamir, slip39 from trezorcrypto import shamir, slip39
if False: if False:
@ -104,10 +105,6 @@ _DIGEST_INDEX = const(254)
"""The index of the share containing the digest of the shared secret.""" """The index of the share containing the digest of the shared secret."""
class MnemonicError(Exception):
pass
def _rs1024_polymod(values: Indices) -> int: def _rs1024_polymod(values: Indices) -> int:
GEN = ( GEN = (
0xE0E040, 0xE0E040,

15
core/src/trezor/errors.py Normal file
View File

@ -0,0 +1,15 @@
# TODO: review: I'm not sure where this should be, but the idea
# is to have a file containing all Runtime exceptions, that means
# exceptions that are internal only
class MnemonicError(RuntimeError):
pass
class IdentifierMismatchError(MnemonicError):
pass
class ShareAlreadyAddedError(MnemonicError):
pass

View File

@ -126,6 +126,15 @@ def run() -> None:
# rationale: We use untyped lists here, because that is what the C API supports. # rationale: We use untyped lists here, because that is what the C API supports.
def clear():
"""Clear all queue state. Any scheduled or paused tasks will be forgotten."""
_ = [0, 0, 0]
while _queue:
_queue.pop(_)
_paused.clear()
_finalizers.clear()
def _step(task: Task, value: Any) -> None: def _step(task: Task, value: Any) -> None:
try: try:
if isinstance(value, BaseException): if isinstance(value, BaseException):

View File

@ -15,3 +15,6 @@ MnemonicWordCount = 12
MnemonicInput = 13 MnemonicInput = 13
PassphraseType = 14 PassphraseType = 14
UnknownDerivationPath = 15 UnknownDerivationPath = 15
RecoveryHomepage = 16
Success = 17
Warning = 18

View File

@ -108,6 +108,22 @@ def header(
display.text(44, 35, title, BOLD, fg, bg) display.text(44, 35, title, BOLD, fg, bg)
def header_warning(message: str, clear=True) -> None:
# TODO: review: is the clear=True really needed?
display.bar(0, 0, WIDTH, 30, style.YELLOW)
display.text_center(WIDTH // 2, 22, message, BOLD, style.BLACK, style.YELLOW)
if clear:
display.bar(0, 30, WIDTH, HEIGHT - 30, style.BG)
def header_error(message: str, clear=True) -> None:
# TODO: review: as above
display.bar(0, 0, WIDTH, 30, style.RED)
display.text_center(WIDTH // 2, 22, message, BOLD, style.WHITE, style.RED)
if clear:
display.bar(0, 30, WIDTH, HEIGHT - 30, style.BG)
def grid( def grid(
i: int, i: int,
n_x: int = 3, n_x: int = 3,

View File

@ -58,6 +58,25 @@ def setup(iface: WireInterface) -> None:
loop.schedule(session_handler(iface, codec_v1.SESSION_ID)) loop.schedule(session_handler(iface, codec_v1.SESSION_ID))
def clear() -> None:
"""Remove all registered handlers."""
workflow_handlers.clear()
class DummyContext:
async def call(*argv):
pass
async def read(*argv):
pass
async def write(*argv):
pass
async def wait(self, *tasks: Awaitable) -> Any:
return await loop.spawn(*tasks)
class Context: class Context:
def __init__(self, iface: WireInterface, sid: int) -> None: def __init__(self, iface: WireInterface, sid: int) -> None:
self.iface = iface self.iface = iface

View File

@ -15,3 +15,6 @@ MnemonicWordCount = 12
MnemonicInput = 13 MnemonicInput = 13
PassphraseType = 14 PassphraseType = 14
UnknownDerivationPath = 15 UnknownDerivationPath = 15
RecoveryHomepage = 16
Success = 17
Warning = 18

View File

@ -46,6 +46,21 @@ class TestMsgRecoveryDeviceShamir(TrezorTest):
self.client.debug.press_yes() self.client.debug.press_yes()
ret = self.client.call_raw(proto.ButtonAck()) ret = self.client.call_raw(proto.ButtonAck())
# Homescreen - consider aborting process
assert isinstance(ret, proto.ButtonRequest)
self.client.debug.press_no()
ret = self.client.call_raw(proto.ButtonAck())
# Homescreen - but then bail out in the warning
assert isinstance(ret, proto.ButtonRequest)
self.client.debug.press_no()
ret = self.client.call_raw(proto.ButtonAck())
# Homescreen - click Enter
assert isinstance(ret, proto.ButtonRequest)
self.client.debug.press_yes()
ret = self.client.call_raw(proto.ButtonAck())
# Enter word count # Enter word count
assert ret == proto.ButtonRequest( assert ret == proto.ButtonRequest(
code=proto.ButtonRequestType.MnemonicWordCount code=proto.ButtonRequestType.MnemonicWordCount
@ -53,6 +68,11 @@ class TestMsgRecoveryDeviceShamir(TrezorTest):
self.client.debug.input(str(word_count)) self.client.debug.input(str(word_count))
ret = self.client.call_raw(proto.ButtonAck()) ret = self.client.call_raw(proto.ButtonAck())
# Homescreen
assert isinstance(ret, proto.ButtonRequest)
self.client.debug.press_yes()
ret = self.client.call_raw(proto.ButtonAck())
# Enter shares # Enter shares
for mnemonic in mnemonics: for mnemonic in mnemonics:
# Enter mnemonic words # Enter mnemonic words
@ -66,7 +86,7 @@ class TestMsgRecoveryDeviceShamir(TrezorTest):
ret = self.client.transport.read() ret = self.client.transport.read()
if mnemonic != mnemonics[-1]: if mnemonic != mnemonics[-1]:
# Confirm status # Homescreen
assert isinstance(ret, proto.ButtonRequest) assert isinstance(ret, proto.ButtonRequest)
self.client.debug.press_yes() self.client.debug.press_yes()
ret = self.client.call_raw(proto.ButtonAck()) ret = self.client.call_raw(proto.ButtonAck())
@ -108,6 +128,7 @@ class TestMsgRecoveryDeviceShamir(TrezorTest):
"hobo romp academic axis august founder knife legal recover alien expect emphasis loan kitchen involve teacher capture rebuild trial numb spider forward ladle lying voter typical security quantity hawk legs idle leaves gasoline", "hobo romp academic axis august founder knife legal recover alien expect emphasis loan kitchen involve teacher capture rebuild trial numb spider forward ladle lying voter typical security quantity hawk legs idle leaves gasoline",
"hobo romp academic agency ancestor industry argue sister scene midst graduate profile numb paid headset airport daisy flame express scene usual welcome quick silent downtown oral critical step remove says rhythm venture aunt", "hobo romp academic agency ancestor industry argue sister scene midst graduate profile numb paid headset airport daisy flame express scene usual welcome quick silent downtown oral critical step remove says rhythm venture aunt",
] ]
# TODO: add incorrect mnemonic to test
word_count = len(mnemonics[0].split(" ")) word_count = len(mnemonics[0].split(" "))
ret = self.client.call_raw( ret = self.client.call_raw(
@ -121,6 +142,21 @@ class TestMsgRecoveryDeviceShamir(TrezorTest):
self.client.debug.press_yes() self.client.debug.press_yes()
ret = self.client.call_raw(proto.ButtonAck()) ret = self.client.call_raw(proto.ButtonAck())
# Enter PIN for first time
assert ret == proto.ButtonRequest(code=proto.ButtonRequestType.Other)
self.client.debug.input("654")
ret = self.client.call_raw(proto.ButtonAck())
# Enter PIN for second time
assert ret == proto.ButtonRequest(code=proto.ButtonRequestType.Other)
self.client.debug.input("654")
ret = self.client.call_raw(proto.ButtonAck())
# Homescreen
assert isinstance(ret, proto.ButtonRequest)
self.client.debug.press_yes()
ret = self.client.call_raw(proto.ButtonAck())
# Enter word count # Enter word count
assert ret == proto.ButtonRequest( assert ret == proto.ButtonRequest(
code=proto.ButtonRequestType.MnemonicWordCount code=proto.ButtonRequestType.MnemonicWordCount
@ -128,6 +164,11 @@ class TestMsgRecoveryDeviceShamir(TrezorTest):
self.client.debug.input(str(word_count)) self.client.debug.input(str(word_count))
ret = self.client.call_raw(proto.ButtonAck()) ret = self.client.call_raw(proto.ButtonAck())
# Homescreen
assert isinstance(ret, proto.ButtonRequest)
self.client.debug.press_yes()
ret = self.client.call_raw(proto.ButtonAck())
# Enter shares # Enter shares
for mnemonic in mnemonics: for mnemonic in mnemonics:
# Enter mnemonic words # Enter mnemonic words
@ -141,21 +182,11 @@ class TestMsgRecoveryDeviceShamir(TrezorTest):
ret = self.client.transport.read() ret = self.client.transport.read()
if mnemonic != mnemonics[-1]: if mnemonic != mnemonics[-1]:
# Confirm status # Homescreen
assert isinstance(ret, proto.ButtonRequest) assert isinstance(ret, proto.ButtonRequest)
self.client.debug.press_yes() self.client.debug.press_yes()
ret = self.client.call_raw(proto.ButtonAck()) ret = self.client.call_raw(proto.ButtonAck())
# Enter PIN for first time
assert ret == proto.ButtonRequest(code=proto.ButtonRequestType.Other)
self.client.debug.input("654")
ret = self.client.call_raw(proto.ButtonAck())
# Enter PIN for second time
assert ret == proto.ButtonRequest(code=proto.ButtonRequestType.Other)
self.client.debug.input("654")
ret = self.client.call_raw(proto.ButtonAck())
# Confirm success # Confirm success
assert isinstance(ret, proto.ButtonRequest) assert isinstance(ret, proto.ButtonRequest)
self.client.debug.press_yes() self.client.debug.press_yes()
@ -194,3 +225,29 @@ class TestMsgRecoveryDeviceShamir(TrezorTest):
address = btc.get_address(self.client, "Bitcoin", []) address = btc.get_address(self.client, "Bitcoin", [])
assert address == "19Fjs9AvT13Y2Nx8GtoVfADmFWnccsPinQ" assert address == "19Fjs9AvT13Y2Nx8GtoVfADmFWnccsPinQ"
def test_abort(self):
ret = self.client.call_raw(
proto.RecoveryDevice(
passphrase_protection=False, pin_protection=False, label="label"
)
)
# Confirm Recovery
assert isinstance(ret, proto.ButtonRequest)
self.client.debug.press_yes()
ret = self.client.call_raw(proto.ButtonAck())
# Homescreen - abort process
assert isinstance(ret, proto.ButtonRequest)
self.client.debug.press_no()
ret = self.client.call_raw(proto.ButtonAck())
# Homescreen - yup, really
assert isinstance(ret, proto.ButtonRequest)
self.client.debug.press_yes()
ret = self.client.call_raw(proto.ButtonAck())
# check that the device is wiped
features = self.client.call_raw(proto.Initialize())
assert features.initialized is False

View File

@ -41,21 +41,6 @@ class TestMsgRecoverydeviceT2(TrezorTest):
self.client.debug.press_yes() self.client.debug.press_yes()
ret = self.client.call_raw(proto.ButtonAck()) ret = self.client.call_raw(proto.ButtonAck())
# Enter word count
assert ret == proto.ButtonRequest(
code=proto.ButtonRequestType.MnemonicWordCount
)
self.client.debug.input(str(len(mnemonic)))
ret = self.client.call_raw(proto.ButtonAck())
# Enter mnemonic words
assert ret == proto.ButtonRequest(code=proto.ButtonRequestType.MnemonicInput)
self.client.transport.write(proto.ButtonAck())
for word in mnemonic:
time.sleep(1)
self.client.debug.input(word)
ret = self.client.transport.read()
# Enter PIN for first time # Enter PIN for first time
assert ret == proto.ButtonRequest(code=proto.ButtonRequestType.Other) assert ret == proto.ButtonRequest(code=proto.ButtonRequestType.Other)
self.client.debug.input("654") self.client.debug.input("654")
@ -66,6 +51,31 @@ class TestMsgRecoverydeviceT2(TrezorTest):
self.client.debug.input("654") self.client.debug.input("654")
ret = self.client.call_raw(proto.ButtonAck()) ret = self.client.call_raw(proto.ButtonAck())
# Homescreen
assert isinstance(ret, proto.ButtonRequest)
self.client.debug.press_yes()
ret = self.client.call_raw(proto.ButtonAck())
# Enter word count
assert ret == proto.ButtonRequest(
code=proto.ButtonRequestType.MnemonicWordCount
)
self.client.debug.input(str(len(mnemonic)))
ret = self.client.call_raw(proto.ButtonAck())
# Homescreen
assert isinstance(ret, proto.ButtonRequest)
self.client.debug.press_yes()
ret = self.client.call_raw(proto.ButtonAck())
# Enter mnemonic words
assert ret == proto.ButtonRequest(code=proto.ButtonRequestType.MnemonicInput)
self.client.transport.write(proto.ButtonAck())
for word in mnemonic:
time.sleep(1)
self.client.debug.input(word)
ret = self.client.transport.read()
# Confirm success # Confirm success
assert isinstance(ret, proto.ButtonRequest) assert isinstance(ret, proto.ButtonRequest)
self.client.debug.press_yes() self.client.debug.press_yes()
@ -97,6 +107,11 @@ class TestMsgRecoverydeviceT2(TrezorTest):
self.client.debug.press_yes() self.client.debug.press_yes()
ret = self.client.call_raw(proto.ButtonAck()) ret = self.client.call_raw(proto.ButtonAck())
# Homescreen
assert isinstance(ret, proto.ButtonRequest)
self.client.debug.press_yes()
ret = self.client.call_raw(proto.ButtonAck())
# Enter word count # Enter word count
assert ret == proto.ButtonRequest( assert ret == proto.ButtonRequest(
code=proto.ButtonRequestType.MnemonicWordCount code=proto.ButtonRequestType.MnemonicWordCount
@ -104,6 +119,11 @@ class TestMsgRecoverydeviceT2(TrezorTest):
self.client.debug.input(str(len(mnemonic))) self.client.debug.input(str(len(mnemonic)))
ret = self.client.call_raw(proto.ButtonAck()) ret = self.client.call_raw(proto.ButtonAck())
# Homescreen
assert isinstance(ret, proto.ButtonRequest)
self.client.debug.press_yes()
ret = self.client.call_raw(proto.ButtonAck())
# Enter mnemonic words # Enter mnemonic words
assert ret == proto.ButtonRequest(code=proto.ButtonRequestType.MnemonicInput) assert ret == proto.ButtonRequest(code=proto.ButtonRequestType.MnemonicInput)
self.client.transport.write(proto.ButtonAck()) self.client.transport.write(proto.ButtonAck())