core: implement channels for sending messages

pull/328/head
matejcik 5 years ago committed by matejcik
parent e23bb10ec4
commit 6e7fc5f601

@ -24,6 +24,7 @@ if False:
List,
Optional,
Set,
Tuple,
)
Task = Coroutine
@ -256,8 +257,7 @@ class signal(Syscall):
def _deliver(self) -> None:
if self.task is not None and self.value is not _NO_VALUE:
schedule(self.task, self.value)
self.task = None
self.value = _NO_VALUE
self.reset()
def __iter__(self) -> Task: # type: ignore
try:
@ -343,3 +343,87 @@ class spawn(Syscall):
# close() or throw(), kill the children tasks and re-raise
self.exit()
raise
class chan:
"""
Two-ended channel.
The receiving end pauses until a value to be received is available. The sending end
can choose to wait until the value is received, or it can publish the value without
waiting.
Example:
>>> # in task #1:
>>> signal = loop.chan()
>>> while True:
>>> result = await signal.take()
>>> print("awaited result:", result)
>>> # in task #2:
>>> signal.publish("Published without waiting")
>>> print("publish completed")
>>> await signal.put("Put with await")
>>> print("put completed")
Example Output:
publish completed
awaited result: Published without waiting
awaited result: Put with await
put completed
"""
class Put(Syscall):
def __init__(self, ch: "chan") -> None:
self.ch = ch
self.value = None # type: Any
def __call__(self, value: Any) -> Syscall:
self.value = value
return self
def handle(self, task: Task) -> None:
self.ch._schedule_put(task, self.value)
class Take(Syscall):
def __init__(self, ch: "chan") -> None:
self.ch = ch
def __call__(self) -> Syscall:
return self
def handle(self, task) -> None:
self.ch._schedule_take(task)
def __init__(self):
self.putters = [] # type: List[Tuple[Optional[Task], Any]]
self.takers = [] # type: List[Task]
self.put = chan.Put(self)
self.take = chan.Take(self)
def publish(self, value: Any) -> None:
if self.takers:
taker = self.takers.pop(0)
schedule(taker, value)
else:
self.putters.append((None, value))
def _schedule_put(self, putter: Task, value: Any) -> None:
if self.takers:
taker = self.takers.pop(0)
schedule(taker, value)
schedule(putter)
return True
else:
self.putters.append((putter, value))
return False
def _schedule_take(self, taker: Task) -> None:
if self.putters:
putter, value = self.putters.pop(0)
schedule(taker, value)
if putter is not None:
schedule(putter)
else:
self.takers.append(taker)

Loading…
Cancel
Save