feat(core): add mailbox as a simplified version of chan

pull/3686/merge^2
matejcik 11 months ago
parent c01a86dd9b
commit ca80a6e9bf

@ -350,6 +350,109 @@ class race(Syscall):
raise
class mailbox(Syscall):
"""
Wait to receive a value.
In terms of synchronization primitives, this is a condition variable that also
contains a value. It is a simplification of Go channels, which is one-ended and
only has a buffer of size 1.
The receiving end pauses until a value is received, and then empties the mailbox
to wait again.
The sending end synchronously posts a value. It is impossible to wait until
the value is consumed. Trying to post a value when the mailbox is full raises
an error, unless `replace=True` is specified
Example:
>>> # in task #1:
>>> box = loop.mailbox()
>>> while True:
>>> result = await box
>>> print("awaited result:", result)
>>> # in task #2:
>>> box.put("Hello from the other task")
>>> print("put completed")
Example Output:
put completed
awaited result: Hello from the other task
"""
_NO_VALUE = object()
def __init__(self, initial_value: Any = _NO_VALUE) -> None:
self.value = initial_value
self.taker: Task | None = None
def is_empty(self) -> bool:
"""Is the mailbox empty?"""
return self.value is self._NO_VALUE
def clear(self) -> None:
"""Empty the mailbox."""
assert self.taker is None
self.value = self._NO_VALUE
def put(self, value: Any, replace: bool = False) -> None:
"""Put a value into the mailbox.
If there is another task waiting for the value, it will be scheduled to resume.
Otherwise, the mailbox will hold the value until someone consumes it.
It is an error to call `put()` when there is a value already held, unless
`replace` is set to `True`. In such case, the held value is replaced with
the new one.
"""
if not self.is_empty() and not replace:
raise ValueError("mailbox already has a value")
self.value = value
if self.taker is not None:
self._take(self.taker)
def _take(self, task: Task) -> None:
"""Take a value and schedule the taker."""
self.taker = None
schedule(task, self.value)
self.clear()
def handle(self, task: Task) -> None:
assert self.taker is None
if not self.is_empty():
self._take(task)
else:
self.taker = task
def __iter__(self) -> Generator:
assert self.taker is None
# short-circuit if there is a value already
if not self.is_empty():
value = self.value
self.clear()
return value
# otherwise, wait for a value
try:
return (yield self)
finally:
# Clear the taker even in case of exception. This way stale takers don't
# blow up someone calling `maybe_close()`
self.taker = None
def maybe_close(self) -> None:
"""Shut down the taker if possible."""
taker = self.taker
self.taker = None
if taker is not None and taker is not this_task:
taker.close()
class chan:
"""
Two-ended channel.

Loading…
Cancel
Save