fix(core): do not touch idle timer on USB events

release/23.04
Martin Milata 1 year ago
parent c38b39ee6e
commit 14a8b0c62b

@ -0,0 +1 @@
Auto-lock timer is no longer restarted by USB messages, only touch screen activity.

@ -24,18 +24,6 @@ if TYPE_CHECKING:
)
_ALLOW_WHILE_LOCKED = (
MessageType.Initialize,
MessageType.EndSession,
MessageType.GetFeatures,
MessageType.Cancel,
MessageType.LockDevice,
MessageType.DoPreauthorized,
MessageType.WipeDevice,
MessageType.SetBusy,
)
def busy_expiry_ms() -> int:
"""
Returns the time left until the busy state expires or 0 if the device is not in the busy state.
@ -313,17 +301,18 @@ def set_homescreen() -> None:
set_default(homescreen)
def lock_device() -> None:
def lock_device(interrupt_workflow: bool = True) -> None:
if config.has_pin():
config.lock()
wire.find_handler = get_pinlocked_handler
set_homescreen()
workflow.close_others()
if interrupt_workflow:
workflow.close_others()
def lock_device_if_unlocked() -> None:
if config.is_unlocked():
lock_device()
lock_device(interrupt_workflow=workflow.autolock_interrupts_workflow)
async def unlock_device(ctx: wire.GenericContext = wire.DUMMY_CONTEXT) -> None:
@ -355,7 +344,7 @@ def get_pinlocked_handler(
if iface is usb.iface_debug:
return orig_handler
if msg_type in _ALLOW_WHILE_LOCKED:
if msg_type in workflow.ALLOW_WHILE_LOCKED:
return orig_handler
async def wrapper(ctx: wire.Context, msg: wire.Msg) -> protobuf.MessageType:

@ -1,6 +1,7 @@
from micropython import const
from typing import TYPE_CHECKING
from trezor import workflow
from trezor.crypto.hashlib import sha256
from trezor.enums import InputScriptType
from trezor.utils import HashWriter, empty_bytearray
@ -76,6 +77,10 @@ class Bitcoin:
self.orig_txs,
)
# Following steps can take a long time, make sure autolock doesn't kick in.
# This is set to True again after workflow is finished in start_default().
workflow.autolock_interrupts_workflow = False
# Verify the transaction input amounts by requesting each previous transaction
# and checking its output amount. Verify external inputs which have already
# been signed or which come with a proof of non-ownership.

@ -132,7 +132,9 @@ if __debug__:
async def dispatch_DebugLinkDecision(
ctx: wire.Context, msg: DebugLinkDecision
) -> None:
from trezor import io
from trezor import io, workflow
workflow.idle_timer.touch()
if debuglink_decision_chan.putters:
log.warning(__name__, "DebugLinkDecision queue is not empty")

@ -47,6 +47,10 @@ LOCKSCREEN_ON = object()
BUSYSCREEN_ON = object()
homescreen_shown: object | None = None
# Timestamp of last autolock activity.
# Here to persist across main loop restart between workflows.
autolock_last_touch: int | None = None
class InvalidSessionError(Exception):
pass
@ -338,8 +342,11 @@ def stored_async(key: int) -> Callable[[AsyncByteFunc[P]], AsyncByteFunc[P]]:
def clear_all() -> None:
global _active_session_idx
global autolock_last_touch
_active_session_idx = None
_SESSIONLESS_CACHE.clear()
for session in _SESSIONS:
session.clear()
autolock_last_touch = None

@ -194,8 +194,6 @@ class Context:
expected_type.MESSAGE_NAME,
)
workflow.idle_timer.touch()
# look up the protobuf class and parse the message
return _wrap_protobuf_load(msg.data, expected_type)
@ -231,8 +229,6 @@ class Context:
exptype.MESSAGE_NAME,
)
workflow.idle_timer.touch()
# parse the message and return it
return _wrap_protobuf_load(msg.data, exptype)
@ -322,6 +318,9 @@ async def _handle_single_message(
await ctx.write(unexpected_message())
return None
if msg.type in workflow.ALLOW_WHILE_LOCKED:
workflow.autolock_interrupts_workflow = False
# Here we make sure we always respond with a Failure response
# in case of any errors.
try:

@ -3,6 +3,7 @@ from typing import TYPE_CHECKING
import storage.cache
from trezor import log, loop
from trezor.enums import MessageType
if TYPE_CHECKING:
from typing import Callable
@ -17,6 +18,18 @@ if __debug__:
from trezor import utils
ALLOW_WHILE_LOCKED = (
MessageType.Initialize,
MessageType.EndSession,
MessageType.GetFeatures,
MessageType.Cancel,
MessageType.LockDevice,
MessageType.DoPreauthorized,
MessageType.WipeDevice,
MessageType.SetBusy,
)
# Set of workflow tasks. Multiple workflows can be running at the same time.
tasks: set[loop.spawn] = set()
@ -27,6 +40,9 @@ default_task: loop.spawn | None = None
# Constructor for the default workflow. Returns a workflow task.
default_constructor: Callable[[], loop.Task] | None = None
# Determines whether idle timer firing closes currently running workflow. Storage is locked always.
autolock_interrupts_workflow: bool = True
def _on_start(workflow: loop.spawn) -> None:
"""
@ -35,7 +51,6 @@ def _on_start(workflow: loop.spawn) -> None:
# Take note that this workflow task is running.
if __debug__:
log.debug(__name__, "start: %s", workflow.task)
idle_timer.touch()
tasks.add(workflow)
@ -76,6 +91,7 @@ def start_default() -> None:
"""
global default_task
global default_constructor
global autolock_interrupts_workflow
assert default_constructor is not None
@ -88,6 +104,8 @@ def start_default() -> None:
if __debug__:
log.debug(__name__, "default already started")
autolock_interrupts_workflow = True
def set_default(constructor: Callable[[], loop.Task]) -> None:
"""Configure a default workflow, which will be started next time it is needed."""
@ -161,7 +179,7 @@ class IdleTimer:
A global instance `workflow.idle_timer` is available to create events that fire
after a specified time of no user or host activity. This instance is kept awake
by UI taps, swipes, and USB message handling.
by UI taps, swipes, and DebugLinkDecision message.
"""
def __init__(self) -> None:
@ -178,16 +196,27 @@ class IdleTimer:
self.tasks[callback] = self._timeout_task(callback)
callback()
def touch(self) -> None:
def touch(self, _restore_from_cache: bool = False) -> None:
"""Wake up the idle timer.
Events that represent some form of activity (USB messages, touches, etc.) should
call `touch()` to notify the timer of the activity. All pending callback timers
will reset.
Events that represent some form of activity (touches, etc.) should call `touch()`
to notify the timer of the activity. All pending callback timers will reset.
If `_restore_from_cache` is True the function attempts to use previous
timestamp stored in storage.cache. If the parameter is False or no
deadline is saved, the function computes new deadline based on current
time and saves it to storage.cache. This is done to avoid losing an
active timer when workflow restart happens and tasks are lost.
"""
if _restore_from_cache and storage.cache.autolock_last_touch is not None:
now = storage.cache.autolock_last_touch
else:
now = utime.ticks_ms()
storage.cache.autolock_last_touch = now
for callback, task in self.tasks.items():
timeout_us = self.timeouts[callback]
deadline = utime.ticks_add(utime.ticks_ms(), timeout_us)
deadline = utime.ticks_add(now, timeout_us)
loop.schedule(task, None, deadline, reschedule=True)
def set(self, timeout_ms: int, callback: IdleCallback) -> None:
@ -201,21 +230,16 @@ class IdleTimer:
If `callback` was previously registered, it is updated with a new timeout value.
`idle_timer.set()` also counts as an activity, so all running idle timers are
reset.
If there is last activity timestamp saved in `storage.cache` then
`idle_timer.set()` uses it to calculate timer deadlines. Otherwise current
timestamp is used, resetting any idle timers.
"""
# The reason for counting set() as an activity is to clear up an ambiguity that
# would arise otherwise. This does not matter now, as callbacks are only
# scheduled during periods of activity.
# If we ever need to add a callback without touching, we will need to know
# when this callback should execute (10 mins from now? from last activity? if
# the latter, what if 10 minutes have already elapsed?)
if callback in self.tasks:
loop.close(self.tasks[callback])
self.timeouts[callback] = timeout_ms
self.tasks[callback] = self._timeout_task(callback)
self.touch()
self.touch(_restore_from_cache=True)
def remove(self, callback: IdleCallback) -> None:
"""Remove an idle callback."""

Loading…
Cancel
Save