|
|
|
@ -270,11 +270,10 @@ class race(Syscall):
|
|
|
|
|
`race.__iter__` for explanation. Always use `await`.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(self, *children: Awaitable, exit_others: bool = True) -> None:
|
|
|
|
|
def __init__(self, *children: Awaitable) -> None:
|
|
|
|
|
self.children = children
|
|
|
|
|
self.exit_others = exit_others
|
|
|
|
|
self.finished = [] # type: List[Awaitable] # children that finished
|
|
|
|
|
self.scheduled = [] # type: List[Task] # scheduled wrapper tasks
|
|
|
|
|
self.finished = False
|
|
|
|
|
|
|
|
|
|
def handle(self, task: Task) -> None:
|
|
|
|
|
"""
|
|
|
|
@ -282,13 +281,22 @@ class race(Syscall):
|
|
|
|
|
"""
|
|
|
|
|
finalizer = self._finish
|
|
|
|
|
scheduled = self.scheduled
|
|
|
|
|
finished = self.finished
|
|
|
|
|
|
|
|
|
|
self.callback = task
|
|
|
|
|
scheduled.clear()
|
|
|
|
|
finished.clear()
|
|
|
|
|
self.finished = False
|
|
|
|
|
|
|
|
|
|
for child in self.children:
|
|
|
|
|
# # short-circuit syscalls.
|
|
|
|
|
# if isinstance(child, Syscall):
|
|
|
|
|
# child_task = self._resume_subtask(child)
|
|
|
|
|
# # child_task is a coroutine, we must activate it
|
|
|
|
|
# next(child_task)
|
|
|
|
|
|
|
|
|
|
# scheduled.append(child_task)
|
|
|
|
|
# child.handle(child_task)
|
|
|
|
|
# continue
|
|
|
|
|
|
|
|
|
|
if isinstance(child, _type_gen):
|
|
|
|
|
child_task = child
|
|
|
|
|
else:
|
|
|
|
@ -297,20 +305,26 @@ class race(Syscall):
|
|
|
|
|
scheduled.append(child_task) # type: ignore
|
|
|
|
|
# TODO: document the types here
|
|
|
|
|
|
|
|
|
|
def exit(self, except_for: Task = None) -> None:
|
|
|
|
|
def _resume_subtask(self, child: Awaitable) -> None:
|
|
|
|
|
callback = self.callback
|
|
|
|
|
value = yield
|
|
|
|
|
print(hash(self), "subtask", child, "resumed with", value)
|
|
|
|
|
if not self.finished:
|
|
|
|
|
self.finished.append(child)
|
|
|
|
|
if self.exit_others:
|
|
|
|
|
self.exit(child)
|
|
|
|
|
|
|
|
|
|
_step(callback, value)
|
|
|
|
|
|
|
|
|
|
def exit(self, except_for: Awaitable = None) -> None:
|
|
|
|
|
for task in self.scheduled:
|
|
|
|
|
if task != except_for:
|
|
|
|
|
close(task)
|
|
|
|
|
|
|
|
|
|
def _finish(self, task: Task, result: Any) -> None:
|
|
|
|
|
if not self.finished:
|
|
|
|
|
# because we create tasks for children that are not generators yet,
|
|
|
|
|
# we need to find the child value that the caller supplied
|
|
|
|
|
index = self.scheduled.index(task)
|
|
|
|
|
child = self.children[index]
|
|
|
|
|
self.finished.append(child)
|
|
|
|
|
if self.exit_others:
|
|
|
|
|
self.exit(task)
|
|
|
|
|
self.finished = True
|
|
|
|
|
self.exit(task)
|
|
|
|
|
# Result can be GeneratorExit (see finalize()), which causes the resumed
|
|
|
|
|
# callback to exit cleanly.
|
|
|
|
|
schedule(self.callback, result)
|
|
|
|
|