From dd655422f1f653f1f2490a1073dba8fdca262a40 Mon Sep 17 00:00:00 2001 From: matejcik Date: Thu, 25 Mar 2021 12:30:07 +0100 Subject: [PATCH] refactor(core/wire): simplify the message handling loop --- core/src/trezor/wire/__init__.py | 261 +++++++++++++++---------------- 1 file changed, 126 insertions(+), 135 deletions(-) diff --git a/core/src/trezor/wire/__init__.py b/core/src/trezor/wire/__init__.py index 82fd39b600..8e7015eab2 100644 --- a/core/src/trezor/wire/__init__.py +++ b/core/src/trezor/wire/__init__.py @@ -37,7 +37,7 @@ reads the message's header. When the message type is known the first handler is import protobuf from storage.cache import InvalidSessionError -from trezor import log, loop, messages, ui, utils, workflow +from trezor import log, loop, messages, utils, workflow from trezor.messages import FailureType from trezor.messages.Failure import Failure from trezor.wire import codec_v1 @@ -283,6 +283,108 @@ class UnexpectedMessageError(Exception): self.msg = msg +async def _handle_single_message( + ctx: Context, msg: codec_v1.Message, use_workflow: bool +) -> codec_v1.Message | None: + """Handle a message that was loaded from USB by the caller. + + Find the appropriate handler, run it and write its result on the wire. In case + a problem is encountered at any point, write the appropriate error on the wire. + + If the workflow finished normally or with an error, the return value is None. + + If an unexpected message had arrived on the wire while the workflow was processing, + the workflow is shut down with an `UnexpectedMessageError`. This is not considered + an "error condition" to return over the wire -- instead the message is processed + as if starting a new workflow. + In such case, the `UnexpectedMessageError` is caught and the message is returned + to the caller. It will then be processed in the next iteration of the message loop. + """ + if __debug__: + try: + msg_type = messages.get_type(msg.type).__name__ + except KeyError: + msg_type = "%d - unknown message type" % msg.type + log.debug( + __name__, + "%s:%x receive: <%s>", + ctx.iface.iface_num(), + ctx.sid, + msg_type, + ) + + res_msg: protobuf.MessageType | None = None + + # We need to find a handler for this message type. Should not raise. + handler = find_handler(ctx.iface, msg.type) + + if handler is None: + # If no handler is found, we can skip decoding and directly + # respond with failure. + await ctx.write(unexpected_message()) + return None + + # Here we make sure we always respond with a Failure response + # in case of any errors. + try: + # Find a protobuf.MessageType subclass that describes this + # message. Raises if the type is not found. + req_type = messages.get_type(msg.type) + + # Try to decode the message according to schema from + # `req_type`. Raises if the message is malformed. + req_msg = _wrap_protobuf_load(msg.data, req_type) + + # Create the handler task. + task = handler(ctx, req_msg) + + # Run the workflow task. Workflow can do more on-the-wire + # communication inside, but it should eventually return a + # response message, or raise an exception (a rather common + # thing to do). Exceptions are handled in the code below. + if use_workflow: + # Spawn a workflow around the task. This ensures that concurrent + # workflows are shut down. + res_msg = await workflow.spawn(task) + else: + # For debug messages, ignore workflow processing and just await + # results of the handler. + res_msg = await task + + except UnexpectedMessageError as exc: + # Workflow was trying to read a message from the wire, and + # something unexpected came in. See Context.read() for + # example, which expects some particular message and raises + # UnexpectedMessageError if another one comes in. + # In order not to lose the message, we return it to the caller. + # TODO: + # We might handle only the few common cases here, like + # Initialize and Cancel. + return exc.msg + + except BaseException as exc: + # Either: + # - the message had a type that has a registered handler, but does not have + # a protobuf class + # - the message was not valid protobuf + # - workflow raised some kind of an exception while running + # - something canceled the workflow from the outside + if __debug__: + if isinstance(exc, ActionCancelled): + log.debug(__name__, "cancelled: {}".format(exc.message)) + elif isinstance(exc, loop.TaskClosed): + log.debug(__name__, "cancelled: loop task was closed") + else: + log.exception(__name__, exc) + res_msg = failure(exc) + + if res_msg is not None: + # perform the write outside the big try-except block, so that usb write + # problem bubbles up + await ctx.write(res_msg) + return None + + async def handle_session( iface: WireInterface, session_id: int, is_debug_session: bool = False ) -> None: @@ -292,158 +394,47 @@ async def handle_session( ctx_buffer = WIRE_BUFFER ctx = Context(iface, session_id, ctx_buffer) next_msg: codec_v1.Message | None = None - res_msg: protobuf.MessageType | None = None - req_type = None - req_msg = None if __debug__ and is_debug_session: import apps.debug apps.debug.DEBUG_CONTEXT = ctx + # Take a mark of modules that are imported at this point, so we can + # roll back and un-import any others. modules = utils.unimport_begin() while True: try: if next_msg is None: - # We are not currently reading a message, so let's wait for one. - # If the decoding fails, exception is raised and we try again - # (with the same `Reader` instance, it's OK). Even in case of - # de-synchronized wire communication, report with a message - # header is eventually received, after a couple of tries. + # If the previous run did not keep an unprocessed message for us, + # wait for a new one coming from the wire. msg = await ctx.read_from_wire() - - if __debug__: - try: - msg_type = messages.get_type(msg.type).__name__ - except KeyError: - msg_type = "%d - unknown message type" % msg.type - log.debug( - __name__, - "%s:%x receive: <%s>", - iface.iface_num(), - session_id, - msg_type, - ) else: - # We have a reader left over from earlier. We should process - # this message instead of waiting for new one. + # Process the message from previous run. msg = next_msg next_msg = None - # Now we are in a middle of reading a message and we need to decide - # what to do with it, based on its type from the message header. - # From this point on, we should take care to read it in full and - # send a response. + try: + next_msg = await _handle_single_message( + ctx, msg, use_workflow=not is_debug_session + ) + finally: + if not __debug__ or not is_debug_session: + # Unload modules imported by the workflow. Should not raise. + # This is not done for the debug session because the snapshot taken + # in a debug session would clear modules which are in use by the + # workflow running on wire. + utils.unimport_end(modules) - # Take a mark of modules that are imported at this point, so we can - # roll back and un-import any others. Should not raise. - if is_debug_session: - modules = utils.unimport_begin() - - # We need to find a handler for this message type. Should not - # raise. - handler = find_handler(iface, msg.type) - - if handler is None: - # If no handler is found, we can skip decoding and directly - # respond with failure. Should not raise. - res_msg = unexpected_message() - - else: - # We found a valid handler for this message type. - - # Workflow task, declared for the finally block - wf_task: HandlerTask | None = None - - # Here we make sure we always respond with a Failure response - # in case of any errors. - try: - # Find a protobuf.MessageType subclass that describes this - # message. Raises if the type is not found. - req_type = messages.get_type(msg.type) - - # Try to decode the message according to schema from - # `req_type`. Raises if the message is malformed. - req_msg = _wrap_protobuf_load(msg.data, req_type) - - # At this point, message reports are all processed and - # correctly parsed into `req_msg`. - - # Create the workflow task. - wf_task = handler(ctx, req_msg) - - # Run the workflow task. Workflow can do more on-the-wire - # communication inside, but it should eventually return a - # response message, or raise an exception (a rather common - # thing to do). Exceptions are handled in the code below. - if not is_debug_session: - res_msg = await workflow.spawn(wf_task) - else: - res_msg = await wf_task - - except UnexpectedMessageError as exc: - # Workflow was trying to read a message from the wire, and - # something unexpected came in. See Context.read() for - # example, which expects some particular message and raises - # UnexpectedMessageError if another one comes in. - # In order not to lose the message, we pass on the reader - # to get picked up by the workflow logic in the beginning of - # the cycle, which processes it in the usual manner. - # TODO: - # We might handle only the few common cases here, like - # Initialize and Cancel. - next_msg = exc.msg - res_msg = None - - except Exception as exc: - # Either: - # - the first workflow message had a type that has a - # registered handler, but does not have a protobuf class - # - the first workflow message was not a valid protobuf - # - workflow raised some kind of an exception while running - # - something canceled the workflow from the outside - if __debug__: - if isinstance(exc, ActionCancelled): - log.debug(__name__, "cancelled: {}".format(exc.message)) - elif isinstance(exc, loop.TaskClosed): - log.debug(__name__, "cancelled: loop task was closed") - else: - log.exception(__name__, exc) - res_msg = failure(exc) - - finally: - # If we ran a workflow task, and a default workflow is on, make sure - # we do not race against the layout that is inside. - # TODO: this is very hacky and complects wire with the ui - if wf_task is not None and workflow.default_task is not None: - await ui.wait_until_layout_is_running() - - if res_msg is not None: - # Either the workflow returned a response, or we created one. - # Write it on the wire. Possibly, the incoming message haven't - # been read in full. We ignore this case here and let the rest - # of the reports get processed while waiting for the message - # header. - # TODO: if the write fails, we do not unimport the loaded modules - await ctx.write(res_msg) - - # Cleanup, so garbage collection triggered after un-importing can - # pick up the trash. - req_type = None - req_msg = None - res_msg = None - handler = None - wf_task = None - - # Unload modules imported by the workflow. Should not raise. - utils.unimport_end(modules) - - if not is_debug_session and next_msg is None: # and msg_type != 0: - loop.clear() - return + if next_msg is None: + # Shut down the loop if there is no next message waiting. + # Let the session be restarted from `main`. + loop.clear() + return except Exception as exc: - # The session handling should never exit, just log and continue. + # Log and try again. The session handler can only exit explicitly via + # loop.clear() above. if __debug__: log.exception(__name__, exc)