refactor(core/wire): simplify the message handling loop

pull/1610/head
matejcik 3 years ago committed by matejcik
parent b387970468
commit dd655422f1

@ -37,7 +37,7 @@ reads the message's header. When the message type is known the first handler is
import protobuf
from storage.cache import InvalidSessionError
from trezor import log, loop, messages, ui, utils, workflow
from trezor import log, loop, messages, utils, workflow
from trezor.messages import FailureType
from trezor.messages.Failure import Failure
from trezor.wire import codec_v1
@ -283,6 +283,108 @@ class UnexpectedMessageError(Exception):
self.msg = msg
async def _handle_single_message(
ctx: Context, msg: codec_v1.Message, use_workflow: bool
) -> codec_v1.Message | None:
"""Handle a message that was loaded from USB by the caller.
Find the appropriate handler, run it and write its result on the wire. In case
a problem is encountered at any point, write the appropriate error on the wire.
If the workflow finished normally or with an error, the return value is None.
If an unexpected message had arrived on the wire while the workflow was processing,
the workflow is shut down with an `UnexpectedMessageError`. This is not considered
an "error condition" to return over the wire -- instead the message is processed
as if starting a new workflow.
In such case, the `UnexpectedMessageError` is caught and the message is returned
to the caller. It will then be processed in the next iteration of the message loop.
"""
if __debug__:
try:
msg_type = messages.get_type(msg.type).__name__
except KeyError:
msg_type = "%d - unknown message type" % msg.type
log.debug(
__name__,
"%s:%x receive: <%s>",
ctx.iface.iface_num(),
ctx.sid,
msg_type,
)
res_msg: protobuf.MessageType | None = None
# We need to find a handler for this message type. Should not raise.
handler = find_handler(ctx.iface, msg.type)
if handler is None:
# If no handler is found, we can skip decoding and directly
# respond with failure.
await ctx.write(unexpected_message())
return None
# Here we make sure we always respond with a Failure response
# in case of any errors.
try:
# Find a protobuf.MessageType subclass that describes this
# message. Raises if the type is not found.
req_type = messages.get_type(msg.type)
# Try to decode the message according to schema from
# `req_type`. Raises if the message is malformed.
req_msg = _wrap_protobuf_load(msg.data, req_type)
# Create the handler task.
task = handler(ctx, req_msg)
# Run the workflow task. Workflow can do more on-the-wire
# communication inside, but it should eventually return a
# response message, or raise an exception (a rather common
# thing to do). Exceptions are handled in the code below.
if use_workflow:
# Spawn a workflow around the task. This ensures that concurrent
# workflows are shut down.
res_msg = await workflow.spawn(task)
else:
# For debug messages, ignore workflow processing and just await
# results of the handler.
res_msg = await task
except UnexpectedMessageError as exc:
# Workflow was trying to read a message from the wire, and
# something unexpected came in. See Context.read() for
# example, which expects some particular message and raises
# UnexpectedMessageError if another one comes in.
# In order not to lose the message, we return it to the caller.
# TODO:
# We might handle only the few common cases here, like
# Initialize and Cancel.
return exc.msg
except BaseException as exc:
# Either:
# - the message had a type that has a registered handler, but does not have
# a protobuf class
# - the message was not valid protobuf
# - workflow raised some kind of an exception while running
# - something canceled the workflow from the outside
if __debug__:
if isinstance(exc, ActionCancelled):
log.debug(__name__, "cancelled: {}".format(exc.message))
elif isinstance(exc, loop.TaskClosed):
log.debug(__name__, "cancelled: loop task was closed")
else:
log.exception(__name__, exc)
res_msg = failure(exc)
if res_msg is not None:
# perform the write outside the big try-except block, so that usb write
# problem bubbles up
await ctx.write(res_msg)
return None
async def handle_session(
iface: WireInterface, session_id: int, is_debug_session: bool = False
) -> None:
@ -292,158 +394,47 @@ async def handle_session(
ctx_buffer = WIRE_BUFFER
ctx = Context(iface, session_id, ctx_buffer)
next_msg: codec_v1.Message | None = None
res_msg: protobuf.MessageType | None = None
req_type = None
req_msg = None
if __debug__ and is_debug_session:
import apps.debug
apps.debug.DEBUG_CONTEXT = ctx
# Take a mark of modules that are imported at this point, so we can
# roll back and un-import any others.
modules = utils.unimport_begin()
while True:
try:
if next_msg is None:
# We are not currently reading a message, so let's wait for one.
# If the decoding fails, exception is raised and we try again
# (with the same `Reader` instance, it's OK). Even in case of
# de-synchronized wire communication, report with a message
# header is eventually received, after a couple of tries.
# If the previous run did not keep an unprocessed message for us,
# wait for a new one coming from the wire.
msg = await ctx.read_from_wire()
if __debug__:
try:
msg_type = messages.get_type(msg.type).__name__
except KeyError:
msg_type = "%d - unknown message type" % msg.type
log.debug(
__name__,
"%s:%x receive: <%s>",
iface.iface_num(),
session_id,
msg_type,
)
else:
# We have a reader left over from earlier. We should process
# this message instead of waiting for new one.
# Process the message from previous run.
msg = next_msg
next_msg = None
# Now we are in a middle of reading a message and we need to decide
# what to do with it, based on its type from the message header.
# From this point on, we should take care to read it in full and
# send a response.
# Take a mark of modules that are imported at this point, so we can
# roll back and un-import any others. Should not raise.
if is_debug_session:
modules = utils.unimport_begin()
# We need to find a handler for this message type. Should not
# raise.
handler = find_handler(iface, msg.type)
if handler is None:
# If no handler is found, we can skip decoding and directly
# respond with failure. Should not raise.
res_msg = unexpected_message()
else:
# We found a valid handler for this message type.
# Workflow task, declared for the finally block
wf_task: HandlerTask | None = None
# Here we make sure we always respond with a Failure response
# in case of any errors.
try:
# Find a protobuf.MessageType subclass that describes this
# message. Raises if the type is not found.
req_type = messages.get_type(msg.type)
# Try to decode the message according to schema from
# `req_type`. Raises if the message is malformed.
req_msg = _wrap_protobuf_load(msg.data, req_type)
# At this point, message reports are all processed and
# correctly parsed into `req_msg`.
# Create the workflow task.
wf_task = handler(ctx, req_msg)
# Run the workflow task. Workflow can do more on-the-wire
# communication inside, but it should eventually return a
# response message, or raise an exception (a rather common
# thing to do). Exceptions are handled in the code below.
if not is_debug_session:
res_msg = await workflow.spawn(wf_task)
else:
res_msg = await wf_task
except UnexpectedMessageError as exc:
# Workflow was trying to read a message from the wire, and
# something unexpected came in. See Context.read() for
# example, which expects some particular message and raises
# UnexpectedMessageError if another one comes in.
# In order not to lose the message, we pass on the reader
# to get picked up by the workflow logic in the beginning of
# the cycle, which processes it in the usual manner.
# TODO:
# We might handle only the few common cases here, like
# Initialize and Cancel.
next_msg = exc.msg
res_msg = None
except Exception as exc:
# Either:
# - the first workflow message had a type that has a
# registered handler, but does not have a protobuf class
# - the first workflow message was not a valid protobuf
# - workflow raised some kind of an exception while running
# - something canceled the workflow from the outside
if __debug__:
if isinstance(exc, ActionCancelled):
log.debug(__name__, "cancelled: {}".format(exc.message))
elif isinstance(exc, loop.TaskClosed):
log.debug(__name__, "cancelled: loop task was closed")
else:
log.exception(__name__, exc)
res_msg = failure(exc)
finally:
# If we ran a workflow task, and a default workflow is on, make sure
# we do not race against the layout that is inside.
# TODO: this is very hacky and complects wire with the ui
if wf_task is not None and workflow.default_task is not None:
await ui.wait_until_layout_is_running()
if res_msg is not None:
# Either the workflow returned a response, or we created one.
# Write it on the wire. Possibly, the incoming message haven't
# been read in full. We ignore this case here and let the rest
# of the reports get processed while waiting for the message
# header.
# TODO: if the write fails, we do not unimport the loaded modules
await ctx.write(res_msg)
# Cleanup, so garbage collection triggered after un-importing can
# pick up the trash.
req_type = None
req_msg = None
res_msg = None
handler = None
wf_task = None
# Unload modules imported by the workflow. Should not raise.
utils.unimport_end(modules)
if not is_debug_session and next_msg is None: # and msg_type != 0:
loop.clear()
return
try:
next_msg = await _handle_single_message(
ctx, msg, use_workflow=not is_debug_session
)
finally:
if not __debug__ or not is_debug_session:
# Unload modules imported by the workflow. Should not raise.
# This is not done for the debug session because the snapshot taken
# in a debug session would clear modules which are in use by the
# workflow running on wire.
utils.unimport_end(modules)
if next_msg is None:
# Shut down the loop if there is no next message waiting.
# Let the session be restarted from `main`.
loop.clear()
return
except Exception as exc:
# The session handling should never exit, just log and continue.
# Log and try again. The session handler can only exit explicitly via
# loop.clear() above.
if __debug__:
log.exception(__name__, exc)

Loading…
Cancel
Save