Skip to content

Commit dfc13a2

Browse files
committed
Iterate on names and things. Simplify.
1 parent b098af9 commit dfc13a2

File tree

2 files changed

+198
-150
lines changed

2 files changed

+198
-150
lines changed

stacklesslib/async.py

Lines changed: 169 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
import traceback
66
import stackless
77
import contextlib
8-
from .util import atomic, timeout
8+
from .util import atomic
9+
from .util import timeout as timeout_ctxt
910

1011
try:
1112
import threading
@@ -21,178 +22,160 @@ def set(self, val=None):
2122
with atomic():
2223
if self.balance < 0:
2324
self.send(val)
24-
def wait(self, delay=None):
25-
if delay is None:
25+
def wait(self, timeout=None):
26+
if timeout is None:
2627
return self.receive()
27-
with timeout(delay):
28+
with timeout_ctxt(timeout):
2829
return self.receive()
2930

3031
class Task(object):
3132
"""Base for all tasks"""
3233

3334
def __init__(self):
34-
self.result = None
35+
self._result = None
3536
self.callbacks = []
3637

3738
@property
3839
def ready(self):
39-
return self.result is not None
40+
"""True if the task has completed execution"""
41+
return self._result is not None
42+
43+
@property
44+
def result(self):
45+
"""The task successful result or None"""
46+
return self._result[1] if self._result and self._result[0] else None
4047

4148
@property
42-
def is_exception(self):
43-
return self.result is not None and not self.result[0]
49+
def exception(self):
50+
"""The task exception tuple or None"""
51+
return self._result[1] if self._result and not self._result[0] else None
52+
53+
def re_raise(self):
54+
"""Re-raise the exception raised by the task, or return None"""
55+
e = self.exception
56+
if e:
57+
try:
58+
raise e[0], e[1], e[2]
59+
finally:
60+
# Clear the traceback to reduce cyclic links
61+
self.clear_exception()
62+
63+
def clear_exception(self):
64+
if self.exception:
65+
self._result = (False, (None, None, None))
4466

4567
def when_ready(self, cb):
4668
"""Append a callback when the event is ready"""
4769
self.callbacks.append(cb)
4870

49-
def on_ready(self):
71+
def _on_ready(self):
5072
for cb in self.callbacks:
51-
cb()
73+
cb(self)
5274

5375
def wait(self, timeout=None):
76+
"""Wait untli task has completed. Returns True if it completed successfully,
77+
False if it raised an exception
78+
"""
5479
with atomic():
55-
if self.result:
56-
return
57-
e = Event()
58-
self.when_ready(e.set)
59-
e.wait(timeout)
80+
if not self._result:
81+
e = Event()
82+
self.when_ready(e.set)
83+
e.wait(timeout)
84+
return self._result[0]
6085

61-
def get_result(self, timeout=None):
62-
self.wait(timeout)
63-
if self.result[0]:
64-
return self.result[1]
65-
# Consider using "AggregateException" here
66-
raise self.result[1][0], self.result[1][1], self.result[1][2]
6786

68-
def get_exception(self, timeout=None):
87+
def reap(self, timeout=None):
88+
"""Wait for the execution of the task and return its result or raise
89+
its exception.
90+
"""
6991
self.wait(timeout)
70-
if self.result[0]:
71-
return None
72-
return self.result[1]
73-
92+
self.re_raise()
93+
return self.result
94+
7495
def post_result(self, result):
7596
assert self.result is None
76-
self.result = (True, result)
77-
self.on_ready()
97+
self._result = (True, result)
98+
self._on_ready()
7899

79100
def post_exception(self, exc, val=None, tb=None):
80101
assert self.result is None
81-
self.result = (False, (exc, val, tb))
82-
self.on_ready()
102+
self._result = (False, (exc, val, tb))
103+
self._on_ready()
104+
105+
@classmethod
106+
def reap_all(cls, tasks, timeout=None):
107+
"""Get results of all tasks. If one or more raise
108+
an exception, an arbitrary re-raised.
109+
"""
110+
with timeout_ctxt(timeout):
111+
return [t.get_result() for t in tasks]
83112

