2023-02-24 12:23:25 +00:00
|
|
|
import builtins
|
2021-02-25 14:37:02 +00:00
|
|
|
import gc
|
2022-09-15 10:45:30 +00:00
|
|
|
from micropython import const
|
2021-12-08 09:10:58 +00:00
|
|
|
from typing import TYPE_CHECKING
|
2021-02-25 14:37:02 +00:00
|
|
|
|
|
|
|
from trezor import utils
|
2018-07-03 14:20:26 +00:00
|
|
|
|
2021-12-08 09:10:58 +00:00
|
|
|
if TYPE_CHECKING:
|
2021-06-10 14:13:24 +00:00
|
|
|
from typing import Sequence, TypeVar, overload
|
|
|
|
|
|
|
|
T = TypeVar("T")
|
|
|
|
|
2018-02-24 17:58:02 +00:00
|
|
|
|
2022-09-15 10:45:30 +00:00
|
|
|
_MAX_SESSIONS_COUNT = const(10)
|
|
|
|
_SESSIONLESS_FLAG = const(128)
|
|
|
|
_SESSION_ID_LENGTH = const(32)
|
2020-02-13 09:19:07 +00:00
|
|
|
|
|
|
|
# Traditional cache keys
|
2022-09-15 10:45:30 +00:00
|
|
|
APP_COMMON_SEED = const(0)
|
|
|
|
APP_COMMON_AUTHORIZATION_TYPE = const(1)
|
|
|
|
APP_COMMON_AUTHORIZATION_DATA = const(2)
|
|
|
|
APP_COMMON_NONCE = const(3)
|
2021-10-25 09:26:23 +00:00
|
|
|
if not utils.BITCOIN_ONLY:
|
2022-09-15 10:45:30 +00:00
|
|
|
APP_COMMON_DERIVE_CARDANO = const(4)
|
|
|
|
APP_CARDANO_ICARUS_SECRET = const(5)
|
|
|
|
APP_CARDANO_ICARUS_TREZOR_SECRET = const(6)
|
|
|
|
APP_MONERO_LIVE_REFRESH = const(7)
|
2020-02-13 09:19:07 +00:00
|
|
|
|
|
|
|
# Keys that are valid across sessions
|
2022-09-15 10:45:30 +00:00
|
|
|
APP_COMMON_SEED_WITHOUT_PASSPHRASE = const(0 | _SESSIONLESS_FLAG)
|
|
|
|
APP_COMMON_SAFETY_CHECKS_TEMPORARY = const(1 | _SESSIONLESS_FLAG)
|
|
|
|
STORAGE_DEVICE_EXPERIMENTAL_FEATURES = const(2 | _SESSIONLESS_FLAG)
|
|
|
|
APP_COMMON_REQUEST_PIN_LAST_UNLOCK = const(3 | _SESSIONLESS_FLAG)
|
|
|
|
APP_COMMON_BUSY_DEADLINE_MS = const(4 | _SESSIONLESS_FLAG)
|
2022-11-25 20:09:34 +00:00
|
|
|
APP_MISC_COSI_NONCE = const(5 | _SESSIONLESS_FLAG)
|
|
|
|
APP_MISC_COSI_COMMITMENT = const(6 | _SESSIONLESS_FLAG)
|
2020-02-13 09:19:07 +00:00
|
|
|
|
2018-02-24 17:58:02 +00:00
|
|
|
|
2021-04-06 12:31:03 +00:00
|
|
|
# === Homescreen storage ===
|
|
|
|
# This does not logically belong to the "cache" functionality, but the cache module is
|
|
|
|
# a convenient place to put this.
|
|
|
|
# When a Homescreen layout is instantiated, it checks the value of `homescreen_shown`
|
|
|
|
# to know whether it should render itself or whether the result of a previous instance
|
|
|
|
# is still on. This way we can avoid unnecessary fadeins/fadeouts when a workflow ends.
|
|
|
|
HOMESCREEN_ON = object()
|
|
|
|
LOCKSCREEN_ON = object()
|
2022-08-09 16:26:37 +00:00
|
|
|
BUSYSCREEN_ON = object()
|
2021-04-06 12:31:03 +00:00
|
|
|
homescreen_shown: object | None = None
|
|
|
|
|
|
|
|
|
2021-02-25 14:37:02 +00:00
|
|
|
class InvalidSessionError(Exception):
|
|
|
|
pass
|
2019-07-03 13:07:04 +00:00
|
|
|
|
2020-04-20 09:36:28 +00:00
|
|
|
|
2021-02-25 14:37:02 +00:00
|
|
|
class DataCache:
|
|
|
|
fields: Sequence[int]
|
2018-02-09 17:59:26 +00:00
|
|
|
|
2021-02-25 14:37:02 +00:00
|
|
|
def __init__(self) -> None:
|
2021-06-10 14:13:24 +00:00
|
|
|
self.data = [bytearray(f + 1) for f in self.fields]
|
2018-02-09 17:59:26 +00:00
|
|
|
|
2021-02-25 14:37:02 +00:00
|
|
|
def set(self, key: int, value: bytes) -> None:
|
|
|
|
utils.ensure(key < len(self.fields))
|
|
|
|
utils.ensure(len(value) <= self.fields[key])
|
2021-06-10 14:13:24 +00:00
|
|
|
self.data[key][0] = 1
|
|
|
|
self.data[key][1:] = value
|
|
|
|
|
2021-12-08 09:10:58 +00:00
|
|
|
if TYPE_CHECKING:
|
2020-02-13 09:19:07 +00:00
|
|
|
|
2021-12-08 09:10:58 +00:00
|
|
|
@overload
|
|
|
|
def get(self, key: int) -> bytes | None:
|
|
|
|
...
|
|
|
|
|
|
|
|
@overload
|
|
|
|
def get(self, key: int, default: T) -> bytes | T: # noqa: F811
|
|
|
|
...
|
2021-06-10 14:13:24 +00:00
|
|
|
|
|
|
|
def get(self, key: int, default: T | None = None) -> bytes | T | None: # noqa: F811
|
2021-10-15 16:27:22 +00:00
|
|
|
utils.ensure(key < len(self.fields))
|
2021-06-10 14:13:24 +00:00
|
|
|
if self.data[key][0] != 1:
|
|
|
|
return default
|
|
|
|
return bytes(self.data[key][1:])
|
|
|
|
|
2021-10-15 16:27:22 +00:00
|
|
|
def is_set(self, key: int) -> bool:
|
|
|
|
utils.ensure(key < len(self.fields))
|
|
|
|
return self.data[key][0] == 1
|
|
|
|
|
2021-06-10 14:13:24 +00:00
|
|
|
def delete(self, key: int) -> None:
|
|
|
|
utils.ensure(key < len(self.fields))
|
|
|
|
self.data[key][:] = b"\x00"
|
2020-02-13 09:19:07 +00:00
|
|
|
|
2021-02-25 14:37:02 +00:00
|
|
|
def clear(self) -> None:
|
|
|
|
for i in range(len(self.fields)):
|
2021-06-10 14:13:24 +00:00
|
|
|
self.delete(i)
|
2020-02-13 09:19:07 +00:00
|
|
|
|
|
|
|
|
2021-02-25 14:37:02 +00:00
|
|
|
class SessionCache(DataCache):
|
|
|
|
def __init__(self) -> None:
|
|
|
|
self.session_id = bytearray(_SESSION_ID_LENGTH)
|
2021-10-25 09:26:23 +00:00
|
|
|
if utils.BITCOIN_ONLY:
|
|
|
|
self.fields = (
|
|
|
|
64, # APP_COMMON_SEED
|
|
|
|
2, # APP_COMMON_AUTHORIZATION_TYPE
|
|
|
|
128, # APP_COMMON_AUTHORIZATION_DATA
|
2021-01-13 14:13:45 +00:00
|
|
|
32, # APP_COMMON_NONCE
|
2021-10-25 09:26:23 +00:00
|
|
|
)
|
|
|
|
else:
|
|
|
|
self.fields = (
|
|
|
|
64, # APP_COMMON_SEED
|
|
|
|
2, # APP_COMMON_AUTHORIZATION_TYPE
|
|
|
|
128, # APP_COMMON_AUTHORIZATION_DATA
|
2021-01-13 14:13:45 +00:00
|
|
|
32, # APP_COMMON_NONCE
|
2021-10-25 09:26:23 +00:00
|
|
|
1, # APP_COMMON_DERIVE_CARDANO
|
|
|
|
96, # APP_CARDANO_ICARUS_SECRET
|
|
|
|
96, # APP_CARDANO_ICARUS_TREZOR_SECRET
|
|
|
|
1, # APP_MONERO_LIVE_REFRESH
|
|
|
|
)
|
2021-02-25 14:37:02 +00:00
|
|
|
self.last_usage = 0
|
|
|
|
super().__init__()
|
|
|
|
|
|
|
|
def export_session_id(self) -> bytes:
|
2022-09-21 10:50:01 +00:00
|
|
|
from trezorcrypto import random # avoid pulling in trezor.crypto
|
|
|
|
|
2021-02-25 14:37:02 +00:00
|
|
|
# generate a new session id if we don't have it yet
|
|
|
|
if not self.session_id:
|
|
|
|
self.session_id[:] = random.bytes(_SESSION_ID_LENGTH)
|
|
|
|
# export it as immutable bytes
|
|
|
|
return bytes(self.session_id)
|
|
|
|
|
|
|
|
def clear(self) -> None:
|
|
|
|
super().clear()
|
|
|
|
self.last_usage = 0
|
|
|
|
self.session_id[:] = b""
|
|
|
|
|
|
|
|
|
|
|
|
class SessionlessCache(DataCache):
|
|
|
|
def __init__(self) -> None:
|
|
|
|
self.fields = (
|
|
|
|
64, # APP_COMMON_SEED_WITHOUT_PASSPHRASE
|
|
|
|
1, # APP_COMMON_SAFETY_CHECKS_TEMPORARY
|
2021-03-30 09:52:33 +00:00
|
|
|
1, # STORAGE_DEVICE_EXPERIMENTAL_FEATURES
|
2022-10-04 13:40:41 +00:00
|
|
|
8, # APP_COMMON_REQUEST_PIN_LAST_UNLOCK
|
|
|
|
8, # APP_COMMON_BUSY_DEADLINE_MS
|
2022-11-25 20:09:34 +00:00
|
|
|
32, # APP_MISC_COSI_NONCE
|
|
|
|
32, # APP_MISC_COSI_COMMITMENT
|
2021-02-25 14:37:02 +00:00
|
|
|
)
|
|
|
|
super().__init__()
|
|
|
|
|
|
|
|
|
|
|
|
# XXX
|
|
|
|
# Allocation notes:
|
|
|
|
# Instantiation of a DataCache subclass should make as little garbage as possible, so
|
|
|
|
# that the preallocated bytearrays are compact in memory.
|
|
|
|
# That is why the initialization is two-step: first create appropriately sized
|
|
|
|
# bytearrays, then later call `clear()` on all the existing objects, which resets them
|
|
|
|
# to zero length. This is producing some trash - `b[:]` allocates a slice.
|
|
|
|
|
|
|
|
_SESSIONS: list[SessionCache] = []
|
|
|
|
for _ in range(_MAX_SESSIONS_COUNT):
|
|
|
|
_SESSIONS.append(SessionCache())
|
|
|
|
|
|
|
|
_SESSIONLESS_CACHE = SessionlessCache()
|
|
|
|
|
|
|
|
for session in _SESSIONS:
|
|
|
|
session.clear()
|
|
|
|
_SESSIONLESS_CACHE.clear()
|
2020-02-13 09:19:07 +00:00
|
|
|
|
2021-02-25 14:37:02 +00:00
|
|
|
gc.collect()
|
|
|
|
|
|
|
|
|
|
|
|
_active_session_idx: int | None = None
|
|
|
|
_session_usage_counter = 0
|
|
|
|
|
|
|
|
|
|
|
|
def start_session(received_session_id: bytes | None = None) -> bytes:
|
|
|
|
global _active_session_idx
|
|
|
|
global _session_usage_counter
|
|
|
|
|
|
|
|
if (
|
|
|
|
received_session_id is not None
|
|
|
|
and len(received_session_id) != _SESSION_ID_LENGTH
|
|
|
|
):
|
|
|
|
# Prevent the caller from setting received_session_id=b"" and finding a cleared
|
|
|
|
# session. More generally, short-circuit the session id search, because we know
|
|
|
|
# that wrong-length session ids should not be in cache.
|
|
|
|
# Reduce to "session id not provided" case because that's what we do when
|
|
|
|
# caller supplies an id that is not found.
|
|
|
|
received_session_id = None
|
|
|
|
|
|
|
|
_session_usage_counter += 1
|
|
|
|
|
|
|
|
# attempt to find specified session id
|
|
|
|
if received_session_id:
|
|
|
|
for i in range(_MAX_SESSIONS_COUNT):
|
|
|
|
if _SESSIONS[i].session_id == received_session_id:
|
|
|
|
_active_session_idx = i
|
|
|
|
_SESSIONS[i].last_usage = _session_usage_counter
|
|
|
|
return received_session_id
|
|
|
|
|
|
|
|
# allocate least recently used session
|
|
|
|
lru_counter = _session_usage_counter
|
|
|
|
lru_session_idx = 0
|
|
|
|
for i in range(_MAX_SESSIONS_COUNT):
|
|
|
|
if _SESSIONS[i].last_usage < lru_counter:
|
|
|
|
lru_counter = _SESSIONS[i].last_usage
|
|
|
|
lru_session_idx = i
|
|
|
|
|
|
|
|
_active_session_idx = lru_session_idx
|
|
|
|
selected_session = _SESSIONS[lru_session_idx]
|
|
|
|
selected_session.clear()
|
|
|
|
selected_session.last_usage = _session_usage_counter
|
|
|
|
return selected_session.export_session_id()
|
2020-02-13 09:19:07 +00:00
|
|
|
|
|
|
|
|
2020-08-25 12:51:06 +00:00
|
|
|
def end_current_session() -> None:
|
2021-02-25 14:37:02 +00:00
|
|
|
global _active_session_idx
|
2020-08-25 12:51:06 +00:00
|
|
|
|
2021-02-25 14:37:02 +00:00
|
|
|
if _active_session_idx is None:
|
2020-08-25 12:51:06 +00:00
|
|
|
return
|
|
|
|
|
2021-02-25 14:37:02 +00:00
|
|
|
_SESSIONS[_active_session_idx].clear()
|
|
|
|
_active_session_idx = None
|
2020-08-25 12:51:06 +00:00
|
|
|
|
|
|
|
|
2021-02-25 14:37:02 +00:00
|
|
|
def set(key: int, value: bytes) -> None:
|
2020-02-13 09:19:07 +00:00
|
|
|
if key & _SESSIONLESS_FLAG:
|
2021-02-25 14:37:02 +00:00
|
|
|
_SESSIONLESS_CACHE.set(key ^ _SESSIONLESS_FLAG, value)
|
2020-02-13 09:19:07 +00:00
|
|
|
return
|
2021-02-25 14:37:02 +00:00
|
|
|
if _active_session_idx is None:
|
|
|
|
raise InvalidSessionError
|
|
|
|
_SESSIONS[_active_session_idx].set(key, value)
|
2019-09-02 10:09:03 +00:00
|
|
|
|
|
|
|
|
2022-10-04 13:39:51 +00:00
|
|
|
def set_int(key: int, value: int) -> None:
|
|
|
|
if key & _SESSIONLESS_FLAG:
|
|
|
|
length = _SESSIONLESS_CACHE.fields[key ^ _SESSIONLESS_FLAG]
|
|
|
|
elif _active_session_idx is None:
|
|
|
|
raise InvalidSessionError
|
|
|
|
else:
|
|
|
|
length = _SESSIONS[_active_session_idx].fields[key]
|
|
|
|
|
|
|
|
encoded = value.to_bytes(length, "big")
|
|
|
|
|
|
|
|
# Ensure that the value fits within the length. Micropython's int.to_bytes()
|
|
|
|
# doesn't raise OverflowError.
|
|
|
|
assert int.from_bytes(encoded, "big") == value
|
|
|
|
|
|
|
|
set(key, encoded)
|
|
|
|
|
|
|
|
|
2021-12-08 09:10:58 +00:00
|
|
|
if TYPE_CHECKING:
|
2021-06-10 14:13:24 +00:00
|
|
|
|
2021-12-08 09:10:58 +00:00
|
|
|
@overload
|
|
|
|
def get(key: int) -> bytes | None:
|
|
|
|
...
|
2021-06-10 14:13:24 +00:00
|
|
|
|
2021-12-08 09:10:58 +00:00
|
|
|
@overload
|
|
|
|
def get(key: int, default: T) -> bytes | T: # noqa: F811
|
|
|
|
...
|
2021-06-10 14:13:24 +00:00
|
|
|
|
|
|
|
|
|
|
|
def get(key: int, default: T | None = None) -> bytes | T | None: # noqa: F811
|
|
|
|
if key & _SESSIONLESS_FLAG:
|
|
|
|
return _SESSIONLESS_CACHE.get(key ^ _SESSIONLESS_FLAG, default)
|
|
|
|
if _active_session_idx is None:
|
|
|
|
raise InvalidSessionError
|
|
|
|
return _SESSIONS[_active_session_idx].get(key, default)
|
|
|
|
|
|
|
|
|
2022-10-04 13:39:51 +00:00
|
|
|
def get_int(key: int, default: T | None = None) -> int | T | None: # noqa: F811
|
|
|
|
encoded = get(key)
|
|
|
|
if encoded is None:
|
|
|
|
return default
|
|
|
|
else:
|
|
|
|
return int.from_bytes(encoded, "big")
|
|
|
|
|
|
|
|
|
2023-02-24 12:23:25 +00:00
|
|
|
def get_int_all_sessions(key: int) -> builtins.set[int]:
|
|
|
|
sessions = [_SESSIONLESS_CACHE] if key & _SESSIONLESS_FLAG else _SESSIONS
|
|
|
|
values = builtins.set()
|
|
|
|
for session in sessions:
|
|
|
|
encoded = session.get(key)
|
|
|
|
if encoded is not None:
|
|
|
|
values.add(int.from_bytes(encoded, "big"))
|
|
|
|
return values
|
|
|
|
|
|
|
|
|
2021-10-15 16:27:22 +00:00
|
|
|
def is_set(key: int) -> bool:
|
|
|
|
if key & _SESSIONLESS_FLAG:
|
|
|
|
return _SESSIONLESS_CACHE.is_set(key ^ _SESSIONLESS_FLAG)
|
|
|
|
if _active_session_idx is None:
|
|
|
|
raise InvalidSessionError
|
|
|
|
return _SESSIONS[_active_session_idx].is_set(key)
|
|
|
|
|
|
|
|
|
2021-06-10 14:13:24 +00:00
|
|
|
def delete(key: int) -> None:
|
2020-02-13 09:19:07 +00:00
|
|
|
if key & _SESSIONLESS_FLAG:
|
2021-06-10 14:13:24 +00:00
|
|
|
return _SESSIONLESS_CACHE.delete(key ^ _SESSIONLESS_FLAG)
|
2021-02-25 14:37:02 +00:00
|
|
|
if _active_session_idx is None:
|
|
|
|
raise InvalidSessionError
|
2021-06-10 14:13:24 +00:00
|
|
|
return _SESSIONS[_active_session_idx].delete(key)
|
2020-02-13 09:19:07 +00:00
|
|
|
|
2019-11-08 08:43:32 +00:00
|
|
|
|
2021-12-08 09:10:58 +00:00
|
|
|
if TYPE_CHECKING:
|
|
|
|
from typing import Awaitable, Callable, TypeVar, ParamSpec
|
2020-08-03 16:14:48 +00:00
|
|
|
|
2021-12-08 09:10:58 +00:00
|
|
|
P = ParamSpec("P")
|
|
|
|
ByteFunc = Callable[P, bytes]
|
|
|
|
AsyncByteFunc = Callable[P, Awaitable[bytes]]
|
2021-02-25 14:37:02 +00:00
|
|
|
|
2020-04-20 09:36:28 +00:00
|
|
|
|
2021-12-08 09:10:58 +00:00
|
|
|
def stored(key: int) -> Callable[[ByteFunc[P]], ByteFunc[P]]:
|
|
|
|
def decorator(func: ByteFunc[P]) -> ByteFunc[P]:
|
|
|
|
def wrapper(*args: P.args, **kwargs: P.kwargs):
|
2020-04-20 09:36:28 +00:00
|
|
|
value = get(key)
|
2021-06-10 14:13:24 +00:00
|
|
|
if value is None:
|
2020-04-20 09:36:28 +00:00
|
|
|
value = func(*args, **kwargs)
|
|
|
|
set(key, value)
|
|
|
|
return value
|
|
|
|
|
2021-12-08 09:10:58 +00:00
|
|
|
return wrapper
|
2020-04-20 09:36:28 +00:00
|
|
|
|
|
|
|
return decorator
|
|
|
|
|
|
|
|
|
2021-12-08 09:10:58 +00:00
|
|
|
def stored_async(key: int) -> Callable[[AsyncByteFunc[P]], AsyncByteFunc[P]]:
|
|
|
|
def decorator(func: AsyncByteFunc[P]) -> AsyncByteFunc[P]:
|
|
|
|
async def wrapper(*args: P.args, **kwargs: P.kwargs):
|
2020-04-20 09:36:28 +00:00
|
|
|
value = get(key)
|
2021-06-10 14:13:24 +00:00
|
|
|
if value is None:
|
2020-04-20 09:36:28 +00:00
|
|
|
value = await func(*args, **kwargs)
|
|
|
|
set(key, value)
|
|
|
|
return value
|
|
|
|
|
2021-12-08 09:10:58 +00:00
|
|
|
return wrapper
|
2020-04-20 09:36:28 +00:00
|
|
|
|
|
|
|
return decorator
|
|
|
|
|
|
|
|
|
2020-02-13 09:19:07 +00:00
|
|
|
def clear_all() -> None:
|
2021-02-25 14:37:02 +00:00
|
|
|
global _active_session_idx
|
|
|
|
|
|
|
|
_active_session_idx = None
|
|
|
|
_SESSIONLESS_CACHE.clear()
|
|
|
|
for session in _SESSIONS:
|
|
|
|
session.clear()
|