From 46e4c02602987a630a7db4b0fe763f41fb2c4148 Mon Sep 17 00:00:00 2001 From: Tomas Susanka Date: Mon, 1 Jul 2019 09:45:30 +0200 Subject: [PATCH] core: refactor storage Each namespace now has its own file in apps.common.storage and storage/__init__ serves as an entry point. Updates #274. --- core/src/apps/common/cache.py | 2 +- core/src/apps/common/mnemonic/__init__.py | 4 +- core/src/apps/common/mnemonic/bip39.py | 4 +- core/src/apps/common/mnemonic/slip39.py | 45 +-- core/src/apps/common/request_passphrase.py | 4 +- core/src/apps/common/storage.py | 346 ------------------ core/src/apps/common/storage/__init__.py | 35 ++ core/src/apps/common/storage/common.py | 75 ++++ core/src/apps/common/storage/device.py | 201 ++++++++++ core/src/apps/common/storage/slip39.py | 72 ++++ .../apps/common/storage/slip39_mnemonics.py | 31 ++ core/src/apps/debug/__init__.py | 2 +- core/src/apps/homescreen/__init__.py | 14 +- core/src/apps/homescreen/homescreen.py | 14 +- core/src/apps/management/apply_flags.py | 2 +- core/src/apps/management/apply_settings.py | 6 +- core/src/apps/management/backup_device.py | 8 +- core/src/apps/management/load_device.py | 6 +- core/src/apps/management/recovery_device.py | 10 +- core/src/apps/management/reset_device.py | 8 +- core/src/apps/management/set_u2f_counter.py | 2 +- core/src/apps/webauthn/__init__.py | 2 +- core/src/boot.py | 6 +- core/src/usb.py | 2 +- core/tests/test_apps.common.storage.py | 16 +- 25 files changed, 498 insertions(+), 419 deletions(-) delete mode 100644 core/src/apps/common/storage.py create mode 100644 core/src/apps/common/storage/__init__.py create mode 100644 core/src/apps/common/storage/common.py create mode 100644 core/src/apps/common/storage/device.py create mode 100644 core/src/apps/common/storage/slip39.py create mode 100644 core/src/apps/common/storage/slip39_mnemonics.py diff --git a/core/src/apps/common/cache.py b/core/src/apps/common/cache.py index 1f0a8635b6..f16a603dbf 100644 --- a/core/src/apps/common/cache.py +++ b/core/src/apps/common/cache.py @@ -24,7 +24,7 @@ def get_state(prev_state: bytes = None, passphrase: str = None) -> bytes: def _compute_state(salt: bytes, passphrase: str) -> bytes: # state = HMAC(passphrase, salt || device_id) - message = salt + storage.get_device_id().encode() + message = salt + storage.device.get_device_id().encode() state = hmac.new(passphrase.encode(), message, hashlib.sha256).digest() return salt + state diff --git a/core/src/apps/common/mnemonic/__init__.py b/core/src/apps/common/mnemonic/__init__.py index b1e4622d39..62a1cb7340 100644 --- a/core/src/apps/common/mnemonic/__init__.py +++ b/core/src/apps/common/mnemonic/__init__.py @@ -15,8 +15,8 @@ TYPES_WORD_COUNT = {12: bip39, 18: bip39, 24: bip39, 20: slip39, 33: slip39} def get() -> (bytes, int): - mnemonic_secret = storage.get_mnemonic_secret() - mnemonic_type = storage.get_mnemonic_type() or TYPE_BIP39 + mnemonic_secret = storage.device.get_mnemonic_secret() + mnemonic_type = storage.device.get_mnemonic_type() or TYPE_BIP39 if mnemonic_type not in (TYPE_BIP39, TYPE_SLIP39): raise RuntimeError("Invalid mnemonic type") return mnemonic_secret, mnemonic_type diff --git a/core/src/apps/common/mnemonic/bip39.py b/core/src/apps/common/mnemonic/bip39.py index 63641b30fe..74e4ebe813 100644 --- a/core/src/apps/common/mnemonic/bip39.py +++ b/core/src/apps/common/mnemonic/bip39.py @@ -24,7 +24,9 @@ def process_all(mnemonics: list) -> bytes: def store(secret: bytes, needs_backup: bool, no_backup: bool): - storage.store_mnemonic(secret, mnemonic.TYPE_BIP39, needs_backup, no_backup) + storage.device.store_mnemonic_secret( + secret, mnemonic.TYPE_BIP39, needs_backup, no_backup + ) def get_seed(secret: bytes, passphrase: str, progress_bar=True): diff --git a/core/src/apps/common/mnemonic/slip39.py b/core/src/apps/common/mnemonic/slip39.py index aac6920c5d..3e54476c5a 100644 --- a/core/src/apps/common/mnemonic/slip39.py +++ b/core/src/apps/common/mnemonic/slip39.py @@ -8,7 +8,7 @@ def generate_from_secret(master_secret: bytes, count: int, threshold: int) -> li Generates new Shamir backup for 'master_secret'. Multiple groups are not yet supported. """ return slip39.generate_single_group_mnemonics_from_data( - master_secret, storage.get_slip39_identifier(), threshold, count + master_secret, storage.slip39.get_identifier(), threshold, count ) @@ -28,33 +28,33 @@ def process_single(mnemonic: str) -> bytes: raise ValueError("Threshold equal to 1 is not allowed.") # if recovery is not in progress already, start it and wait for more mnemonics - if not storage.is_slip39_in_progress(): - storage.set_slip39_in_progress(True) - storage.set_slip39_iteration_exponent(iteration_exponent) - storage.set_slip39_identifier(identifier) - storage.set_slip39_threshold(threshold) - storage.set_slip39_remaining(threshold - 1) - storage.set_slip39_words_count(len(mnemonic.split())) - storage.set_slip39_mnemonic(index, mnemonic) + if not storage.slip39.is_in_progress(): + storage.slip39.set_in_progress(True) + storage.slip39.set_iteration_exponent(iteration_exponent) + storage.slip39.set_identifier(identifier) + storage.slip39.set_threshold(threshold) + storage.slip39.set_remaining(threshold - 1) + storage.slip39.set_words_count(len(mnemonic.split())) + storage.slip39_mnemonics.set(index, mnemonic) return None # we need more shares # check identifier and member index of this share against stored values - if identifier != storage.get_slip39_identifier(): + if identifier != storage.slip39.get_identifier(): # TODO: improve UX (tell user) raise ValueError("Share identifiers do not match") - if storage.get_slip39_mnemonic(index): + if storage.slip39_mnemonics.get(index): # TODO: improve UX (tell user) raise ValueError("This mnemonic was already entered") # append to storage - remaining = storage.get_slip39_remaining() - 1 - storage.set_slip39_remaining(remaining) - storage.set_slip39_mnemonic(index, mnemonic) + remaining = storage.slip39.get_remaining() - 1 + storage.slip39.set_remaining(remaining) + storage.slip39.set(index, mnemonic) if remaining != 0: return None # we need more shares # combine shares and return the master secret - mnemonics = storage.get_slip39_mnemonics() + mnemonics = storage.slip39_mnemonics.fetch() if len(mnemonics) != threshold: raise ValueError("Some mnemonics are still missing.") _, _, secret = slip39.combine_mnemonics(mnemonics) @@ -67,21 +67,24 @@ def process_all(mnemonics: list) -> bytes: stored in the storage. """ identifier, iteration_exponent, secret = slip39.combine_mnemonics(mnemonics) - storage.set_slip39_iteration_exponent(iteration_exponent) - storage.set_slip39_identifier(identifier) + storage.slip39.set_iteration_exponent(iteration_exponent) + storage.slip39.set_identifier(identifier) return secret def store(secret: bytes, needs_backup: bool, no_backup: bool): - storage.store_mnemonic(secret, mnemonic.TYPE_SLIP39, needs_backup, no_backup) - storage.clear_slip39_data() + storage.device.store_mnemonic_secret( + secret, mnemonic.TYPE_SLIP39, needs_backup, no_backup + ) + storage.slip39.delete_progress() def get_seed(encrypted_master_secret: bytes, passphrase: str, progress_bar=True): if progress_bar: mnemonic._start_progress() - identifier = storage.get_slip39_identifier() - iteration_exponent = storage.get_slip39_iteration_exponent() + identifier = storage.slip39.get_identifier() + iteration_exponent = storage.slip39.get_iteration_exponent() + master_secret = slip39.decrypt( identifier, iteration_exponent, encrypted_master_secret, passphrase ) diff --git a/core/src/apps/common/request_passphrase.py b/core/src/apps/common/request_passphrase.py index 66821e278d..e293ff2ffe 100644 --- a/core/src/apps/common/request_passphrase.py +++ b/core/src/apps/common/request_passphrase.py @@ -18,14 +18,14 @@ _MAX_PASSPHRASE_LEN = const(50) async def protect_by_passphrase(ctx) -> str: - if storage.has_passphrase(): + if storage.device.has_passphrase(): return await request_passphrase(ctx) else: return "" async def request_passphrase(ctx) -> str: - source = storage.get_passphrase_source() + source = storage.device.get_passphrase_source() if source == PassphraseSourceType.ASK: source = await request_passphrase_source(ctx) passphrase = await request_passphrase_ack( diff --git a/core/src/apps/common/storage.py b/core/src/apps/common/storage.py deleted file mode 100644 index 0974c67b1e..0000000000 --- a/core/src/apps/common/storage.py +++ /dev/null @@ -1,346 +0,0 @@ -from micropython import const -from ubinascii import hexlify - -from trezor import config -from trezor.crypto import random, slip39 - -from apps.common import cache - -HOMESCREEN_MAXSIZE = 16384 - -_STORAGE_VERSION = b"\x02" -_FALSE_BYTE = b"\x00" -_TRUE_BYTE = b"\x01" -_COUNTER_HEAD_LEN = 4 -_COUNTER_TAIL_LEN = 8 - -# fmt: off -_APP = const(0x01) # app namespace -_DEVICE_ID = const(0x00) # bytes -_VERSION = const(0x01) # int -_MNEMONIC_SECRET = const(0x02) # bytes -_LANGUAGE = const(0x03) # str -_LABEL = const(0x04) # str -_USE_PASSPHRASE = const(0x05) # bool (0x01 or empty) -_HOMESCREEN = const(0x06) # bytes -_NEEDS_BACKUP = const(0x07) # bool (0x01 or empty) -_FLAGS = const(0x08) # int -_U2F_COUNTER = const(0x09) # int -_PASSPHRASE_SOURCE = const(0x0A) # int -_UNFINISHED_BACKUP = const(0x0B) # bool (0x01 or empty) -_AUTOLOCK_DELAY_MS = const(0x0C) # int -_NO_BACKUP = const(0x0D) # bool (0x01 or empty) -_MNEMONIC_TYPE = const(0x0E) # int -_ROTATION = const(0x0F) # int - -_SLIP39 = const(0x02) # SLIP-39 namespace -_SLIP39_IN_PROGRESS = const(0x00) # bool -_SLIP39_IDENTIFIER = const(0x01) # bytes -_SLIP39_THRESHOLD = const(0x02) # int -_SLIP39_REMAINING = const(0x03) # int -_SLIP39_WORDS_COUNT = const(0x04) # int -_SLIP39_ITERATION_EXPONENT = const(0x05) # int - -# Mnemonics stored during SLIP-39 recovery process. -# Each mnemonic is stored under key = index. -_SLIP39_MNEMONICS = const(0x03) # SLIP-39 mnemonics namespace - -# fmt: on - - -def set_slip39_in_progress(val: bool): - _set_bool(_SLIP39, _SLIP39_IN_PROGRESS, val) - - -def is_slip39_in_progress(): - return _get_bool(_SLIP39, _SLIP39_IN_PROGRESS) - - -def set_slip39_identifier(identifier: int): - _set_uint16(_SLIP39, _SLIP39_IDENTIFIER, identifier) - - -def get_slip39_identifier() -> int: - return _get_uint16(_SLIP39, _SLIP39_IDENTIFIER) - - -def set_slip39_threshold(threshold: int): - _set_uint8(_SLIP39, _SLIP39_THRESHOLD, threshold) - - -def get_slip39_threshold() -> int: - return _get_uint8(_SLIP39, _SLIP39_THRESHOLD) - - -def set_slip39_remaining(remaining: int): - _set_uint8(_SLIP39, _SLIP39_REMAINING, remaining) - - -def get_slip39_remaining() -> int: - return _get_uint8(_SLIP39, _SLIP39_REMAINING) - - -def set_slip39_words_count(count: int): - _set_uint8(_SLIP39, _SLIP39_WORDS_COUNT, count) - - -def get_slip39_words_count() -> int: - return _get_uint8(_SLIP39, _SLIP39_WORDS_COUNT) - - -def set_slip39_iteration_exponent(exponent: int): - # TODO: check if not > 5 bits - _set_uint8(_SLIP39, _SLIP39_ITERATION_EXPONENT, exponent) - - -def get_slip39_iteration_exponent() -> int: - return _get_uint8(_SLIP39, _SLIP39_ITERATION_EXPONENT) - - -def set_slip39_mnemonic(index: int, mnemonic: str): - config.set(_SLIP39_MNEMONICS, index, mnemonic.encode()) - - -def get_slip39_mnemonic(index: int) -> str: - m = config.get(_SLIP39_MNEMONICS, index) - if m: - return m.decode() - return False - - -def get_slip39_mnemonics() -> list: - mnemonics = [] - for index in range(0, slip39.MAX_SHARE_COUNT): - m = get_slip39_mnemonic(index) - if m: - mnemonics.append(m) - return mnemonics - - -def clear_slip39_data(): - config.delete(_SLIP39, _SLIP39_IN_PROGRESS) - config.delete(_SLIP39, _SLIP39_REMAINING) - config.delete(_SLIP39, _SLIP39_THRESHOLD) - config.delete(_SLIP39, _SLIP39_WORDS_COUNT) - for index in (0, slip39.MAX_SHARE_COUNT): - config.delete(_SLIP39_MNEMONICS, index) - - -def _set_bool(app: int, key: int, value: bool, public: bool = False) -> None: - if value: - config.set(app, key, _TRUE_BYTE, public) - else: - config.set(app, key, _FALSE_BYTE, public) - - -def _get_bool(app: int, key: int, public: bool = False) -> bool: - return config.get(app, key, public) == _TRUE_BYTE - - -def _set_uint8(app: int, key: int, val: int): - config.set(app, key, val.to_bytes(1, "big")) - - -def _get_uint8(app: int, key: int) -> int: - val = config.get(app, key) - if not val: - return None - return int.from_bytes(val, "big") - - -def _set_uint16(app: int, key: int, val: int): - config.set(app, key, val.to_bytes(2, "big")) - - -def _get_uint16(app: int, key: int) -> int: - val = config.get(app, key) - if not val: - return None - return int.from_bytes(val, "big") - - -def _new_device_id() -> str: - return hexlify(random.bytes(12)).decode().upper() - - -def get_device_id() -> str: - dev_id = config.get(_APP, _DEVICE_ID, True) # public - if not dev_id: - dev_id = _new_device_id().encode() - config.set(_APP, _DEVICE_ID, dev_id, True) # public - return dev_id.decode() - - -def get_rotation() -> int: - rotation = config.get(_APP, _ROTATION, True) # public - if not rotation: - return 0 - return int.from_bytes(rotation, "big") - - -def is_initialized() -> bool: - return bool(config.get(_APP, _VERSION)) and not bool( - config.get(_SLIP39, _SLIP39_IN_PROGRESS) - ) - - -def get_label() -> str: - label = config.get(_APP, _LABEL, True) # public - if label is None: - return None - return label.decode() - - -def get_mnemonic_secret() -> bytes: - return config.get(_APP, _MNEMONIC_SECRET) - - -def get_mnemonic_type() -> int: - return _get_uint8(_APP, _MNEMONIC_TYPE) - - -def has_passphrase() -> bool: - return _get_bool(_APP, _USE_PASSPHRASE) - - -def get_homescreen() -> bytes: - return config.get(_APP, _HOMESCREEN, True) # public - - -def store_mnemonic( - secret: bytes, - mnemonic_type: int, - needs_backup: bool = False, - no_backup: bool = False, -) -> None: - config.set(_APP, _MNEMONIC_SECRET, secret) - _set_uint8(_APP, _MNEMONIC_TYPE, mnemonic_type) - _init(needs_backup, no_backup) - - -def _init(needs_backup=False, no_backup=False): - config.set(_APP, _VERSION, _STORAGE_VERSION) - _set_bool(_APP, _NO_BACKUP, no_backup) - if not no_backup: - _set_bool(_APP, _NEEDS_BACKUP, needs_backup) - - -def needs_backup() -> bool: - return _get_bool(_APP, _NEEDS_BACKUP) - - -def set_backed_up() -> None: - config.set(_APP, _NEEDS_BACKUP, b"") - - -def unfinished_backup() -> bool: - return _get_bool(_APP, _UNFINISHED_BACKUP) - - -def set_unfinished_backup(state: bool) -> None: - _set_bool(_APP, _UNFINISHED_BACKUP, state) - - -def no_backup() -> bool: - return _get_bool(_APP, _NO_BACKUP) - - -def get_passphrase_source() -> int: - b = config.get(_APP, _PASSPHRASE_SOURCE) - if b == b"\x01": - return 1 - elif b == b"\x02": - return 2 - else: - return 0 - - -def load_settings( - label: str = None, - use_passphrase: bool = None, - homescreen: bytes = None, - passphrase_source: int = None, - display_rotation: int = None, -) -> None: - if label is not None: - config.set(_APP, _LABEL, label.encode(), True) # public - if use_passphrase is not None: - _set_bool(_APP, _USE_PASSPHRASE, use_passphrase) - if homescreen is not None: - if homescreen[:8] == b"TOIf\x90\x00\x90\x00": - if len(homescreen) <= HOMESCREEN_MAXSIZE: - config.set(_APP, _HOMESCREEN, homescreen, True) # public - else: - config.set(_APP, _HOMESCREEN, b"", True) # public - if passphrase_source is not None: - if passphrase_source in (0, 1, 2): - config.set(_APP, _PASSPHRASE_SOURCE, bytes([passphrase_source])) - if display_rotation is not None: - if display_rotation not in (0, 90, 180, 270): - raise ValueError( - "Unsupported display rotation degrees: %d" % display_rotation - ) - else: - config.set( - _APP, _ROTATION, display_rotation.to_bytes(2, "big"), True - ) # public - - -def get_flags() -> int: - b = config.get(_APP, _FLAGS) - if b is None: - return 0 - else: - return int.from_bytes(b, "big") - - -def set_flags(flags: int) -> None: - b = config.get(_APP, _FLAGS) - if b is None: - b = 0 - else: - b = int.from_bytes(b, "big") - flags = (flags | b) & 0xFFFFFFFF - if flags != b: - config.set(_APP, _FLAGS, flags.to_bytes(4, "big")) - - -def get_autolock_delay_ms() -> int: - b = config.get(_APP, _AUTOLOCK_DELAY_MS) - if b is None: - return 10 * 60 * 1000 - else: - return int.from_bytes(b, "big") - - -def set_autolock_delay_ms(delay_ms: int) -> None: - if delay_ms < 60 * 1000: - delay_ms = 60 * 1000 - config.set(_APP, _AUTOLOCK_DELAY_MS, delay_ms.to_bytes(4, "big")) - - -def next_u2f_counter() -> int: - return config.next_counter(_APP, _U2F_COUNTER, True) # writable when locked - - -def set_u2f_counter(cntr: int) -> None: - config.set_counter(_APP, _U2F_COUNTER, cntr, True) # writable when locked - - -def wipe(): - config.wipe() - cache.clear() - - -def init_unlocked(): - # Check for storage version upgrade. - version = config.get(_APP, _VERSION) - if version == b"\x01": - # Make the U2F counter public and writable even when storage is locked. - counter = config.get(_APP, _U2F_COUNTER) - if counter is not None: - config.set_counter( - _APP, _U2F_COUNTER, int.from_bytes(counter, "big"), True - ) # writable when locked - config.delete(_APP, _U2F_COUNTER) - config.set(_APP, _VERSION, _STORAGE_VERSION) diff --git a/core/src/apps/common/storage/__init__.py b/core/src/apps/common/storage/__init__.py new file mode 100644 index 0000000000..15e4238b3a --- /dev/null +++ b/core/src/apps/common/storage/__init__.py @@ -0,0 +1,35 @@ +from trezor import config + +from apps.common import cache +from apps.common.storage import common, device, slip39 + + +def set_current_version(): + device.set_version(common._STORAGE_VERSION_CURRENT) + + +def is_initialized() -> bool: + return device.is_version_stored() and not slip39.is_in_progress() + + +def wipe(): + config.wipe() + cache.clear() + + +def init_unlocked(): + # Check for storage version upgrade. + version = device.get_version() + if version == common._STORAGE_VERSION_01: + _migrate_from_version_01() + + +def _migrate_from_version_01(): + # Make the U2F counter public and writable even when storage is locked. + # U2F counter wasn't public, so we are intentionally not using storage.device module. + counter = common._get(common._APP_DEVICE, device._U2F_COUNTER) + if counter is not None: + device.set_u2f_counter(int.from_bytes(counter, "big")) + # Delete the old, non-public U2F_COUNTER. + common._delete(common._APP_DEVICE, device._U2F_COUNTER) + set_current_version() diff --git a/core/src/apps/common/storage/common.py b/core/src/apps/common/storage/common.py new file mode 100644 index 0000000000..16eda101af --- /dev/null +++ b/core/src/apps/common/storage/common.py @@ -0,0 +1,75 @@ +from trezor import config + +# Namespaces: +# fmt: off +# Intentionally not using const() to allow import in submodules. +_APP_DEVICE = 0x01 +_APP_SLIP39 = 0x02 +_APP_SLIP39_MNEMONICS = 0x03 +# fmt: on + +_FALSE_BYTE = b"\x00" +_TRUE_BYTE = b"\x01" + +_STORAGE_VERSION_01 = b"\x01" +_STORAGE_VERSION_CURRENT = b"\x02" + + +def _set(app: int, key: int, data: bytes, public: bool = False): + config.set(app, key, data, public) + + +def _get(app: int, key: int, public: bool = False): + return config.get(app, key, public) + + +def _delete(app: int, key: int): + config.delete(app, key) + + +def _set_true_or_delete(app: int, key: int, value: bool): + if value: + _set_bool(app, key, value) + else: + _delete(app, key) + + +def _set_bool(app: int, key: int, value: bool, public: bool = False) -> None: + if value: + _set(app, key, _TRUE_BYTE, public) + else: + _set(app, key, _FALSE_BYTE, public) + + +def _get_bool(app: int, key: int, public: bool = False) -> bool: + return _get(app, key, public) == _TRUE_BYTE + + +def _set_uint8(app: int, key: int, val: int): + _set(app, key, val.to_bytes(1, "big")) + + +def _get_uint8(app: int, key: int) -> int: + val = _get(app, key) + if not val: + return None + return int.from_bytes(val, "big") + + +def _set_uint16(app: int, key: int, val: int): + _set(app, key, val.to_bytes(2, "big")) + + +def _get_uint16(app: int, key: int) -> int: + val = _get(app, key) + if not val: + return None + return int.from_bytes(val, "big") + + +def _next_counter(app: int, key: int, public: bool = False): + return config.next_counter(app, key, public) + + +def _set_counter(app: int, key: int, count: int, public: bool = False) -> None: + config.set_counter(app, key, count, public) diff --git a/core/src/apps/common/storage/device.py b/core/src/apps/common/storage/device.py new file mode 100644 index 0000000000..83bc6b77bb --- /dev/null +++ b/core/src/apps/common/storage/device.py @@ -0,0 +1,201 @@ +from micropython import const +from ubinascii import hexlify + +from trezor.crypto import random + +from apps.common.storage import common + +# Namespace: +_NAMESPACE = common._APP_DEVICE + +# fmt: off +# Keys: +_DEVICE_ID = const(0x00) # bytes +_VERSION = const(0x01) # int +_MNEMONIC_SECRET = const(0x02) # bytes +_LANGUAGE = const(0x03) # str +_LABEL = const(0x04) # str +_USE_PASSPHRASE = const(0x05) # bool (0x01 or empty) +_HOMESCREEN = const(0x06) # bytes +_NEEDS_BACKUP = const(0x07) # bool (0x01 or empty) +_FLAGS = const(0x08) # int +_U2F_COUNTER = const(0x09) # int +_PASSPHRASE_SOURCE = const(0x0A) # int +_UNFINISHED_BACKUP = const(0x0B) # bool (0x01 or empty) +_AUTOLOCK_DELAY_MS = const(0x0C) # int +_NO_BACKUP = const(0x0D) # bool (0x01 or empty) +_MNEMONIC_TYPE = const(0x0E) # int +_ROTATION = const(0x0F) # int +# fmt: on + +HOMESCREEN_MAXSIZE = 16384 + + +def is_version_stored() -> bool: + return bool(common._get(_NAMESPACE, _VERSION)) + + +def get_version() -> bool: + return common._get(_NAMESPACE, _VERSION) + + +def set_version(version: bytes) -> bool: + return common._set(_NAMESPACE, _VERSION, version) + + +def _new_device_id() -> str: + return hexlify(random.bytes(12)).decode().upper() + + +def get_device_id() -> str: + dev_id = common._get(_NAMESPACE, _DEVICE_ID, True) # public + if not dev_id: + dev_id = _new_device_id().encode() + common._set(_NAMESPACE, _DEVICE_ID, dev_id, True) # public + return dev_id.decode() + + +def get_rotation() -> int: + rotation = common._get(_NAMESPACE, _ROTATION, True) # public + if not rotation: + return 0 + return int.from_bytes(rotation, "big") + + +def get_label() -> str: + label = common._get(_NAMESPACE, _LABEL, True) # public + if label is None: + return None + return label.decode() + + +def get_mnemonic_secret() -> bytes: + return common._get(_NAMESPACE, _MNEMONIC_SECRET) + + +def get_mnemonic_type() -> int: + return common._get_uint8(_NAMESPACE, _MNEMONIC_TYPE) + + +def has_passphrase() -> bool: + return common._get_bool(_NAMESPACE, _USE_PASSPHRASE) + + +def get_homescreen() -> bytes: + return common._get(_NAMESPACE, _HOMESCREEN, True) # public + + +def store_mnemonic_secret( + secret: bytes, + mnemonic_type: int, + needs_backup: bool = False, + no_backup: bool = False, +) -> None: + set_version(common._STORAGE_VERSION_CURRENT) + common._set(_NAMESPACE, _MNEMONIC_SECRET, secret) + common._set_uint8(_NAMESPACE, _MNEMONIC_TYPE, mnemonic_type) + common._set_true_or_delete(_NAMESPACE, _NO_BACKUP, no_backup) + if not no_backup: + common._set_true_or_delete(_NAMESPACE, _NEEDS_BACKUP, needs_backup) + + +def needs_backup() -> bool: + return common._get_bool(_NAMESPACE, _NEEDS_BACKUP) + + +def set_backed_up() -> None: + common._delete(_NAMESPACE, _NEEDS_BACKUP) + + +def unfinished_backup() -> bool: + return common._get_bool(_NAMESPACE, _UNFINISHED_BACKUP) + + +def set_unfinished_backup(state: bool) -> None: + common._set_bool(_NAMESPACE, _UNFINISHED_BACKUP, state) + + +def no_backup() -> bool: + return common._get_bool(_NAMESPACE, _NO_BACKUP) + + +def get_passphrase_source() -> int: + b = common._get(_NAMESPACE, _PASSPHRASE_SOURCE) + if b == b"\x01": + return 1 + elif b == b"\x02": + return 2 + else: + return 0 + + +def load_settings( + label: str = None, + use_passphrase: bool = None, + homescreen: bytes = None, + passphrase_source: int = None, + display_rotation: int = None, +) -> None: + if label is not None: + common._set(_NAMESPACE, _LABEL, label.encode(), True) # public + if use_passphrase is not None: + common._set_bool(_NAMESPACE, _USE_PASSPHRASE, use_passphrase) + if homescreen is not None: + if homescreen[:8] == b"TOIf\x90\x00\x90\x00": + if len(homescreen) <= HOMESCREEN_MAXSIZE: + common._set(_NAMESPACE, _HOMESCREEN, homescreen, True) # public + else: + common._set(_NAMESPACE, _HOMESCREEN, b"", True) # public + if passphrase_source is not None: + if passphrase_source in (0, 1, 2): + common._set(_NAMESPACE, _PASSPHRASE_SOURCE, bytes([passphrase_source])) + if display_rotation is not None: + if display_rotation not in (0, 90, 180, 270): + raise ValueError( + "Unsupported display rotation degrees: %d" % display_rotation + ) + else: + common._set( + _NAMESPACE, _ROTATION, display_rotation.to_bytes(2, "big"), True + ) # public + + +def get_flags() -> int: + b = common._get(_NAMESPACE, _FLAGS) + if b is None: + return 0 + else: + return int.from_bytes(b, "big") + + +def set_flags(flags: int) -> None: + b = common._get(_NAMESPACE, _FLAGS) + if b is None: + b = 0 + else: + b = int.from_bytes(b, "big") + flags = (flags | b) & 0xFFFFFFFF + if flags != b: + common._set(_NAMESPACE, _FLAGS, flags.to_bytes(4, "big")) + + +def get_autolock_delay_ms() -> int: + b = common._get(_NAMESPACE, _AUTOLOCK_DELAY_MS) + if b is None: + return 10 * 60 * 1000 + else: + return int.from_bytes(b, "big") + + +def set_autolock_delay_ms(delay_ms: int) -> None: + if delay_ms < 60 * 1000: + delay_ms = 60 * 1000 + common._set(_NAMESPACE, _AUTOLOCK_DELAY_MS, delay_ms.to_bytes(4, "big")) + + +def next_u2f_counter() -> int: + return common._next_counter(_NAMESPACE, _U2F_COUNTER, True) # writable when locked + + +def set_u2f_counter(count: int) -> None: + common._set_counter(_NAMESPACE, _U2F_COUNTER, count, True) # writable when locked diff --git a/core/src/apps/common/storage/slip39.py b/core/src/apps/common/storage/slip39.py new file mode 100644 index 0000000000..140e3b7003 --- /dev/null +++ b/core/src/apps/common/storage/slip39.py @@ -0,0 +1,72 @@ +from micropython import const + +from apps.common.storage import common, slip39_mnemonics + +# Namespace: +_NAMESPACE = common._APP_SLIP39 + +# fmt: off +# Keys: +_SLIP39_IN_PROGRESS = const(0x00) # bool +_SLIP39_IDENTIFIER = const(0x01) # bytes +_SLIP39_THRESHOLD = const(0x02) # int +_SLIP39_REMAINING = const(0x03) # int +_SLIP39_WORDS_COUNT = const(0x04) # int +_SLIP39_ITERATION_EXPONENT = const(0x05) # int +# fmt: on + + +def set_in_progress(val: bool): + common._set_bool(_NAMESPACE, _SLIP39_IN_PROGRESS, val) + + +def is_in_progress(): + return common._get_bool(_NAMESPACE, _SLIP39_IN_PROGRESS) + + +def set_identifier(identifier: int): + common._set_uint16(_NAMESPACE, _SLIP39_IDENTIFIER, identifier) + + +def get_identifier() -> int: + return common._get_uint16(_NAMESPACE, _SLIP39_IDENTIFIER) + + +def set_threshold(threshold: int): + common._set_uint8(_NAMESPACE, _SLIP39_THRESHOLD, threshold) + + +def get_threshold() -> int: + return common._get_uint8(_NAMESPACE, _SLIP39_THRESHOLD) + + +def set_remaining(remaining: int): + common._set_uint8(_NAMESPACE, _SLIP39_REMAINING, remaining) + + +def get_remaining() -> int: + return common._get_uint8(_NAMESPACE, _SLIP39_REMAINING) + + +def set_words_count(count: int): + common._set_uint8(_NAMESPACE, _SLIP39_WORDS_COUNT, count) + + +def get_words_count() -> int: + return common._get_uint8(_NAMESPACE, _SLIP39_WORDS_COUNT) + + +def set_iteration_exponent(exponent: int): + common._set_uint8(_NAMESPACE, _SLIP39_ITERATION_EXPONENT, exponent) + + +def get_iteration_exponent() -> int: + return common._get_uint8(_NAMESPACE, _SLIP39_ITERATION_EXPONENT) + + +def delete_progress(): + common._delete(_NAMESPACE, _SLIP39_IN_PROGRESS) + common._delete(_NAMESPACE, _SLIP39_REMAINING) + common._delete(_NAMESPACE, _SLIP39_THRESHOLD) + common._delete(_NAMESPACE, _SLIP39_WORDS_COUNT) + slip39_mnemonics.delete() diff --git a/core/src/apps/common/storage/slip39_mnemonics.py b/core/src/apps/common/storage/slip39_mnemonics.py new file mode 100644 index 0000000000..b6e796632c --- /dev/null +++ b/core/src/apps/common/storage/slip39_mnemonics.py @@ -0,0 +1,31 @@ +from trezor.crypto import slip39 + +from apps.common.storage import common + +# Mnemonics stored during SLIP-39 recovery process. +# Each mnemonic is stored under key = index. + + +def set(index: int, mnemonic: str): + common._set(common._APP_SLIP39_MNEMONICS, index, mnemonic.encode()) + + +def get(index: int) -> str: + m = common._get(common._APP_SLIP39_MNEMONICS, index) + if m: + return m.decode() + return False + + +def fetch() -> list: + mnemonics = [] + for index in range(0, slip39.MAX_SHARE_COUNT): + m = get(index) + if m: + mnemonics.append(m) + return mnemonics + + +def delete(): + for index in range(0, slip39.MAX_SHARE_COUNT): + common._delete(common._APP_SLIP39_MNEMONICS, index) diff --git a/core/src/apps/debug/__init__.py b/core/src/apps/debug/__init__.py index c5c46f5410..c6d7edabd4 100644 --- a/core/src/apps/debug/__init__.py +++ b/core/src/apps/debug/__init__.py @@ -32,7 +32,7 @@ if __debug__: m = DebugLinkState() m.mnemonic_secret, m.mnemonic_type = mnemonic.get() - m.passphrase_protection = storage.has_passphrase() + m.passphrase_protection = storage.device.has_passphrase() m.reset_word_pos = reset_word_index m.reset_entropy = reset_internal_entropy if reset_current_words: diff --git a/core/src/apps/homescreen/__init__.py b/core/src/apps/homescreen/__init__.py index 858795995a..7fd95958bd 100644 --- a/core/src/apps/homescreen/__init__.py +++ b/core/src/apps/homescreen/__init__.py @@ -16,17 +16,17 @@ def get_features(): f.patch_version = utils.VERSION_PATCH f.revision = utils.GITREV f.model = utils.MODEL - f.device_id = storage.get_device_id() - f.label = storage.get_label() + f.device_id = storage.device.get_device_id() + f.label = storage.device.get_label() f.initialized = storage.is_initialized() f.pin_protection = config.has_pin() f.pin_cached = config.has_pin() - f.passphrase_protection = storage.has_passphrase() + f.passphrase_protection = storage.device.has_passphrase() f.passphrase_cached = cache.has_passphrase() - f.needs_backup = storage.needs_backup() - f.unfinished_backup = storage.unfinished_backup() - f.no_backup = storage.no_backup() - f.flags = storage.get_flags() + f.needs_backup = storage.device.needs_backup() + f.unfinished_backup = storage.device.unfinished_backup() + f.no_backup = storage.device.no_backup() + f.flags = storage.device.get_flags() return f diff --git a/core/src/apps/homescreen/homescreen.py b/core/src/apps/homescreen/homescreen.py index 4a229063a0..177e52990a 100644 --- a/core/src/apps/homescreen/homescreen.py +++ b/core/src/apps/homescreen/homescreen.py @@ -17,26 +17,26 @@ async def homescreen(): def display_homescreen(): image = None - if storage.is_slip39_in_progress(): + if storage.slip39.is_in_progress(): label = "Waiting for other shares" elif not storage.is_initialized(): label = "Go to trezor.io/start" else: - label = storage.get_label() or "My Trezor" - image = storage.get_homescreen() + label = storage.device.get_label() or "My Trezor" + image = storage.device.get_homescreen() if not image: image = res.load("apps/homescreen/res/bg.toif") - if storage.is_initialized() and storage.no_backup(): + if storage.is_initialized() and storage.device.no_backup(): _err("SEEDLESS") - elif storage.is_initialized() and storage.unfinished_backup(): + elif storage.is_initialized() and storage.device.unfinished_backup(): _err("BACKUP FAILED!") - elif storage.is_initialized() and storage.needs_backup(): + elif storage.is_initialized() and storage.device.needs_backup(): _warn("NEEDS BACKUP!") elif storage.is_initialized() and not config.has_pin(): _warn("PIN NOT SET!") - elif storage.is_slip39_in_progress(): + elif storage.slip39.is_in_progress(): _warn("SHAMIR IN PROGRESS!") else: ui.display.bar(0, 0, ui.WIDTH, ui.HEIGHT, ui.BG) diff --git a/core/src/apps/management/apply_flags.py b/core/src/apps/management/apply_flags.py index 29e2e21add..a9fd1c9fa0 100644 --- a/core/src/apps/management/apply_flags.py +++ b/core/src/apps/management/apply_flags.py @@ -4,5 +4,5 @@ from apps.common import storage async def apply_flags(ctx, msg): - storage.set_flags(msg.flags) + storage.device.set_flags(msg.flags) return Success(message="Flags applied") diff --git a/core/src/apps/management/apply_settings.py b/core/src/apps/management/apply_settings.py index 3ef58d137a..5fd36b53ae 100644 --- a/core/src/apps/management/apply_settings.py +++ b/core/src/apps/management/apply_settings.py @@ -18,7 +18,7 @@ async def apply_settings(ctx, msg): raise wire.ProcessError("No setting provided") if msg.homescreen is not None: - if len(msg.homescreen) > storage.HOMESCREEN_MAXSIZE: + if len(msg.homescreen) > storage.device.HOMESCREEN_MAXSIZE: raise wire.DataError("Homescreen is too complex") await require_confirm_change_homescreen(ctx) @@ -34,7 +34,7 @@ async def apply_settings(ctx, msg): if msg.display_rotation is not None: await require_confirm_change_display_rotation(ctx, msg.display_rotation) - storage.load_settings( + storage.device.load_settings( label=msg.label, use_passphrase=msg.use_passphrase, homescreen=msg.homescreen, @@ -43,7 +43,7 @@ async def apply_settings(ctx, msg): ) if msg.display_rotation is not None: - ui.display.orientation(storage.get_rotation()) + ui.display.orientation(storage.device.get_rotation()) return Success(message="Settings applied") diff --git a/core/src/apps/management/backup_device.py b/core/src/apps/management/backup_device.py index 376c968d5e..daf48e55ef 100644 --- a/core/src/apps/management/backup_device.py +++ b/core/src/apps/management/backup_device.py @@ -9,7 +9,7 @@ from apps.management.reset_device import backup_slip39_wallet async def backup_device(ctx, msg): if not storage.is_initialized(): raise wire.ProcessError("Device is not initialized") - if not storage.needs_backup(): + if not storage.device.needs_backup(): raise wire.ProcessError("Seed already backed up") mnemonic_secret, mnemonic_type = mnemonic.get() @@ -18,14 +18,14 @@ async def backup_device(ctx, msg): # warn user about mnemonic safety await layout.show_backup_warning(ctx, "Back up your seed", "I understand", slip39) - storage.set_unfinished_backup(True) - storage.set_backed_up() + storage.device.set_unfinished_backup(True) + storage.device.set_backed_up() if slip39: await backup_slip39_wallet(ctx, mnemonic_secret) else: await layout.bip39_show_and_confirm_mnemonic(ctx, mnemonic_secret.decode()) - storage.set_unfinished_backup(False) + storage.device.set_unfinished_backup(False) return Success(message="Seed successfully backed up") diff --git a/core/src/apps/management/load_device.py b/core/src/apps/management/load_device.py index 375674b57e..1b3b077d74 100644 --- a/core/src/apps/management/load_device.py +++ b/core/src/apps/management/load_device.py @@ -25,13 +25,15 @@ async def load_device(ctx, msg): await require_confirm(ctx, text) secret = bip39.process_all([msg.mnemonic]) - storage.store_mnemonic( + storage.device.store_mnemonic_secret( secret=secret, mnemonic_type=bip39.get_type(), needs_backup=True, no_backup=False, ) - storage.load_settings(use_passphrase=msg.passphrase_protection, label=msg.label) + storage.device.load_settings( + use_passphrase=msg.passphrase_protection, label=msg.label + ) if msg.pin: config.change_pin(pin_to_int(""), pin_to_int(msg.pin)) diff --git a/core/src/apps/management/recovery_device.py b/core/src/apps/management/recovery_device.py index 5a026b5e9a..0d5cb3fd7c 100644 --- a/core/src/apps/management/recovery_device.py +++ b/core/src/apps/management/recovery_device.py @@ -34,7 +34,7 @@ async def recovery_device(ctx, msg): if not msg.dry_run and storage.is_initialized(): raise wire.UnexpectedMessage("Already initialized") - if not storage.is_slip39_in_progress(): + if not storage.slip39.is_in_progress(): if not msg.dry_run: title = "Wallet recovery" text = Text(title, ui.ICON_RECOVERY) @@ -57,7 +57,7 @@ async def recovery_device(ctx, msg): wordcount = await request_wordcount(ctx, title) mnemonic_module = mnemonic.module_from_words_count(wordcount) else: - wordcount = storage.get_slip39_words_count() + wordcount = storage.slip39.get_words_count() mnemonic_module = mnemonic.slip39 mnemonic_threshold = None @@ -104,8 +104,10 @@ async def recovery_device(ctx, msg): # save into storage if msg.pin_protection: config.change_pin(pin_to_int(""), pin_to_int(newpin)) - storage.set_u2f_counter(msg.u2f_counter) - storage.load_settings(label=msg.label, use_passphrase=msg.passphrase_protection) + storage.device.set_u2f_counter(msg.u2f_counter) + storage.device.load_settings( + label=msg.label, use_passphrase=msg.passphrase_protection + ) mnemonic_module.store(secret=secret, needs_backup=False, no_backup=False) await show_success(ctx) diff --git a/core/src/apps/management/reset_device.py b/core/src/apps/management/reset_device.py index 6868fdc802..8e2b310eff 100644 --- a/core/src/apps/management/reset_device.py +++ b/core/src/apps/management/reset_device.py @@ -39,8 +39,8 @@ async def reset_device(ctx, msg): secret = _compute_secret_from_entropy(int_entropy, ext_entropy, msg.strength) if msg.slip39: - storage.set_slip39_identifier(slip39.generate_random_identifier()) - storage.set_slip39_iteration_exponent(slip39.DEFAULT_ITERATION_EXPONENT) + storage.slip39.set_identifier(slip39.generate_random_identifier()) + storage.slip39.set_iteration_exponent(slip39.DEFAULT_ITERATION_EXPONENT) # should we back up the wallet now? if not msg.no_backup and not msg.skip_backup: @@ -60,7 +60,9 @@ async def reset_device(ctx, msg): raise wire.ProcessError("Could not change PIN") # write settings and master secret into storage - storage.load_settings(label=msg.label, use_passphrase=msg.passphrase_protection) + storage.device.load_settings( + label=msg.label, use_passphrase=msg.passphrase_protection + ) if msg.slip39: mnemonic.slip39.store( secret=secret, needs_backup=msg.skip_backup, no_backup=msg.no_backup diff --git a/core/src/apps/management/set_u2f_counter.py b/core/src/apps/management/set_u2f_counter.py index e8aaada677..97a61288c7 100644 --- a/core/src/apps/management/set_u2f_counter.py +++ b/core/src/apps/management/set_u2f_counter.py @@ -16,6 +16,6 @@ async def set_u2f_counter(ctx, msg): text.bold("to %d?" % msg.u2f_counter) await require_confirm(ctx, text, code=ButtonRequestType.ProtectCall) - storage.set_u2f_counter(msg.u2f_counter) + storage.device.set_u2f_counter(msg.u2f_counter) return Success(message="U2F counter set") diff --git a/core/src/apps/webauthn/__init__.py b/core/src/apps/webauthn/__init__.py index 66f3ad824a..50d1bce220 100644 --- a/core/src/apps/webauthn/__init__.py +++ b/core/src/apps/webauthn/__init__.py @@ -712,7 +712,7 @@ def msg_authenticate_sign(challenge: bytes, app_id: bytes, privkey: bytes) -> by flags = bytes([_AUTH_FLAG_TUP]) # get next counter - ctr = storage.next_u2f_counter() + ctr = storage.device.next_u2f_counter() ctrbuf = ustruct.pack(">L", ctr) # hash input data together with counter diff --git a/core/src/boot.py b/core/src/boot.py index c84432c073..7181c0ca15 100644 --- a/core/src/boot.py +++ b/core/src/boot.py @@ -6,7 +6,7 @@ from apps.common.request_pin import request_pin async def bootscreen(): - ui.display.orientation(storage.get_rotation()) + ui.display.orientation(storage.device.get_rotation()) while True: try: if not config.has_pin(): @@ -28,8 +28,8 @@ async def bootscreen(): async def lockscreen(): - label = storage.get_label() - image = storage.get_homescreen() + label = storage.device.get_label() + image = storage.device.get_homescreen() if not label: label = "My Trezor" if not image: diff --git a/core/src/usb.py b/core/src/usb.py index 06132785ea..b557241e27 100644 --- a/core/src/usb.py +++ b/core/src/usb.py @@ -1,6 +1,6 @@ from trezor import io -from apps.common.storage import get_device_id +from apps.common.storage.device import get_device_id # fmt: off diff --git a/core/tests/test_apps.common.storage.py b/core/tests/test_apps.common.storage.py index e3630a920c..69b6e5f937 100644 --- a/core/tests/test_apps.common.storage.py +++ b/core/tests/test_apps.common.storage.py @@ -1,7 +1,7 @@ from common import * from trezor.pin import pin_to_int from trezor import config -from apps.common import storage +from apps.common.storage import device class TestConfig(unittest.TestCase): @@ -10,14 +10,14 @@ class TestConfig(unittest.TestCase): config.init() config.wipe() for i in range(150): - self.assertEqual(storage.next_u2f_counter(), i) - storage.set_u2f_counter(350) + self.assertEqual(device.next_u2f_counter(), i) + device.set_u2f_counter(350) for i in range(351, 500): - self.assertEqual(storage.next_u2f_counter(), i) - storage.set_u2f_counter(0) - self.assertEqual(storage.next_u2f_counter(), 1) - storage.set_u2f_counter(None) - self.assertEqual(storage.next_u2f_counter(), 0) + self.assertEqual(device.next_u2f_counter(), i) + device.set_u2f_counter(0) + self.assertEqual(device.next_u2f_counter(), 1) + device.set_u2f_counter(None) + self.assertEqual(device.next_u2f_counter(), 0) if __name__ == '__main__':