core: replace workflow.on_start/on_close with workflow.spawn

pull/971/head
matejcik 4 years ago committed by matejcik
parent 02565f3bfb
commit 2d0206c043

@ -1001,16 +1001,19 @@ class DialogManager:
self.state = None # type: Optional[State] self.state = None # type: Optional[State]
self.deadline = 0 self.deadline = 0
self.result = _RESULT_NONE self.result = _RESULT_NONE
self.workflow = None # type: Optional[Coroutine] self.workflow = None # type: Optional[loop.spawn]
self.keepalive = None # type: Optional[Coroutine] self.keepalive = None # type: Optional[Coroutine]
def _workflow_is_running(self) -> bool:
return self.workflow is not None and not self.workflow.finished
def reset_timeout(self) -> None: def reset_timeout(self) -> None:
if self.state is not None: if self.state is not None:
self.deadline = utime.ticks_ms() + self.state.timeout_ms() self.deadline = utime.ticks_ms() + self.state.timeout_ms()
def reset(self) -> None: def reset(self) -> None:
if self.workflow is not None: if self.workflow is not None:
loop.close(self.workflow) self.workflow.close()
if self.keepalive is not None: if self.keepalive is not None:
loop.close(self.keepalive) loop.close(self.keepalive)
self._clear() self._clear()
@ -1024,7 +1027,7 @@ class DialogManager:
if utime.ticks_ms() >= self.deadline: if utime.ticks_ms() >= self.deadline:
self.reset() self.reset()
if self.workflow is None: if not self._workflow_is_running():
return bool(workflow.tasks) return bool(workflow.tasks)
if self.state is None or self.state.finished: if self.state is None or self.state.finished:
@ -1044,10 +1047,9 @@ class DialogManager:
self.state = state self.state = state
self.reset_timeout() self.reset_timeout()
self.result = _RESULT_NONE self.result = _RESULT_NONE
self.keepalive = self.keepalive_loop() self.keepalive = self.keepalive_loop() # TODO: use loop.spawn here
loop.schedule(self.keepalive) loop.schedule(self.keepalive)
self.workflow = self.dialog_workflow() self.workflow = workflow.spawn(self.dialog_workflow())
loop.schedule(self.workflow)
return True return True
async def keepalive_loop(self) -> None: async def keepalive_loop(self) -> None:
@ -1066,36 +1068,31 @@ class DialogManager:
self.reset() self.reset()
async def dialog_workflow(self) -> None: async def dialog_workflow(self) -> None:
if self.workflow is None or self.state is None: if self.state is None:
return return
try: try:
workflow.on_start(self.workflow) while self.result is _RESULT_NONE:
try: result = await self.state.confirm_dialog()
while self.result is _RESULT_NONE: if isinstance(result, State):
result = await self.state.confirm_dialog() self.state = result
if isinstance(result, State): self.reset_timeout()
self.state = result elif result is True:
self.reset_timeout() self.result = _RESULT_CONFIRM
elif result is True:
self.result = _RESULT_CONFIRM
else:
self.result = _RESULT_DECLINE
finally:
if self.keepalive is not None:
loop.close(self.keepalive)
if self.result == _RESULT_CONFIRM:
await self.state.on_confirm()
elif self.result == _RESULT_CANCEL:
await self.state.on_cancel()
elif self.result == _RESULT_TIMEOUT:
await self.state.on_timeout()
else: else:
await self.state.on_decline() self.result = _RESULT_DECLINE
finally: finally:
workflow.on_close(self.workflow) if self.keepalive is not None:
self.workflow = None loop.close(self.keepalive)
if self.result == _RESULT_CONFIRM:
await self.state.on_confirm()
elif self.result == _RESULT_CANCEL:
await self.state.on_cancel()
elif self.result == _RESULT_TIMEOUT:
await self.state.on_timeout()
else:
await self.state.on_decline()
def dispatch_cmd(req: Cmd, dialog_mgr: DialogManager) -> Optional[Cmd]: def dispatch_cmd(req: Cmd, dialog_mgr: DialogManager) -> Optional[Cmd]:

@ -322,7 +322,7 @@ async def handle_session(iface: WireInterface, session_id: int) -> None:
else: else:
# We found a valid handler for this message type. # We found a valid handler for this message type.
# Workflow task, declared for the `workflow.on_close` call later. # Workflow task, declared for the finally block
wf_task = None # type: Optional[HandlerTask] wf_task = None # type: Optional[HandlerTask]
# Here we make sure we always respond with a Failure response # Here we make sure we always respond with a Failure response
@ -342,14 +342,11 @@ async def handle_session(iface: WireInterface, session_id: int) -> None:
# Create the workflow task. # Create the workflow task.
wf_task = handler(ctx, req_msg) wf_task = handler(ctx, req_msg)
# Register the task into the workflow management system.
workflow.on_start(wf_task)
# Run the workflow task. Workflow can do more on-the-wire # Run the workflow task. Workflow can do more on-the-wire
# communication inside, but it should eventually return a # communication inside, but it should eventually return a
# response message, or raise an exception (a rather common # response message, or raise an exception (a rather common
# thing to do). Exceptions are handled in the code below. # thing to do). Exceptions are handled in the code below.
res_msg = await wf_task res_msg = await workflow.spawn(wf_task)
except UnexpectedMessageError as exc: except UnexpectedMessageError as exc:
# Workflow was trying to read a message from the wire, and # Workflow was trying to read a message from the wire, and
@ -371,23 +368,22 @@ async def handle_session(iface: WireInterface, session_id: int) -> None:
# registered handler, but does not have a protobuf class # registered handler, but does not have a protobuf class
# - the first workflow message was not a valid protobuf # - the first workflow message was not a valid protobuf
# - workflow raised some kind of an exception while running # - workflow raised some kind of an exception while running
# - something canceled the workflow from the outside
if __debug__: if __debug__:
if isinstance(exc, ActionCancelled): if isinstance(exc, ActionCancelled):
log.debug(__name__, "cancelled: {}".format(exc.message)) log.debug(__name__, "cancelled: {}".format(exc.message))
elif isinstance(exc, loop.TaskClosed):
log.debug(__name__, "cancelled: loop task was closed")
else: else:
log.exception(__name__, exc) log.exception(__name__, exc)
res_msg = failure(exc) res_msg = failure(exc)
finally: finally:
# De-register the task from the workflow system, if we # If we ran a workflow task, and a default workflow is on, make sure
# registered it before. # we do not race against the layout that is inside.
if wf_task is not None: # TODO: this is very hacky and complects wire with the ui
workflow.on_close(wf_task) if wf_task is not None and workflow.default_task is not None:
# If a default workflow is on, make sure we do not race await ui.wait_until_layout_is_running()
# against the layout that is inside.
# TODO: this is very hacky and complects wire with the ui
if workflow.default_task is not None:
await ui.wait_until_layout_is_running()
if res_msg is not None: if res_msg is not None:
# Either the workflow returned a response, or we created one. # Either the workflow returned a response, or we created one.
@ -448,6 +444,8 @@ def import_workflow(pkgname: str, modname: str) -> Any:
def failure(exc: BaseException) -> Failure: def failure(exc: BaseException) -> Failure:
if isinstance(exc, Error): if isinstance(exc, Error):
return Failure(code=exc.code, message=exc.message) return Failure(code=exc.code, message=exc.message)
elif isinstance(exc, loop.TaskClosed):
return Failure(code=FailureType.ActionCancelled, message="Cancelled")
else: else:
return Failure(code=FailureType.FirmwareError, message="Firmware error") return Failure(code=FailureType.FirmwareError, message="Firmware error")

@ -16,7 +16,7 @@ if __debug__:
# Set of workflow tasks. Multiple workflows can be running at the same time. # Set of workflow tasks. Multiple workflows can be running at the same time.
tasks = set() # type: Set[loop.Task] tasks = set() # type: Set[loop.spawn]
# Default workflow task, if a default workflow is running. Default workflow # Default workflow task, if a default workflow is running. Default workflow
# is not contained in the `tasks` set above. # is not contained in the `tasks` set above.
@ -26,23 +26,22 @@ default_task = None # type: Optional[loop.Task]
default_constructor = None # type: Optional[Callable[[], loop.Task]] default_constructor = None # type: Optional[Callable[[], loop.Task]]
def on_start(workflow: loop.Task) -> None: def _on_start(workflow: loop.spawn) -> None:
""" """
Call after creating a workflow task, but before running it. You should Called after creating a workflow task, but before running it.
make sure to always call `on_close` when the task is finished.
""" """
# Take note that this workflow task is running. # Take note that this workflow task is running.
if __debug__: if __debug__:
log.debug(__name__, "start: %s", workflow) log.debug(__name__, "start: %s", workflow.task)
idle_timer.touch() idle_timer.touch()
tasks.add(workflow) tasks.add(workflow)
def on_close(workflow: loop.Task) -> None: def _on_close(workflow: loop.spawn) -> None:
"""Call when a workflow task has finished running.""" """Called when a workflow task has finished running."""
# Remove task from the running set. # Remove task from the running set.
if __debug__: if __debug__:
log.debug(__name__, "close: %s", workflow) log.debug(__name__, "close: %s", workflow.task)
tasks.remove(workflow) tasks.remove(workflow)
if not tasks and default_constructor: if not tasks and default_constructor:
# If no workflows are running, we should create a new default workflow # If no workflows are running, we should create a new default workflow
@ -55,6 +54,18 @@ def on_close(workflow: loop.Task) -> None:
micropython.mem_info() micropython.mem_info()
def spawn(workflow: loop.Task) -> loop.spawn:
"""Spawn a workflow task.
Creates an instance of loop.spawn for the workflow and registers it into the
workflow management system.
"""
task = loop.spawn(workflow)
_on_start(task)
task.set_finalizer(_on_close)
return task
def start_default() -> None: def start_default() -> None:
"""Start a default workflow. """Start a default workflow.

Loading…
Cancel
Save