This is to avoid including app-specific functionality in storage and avoid circular imports. The following policy is now in effect: modules from `storage` namespace must not import from `apps` namespace. In most files, the change only involves changing import paths. A minor refactor was needed in case of webauthn: basic get/set/delete functionality was left in storage.webauthn, and more advanced logic on top of it was moved to apps.webauthn.resident_credentials. A significant refactor was needed for sd_salt, where application (and UI) logic was tightly coupled with the IO code. This is now separated, and storage.sd_salt deals exclusively with the IO side, while the app/UI logic is implemented on top of it in apps.common.sd_salt and apps.management.sd_protect.pull/665/head
parent
39a532c8b1
commit
5c93ecd53a
@ -1,97 +0,0 @@
|
||||
from micropython import const
|
||||
|
||||
from apps.common.storage import common
|
||||
from apps.webauthn.credential import Credential, Fido2Credential
|
||||
|
||||
if False:
|
||||
from typing import List, Optional
|
||||
|
||||
_RESIDENT_CREDENTIAL_START_KEY = const(1)
|
||||
_MAX_RESIDENT_CREDENTIALS = const(100)
|
||||
|
||||
|
||||
def get_resident_credentials(rp_id_hash: Optional[bytes] = None) -> List[Credential]:
|
||||
creds = [] # type: List[Credential]
|
||||
for i in range(_MAX_RESIDENT_CREDENTIALS):
|
||||
cred = get_resident_credential(i, rp_id_hash)
|
||||
if cred is not None:
|
||||
creds.append(cred)
|
||||
return creds
|
||||
|
||||
|
||||
def get_resident_credential(
|
||||
index: int, rp_id_hash: Optional[bytes] = None
|
||||
) -> Optional[Credential]:
|
||||
if not (0 <= index < _MAX_RESIDENT_CREDENTIALS):
|
||||
return None
|
||||
|
||||
stored_cred_data = common.get(
|
||||
common.APP_WEBAUTHN, index + _RESIDENT_CREDENTIAL_START_KEY
|
||||
)
|
||||
if stored_cred_data is None:
|
||||
return None
|
||||
|
||||
stored_rp_id_hash = stored_cred_data[:32]
|
||||
stored_cred_id = stored_cred_data[32:]
|
||||
|
||||
if rp_id_hash is not None and rp_id_hash != stored_rp_id_hash:
|
||||
# Stored credential is not for this RP ID.
|
||||
return None
|
||||
|
||||
stored_cred = Fido2Credential.from_cred_id(stored_cred_id, stored_rp_id_hash)
|
||||
if stored_cred is None:
|
||||
return None
|
||||
|
||||
stored_cred.index = index
|
||||
return stored_cred
|
||||
|
||||
|
||||
def store_resident_credential(cred: Fido2Credential) -> bool:
|
||||
slot = None
|
||||
for i in range(_MAX_RESIDENT_CREDENTIALS):
|
||||
stored_cred_data = common.get(
|
||||
common.APP_WEBAUTHN, i + _RESIDENT_CREDENTIAL_START_KEY
|
||||
)
|
||||
if stored_cred_data is None:
|
||||
if slot is None:
|
||||
slot = i
|
||||
continue
|
||||
|
||||
stored_rp_id_hash = stored_cred_data[:32]
|
||||
stored_cred_id = stored_cred_data[32:]
|
||||
|
||||
if cred.rp_id_hash != stored_rp_id_hash:
|
||||
# Stored credential is not for this RP ID.
|
||||
continue
|
||||
|
||||
stored_cred = Fido2Credential.from_cred_id(stored_cred_id, stored_rp_id_hash)
|
||||
if stored_cred is None:
|
||||
# Stored credential is not for this RP ID.
|
||||
continue
|
||||
|
||||
# If a credential for the same RP ID and user ID already exists, then overwrite it.
|
||||
if stored_cred.user_id == cred.user_id:
|
||||
slot = i
|
||||
break
|
||||
|
||||
if slot is None:
|
||||
return False
|
||||
|
||||
common.set(
|
||||
common.APP_WEBAUTHN,
|
||||
slot + _RESIDENT_CREDENTIAL_START_KEY,
|
||||
cred.rp_id_hash + cred.id,
|
||||
)
|
||||
return True
|
||||
|
||||
|
||||
def erase_resident_credentials() -> None:
|
||||
for i in range(_MAX_RESIDENT_CREDENTIALS):
|
||||
common.delete(common.APP_WEBAUTHN, i + _RESIDENT_CREDENTIAL_START_KEY)
|
||||
|
||||
|
||||
def erase_resident_credential(index: int) -> bool:
|
||||
if not (0 <= index < _MAX_RESIDENT_CREDENTIALS):
|
||||
return False
|
||||
common.delete(common.APP_WEBAUTHN, index + _RESIDENT_CREDENTIAL_START_KEY)
|
||||
return True
|
@ -0,0 +1,81 @@
|
||||
from micropython import const
|
||||
|
||||
import storage.webauthn
|
||||
from storage.webauthn import MAX_RESIDENT_CREDENTIALS
|
||||
|
||||
from apps.webauthn.credential import Fido2Credential
|
||||
|
||||
if False:
|
||||
from typing import Iterator, Optional
|
||||
|
||||
|
||||
RP_ID_HASH_LENGTH = const(32)
|
||||
|
||||
|
||||
def _credential_from_data(index: int, data: bytes) -> Fido2Credential:
|
||||
rp_id_hash = data[:RP_ID_HASH_LENGTH]
|
||||
cred_id = data[RP_ID_HASH_LENGTH:]
|
||||
cred = Fido2Credential.from_cred_id(cred_id, rp_id_hash)
|
||||
cred.index = index
|
||||
return cred
|
||||
|
||||
|
||||
def find_all() -> Iterator[Fido2Credential]:
|
||||
for index in range(MAX_RESIDENT_CREDENTIALS):
|
||||
data = storage.webauthn.get_resident_credential(index)
|
||||
if data is not None:
|
||||
yield _credential_from_data(index, data)
|
||||
|
||||
|
||||
def find_by_rp_id_hash(rp_id_hash: bytes) -> Iterator[Fido2Credential]:
|
||||
for index in range(MAX_RESIDENT_CREDENTIALS):
|
||||
data = storage.webauthn.get_resident_credential(index)
|
||||
|
||||
if data is None:
|
||||
# empty slot
|
||||
continue
|
||||
|
||||
if data[:RP_ID_HASH_LENGTH] != rp_id_hash:
|
||||
# rp_id_hash mismatch
|
||||
continue
|
||||
|
||||
yield _credential_from_data(index, data)
|
||||
|
||||
|
||||
def get_resident_credential(index: int) -> Optional[Fido2Credential]:
|
||||
if not (0 <= index < MAX_RESIDENT_CREDENTIALS):
|
||||
return None
|
||||
|
||||
data = storage.webauthn.get_resident_credential(index)
|
||||
if data is None:
|
||||
return None
|
||||
|
||||
return _credential_from_data(index, data)
|
||||
|
||||
|
||||
def store_resident_credential(cred: Fido2Credential) -> bool:
|
||||
slot = None
|
||||
for index in range(MAX_RESIDENT_CREDENTIALS):
|
||||
data = storage.webauthn.get_resident_credential(index)
|
||||
if data is None:
|
||||
# found candidate empty slot
|
||||
if slot is None:
|
||||
slot = index
|
||||
continue
|
||||
|
||||
if cred.rp_id_hash != data[:RP_ID_HASH_LENGTH]:
|
||||
# slot is occupied by a different rp_id_hash
|
||||
continue
|
||||
|
||||
stored_cred = _credential_from_data(index, data)
|
||||
# If a credential for the same RP ID and user ID already exists, then overwrite it.
|
||||
if stored_cred.user_id == cred.user_id:
|
||||
slot = index
|
||||
break
|
||||
|
||||
if slot is None:
|
||||
return False
|
||||
|
||||
cred_data = cred.rp_id_hash + cred.id
|
||||
storage.webauthn.set_resident_credential(slot, cred_data)
|
||||
return True
|
@ -1,8 +1,6 @@
|
||||
from storage import cache, common, device
|
||||
from trezor import config
|
||||
|
||||
from apps.common import cache
|
||||
from apps.common.storage import common, device
|
||||
|
||||
|
||||
def set_current_version() -> None:
|
||||
device.set_version(common.STORAGE_VERSION_CURRENT)
|
@ -1,7 +1,6 @@
|
||||
from storage.device import get_device_id
|
||||
from trezor.crypto import hashlib, hmac, random
|
||||
|
||||
from apps.common.storage.device import get_device_id
|
||||
|
||||
if False:
|
||||
from typing import Optional
|
||||
|
@ -1,9 +1,8 @@
|
||||
from micropython import const
|
||||
|
||||
from storage import common, recovery_shares
|
||||
from trezor.crypto import slip39
|
||||
|
||||
from apps.common.storage import common, recovery_shares
|
||||
|
||||
# Namespace:
|
||||
_NAMESPACE = common.APP_RECOVERY
|
||||
|
@ -1,7 +1,6 @@
|
||||
from storage import common
|
||||
from trezor.crypto import slip39
|
||||
|
||||
from apps.common.storage import common
|
||||
|
||||
if False:
|
||||
from typing import List, Optional
|
||||
|
@ -0,0 +1,160 @@
|
||||
from micropython import const
|
||||
|
||||
import storage.device
|
||||
from trezor import io
|
||||
from trezor.crypto import hmac
|
||||
from trezor.crypto.hashlib import sha256
|
||||
from trezor.utils import consteq
|
||||
|
||||
if False:
|
||||
from typing import Optional
|
||||
|
||||
SD_CARD_HOT_SWAPPABLE = False
|
||||
SD_SALT_LEN_BYTES = const(32)
|
||||
SD_SALT_AUTH_TAG_LEN_BYTES = const(16)
|
||||
|
||||
|
||||
class SdSaltMismatch(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def is_enabled() -> bool:
|
||||
return storage.device.get_sd_salt_auth_key() is not None
|
||||
|
||||
|
||||
def compute_auth_tag(salt: bytes, auth_key: bytes) -> bytes:
|
||||
digest = hmac.new(auth_key, salt, sha256).digest()
|
||||
return digest[:SD_SALT_AUTH_TAG_LEN_BYTES]
|
||||
|
||||
|
||||
def _get_device_dir() -> str:
|
||||
return "/trezor/device_{}".format(storage.device.get_device_id().lower())
|
||||
|
||||
|
||||
def _get_salt_path(new: bool = False) -> str:
|
||||
return "{}/salt{}".format(_get_device_dir(), ".new" if new else "")
|
||||
|
||||
|
||||
def _load_salt(fs: io.FatFS, auth_key: bytes, path: str) -> Optional[bytearray]:
|
||||
# Load the salt file if it exists.
|
||||
try:
|
||||
with fs.open(path, "r") as f:
|
||||
salt = bytearray(SD_SALT_LEN_BYTES)
|
||||
stored_tag = bytearray(SD_SALT_AUTH_TAG_LEN_BYTES)
|
||||
f.read(salt)
|
||||
f.read(stored_tag)
|
||||
except OSError:
|
||||
return None
|
||||
|
||||
# Check the salt's authentication tag.
|
||||
computed_tag = compute_auth_tag(salt, auth_key)
|
||||
if not consteq(computed_tag, stored_tag):
|
||||
return None
|
||||
|
||||
return salt
|
||||
|
||||
|
||||
def load_sd_salt() -> Optional[bytearray]:
|
||||
salt_auth_key = storage.device.get_sd_salt_auth_key()
|
||||
if salt_auth_key is None:
|
||||
return None
|
||||
|
||||
sd = io.SDCard()
|
||||
if not sd.power(True):
|
||||
raise OSError
|
||||
|
||||
salt_path = _get_salt_path()
|
||||
new_salt_path = _get_salt_path(new=True)
|
||||
|
||||
try:
|
||||
fs = io.FatFS()
|
||||
try:
|
||||
fs.mount()
|
||||
except OSError as e:
|
||||
# SD card is probably not formatted. For purposes of loading SD salt, this
|
||||
# is identical to having the wrong card in.
|
||||
raise SdSaltMismatch from e
|
||||
|
||||
salt = _load_salt(fs, salt_auth_key, salt_path)
|
||||
if salt is not None:
|
||||
return salt
|
||||
|
||||
# Check if there is a new salt.
|
||||
salt = _load_salt(fs, salt_auth_key, new_salt_path)
|
||||
if salt is None:
|
||||
# No valid salt file on this SD card.
|
||||
raise SdSaltMismatch
|
||||
|
||||
# Normal salt file does not exist, but new salt file exists. That means that
|
||||
# SD salt regeneration was interrupted earlier. Bring into consistent state.
|
||||
# TODO Possibly overwrite salt file with random data.
|
||||
try:
|
||||
fs.unlink(salt_path)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
# fs.rename can fail with a write error, which falls through as an OSError.
|
||||
# This should be handled in calling code, by allowing the user to retry.
|
||||
fs.rename(new_salt_path, salt_path)
|
||||
return salt
|
||||
finally:
|
||||
fs.unmount()
|
||||
sd.power(False)
|
||||
|
||||
|
||||
def set_sd_salt(salt: bytes, salt_tag: bytes, stage: bool = False) -> None:
|
||||
salt_path = _get_salt_path(stage)
|
||||
sd = io.SDCard()
|
||||
if not sd.power(True):
|
||||
raise OSError
|
||||
|
||||
try:
|
||||
fs = io.FatFS()
|
||||
fs.mount()
|
||||
fs.mkdir("/trezor", True)
|
||||
fs.mkdir(_get_device_dir(), True)
|
||||
with fs.open(salt_path, "w") as f:
|
||||
f.write(salt)
|
||||
f.write(salt_tag)
|
||||
finally:
|
||||
fs.unmount()
|
||||
sd.power(False)
|
||||
|
||||
|
||||
def commit_sd_salt() -> None:
|
||||
salt_path = _get_salt_path(new=False)
|
||||
new_salt_path = _get_salt_path(new=True)
|
||||
|
||||
sd = io.SDCard()
|
||||
fs = io.FatFS()
|
||||
if not sd.power(True):
|
||||
raise OSError
|
||||
|
||||
try:
|
||||
fs.mount()
|
||||
# TODO Possibly overwrite salt file with random data.
|
||||
try:
|
||||
fs.unlink(salt_path)
|
||||
except OSError:
|
||||
pass
|
||||
fs.rename(new_salt_path, salt_path)
|
||||
finally:
|
||||
fs.unmount()
|
||||
sd.power(False)
|
||||
|
||||
|
||||
def remove_sd_salt() -> None:
|
||||
salt_path = _get_salt_path()
|
||||
|
||||
sd = io.SDCard()
|
||||
fs = io.FatFS()
|
||||
if not sd.power(True):
|
||||
raise OSError
|
||||
|
||||
try:
|
||||
fs.mount()
|
||||
# TODO Possibly overwrite salt file with random data.
|
||||
fs.unlink(salt_path)
|
||||
finally:
|
||||
fs.unmount()
|
||||
sd.power(False)
|
@ -0,0 +1,37 @@
|
||||
from micropython import const
|
||||
|
||||
from storage import common
|
||||
|
||||
if False:
|
||||
from typing import Optional
|
||||
|
||||
|
||||
_RESIDENT_CREDENTIAL_START_KEY = const(1)
|
||||
|
||||
MAX_RESIDENT_CREDENTIALS = const(100)
|
||||
|
||||
|
||||
def get_resident_credential(index: int) -> Optional[bytes]:
|
||||
if not (0 <= index < MAX_RESIDENT_CREDENTIALS):
|
||||
raise ValueError # invalid credential index
|
||||
|
||||
return common.get(common.APP_WEBAUTHN, index + _RESIDENT_CREDENTIAL_START_KEY)
|
||||
|
||||
|
||||
def set_resident_credential(index: int, data: bytes) -> None:
|
||||
if not (0 <= index < MAX_RESIDENT_CREDENTIALS):
|
||||
raise ValueError # invalid credential index
|
||||
|
||||
common.set(common.APP_WEBAUTHN, index + _RESIDENT_CREDENTIAL_START_KEY, data)
|
||||
|
||||
|
||||
def delete_resident_credential(index: int) -> None:
|
||||
if not (0 <= index < MAX_RESIDENT_CREDENTIALS):
|
||||
raise ValueError # invalid credential index
|
||||
|
||||
common.delete(common.APP_WEBAUTHN, index + _RESIDENT_CREDENTIAL_START_KEY)
|
||||
|
||||
|
||||
def delete_all_resident_credentials() -> None:
|
||||
for i in range(MAX_RESIDENT_CREDENTIALS):
|
||||
common.delete(common.APP_WEBAUTHN, i + _RESIDENT_CREDENTIAL_START_KEY)
|
Loading…
Reference in new issue