feat(core): enable entering passphrase on device

M1nd3r/thp6
M1nd3r 2 weeks ago
parent 6adad19a6d
commit fcd16fbdf8

@ -1,7 +1,8 @@
from typing import TYPE_CHECKING
import storage.cache as storage_cache
import storage.cache_codec as cache_codec
import storage.device as storage_device
from storage.cache import check_thp_is_not_used
from storage.cache_common import (
APP_COMMON_BUSY_DEADLINE_MS,
APP_COMMON_DERIVE_CARDANO,
@ -181,12 +182,14 @@ def get_features() -> Features:
return f
@storage_cache.check_thp_is_not_used
async def handle_Initialize(msg: Initialize) -> Features:
if utils.USE_THP:
raise ValueError("With THP enabled, a session id must be provided in args")
@check_thp_is_not_used
async def handle_Initialize(
msg: Initialize,
) -> Features:
session_id = cache_codec.start_session(msg.session_id)
session_id = storage_cache.start_session(msg.session_id)
# TODO change cardano derivation
# ctx = context.get_context()
if not utils.BITCOIN_ONLY:
derive_cardano = context.cache_get(APP_COMMON_DERIVE_CARDANO)
@ -199,8 +202,8 @@ async def handle_Initialize(msg: Initialize) -> Features:
):
# seed is already derived, and host wants to change derive_cardano setting
# => create a new session
storage_cache.end_current_session()
session_id = storage_cache.start_session()
cache_codec.end_current_session()
session_id = cache_codec.start_session()
have_seed = False
if not have_seed:
@ -244,7 +247,7 @@ async def handle_SetBusy(msg: SetBusy) -> Success:
async def handle_EndSession(msg: EndSession) -> Success:
storage_cache.end_current_session()
cache_codec.end_current_session()
return Success()

@ -146,7 +146,7 @@ async def _get_keychain_bip39(derivation_type: CardanoDerivationType) -> Keychai
from trezor.enums import CardanoDerivationType
from trezor.wire import context
from apps.common.seed import derive_and_store_roots
from apps.common.seed import derive_and_store_roots_legacy
if not device.is_initialized():
raise wire.NotInitialized("Device is not initialized")
@ -166,7 +166,7 @@ async def _get_keychain_bip39(derivation_type: CardanoDerivationType) -> Keychai
# _get_secret
secret = context.cache_get(cache_entry)
if secret is None:
await derive_and_store_roots()
await derive_and_store_roots_legacy()
secret = context.cache_get(cache_entry)
assert secret is not None

@ -1,15 +1,46 @@
from micropython import const
from typing import TYPE_CHECKING
import storage.device as storage_device
from storage.cache import check_thp_is_not_used
from trezor.wire import DataError
_MAX_PASSPHRASE_LEN = const(50)
if TYPE_CHECKING:
from trezor.messages import ThpCreateNewSession
def is_enabled() -> bool:
return storage_device.is_passphrase_enabled()
async def get_passphrase(msg: ThpCreateNewSession) -> str:
if not is_enabled():
return ""
if msg.on_device or storage_device.get_passphrase_always_on_device():
passphrase = await _get_on_device()
else:
passphrase = msg.passphrase or ""
if len(passphrase.encode()) > _MAX_PASSPHRASE_LEN:
raise DataError(f"Maximum passphrase length is {_MAX_PASSPHRASE_LEN} bytes")
return passphrase
async def _get_on_device() -> str:
from trezor import workflow
from trezor.ui.layouts import request_passphrase_on_device
workflow.close_others() # request exclusive UI access
passphrase = await request_passphrase_on_device(_MAX_PASSPHRASE_LEN)
return passphrase
@check_thp_is_not_used
async def get() -> str:
from trezor import workflow
@ -29,6 +60,7 @@ async def get() -> str:
return passphrase
@check_thp_is_not_used
async def _request_on_host() -> str:
from trezor import TR
from trezor.messages import PassphraseAck, PassphraseRequest

