1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2025-01-13 17:00:59 +00:00

core/recovery: refactor word checks and add a test

This commit is contained in:
Tomas Susanka 2019-12-20 20:36:59 +00:00
parent a316347bf1
commit faa9078c2b
3 changed files with 160 additions and 76 deletions

View File

@ -1,13 +1,14 @@
import storage.recovery import storage.recovery
from trezor import ui, wire from trezor import ui, wire
from trezor.crypto.slip39 import MAX_SHARE_COUNT from trezor.crypto.slip39 import MAX_SHARE_COUNT
from trezor.messages import BackupType, ButtonRequestType from trezor.messages import ButtonRequestType
from trezor.messages.ButtonAck import ButtonAck from trezor.messages.ButtonAck import ButtonAck
from trezor.messages.ButtonRequest import ButtonRequest from trezor.messages.ButtonRequest import ButtonRequest
from trezor.ui.scroll import Paginated from trezor.ui.scroll import Paginated
from trezor.ui.text import Text from trezor.ui.text import Text
from trezor.ui.word_select import WordSelector from trezor.ui.word_select import WordSelector
from . import word_validity
from .keyboard_bip39 import Bip39Keyboard from .keyboard_bip39 import Bip39Keyboard
from .keyboard_slip39 import Slip39Keyboard from .keyboard_slip39 import Slip39Keyboard
from .recover import RecoveryAborted from .recover import RecoveryAborted
@ -15,7 +16,6 @@ from .recover import RecoveryAborted
from apps.common.confirm import confirm, info_confirm, require_confirm from apps.common.confirm import confirm, info_confirm, require_confirm
from apps.common.layout import show_success, show_warning from apps.common.layout import show_success, show_warning
from apps.management import backup_types from apps.management import backup_types
from apps.management.recovery_device import recover
if __debug__: if __debug__:
from apps.debug import input_signal from apps.debug import input_signal
@ -72,7 +72,14 @@ async def request_mnemonic(
else: else:
word = await ctx.wait(keyboard) word = await ctx.wait(keyboard)
if not await check_word_validity(ctx, i, word, backup_type, words): validity = word_validity.check(i, word, backup_type, words)
if validity != word_validity.OK:
if validity == word_validity.NOK_ALREADY_ADDED:
await show_share_already_added(ctx)
elif validity == word_validity.NOK_IDENTIFIER_MISMATCH:
await show_identifier_mismatch(ctx)
elif validity == word_validity.NOK_THRESHOLD_REACHED:
await show_group_threshold_reached(ctx)
return None return None
words.append(word) words.append(word)
@ -80,79 +87,6 @@ async def request_mnemonic(
return " ".join(words) return " ".join(words)
async def check_word_validity(
ctx: wire.GenericContext,
current_index: int,
current_word: str,
backup_type: Optional[EnumTypeBackupType],
previous_words: List[str],
) -> bool:
# we can't perform any checks if the backup type was not yet decided
if backup_type is None:
return True
# there are no "on-the-fly" checks for BIP-39
if backup_type is BackupType.Bip39:
return True
previous_mnemonics = recover.fetch_previous_mnemonics()
if previous_mnemonics is None:
# this should not happen if backup_type is set
raise RuntimeError
if backup_type == BackupType.Slip39_Basic:
# check if first 3 words of mnemonic match
# we can check against the first one, others were checked already
if current_index < 3:
share_list = previous_mnemonics[0][0].split(" ")
if share_list[current_index] != current_word:
await show_identifier_mismatch(ctx)
return False
elif current_index == 3:
for share in previous_mnemonics[0]:
share_list = share.split(" ")
# check if the fourth word is different from previous shares
if share_list[current_index] == current_word:
await show_share_already_added(ctx)
return False
elif backup_type == BackupType.Slip39_Advanced:
if current_index < 2:
share_list = next(s for s in previous_mnemonics if s)[0].split(" ")
if share_list[current_index] != current_word:
await show_identifier_mismatch(ctx)
return False
# check if we reached threshold in group
elif current_index == 2:
for i, group in enumerate(previous_mnemonics):
if len(group) > 0:
if current_word == group[0].split(" ")[current_index]:
remaining_shares = (
storage.recovery.fetch_slip39_remaining_shares()
)
# if backup_type is not None, some share was already entered -> remaining needs to be set
assert remaining_shares is not None
if remaining_shares[i] == 0:
await show_group_threshold_reached(ctx)
return False
# check if share was already added for group
elif current_index == 3:
# we use the 3rd word from previously entered shares to find the group id
group_identifier_word = previous_words[2]
group_index = None
for i, group in enumerate(previous_mnemonics):
if len(group) > 0:
if group_identifier_word == group[0].split(" ")[2]:
group_index = i
if group_index is not None:
group = previous_mnemonics[group_index]
for share in group:
if current_word == share.split(" ")[current_index]:
await show_share_already_added(ctx)
return False
return True
async def show_remaining_shares( async def show_remaining_shares(
ctx: wire.GenericContext, ctx: wire.GenericContext,
groups: Iterable[Tuple[int, Tuple[str, ...]]], # remaining + list 3 words groups: Iterable[Tuple[int, Tuple[str, ...]]], # remaining + list 3 words

View File

@ -0,0 +1,103 @@
from micropython import const
import storage.recovery
from trezor.messages import BackupType
from apps.management.recovery_device import recover
if False:
from typing import List, Optional
from trezor.messages.ResetDevice import EnumTypeBackupType
OK = const(0)
NOK_IDENTIFIER_MISMATCH = const(1)
NOK_ALREADY_ADDED = const(2)
NOK_THRESHOLD_REACHED = const(3)
def check(
current_index: int,
current_word: str,
backup_type: Optional[EnumTypeBackupType],
previous_words: List[str],
) -> int:
# we can't perform any checks if the backup type was not yet decided
if backup_type is None:
return OK
# there are no "on-the-fly" checks for BIP-39
if backup_type is BackupType.Bip39:
return OK
previous_mnemonics = recover.fetch_previous_mnemonics()
if previous_mnemonics is None:
# this should not happen if backup_type is set
raise RuntimeError
if backup_type == BackupType.Slip39_Basic:
return check_slip39_basic(current_index, current_word, previous_mnemonics)
if backup_type == BackupType.Slip39_Advanced:
return check_slip39_advanced(
current_index, current_word, previous_words, previous_mnemonics
)
# there are no other backup types
raise RuntimeError
def check_slip39_basic(
current_index: int, current_word: str, previous_mnemonics: List[List[str]]
) -> int:
# check if first 3 words of mnemonic match
# we can check against the first one, others were checked already
if current_index < 3:
share_list = previous_mnemonics[0][0].split(" ")
if share_list[current_index] != current_word:
return NOK_IDENTIFIER_MISMATCH
elif current_index == 3:
for share in previous_mnemonics[0]:
share_list = share.split(" ")
# check if the fourth word is different from previous shares
if share_list[current_index] == current_word:
return NOK_ALREADY_ADDED
return OK
def check_slip39_advanced(
current_index: int,
current_word: str,
previous_words: List[str],
previous_mnemonics: List[List[str]],
) -> int:
if current_index < 2:
share_list = next(s for s in previous_mnemonics if s)[0].split(" ")
if share_list[current_index] != current_word:
return NOK_IDENTIFIER_MISMATCH
# check if we reached threshold in group
elif current_index == 2:
for i, group in enumerate(previous_mnemonics):
if len(group) > 0:
if current_word == group[0].split(" ")[current_index]:
remaining_shares = storage.recovery.fetch_slip39_remaining_shares()
# if backup_type is not None, some share was already entered -> remaining needs to be set
assert remaining_shares is not None
if remaining_shares[i] == 0:
return NOK_THRESHOLD_REACHED
# check if share was already added for group
elif current_index == 3:
# we use the 3rd word from previously entered shares to find the group id
group_identifier_word = previous_words[2]
group_index = None
for i, group in enumerate(previous_mnemonics):
if len(group) > 0:
if group_identifier_word == group[0].split(" ")[2]:
group_index = i
if group_index is not None:
group = previous_mnemonics[group_index]
for share in group:
if current_word == share.split(" ")[current_index]:
return NOK_ALREADY_ADDED
return OK

View File

@ -140,6 +140,53 @@ class TestSlip39(unittest.TestCase):
secret, share = process_slip39(words) secret, share = process_slip39(words)
self.assertIsNone(secret) self.assertIsNone(secret)
@mock_storage
def test_check_word_validity(self):
from trezor.messages import BackupType
from apps.management.recovery_device.word_validity import check, OK, NOK_IDENTIFIER_MISMATCH, NOK_ALREADY_ADDED, NOK_THRESHOLD_REACHED
storage.recovery.set_in_progress(True)
# nothing is stored -> should raise
with self.assertRaises(RuntimeError):
check(0, "ocean", BackupType.Slip39_Advanced, [])
# if backup type is not set we can not do any checks
result = check(0, "ocean", None, [])
self.assertIs(result, OK)
# BIP-39 has no "on-the-fly" checks
result = check(0, "ocean", BackupType.Bip39, [])
self.assertIs(result, OK)
# let's store two shares in the storage
secret, share = process_slip39("trash smug adjust ambition criminal prisoner security math cover pecan response pharmacy center criminal salary elbow bracelet lunar briefing dragon")
self.assertIsNone(secret)
secret, share = process_slip39("trash smug adjust aide benefit temple round clogs devote prevent type cards clogs plastic aspect paper behavior lunar custody intimate")
self.assertIsNone(secret)
# different identifier
result = check(0, "slush", BackupType.Slip39_Advanced, [])
self.assertIs(result, NOK_IDENTIFIER_MISMATCH)
# same first word but still a different identifier
result = check(1, "slush", BackupType.Slip39_Advanced, ["trash"])
self.assertIs(result, NOK_IDENTIFIER_MISMATCH)
# same mnemonic found out using the index
result = check(3, "ambition", BackupType.Slip39_Advanced, ["trash", "smug", "adjust"])
self.assertIs(result, NOK_ALREADY_ADDED)
# Let's store two more. The group is 4/6 so this group is now complete.
secret, share = process_slip39("trash smug adjust arena beard quick language program true hush amount round geology should training practice language diet order ruin")
self.assertIsNone(secret)
secret, share = process_slip39("trash smug adjust beam brave sack magazine radar toxic emission domestic cradle vocal petition mule toxic acid hobo welcome downtown")
self.assertIsNone(secret)
# If trying to add another one from this group we get a warning.
result = check(2, "adjust", BackupType.Slip39_Advanced, ["trash", "smug"])
self.assertIs(result, NOK_THRESHOLD_REACHED)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()