Skip to content

Commit 3cfa8b7

Browse files
committed
Futures cleanup. Add two convenience functions to wait for results.
stacklesslib.async is now based on futures.
1 parent 2d85b06 commit 3cfa8b7

File tree

4 files changed

+85
-411
lines changed

4 files changed

+85
-411
lines changed

stacklesslib/async.py

Lines changed: 16 additions & 236 deletions
Original file line numberDiff line numberDiff line change
@@ -6,237 +6,24 @@
66
import stackless
77
import contextlib
88
from .util import atomic
9-
from .util import timeout as timeout_ctxt
9+
from . import futures
1010

11-
try:
12-
import threading
13-
_threading = getattr(threading, "real_threading", threading)
14-
except ImportError:
15-
_threading = None
1611

17-
class Event(stackless.channel):
18-
"""A channel that sends an event if anyone is listening"""
19-
def __init__(self):
20-
self.preference = 0
21-
def set(self, val=None):
22-
with atomic():
23-
if self.balance < 0:
24-
self.send(val)
25-
def wait(self, timeout=None):
26-
if timeout is None:
27-
return self.receive()
28-
with timeout_ctxt(timeout):
29-
return self.receive()
30-
31-
class Task(object):
32-
"""Base for all tasks"""
33-
34-
def __init__(self):
35-
self._result = None
36-
self.callbacks = []
37-
38-
@property
39-
def ready(self):
40-
"""True if the task has completed execution"""
41-
return self._result is not None
42-
43-
@property
44-
def result(self):
45-
"""The task successful result or None"""
46-
return self._result[1] if self._result and self._result[0] else None
47-
48-
@property
49-
def exception(self):
50-
"""The task exception tuple or None"""
51-
return self._result[1] if self._result and not self._result[0] else None
52-
53-
def re_raise(self):
54-
"""Re-raise the exception raised by the task, or return None"""
55-
e = self.exception
56-
if e:
57-
try:
58-
raise e[0], e[1], e[2]
59-
finally:
60-
# Clear the traceback to reduce cyclic links
61-
self.clear_exception()
62-
63-
def clear_exception(self):
64-
if self.exception:
65-
self._result = (False, (None, None, None))
66-
67-
def when_ready(self, cb):
68-
"""Append a callback when the event is ready"""
69-
self.callbacks.append(cb)
70-
71-
def _on_ready(self):
72-
for cb in self.callbacks:
73-
cb(self)
74-
75-
def wait(self, timeout=None):
76-
"""Wait untli task has completed. Returns True if it completed successfully,
77-
False if it raised an exception
78-
"""
79-
with atomic():
80-
if not self._result:
81-
e = Event()
82-
self.when_ready(e.set)
83-
e.wait(timeout)
84-
return self._result[0]
85-
86-
87-
def reap(self, timeout=None):
88-
"""Wait for the execution of the task and return its result or raise
89-
its exception.
90-
"""
91-
self.wait(timeout)
92-
self.re_raise()
93-
return self.result
94-
95-
def post_result(self, result):
96-
assert self.result is None
97-
self._result = (True, result)
98-
self._on_ready()
99-
100-
def post_exception(self, exc, val=None, tb=None):
101-
assert self.result is None
102-
self._result = (False, (exc, val, tb))
103-
self._on_ready()
104-
105-
@classmethod
106-
def reap_all(cls, tasks, timeout=None):
107-
"""Get results of all tasks. If one or more raise
108-
an exception, an arbitrary re-raised.
109-
"""
110-
with timeout_ctxt(timeout):
111-
return [t.get_result() for t in tasks]
112-
113-
@classmethod
114-
def reap_any(cls, tasks, timeout=None):
115-
"""Return the result of an arbitrary taskl. The result is returned
116-
as a tuple (i, result), where i is the index in the original list.
117-
Can re-raise an exception that was found on an arbitrary task.
118-
"""
119-
return cls.wait_any(tasks, timeout).reap()
120-
121-
@classmethod
122-
def wait_all(cls, tasks, timeout=None):
123-
with timeout_ctxt(timeout):
124-
for t in tasks:
125-
t.wait()
126-
127-
@classmethod
128-
def wait_any(cls, tasks, timeout=None):
129-
"""Wait until one of the tasks is ready. Returns the
130-
ready task.
131-
"""
132-
e = Event()
133-
with atomic():
134-
# See if anyone is ready, otherwise, add a callback to our event
135-
for t in tasks:
136-
if t.ready:
137-
return t
138-
def cb(t):
139-
e.set(t)
140-
t.when_ready(cb)
141-
# wait for our event
142-
return e.wait(timeout)
143-
144-
# class methpods returning tasks that wait/reap all/any
145-
@classmethod
146-
def waiter_all(cls, tasks):
147-
return create_task(cls.wait_all, (tasks,))
148-
149-
@classmethod
150-
def waiter_any(cls, tasks):
151-
return create_task(cls.wait_any, (tasks,))
152-
153-
@classmethod
154-
def reaper_all(cls, tasks):
155-
return create_task(cls.reap_all, (tasks,))
156-
157-
@classmethod
158-
def reaper_any(cls, tasks):
159-
return create_task(cls.reap_any, (tasks,))
160-
161-
class DummyTask(Task):
162-
"""A dummy task object. Work performed on a dummy is
163-
simply executed directly as a function call.
164-
"""
165-
166-
class TaskletTask(Task):
167-
""" A Task object which work is performed by a tasklet """
168-
def __init__(self, tasklet):
169-
super(TaskletTask, self).__init__()
170-
self.tasklet = tasklet
171-
172-
class ThreadTask(Task):
173-
""" A task object that runs on a separate thread"""
174-
def __init__(self, thread):
175-
super(ThreadTask, self).__init__()
176-
self.thread = thread
177-
178-
179-
180-
class TaskFactory(object):
181-
"""Base class for TaskFactories"""
182-
def task_wrapper(self, task, callable, args, kwargs):
183-
try:
184-
try:
185-
result = callable(*args, **kwargs)
186-
except:
187-
task.post_exception(*sys.exc_info())
188-
else:
189-
task.post_result(result)
190-
except:
191-
print >> sys.stderr, "Unhandled exception in ", callable
192-
traceback.print_exc()
193-
194-
class DummyTaskFactory(TaskFactory):
195-
"""Just executes the task, and wraps the result"""
196-
def create_task(self, callable, args=(), kwargs={}):
197-
task = DummyTask()
198-
self.task_wrapper(task, callable, args, kwargs)
199-
return task
200-
201-
class TaskletTaskFactory(TaskFactory):
202-
"""Creates a tasklet to run the task"""
203-
def create_task(self, callable, args=(), kwargs={}):
204-
callee = stackless.tasklet(self.task_wrapper)
205-
task = TaskletTask(callee)
206-
callee(task, callable, args, kwargs)
207-
return task
208-
209-
class ThreadTaskFactory(TaskFactory):
210-
"""Creates tasks that run on a new thread"""
211-
def create_task(self, callable, args=(), kwargs={}):
212-
task = None
213-
def helper():
214-
"""helper to pass "task" into the target"""
215-
self.task_wrapper(task, callable, args, kwargs)
216-
callee = _threading.Thread(target=helper)
217-
task = ThreadTask(callee)
218-
callee.start()
219-
return task
220-
221-
dummyTaskFactory = DummyTaskFactory()
222-
taskletTaskFactory = TaskletTaskFactory()
223-
threadTaskFactory = ThreadTaskFactory()
224-
225-
226-
def create_task(callable, args=(), kwargs={}, factory=taskletTaskFactory):
12+
# Helpers to run create futures and decorate functions as being Future functions
13+
def create_future(callable, args=(), kwargs={}, executor=futures.tasklet_executor):
22714
"""Create a task, given a function and arguments"""
228-
return factory.create_task(callable, args, kwargs)
15+
return executor.submit_args(callable, args, kwargs)
22916

