-
Notifications
You must be signed in to change notification settings - Fork 20
Expand file tree
/
Copy pathnotify.py
More file actions
149 lines (126 loc) · 5.58 KB
/
notify.py
File metadata and controls
149 lines (126 loc) · 5.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
"""Handling of notifications."""
from __future__ import annotations
import select
from typing import TYPE_CHECKING, Callable
from .core import Query
from .error import db_error
if TYPE_CHECKING:
from .db import DB
__all__ = ['NotificationHandler']
# The notification handler
class NotificationHandler:
"""A PostgreSQL client-side asynchronous notification handler."""
def __init__(self, db: DB, event: str, callback: Callable,
arg_dict: dict | None = None,
timeout: int | float | None = None,
stop_event: str | None = None):
"""Initialize the notification handler.
You must pass a PyGreSQL database connection, the name of an
event (notification channel) to listen for and a callback function.
You can also specify a dictionary arg_dict that will be passed as
the single argument to the callback function, and a timeout value
in seconds (a floating point number denotes fractions of seconds).
If it is absent or None, the callers will never time out. If the
timeout is reached, the callback function will be called with a
single argument that is None. If you set the timeout to zero,
the handler will poll notifications synchronously and return.
You can specify the name of the event that will be used to signal
the handler to stop listening as stop_event. By default, it will
be the event name prefixed with 'stop_'.
"""
self.db: DB | None = db
self.event = event
self.stop_event = stop_event or f'stop_{event}'
self.listening = False
self.callback = callback
if arg_dict is None:
arg_dict = {}
self.arg_dict = arg_dict
self.timeout = timeout
def __del__(self) -> None:
"""Delete the notification handler."""
self.unlisten()
def close(self) -> None:
"""Stop listening and close the connection."""
if self.db:
self.unlisten()
self.db.close()
self.db = None
def listen(self) -> None:
"""Start listening for the event and the stop event."""
db = self.db
if db and not self.listening:
db.query(f'listen "{self.event}"')
db.query(f'listen "{self.stop_event}"')
self.listening = True
def unlisten(self) -> None:
"""Stop listening for the event and the stop event."""
db = self.db
if db and self.listening:
db.query(f'unlisten "{self.event}"')
db.query(f'unlisten "{self.stop_event}"')
self.listening = False
def notify(self, db: DB | None = None, stop: bool = False,
payload: str | None = None) -> Query | None:
"""Generate a notification.
Optionally, you can pass a payload with the notification.
If you set the stop flag, a stop notification will be sent that
will cause the handler to stop listening.
Note: If the notification handler is running in another thread, you
must pass a different database connection since PyGreSQL database
connections are not thread-safe.
"""
if not self.listening:
return None
if not db:
db = self.db
if not db:
return None
event = self.stop_event if stop else self.event
cmd = f'notify "{event}"'
if payload:
cmd += f", '{payload}'"
return db.query(cmd)
def __call__(self) -> None:
"""Invoke the notification handler.
The handler is a loop that listens for notifications on the event
and stop event channels. When either of these notifications are
received, its associated 'pid', 'event' and 'extra' (the payload
passed with the notification) are inserted into its arg_dict
dictionary and the callback is invoked with this dictionary as
a single argument. When the handler receives a stop event, it
stops listening to both events and return.
In the special case that the timeout of the handler has been set
to zero, the handler will poll all events synchronously and return.
If will keep listening until it receives a stop event.
Note: If you run this loop in another thread, don't use the same
database connection for database operations in the main thread.
"""
if not self.db:
return
self.listen()
poll = self.timeout == 0
rlist = [] if poll else [self.db.fileno()]
while self.db and self.listening:
# noinspection PyUnboundLocalVariable
if poll or select.select(rlist, [], [], self.timeout)[0]:
while self.db and self.listening:
notice = self.db.getnotify()
if not notice: # no more messages
break
event, pid, extra = notice
if event not in (self.event, self.stop_event):
self.unlisten()
raise db_error(
f'Listening for "{self.event}"'
f' and "{self.stop_event}",'
f' but notified of "{event}"')
if event == self.stop_event:
self.unlisten()
self.arg_dict.update(pid=pid, event=event, extra=extra)
self.callback(self.arg_dict)
if poll:
break
else: # we timed out
self.unlisten()
self.callback(None)