84113
@classmethod
85-
def wait_all(cls, tasks, delay=None):
86-
with timeout(delay):
114+
def reap_any(cls, tasks, timeout=None):
115+
"""Return the result of an arbitrary taskl. The result is returned
116+
as a tuple (i, result), where i is the index in the original list.
117+
Can re-raise an exception that was found on an arbitrary task.
118+
"""
119+
return cls.wait_any(tasks, timeout).reap()
120+
121+
@classmethod
122+
def wait_all(cls, tasks, timeout=None):
123+
with timeout_ctxt(timeout):
87124
for t in tasks:
88125
t.wait()
89126

90127
@classmethod
91-
def wait_any(cls, tasks, delay=None):
128+
def wait_any(cls, tasks, timeout=None):
129+
"""Wait until one of the tasks is ready. Returns the
130+
ready task.
131+
"""
92132
e = Event()
93133
with atomic():
94-
for i, t in enumerate(tasks):
134+
# See if anyone is ready, otherwise, add a callback to our event
135+
for t in tasks:
95136
if t.ready:
96-
return i
97-
def get_cb(i):
98-
def cb():
99-
e.set(i)
100-
return cb
101-
t.when_ready(get_cb(i))
102-
return e.wait(delay)
103-
137+
return t
138+
def cb(t):
139+
e.set(t)
140+
t.when_ready(cb)
141+
# wait for our event
142+
return e.wait(timeout)
143+
144+
# class methpods returning tasks that wait/reap all/any
104145
@classmethod
105-
def when_all(cls, tasks):
146+
def waiter_all(cls, tasks):
106147
return create_task(cls.wait_all, (tasks,))
107148

108149
@classmethod
109-
def when_any(cls, tasks):
150+
def waiter_any(cls, tasks):
110151
return create_task(cls.wait_any, (tasks,))
111152

112153
@classmethod
113-
def get_results(cls, tasks):
114-
try:
115-
return [t.get_result() for t in tasks]
116-
except:
117-
e = sys.exc_info()
118-
try:
119-
for t in tasks:
120-
t.forget()
121-
raise e[0], e[1], e[2]
122-
finally:
123-
e = None #clear traceback cycle
124-
154+
def reaper_all(cls, tasks):
155+
return create_task(cls.reap_all, (tasks,))
156+
157+
@classmethod
158+
def reaper_any(cls, tasks):
159+
return create_task(cls.reap_any, (tasks,))
160+
125161
class DummyTask(Task):
126-
"""A dummy task object"""
162+
"""A dummy task object. Work performed on a dummy is
163+
simply executed directly as a function call.
164+
"""
127165

128166
class TaskletTask(Task):
129-
""" A Task object which work performed by a tasklet """
167+
""" A Task object which work is performed by a tasklet """
130168
def __init__(self, tasklet):
131169
super(TaskletTask, self).__init__()
132170
self.tasklet = tasklet
133171

134172
class ThreadTask(Task):
135-
""" A task object that runs on a thread"""
173+
""" A task object that runs on a separate thread"""
136174
def __init__(self, thread):
137175
super(ThreadTask, self).__init__()
138176
self.thread = thread
139177

140178

141-
class AwaitManager(object):
142-
"""This class provides the "await" method and is used by
143-
Async functions to yield to their caller
144-
"""
145-
def __init__(self, caller):
146-
self.caller = caller
147-
self.caller_continued = False
148-
149-
def _continue_caller(self):
150-
if not self.caller_continued:
151-
self.caller_continued = True
152-
self.caller.run()
153-
154-
def await(self, task, delay=None):
155-
with atomic():
156-
if task.ready:
157-
# get the result without switching
158-
return task.get_result()
159-
# optionally continue the caller
160-
self._continue_caller()
161-
# wait for the result, possibly tasklet blocking
162-
return task.get_result(delay)
163-
164-
165-
def async_call_helper(task, await_manager, callable, args, kwargs):
166-
# remove the caller from the runnables queue. There is a window here where other tasklets
167-
# might run, we need perhaps a primitive to perform this task
168-
try:
169-
await_manager.caller.remove()
170-
try:
171-
result = callable(await_manager, *args, **kwargs)
172-
except:
173-
task.post_exception(*sys.exc_info())
174-
else:
175-
task.post_result(result)
176-
finally:
177-
with atomic():
178-
await_manager._continue_caller()
179-
except:
180-
print >> sys.stderr, "Unhandled exception in ", callable
181-
traceback.print_exc()
182-
183-
def call_async(callable, args=(), kwargs={}):
184-
await_manager = AwaitManager(stackless.getcurrent())
185-
callee = stackless.tasklet(async_call_helper)
186-
task = TaskletTask(callee)
187-
with atomic():
188-
callee(task, await_manager, callable, args, kwargs)
189-
try:
190-
# here, a run(remove=True) or a switch() primitive would be useful
191-
callee.run()
192-
finally:
193-
# need this here, in case caller gets awoken by other means, e.g. exception
194-
await_manager.caller_continued = True
195-
return task
196179