@ -10,7 +10,8 @@ from trezor.wire.context import get_context
from apps.common import cache
from . import mnemonic
from .passphrase import get as get_passphrase
from .passphrase import get as get_passphrase_legacy
from .passphrase import get_passphrase as get_passphrase
if TYPE_CHECKING:
from trezor.crypto import bip32
@ -68,7 +69,32 @@ if not utils.BITCOIN_ONLY:
# We want to derive both the normal seed and the Cardano seed together, AND
# expose a method for Cardano to do the same
async def derive_and_store_roots(
async def derive_and_store_roots(ctx: Context, msg: ThpCreateNewSession) -> None:
if msg.passphrase is not None and msg.on_device:
raise Exception("Passphrase provided when it shouldn't be!")
from trezor import wire
if not storage_device.is_initialized():
raise wire.NotInitialized("Device is not initialized")
if ctx.cache.is_set(APP_COMMON_SEED):
raise Exception("Seed is already set!")
if ctx.cache.is_set(APP_CARDANO_ICARUS_SECRET):
raise Exception("Cardano icarus secret is already set!")
passphrase = await get_passphrase(msg)
common_seed = mnemonic.get_seed(passphrase)
ctx.cache.set(APP_COMMON_SEED, common_seed)
if msg.derive_cardano:
from apps.cardano.seed import derive_and_store_secrets
derive_and_store_secrets(ctx, passphrase)
async def derive_and_store_roots_legacy(
ctx: Context | None = None, msg: ThpCreateNewSession | None = None
) -> None:
if __debug__:
@ -92,7 +118,7 @@ if not utils.BITCOIN_ONLY:
return
if msg is None or msg.on_device:
passphrase = await get_passphrase()
passphrase = await get_passphrase_legacy()
else:
passphrase = msg.passphrase or ""

@ -11,7 +11,6 @@ if TYPE_CHECKING:
async def create_new_session(
channel: ChannelContext, message: ThpCreateNewSession
) -> ThpNewSession:
# from apps.common.seed import get_seed TODO
from trezor.wire.thp.session_manager import create_new_session
from apps.common.seed import derive_and_store_roots

@ -5,27 +5,6 @@ from typing import TYPE_CHECKING
from storage.cache_common import SESSIONLESS_FLAG, SessionlessCache
from trezor import utils
if TYPE_CHECKING:
from typing import Callable, ParamSpec, TypeVar
T = TypeVar("T")
P = ParamSpec("P")
def check_thp_is_not_used(f: Callable[P, T]) -> Callable[P, T]:
"""A type-safe decorator to raise an exception when the function is called with THP enabled.
This decorator should be removed after the caches for Codec_v1 and THP are properly refactored and separated.
"""
def inner(*args: P.args, **kwargs: P.kwargs) -> T:
if utils.USE_THP:
raise Exception("Cannot call this function with the new THP enabled")
return f(*args, **kwargs)
return inner
# XXX
# Allocation notes:
# Instantiation of a DataCache subclass should make as little garbage as possible, so
@ -51,8 +30,6 @@ _SESSIONLESS_CACHE.clear()
gc.collect()
# Common functions
def clear_all() -> None:
global autolock_last_touch
@ -71,19 +48,26 @@ def get_int_all_sessions(key: int) -> builtins.set[int]:
return _PROTOCOL_CACHE.get_int_all_sessions(key)
# Sessionless functions
def get_sessionless_cache() -> SessionlessCache:
return _SESSIONLESS_CACHE
# Codec_v1 specific functions
if TYPE_CHECKING:
from typing import Callable, ParamSpec, TypeVar
T = TypeVar("T")
P = ParamSpec("P")
def check_thp_is_not_used(f: Callable[P, T]) -> Callable[P, T]:
"""A type-safe decorator to raise an exception when the function is called with THP enabled.
@check_thp_is_not_used
def start_session(received_session_id: bytes | None = None) -> bytes:
return cache_codec.start_session(received_session_id)
This decorator should be removed after the caches for Codec_v1 and THP are properly refactored and separated.
"""
def inner(*args: P.args, **kwargs: P.kwargs) -> T:
if utils.USE_THP:
raise Exception("Cannot call this function with the new THP enabled")
return f(*args, **kwargs)
@check_thp_is_not_used
def end_current_session() -> None:
cache_codec.end_current_session()
return inner

Loading…
Cancel
Save