1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2024-11-18 05:28:40 +00:00

core: add a global idle timer

This commit is contained in:
matejcik 2020-05-18 14:59:37 +02:00 committed by matejcik
parent 88c5ec8d40
commit 67b723e4ca
4 changed files with 94 additions and 3 deletions

View File

@ -47,14 +47,22 @@ if __debug__:
def schedule(
task: Task, value: Any = None, deadline: int = None, finalizer: Finalizer = None
task: Task,
value: Any = None,
deadline: int = None,
finalizer: Finalizer = None,
reschedule: bool = False,
) -> None:
"""
Schedule task to be executed with `value` on given `deadline` (in
microseconds). Does not start the event loop itself, see `run`.
Usually done in very low-level cases, see `race` for more user-friendly
and correct concept.
If `reschedule` is set, updates an existing entry.
"""
if reschedule:
_queue.discard(task)
if deadline is None:
deadline = utime.ticks_us()
if finalizer is not None:

View File

@ -3,7 +3,7 @@ import utime
from micropython import const
from trezorui import Display
from trezor import io, loop, res, utils
from trezor import io, loop, res, utils, workflow
if __debug__:
from apps.debug import notify_layout_change
@ -348,6 +348,7 @@ class Layout(Component):
while True:
# Using `yield` instead of `await` to avoid allocations.
event, x, y = yield touch
workflow.idle_timer.touch()
self.dispatch(event, x, y)
# We dispatch a render event right after the touch. Quick and dirty
# way to get the lowest input-to-render latency.

View File

@ -185,6 +185,8 @@ class Context:
expected_type,
)
workflow.idle_timer.touch()
# parse the message and return it
return await protobuf.load_message(reader, expected_type)
@ -219,6 +221,8 @@ class Context:
__name__, "%s:%x read: %s", self.iface.iface_num(), self.sid, exptype
)
workflow.idle_timer.touch()
# parse the message and return it
return await protobuf.load_message(reader, exptype)

View File

@ -1,7 +1,11 @@
import utime
from trezor import log, loop
if False:
from typing import Any, Callable, Optional, Set
from typing import Any, Callable, Dict, Optional, Set
IdleCallback = Callable[[], None]
if __debug__:
# Used in `on_close` bellow for memory statistics.
@ -30,6 +34,7 @@ def on_start(workflow: loop.Task) -> None:
# Take note that this workflow task is running.
if __debug__:
log.debug(__name__, "start: %s", workflow)
idle_timer.touch()
tasks.add(workflow)
@ -130,3 +135,76 @@ def _finalize_default(task: loop.Task, value: Any) -> None:
# If required, a function `shutdown_default` should be written, that clears the
# default constructor and shuts down the running default task.
# We currently do not need such function, so I'm just noting how it should work.
class IdleTimer:
"""Run callbacks after a period of inactivity.
A global instance `workflow.idle_timer` is available to create events that fire
after a specified time of no user or host activity. This instance is kept awake
by UI taps, swipes, and USB message handling.
"""
def __init__(self) -> None:
self.timeouts = {} # type: Dict[IdleCallback, int]
self.tasks = {} # type: Dict[IdleCallback, loop.Task]
async def _timeout_task(self, callback: IdleCallback) -> None:
# This function is async, so the result of self._timeout_task() is an awaitable,
# suitable for scheduling.
# After the scheduled task completes, self.tasks will contain a stale task
# object. A new one must be created here so that subsequent calls to touch() can
# schedule it again.
self.tasks[callback] = self._timeout_task(callback)
callback()
def touch(self) -> None:
"""Wake up the idle timer.
Events that represent some form of activity (USB messages, touches, etc.) should
call `touch()` to notify the timer of the activity. All pending callback timers
will reset.
"""
for callback, task in self.tasks.items():
timeout_us = self.timeouts[callback]
deadline = utime.ticks_add(utime.ticks_us(), timeout_us)
loop.schedule(task, None, deadline, reschedule=True)
def set(self, timeout_ms: int, callback: IdleCallback) -> None:
"""Add or update an idle callback.
Every time `timeout_ms` milliseconds elapse after the last registered activity,
`callback` will be invoked.
I.e., in every period of inactivity, each `callback` will only run once. To run
again, an activity must be registered and then no activity for the specified
period.
If `callback` was previously registered, it is updated with a new timeout value.
`idle_timer.set()` also counts as an activity, so all running idle timers are
reset.
"""
# The reason for counting set() as an activity is to clear up an ambiguity that
# would arise otherwise. This does not matter now, as callbacks are only
# scheduled during periods of activity.
# If we ever need to add a callback without touching, we will need to know
# when this callback should execute (10 mins from now? from last activity? if
# the latter, what if 10 minutes have already elapsed?)
if callback in self.tasks:
loop.close(self.tasks[callback])
self.timeouts[callback] = timeout_ms * 1000
self.tasks[callback] = self._timeout_task(callback)
self.touch()
def remove(self, callback: IdleCallback) -> None:
"""Remove an idle callback."""
self.timeouts.pop(callback, None)
task = self.tasks.pop(callback, None)
if task is not None:
loop.close(task)
"""Global idle timer."""
idle_timer = IdleTimer()