wip: handler filters

ibz/20240524-repeated-backup-rename-fields
matejcik 4 months ago committed by Ioan Bizău
parent 724e830faa
commit 0ee80abf91

@ -279,6 +279,8 @@ apps.common.address_type
import apps.common.address_type
apps.common.authorization
import apps.common.authorization
apps.common.backup
import apps.common.backup
apps.common.cbor
import apps.common.cbor
apps.common.coininfo

@ -23,6 +23,7 @@ if TYPE_CHECKING:
Ping,
SetBusy,
)
from trezor.wire import Handler, Msg
_SCREENSAVER_IS_ON = False
@ -373,7 +374,7 @@ def set_homescreen() -> None:
def lock_device(interrupt_workflow: bool = True) -> None:
if config.has_pin():
config.lock()
wire.find_handler = _get_pinlocked_handler
wire.filters.append(_pinlock_filter)
set_homescreen()
if interrupt_workflow:
workflow.close_others()
@ -409,28 +410,16 @@ async def unlock_device() -> None:
_SCREENSAVER_IS_ON = False
set_homescreen()
wire.find_handler = workflow_handlers.find_registered_handler
wire.filters.remove(_pinlock_filter)
def _get_pinlocked_handler(
iface: wire.WireInterface, msg_type: int
) -> wire.Handler[wire.Msg] | None:
orig_handler = workflow_handlers.find_registered_handler(iface, msg_type)
if orig_handler is None:
return None
if __debug__:
import usb
if iface is usb.iface_debug:
return orig_handler
def _pinlock_filter(msg_type: int, prev_handler: Handler[Msg]) -> Handler[Msg]:
if msg_type in workflow.ALLOW_WHILE_LOCKED:
return orig_handler
return prev_handler
async def wrapper(msg: protobuf.MessageType) -> protobuf.MessageType:
async def wrapper(msg: Msg) -> protobuf.MessageType:
await unlock_device()
return await orig_handler(msg)
return await prev_handler(msg)
return wrapper
@ -445,26 +434,18 @@ _ALLOW_WHILE_REPEATED_BACKUP_UNLOCKED = (
)
def _get_backup_handler(
iface: wire.WireInterface, msg_type: int
) -> wire.Handler[wire.Msg] | None:
orig_handler = workflow_handlers.find_registered_handler(iface, msg_type)
if orig_handler is None:
return None
if __debug__:
import usb
if iface is usb.iface_debug:
return orig_handler
def _repeated_backup_filter(msg_type: int, prev_handler: Handler[Msg]) -> Handler[Msg]:
if msg_type in _ALLOW_WHILE_REPEATED_BACKUP_UNLOCKED:
return orig_handler
async def wrapper(_msg: protobuf.MessageType) -> protobuf.MessageType:
return prev_handler
else:
raise wire.ProcessError("Operation not allowed when in repeated backup state")
return wrapper
def remove_repeated_backup_filter():
try:
wire.filters.remove(_repeated_backup_filter)
except ValueError:
pass
# this function is also called when handling ApplySettings
@ -499,9 +480,9 @@ def boot() -> None:
workflow_handlers.register(msg_type, handler)
reload_settings_from_storage()
if storage_cache.get_bool(storage_cache.APP_RECOVERY_REPEATED_BACKUP_UNLOCKED):
wire.filters.append(_repeated_backup_filter)
if not config.is_unlocked():
wire.find_handler = _get_pinlocked_handler
elif storage_cache.get_bool(storage_cache.APP_RECOVERY_REPEATED_BACKUP_UNLOCKED):
wire.find_handler = _get_backup_handler
else:
wire.find_handler = workflow_handlers.find_registered_handler
# pinlocked handler should always be the last one
wire.filters.append(_pinlock_filter)

@ -0,0 +1,7 @@
def disable_repeated_backup():
import storage.cache as storage_cache
from apps import base
storage_cache.delete(storage_cache.APP_RECOVERY_REPEATED_BACKUP_UNLOCKED)
base.remove_repeated_backup_filter()