197180
class TaskFactory(object):
198181
"""Base class for TaskFactories"""
@@ -244,19 +227,83 @@ def create_task(callable, args=(), kwargs={}, factory=taskletTaskFactory):
244227
"""Create a task, given a function and arguments"""
245228
return factory.create_task(callable, args, kwargs)
246229

247-
def async(func):
248-
"""Wraps a function as an async function, taking an AwaitHelper as the first argument"""
249-
@contextlib.wraps(func)
250-
def helper(*args, **kwargs):
251-
return call_async(func, args, kwargs)
252-
return helper
253-
254230
def task(factory=taskletTaskFactory):
255231
"""Wraps a function so that it will be executed as a task"""
256232
def decorator(func):
257-
"""Wraps a function as a task"""
258233
@contextlib.wraps(func)
259234
def helper(*args, **kwargs):
260235
return create_task(func, args, kwargs, factory)
261236
return helper
262237
return decorator
238+
239+
# Now comes the "await" functionality. This is a special kind of task
240+
# that has an Awaiter instance in its first argument which contols
241+
# execution flow with its caller
242+
243+
class Awaiter(object):
244+
"""This class provides the "await" method and is used by
245+
Async functions to yield to their caller
246+
"""
247+
def __init__(self, caller):
248+
self.caller = caller
249+
self.caller_continued = False
250+
251+
def _continue_caller(self):
252+
if not self.caller_continued:
253+
self.caller.run()
254+
self.caller_continued = True
255+
256+
def await(self, task, timeout=None):
257+
with atomic():
258+
if task.ready:
259+
# get the result without switching
260+
return task.reap()
261+
# optionally continue the caller
262+
self._continue_caller()
263+
# wait for the result, possibly tasklet blocking
264+
return task.reap(timeout)
265+
266+
267+
def async_call_helper(task, awaiter, callable, args, kwargs):
268+
# remove the caller from the runnables queue. There is a window here where other tasklets
269+
# might run, we need perhaps a primitive to perform this task
270+
try:
271+
awaiter.caller.remove()
272+
try:
273+
result = callable(awaiter, *args, **kwargs)
274+
except:
275+
task.post_exception(*sys.exc_info())
276+
else:
277+
task.post_result(result)
278+
finally:
279+
with atomic(): #for _continue_caller
280+
awaiter._continue_caller()
281+
except:
282+
print >> sys.stderr, "Unhandled exception in ", callable
283+
traceback.print_exc()
284+
285+
class AsyncTask(TaskletTask):
286+
"""A task representing the Async function"""
287+
288+
def call_async(callable, args=(), kwargs={}):
289+
awaiter = Awaiter(stackless.getcurrent())
290+
callee = stackless.tasklet(async_call_helper)
291+
task = AsyncTask(callee)
292+
with atomic():
293+
callee(task, awaiter, callable, args, kwargs)
294+
try:
295+
# here, a run(remove=True) or a switch() primitive would be useful
296+
callee.run()
297+
finally:
298+
# need this here, in case caller gets awoken by other means, e.g. exception
299+
awaiter.caller_continued = True
300+
return task
301+
302+
303+
def async(func):
304+
"""Wraps a function as an async function, taking an Awaiter as the first argument"""
305+
@contextlib.wraps(func)
306+
def helper(*args, **kwargs):
307+
return call_async(func, args, kwargs)
308+
return helper
309+

0 commit comments

Comments
 (0)