-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathevents.py
More file actions
192 lines (169 loc) · 6.41 KB
/
events.py
File metadata and controls
192 lines (169 loc) · 6.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
#events.py
import sys
import threading
import traceback
import heapq
from .base import time as elapsed_time
# A event queue class.
class EventQueue(object):
"""
This class manages future events. Its scheduling functions have an interface that
match PEP 3156: http://www.python.org/dev/peps/pep-3156/#event-loop-interface
e.g. call_soon, call_later, etc.
"""
def __init__(self):
self.queue = [] # A heapq for events
self.time_offset = 0 # time offset for scheduling
self.sequence = 0 # unique index
self.lock = threading.Lock()
def __len__(self):
return len(self.queue)
@staticmethod
def time():
return elapsed_time()
def reschedule(self, delta_t):
"""
Apply a delta-t to all existing timed events
"""
self.time_offset -= delta_t
def call_soon(self, callback, *args):
"""
Cause the given callback to be performed as soon as possible
"""
# policy decision. soon means now. There might be other earlier, though
return self._call_at(self.time(), -1, callback, args)
def call_later(self, delay, callback, *args):
"""
Cause the given callback to be scheduled for call after 'delay' seconds
"""
time = self.time() + self.time_offset + delay
return self._call_at(time, -1, callback, args)
def call_later_threadsafe(self, delay, callback, *args):
"""
Cause the given callback to be scheduled for call after 'delay' seconds
"""
result = self.call_later(delay, callback, *args)
self.cancel_sleep()
return result
def call_at(self, deadline, callback, *args):
"""
Cause the given callback to be scheduled for call at a certain time
"""
time = deadline + self.time_offset
return self._call_at(time, -1, callback, args, delay)
def cancel_sleep(self):
"""
Attempt to wake up any thread that is sleeping until the next event
is due. The default implementation does nothing.
"""
pass
def call_repeatedly(self, interval, callback, *args):
"""
Cause the given callback to be called every 'interval' seconds.
"""
time = self.time() + self.time_offset + interval
return self._call_at(time, interval, callback, args)
def _call_at(self, when, interval, callback, args):
#print self.time(), (when, interval, callback, args)
with self.lock:
sequence = self.sequence
self.sequence += 1
# s is a disambiguator for equal deadlines.
entry = (when, sequence, interval, callback, args)
heapq.heappush(self.queue, entry)
return Handle(self, sequence, callback, args)
def _cancel(self, sequence):
"""
Cancel an event that has been submitted. Raise ValueError if it isn't there.
"""
# Note, there is no way currently to ensure that either the event was
# removed or successfully executed, i.e. no synchronization.
# Caveat Emptor.
with self.lock:
for i, e in enumerate(self.queue):
if e[1] == sequence:
del self.queue[i]
heapq.heapify(self.queue) #heapq has no "remove" method
return
raise ValueError("event not in queue")
def pump(self):
"""
The worker function for the main loop to process events in the queue
"""
# produce a batch of events to perform this time. This makes sure
# that new events created don't add to our job, thus making the loop
# infinite.
now = self.time() + self.time_offset
batch = []
with self.lock:
while self.queue:
t = self.queue[0][0]
if t <= now:
batch.append(heapq.heappop(self.queue))
else:
break
# Run the events
for event in batch:
if event[2] >= 0.0:
# reschedule a repeated event with the same sequence id
with self.lock:
entry = (now + event[2], ) + event[1:]
heapq.heappush(self.queue, entry)
try:
event[3](*event[4]) # callback(*args)
except Exception:
self.handle_exception(sys.exc_info())
return len(batch)
@property
def is_due(self):
"""Returns true if the queue needs pumping now."""
return self.due_delay() <= 0.0
def due_delay(self):
"""delay in seconds until the next event, or None"""
with self.lock:
if self.queue:
t = self.queue[0][0]
if t < 0:
return 0.0
now = self.time() + self.time_offset
return max(0.0, t-now)
return None
def handle_exception(self, exc_info):
traceback.print_exception(*exc_info)
class Handle(object):
"""
This object represents a cancelable event from the EventQueue.
See http://www.python.org/dev/peps/pep-3156
"""
def __init__(self, queue, sequence, callback, args):
self._queue = queue
self._sequence = sequence
# public attributes
self.cancelled = None
self.callback = callback
self.args = args
def cancel(self):
"""
exact semantics of this call are not yet defined, see
http://www.python.org/dev/peps/pep-3156
Currently returns True if it was successfully cancelled, False if it had already run
"""
if self.cancelled is not None:
try:
self._queue._cancel(self._sequence)
except ValueError:
self.cancelled = False # it already ran
else:
self.cancelled = True
return self.cancelled
class DummyEventQueue(object):
"""
Instances of this class raise errors. Use this in an application where
there is no pumping of the event queue to detect errors
"""
def bork(self, *args, **kwds):
raise NotImplementedError("events are not being pumped")
call_later = bork
call_soon = bork
call_repeatedly = bork
pump = bork