core/loop: introduce spawn syscall

pull/971/head
matejcik 4 years ago committed by matejcik
parent a4f47ddd21
commit 02565f3bfb

@ -41,11 +41,21 @@ _paused = {} # type: Dict[int, Set[Task]]
# functions to execute after a task is finished
_finalizers = {} # type: Dict[int, Finalizer]
# reference to the task that is currently executing
this_task = None # type: Optional[Task]
if __debug__:
# synthetic event queue
synthetic_events = [] # type: List[Tuple[int, Any]]
class TaskClosed(Exception):
pass
TASK_CLOSED = TaskClosed()
def schedule(
task: Task,
value: Any = None,
@ -166,6 +176,8 @@ def _step(task: Task, value: Any) -> None:
c) Something else
- This should not happen - error.
"""
global this_task
this_task = task
try:
if isinstance(value, BaseException):
result = task.throw(value) # type: ignore
@ -429,3 +441,118 @@ class chan:
schedule(putter)
else:
self.takers.append(taker)
class spawn(Syscall):
"""Spawn a task asynchronously and get an awaitable reference to it.
Abstraction over `loop.schedule` and `loop.close`. Useful when you need to start
a task in the background, but want to be able to kill it from the outside.
Examples:
1. Spawn a background task, get its result later.
>>> wire_read = loop.spawn(read_from_wire())
>>> long_result = await long_running_operation()
>>> wire_result = await wire_read
2. Allow the user to kill a long-running operation:
>>> try:
>>> operation = loop.spawn(long_running_operation())
>>> result = await operation
>>> print("finished with result", result)
>>> except loop.TaskClosed:
>>> print("task was closed before it could finish")
>>>
>>> # meanwhile, on the other side of town...
>>> controller.close()
Task is spawned only once. Multiple attempts to `await spawned_object` will return
the original return value (or raise the original exception).
"""
def __init__(self, task: Task) -> None:
self.task = task
self.callback = None # type: Optional[Task]
self.finalizer_callback = None # type: Optional[Callable[["spawn"], None]]
self.finished = False
self.return_value = None # type: Any
# schedule task immediately
if __debug__:
log.debug(__name__, "spawn new task: %s", task)
schedule(task, finalizer=self._finalize)
def _finalize(self, task: Task, value: Any) -> None:
# sanity check: make sure finalizer is for our task
assert task is self.task
# sanity check: make sure finalizer is not called more than once
assert self.finished is False
# now we are truly finished
self.finished = True
if isinstance(value, GeneratorExit):
# coerce GeneratorExit to a catchable TaskClosed
self.return_value = TASK_CLOSED
else:
self.return_value = value
if self.callback is not None:
schedule(self.callback, self.return_value)
self.callback = None
if self.finalizer_callback is not None:
self.finalizer_callback(self)
def __iter__(self) -> Task: # type: ignore
if self.finished:
# exit immediately if we already have a return value
if isinstance(self.return_value, BaseException):
raise self.return_value
else:
return self.return_value
try:
return (yield self)
except BaseException:
# Clear out the callback. Otherwise we would raise the exception into it,
# AND schedule it with the closing value of the child task.
self.callback = None
assert self.task is not this_task # closing parent from child :(
close(self.task)
raise
def handle(self, caller: Task) -> None:
# the same spawn should not be awaited multiple times
assert self.callback is None
self.callback = caller
def close(self) -> None:
"""Shut down the spawned task.
If another caller is awaiting its result it will get a TaskClosed exception.
If the task was already finished, the call has no effect.
"""
if not self.finished:
if __debug__:
log.debug(__name__, "close spawned task: %s", self.task)
close(self.task)
def set_finalizer(self, finalizer_callback: Callable[["spawn"], None]) -> None:
"""Register a finalizer callback.
The provided function is executed synchronously when the spawned task ends,
with the spawn object as an argument.
"""
if self.finished:
finalizer_callback(self)
self.finalizer_callback = finalizer_callback
def is_running(self) -> bool:
"""Check if the caller is executing from the spawned task.
Useful for checking if it is OK to call `task.close()`. If `task.is_running()`
is True, it would be calling close on self, which will result in a ValueError.
"""
return self.task is this_task

@ -27,14 +27,17 @@ to `__iter__`. The `__await__` method is never executed, however.
`loop.run()` starts the event loop. The call only returns when there are no further
waiting tasks -- so, in usual conditions, never.
`loop.schedule(task, value, deadline, finalizer)` schedules an awaitable to be run
either as soon as possible, or at a specified time (given as a `deadline` in
`loop.schedule(task, value, deadline, finalizer, reschedule)` schedules an awaitable to
be run either as soon as possible, or at a specified time (given as a `deadline` in
microseconds since system bootup.)
In addition, when the task finishes processing or is closed externally, the `finalizer`
callback will be executed, with the task and the return value (or the raised exception)
as a parameter.
If `reschedule` is true, the task is first cleared from the scheduled queue -- in
effect, it is rescheduled to run at a different time.
`loop.close(task)` removes a previously scheduled task from the list of waiting tasks
and calls its finalizer.
@ -184,6 +187,26 @@ is scheduled to run on the next tick.
_Upcoming changes may solve this in relevant cases, by inlining syscall operations._
**`loop.spawn(task)`**: Start the task asynchronously. Return an object that allows
the caller to await its result, or shut the task down.
Example usage:
```python
task = loop.spawn(some_background_task())
await do_something_here()
result = await task
```
Unlike other syscalls, `loop.spawn` starts the task at instantiation time. `await`ing
the same `loop.spawn` instance a second time will immediately return the result of the
original run.
If the task is cancelled (usually by calling `task.close()`), the awaiter receives a
`loop.TaskClosed` exception.
It is also possible to register a synchronous finalizer callback via
`task.set_finalizer`. This is used internally to implement workflow management.
**`loop.chan()`** is a unidirectional communication channel that actually implements two
syscalls:

Loading…
Cancel
Save