@ -15,8 +15,7 @@ async def backup_device(msg: BackupDevice) -> Success:
from trezor import wire
from trezor.messages import Success
from apps import workflow_handlers
from apps.common import mnemonic
from apps.common import backup, mnemonic
from .reset_device import backup_seed, backup_slip39_custom, layout
@ -51,7 +50,7 @@ async def backup_device(msg: BackupDevice) -> Success:
if not repeated_backup_unlocked:
storage_device.set_unfinished_backup(True)
storage_cache.delete(storage_cache.APP_RECOVERY_REPEATED_BACKUP_UNLOCKED)
backup.disable_repeated_backup()
storage_device.set_backed_up()
if group_threshold is not None:
@ -61,7 +60,6 @@ async def backup_device(msg: BackupDevice) -> Success:
storage_device.set_unfinished_backup(False)
wire.find_handler = workflow_handlers.find_registered_handler
await layout.show_backup_success()
return Success(message="Seed successfully backed up")

@ -31,10 +31,9 @@ async def recovery_process() -> Success:
import storage
from trezor.enums import MessageType, RecoveryKind
is_special_kind = storage_recovery.get_kind() in (
RecoveryKind.DryRun,
RecoveryKind.UnlockRepeatedBackup,
)
from apps.common import backup
kind = storage_recovery.get_kind()
wire.AVOID_RESTARTING_FOR = (
MessageType.Initialize,
@ -44,7 +43,10 @@ async def recovery_process() -> Success:
try:
return await _continue_recovery_process()
except recover.RecoveryAborted:
if is_special_kind:
if kind == RecoveryKind.DryRun:
storage_recovery.end_progress()
elif kind == RecoveryKind.UnlockRepeatedBackup:
backup.disable_repeated_backup()
storage_recovery.end_progress()
else:
storage.wipe()
@ -57,8 +59,7 @@ async def _continue_repeated_backup() -> None:
from trezor.ui.layouts import confirm_action
from trezor.wire import ActionCancelled
from apps import workflow_handlers
from apps.common import mnemonic
from apps.common import backup, mnemonic
from apps.homescreen import homescreen
from apps.management.reset_device import backup_seed
@ -85,8 +86,7 @@ async def _continue_repeated_backup() -> None:
except ActionCancelled:
workflow.set_default(homescreen)
finally:
storage_cache.delete(storage_cache.APP_RECOVERY_REPEATED_BACKUP_UNLOCKED)
wire.find_handler = workflow_handlers.find_registered_handler
backup.disable_repeated_backup()
storage_recovery.end_progress()

@ -119,13 +119,13 @@ async def _handle_single_message(
res_msg: protobuf.MessageType | None = None
# We need to find a handler for this message type. Should not raise.
handler = find_handler(ctx.iface, msg.type) # pylint: disable=assignment-from-none
if handler is None:
# If no handler is found, we can skip decoding and directly
# respond with failure.
await ctx.write(unexpected_message())
# We need to find a handler for this message type.
try:
handler = find_handler(ctx.iface, msg.type)
except Error as exc:
# Handlers are allowed to exception out. In that case, we can skip decoding
# and return the error.
await ctx.write(failure(exc))
return None
if msg.type in workflow.ALLOW_WHILE_LOCKED:
@ -259,12 +259,46 @@ async def handle_session(
log.exception(__name__, exc)
def _find_handler_placeholder(iface: WireInterface, msg_type: int) -> Handler | None:
"""Placeholder handler lookup before a proper one is registered."""
return None
def find_handler(iface: WireInterface, msg_type: int) -> Handler:
import usb
from apps import workflow_handlers
handler = workflow_handlers.find_registered_handler(iface, msg_type)
if handler is None:
raise context.UnexpectedMessage(msg="Unexpected message")
if __debug__ and iface is usb.iface_debug:
# no filtering allowed for debuglink
return handler
for filter in filters:
handler = filter(msg_type, handler)
return handler
filters: list[Callable[[int, Handler], Handler]] = []
"""Filters for the wire handler.
Filters are applied in order. Each filter gets a message id and a preceding handler. It
must either return a handler (the same one or a modified one), or raise an exception
that gets sent to wire directly.
Filters are not applied to debug sessions.
The filters are designed for:
* rejecting messages -- while in Recovery mode, most messages are not allowed
* adding additional behavior -- while device is soft-locked, a PIN screen will be shown
before allowing a message to trigger its original behavior.
For this, the filters are effectively deny-first. If an earlier filter rejects the
message, the later filters are not called. But if a filter adds behavior, the latest
filter "wins" and the latest behavior triggers first.
Please note that this behavior is really unsuited to anything other than what we are
using it for now. It might be necessary to modify the semantics if we need more complex
usecases.
"""
find_handler = _find_handler_placeholder
AVOID_RESTARTING_FOR: Container[int] = ()

Loading…
Cancel
Save