230-
def task(factory=taskletTaskFactory):
17+
def future(executor=futures.tasklet_executor):
23118
"""Wraps a function so that it will be executed as a task"""
23219
def decorator(func):
23320
@contextlib.wraps(func)
23421
def helper(*args, **kwargs):
235-
return create_task(func, args, kwargs, factory)
22+
return create_future(func, args, kwargs, executor)
23623
return helper
23724
return decorator
23825

239-
# Now comes the "await" functionality. This is a special kind of task
26+
# Now comes the "await" functionality. This is a special kind of future method
24027
# that has an Awaiter instance in its first argument which contols
24128
# execution flow with its caller
24229

@@ -253,51 +40,44 @@ def _continue_caller(self):
25340
self.caller.run()
25441
self.caller_continued = True
25542

256-
def await(self, task, timeout=None):
43+
def await(self, future, timeout=None):
25744
with atomic():
258-
if task.ready:
45+
if future.done():
25946
# get the result without switching
260-
return task.reap()
47+
return future.result()
26148
# optionally continue the caller
26249
self._continue_caller()
26350
# wait for the result, possibly tasklet blocking
264-
return task.reap(timeout)
51+
return future.result(timeout)
26552

26653

267-
def async_call_helper(task, awaiter, callable, args, kwargs):
54+
def async_call_helper(future, awaiter, callable, args, kwargs):
26855
# remove the caller from the runnables queue. There is a window here where other tasklets
26956
# might run, we need perhaps a primitive to perform this task
27057
try:
27158
awaiter.caller.remove()
27259
try:
273-
result = callable(awaiter, *args, **kwargs)
274-
except:
275-
task.post_exception(*sys.exc_info())
276-
else:
277-
task.post_result(result)
60+
future.execute(callable, (awaiter,) + args, kwargs)
27861
finally:
27962
with atomic(): #for _continue_caller
28063
awaiter._continue_caller()
28164
except:
28265
print >> sys.stderr, "Unhandled exception in ", callable
28366
traceback.print_exc()
28467

285-
class AsyncTask(TaskletTask):
286-
"""A task representing the Async function"""
287-
28868
def call_async(callable, args=(), kwargs={}):
28969
awaiter = Awaiter(stackless.getcurrent())
29070
callee = stackless.tasklet(async_call_helper)
291-
task = AsyncTask(callee)
71+
future = futures.Future()
29272
with atomic():
293-
callee(task, awaiter, callable, args, kwargs)
73+
callee(future, awaiter, callable, args, kwargs)
29474
try:
29575
# here, a run(remove=True) or a switch() primitive would be useful
29676
callee.run()
29777
finally:
29878
# need this here, in case caller gets awoken by other means, e.g. exception
29979
awaiter.caller_continued = True
300-
return task
80+
return future
30181

30282

30383
def async(func):

0 commit comments

Comments
 (0)