-
-
Notifications
You must be signed in to change notification settings - Fork 145
Expand file tree
/
Copy pathsimple_pub_sub.py
More file actions
85 lines (65 loc) · 2.64 KB
/
simple_pub_sub.py
File metadata and controls
85 lines (65 loc) · 2.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
"""Simple public-subscribe system"""
from __future__ import annotations
from asyncio import Future, Queue, create_task, get_running_loop, sleep
from collections.abc import AsyncIterator, Callable
from typing import Any
from .is_awaitable import is_awaitable
__all__ = ["SimplePubSub", "SimplePubSubIterator"]
class SimplePubSub:
"""A very simple publish-subscript system.
Creates an AsyncIterator from an EventEmitter.
Useful for mocking a PubSub system for tests.
"""
subscribers: set[Callable]
def __init__(self) -> None:
self.subscribers = set()
def emit(self, event: Any) -> bool:
"""Emit an event."""
for subscriber in self.subscribers:
result = subscriber(event)
if is_awaitable(result):
create_task(result) # type: ignore # noqa: RUF006
return bool(self.subscribers)
def get_subscriber(self, transform: Callable | None = None) -> SimplePubSubIterator:
"""Return subscriber iterator"""
return SimplePubSubIterator(self, transform)
class SimplePubSubIterator(AsyncIterator):
"""Async iterator used for subscriptions."""
def __init__(self, pubsub: SimplePubSub, transform: Callable | None) -> None:
self.pubsub = pubsub
self.transform = transform
self.pull_queue: Queue[Future] = Queue()
self.push_queue: Queue[Any] = Queue()
self.listening = True
pubsub.subscribers.add(self.push_value)
def __aiter__(self) -> SimplePubSubIterator:
return self
async def __anext__(self) -> Any:
if not self.listening:
raise StopAsyncIteration
await sleep(0)
if not self.push_queue.empty():
return await self.push_queue.get()
future = get_running_loop().create_future()
await self.pull_queue.put(future)
return future
async def aclose(self) -> None:
"""Close the iterator."""
if self.listening:
await self.empty_queue()
async def empty_queue(self) -> None:
"""Empty the queue."""
self.listening = False
self.pubsub.subscribers.remove(self.push_value)
while not self.pull_queue.empty():
future = await self.pull_queue.get()
future.cancel()
while not self.push_queue.empty():
await self.push_queue.get()
async def push_value(self, event: Any) -> None:
"""Push a new value."""
value = event if self.transform is None else self.transform(event)
if self.pull_queue.empty():
await self.push_queue.put(value)
else:
(await self.pull_queue.get()).set_result(value)