55import traceback
66import stackless
77import contextlib
8- from .util import atomic , timeout
8+ from .util import atomic
9+ from .util import timeout as timeout_ctxt
910
1011try :
1112 import threading
@@ -21,178 +22,160 @@ def set(self, val=None):
2122 with atomic ():
2223 if self .balance < 0 :
2324 self .send (val )
24- def wait (self , delay = None ):
25- if delay is None :
25+ def wait (self , timeout = None ):
26+ if timeout is None :
2627 return self .receive ()
27- with timeout ( delay ):
28+ with timeout_ctxt ( timeout ):
2829 return self .receive ()
2930
3031class Task (object ):
3132 """Base for all tasks"""
3233
3334 def __init__ (self ):
34- self .result = None
35+ self ._result = None
3536 self .callbacks = []
3637
3738 @property
3839 def ready (self ):
39- return self .result is not None
40+ """True if the task has completed execution"""
41+ return self ._result is not None
42+
43+ @property
44+ def result (self ):
45+ """The task successful result or None"""
46+ return self ._result [1 ] if self ._result and self ._result [0 ] else None
4047
4148 @property
42- def is_exception (self ):
43- return self .result is not None and not self .result [0 ]
49+ def exception (self ):
50+ """The task exception tuple or None"""
51+ return self ._result [1 ] if self ._result and not self ._result [0 ] else None
52+
53+ def re_raise (self ):
54+ """Re-raise the exception raised by the task, or return None"""
55+ e = self .exception
56+ if e :
57+ try :
58+ raise e [0 ], e [1 ], e [2 ]
59+ finally :
60+ # Clear the traceback to reduce cyclic links
61+ self .clear_exception ()
62+
63+ def clear_exception (self ):
64+ if self .exception :
65+ self ._result = (False , (None , None , None ))
4466
4567 def when_ready (self , cb ):
4668 """Append a callback when the event is ready"""
4769 self .callbacks .append (cb )
4870
49- def on_ready (self ):
71+ def _on_ready (self ):
5072 for cb in self .callbacks :
51- cb ()
73+ cb (self )
5274
5375 def wait (self , timeout = None ):
76+ """Wait untli task has completed. Returns True if it completed successfully,
77+ False if it raised an exception
78+ """
5479 with atomic ():
55- if self .result :
56- return
57- e = Event ( )
58- self . when_ready ( e . set )
59- e . wait ( timeout )
80+ if not self ._result :
81+ e = Event ()
82+ self . when_ready ( e . set )
83+ e . wait ( timeout )
84+ return self . _result [ 0 ]
6085
61- def get_result (self , timeout = None ):
62- self .wait (timeout )
63- if self .result [0 ]:
64- return self .result [1 ]
65- # Consider using "AggregateException" here
66- raise self .result [1 ][0 ], self .result [1 ][1 ], self .result [1 ][2 ]
6786
68- def get_exception (self , timeout = None ):
87+ def reap (self , timeout = None ):
88+ """Wait for the execution of the task and return its result or raise
89+ its exception.
90+ """
6991 self .wait (timeout )
70- if self .result [0 ]:
71- return None
72- return self .result [1 ]
73-
92+ self .re_raise ()
93+ return self .result
94+
7495 def post_result (self , result ):
7596 assert self .result is None
76- self .result = (True , result )
77- self .on_ready ()
97+ self ._result = (True , result )
98+ self ._on_ready ()
7899
79100 def post_exception (self , exc , val = None , tb = None ):
80101 assert self .result is None
81- self .result = (False , (exc , val , tb ))
82- self .on_ready ()
102+ self ._result = (False , (exc , val , tb ))
103+ self ._on_ready ()
104+
105+ @classmethod
106+ def reap_all (cls , tasks , timeout = None ):
107+ """Get results of all tasks. If one or more raise
108+ an exception, an arbitrary re-raised.
109+ """
110+ with timeout_ctxt (timeout ):
111+ return [t .get_result () for t in tasks ]
83112
84113 @classmethod
85- def wait_all (cls , tasks , delay = None ):
86- with timeout (delay ):
114+ def reap_any (cls , tasks , timeout = None ):
115+ """Return the result of an arbitrary taskl. The result is returned
116+ as a tuple (i, result), where i is the index in the original list.
117+ Can re-raise an exception that was found on an arbitrary task.
118+ """
119+ return cls .wait_any (tasks , timeout ).reap ()
120+
121+ @classmethod
122+ def wait_all (cls , tasks , timeout = None ):
123+ with timeout_ctxt (timeout ):
87124 for t in tasks :
88125 t .wait ()
89126
90127 @classmethod
91- def wait_any (cls , tasks , delay = None ):
128+ def wait_any (cls , tasks , timeout = None ):
129+ """Wait until one of the tasks is ready. Returns the
130+ ready task.
131+ """
92132 e = Event ()
93133 with atomic ():
94- for i , t in enumerate (tasks ):
134+ # See if anyone is ready, otherwise, add a callback to our event
135+ for t in tasks :
95136 if t .ready :
96- return i
97- def get_cb ( i ):
98- def cb ():
99- e . set ( i )
100- return cb
101- t . when_ready ( get_cb ( i ) )
102- return e . wait ( delay )
103-
137+ return t
138+ def cb ( t ):
139+ e . set ( t )
140+ t . when_ready ( cb )
141+ # wait for our event
142+ return e . wait ( timeout )
143+
144+ # class methpods returning tasks that wait/reap all/any
104145 @classmethod
105- def when_all (cls , tasks ):
146+ def waiter_all (cls , tasks ):
106147 return create_task (cls .wait_all , (tasks ,))
107148
108149 @classmethod
109- def when_any (cls , tasks ):
150+ def waiter_any (cls , tasks ):
110151 return create_task (cls .wait_any , (tasks ,))
111152
112153 @classmethod
113- def get_results (cls , tasks ):
114- try :
115- return [t .get_result () for t in tasks ]
116- except :
117- e = sys .exc_info ()
118- try :
119- for t in tasks :
120- t .forget ()
121- raise e [0 ], e [1 ], e [2 ]
122- finally :
123- e = None #clear traceback cycle
124-
154+ def reaper_all (cls , tasks ):
155+ return create_task (cls .reap_all , (tasks ,))
156+
157+ @classmethod
158+ def reaper_any (cls , tasks ):
159+ return create_task (cls .reap_any , (tasks ,))
160+
125161class DummyTask (Task ):
126- """A dummy task object"""
162+ """A dummy task object. Work performed on a dummy is
163+ simply executed directly as a function call.
164+ """
127165
128166class TaskletTask (Task ):
129- """ A Task object which work performed by a tasklet """
167+ """ A Task object which work is performed by a tasklet """
130168 def __init__ (self , tasklet ):
131169 super (TaskletTask , self ).__init__ ()
132170 self .tasklet = tasklet
133171
134172class ThreadTask (Task ):
135- """ A task object that runs on a thread"""
173+ """ A task object that runs on a separate thread"""
136174 def __init__ (self , thread ):
137175 super (ThreadTask , self ).__init__ ()
138176 self .thread = thread
139177
140178
141- class AwaitManager (object ):
142- """This class provides the "await" method and is used by
143- Async functions to yield to their caller
144- """
145- def __init__ (self , caller ):
146- self .caller = caller
147- self .caller_continued = False
148-
149- def _continue_caller (self ):
150- if not self .caller_continued :
151- self .caller_continued = True
152- self .caller .run ()
153-
154- def await (self , task , delay = None ):
155- with atomic ():
156- if task .ready :
157- # get the result without switching
158- return task .get_result ()
159- # optionally continue the caller
160- self ._continue_caller ()
161- # wait for the result, possibly tasklet blocking
162- return task .get_result (delay )
163-
164-
165- def async_call_helper (task , await_manager , callable , args , kwargs ):
166- # remove the caller from the runnables queue. There is a window here where other tasklets
167- # might run, we need perhaps a primitive to perform this task
168- try :
169- await_manager .caller .remove ()
170- try :
171- result = callable (await_manager , * args , ** kwargs )
172- except :
173- task .post_exception (* sys .exc_info ())
174- else :
175- task .post_result (result )
176- finally :
177- with atomic ():
178- await_manager ._continue_caller ()
179- except :
180- print >> sys .stderr , "Unhandled exception in " , callable
181- traceback .print_exc ()
182-
183- def call_async (callable , args = (), kwargs = {}):
184- await_manager = AwaitManager (stackless .getcurrent ())
185- callee = stackless .tasklet (async_call_helper )
186- task = TaskletTask (callee )
187- with atomic ():
188- callee (task , await_manager , callable , args , kwargs )
189- try :
190- # here, a run(remove=True) or a switch() primitive would be useful
191- callee .run ()
192- finally :
193- # need this here, in case caller gets awoken by other means, e.g. exception
194- await_manager .caller_continued = True
195- return task
196179
197180class TaskFactory (object ):
198181 """Base class for TaskFactories"""
@@ -244,19 +227,83 @@ def create_task(callable, args=(), kwargs={}, factory=taskletTaskFactory):
244227 """Create a task, given a function and arguments"""
245228 return factory .create_task (callable , args , kwargs )
246229
247- def async (func ):
248- """Wraps a function as an async function, taking an AwaitHelper as the first argument"""
249- @contextlib .wraps (func )
250- def helper (* args , ** kwargs ):
251- return call_async (func , args , kwargs )
252- return helper
253-
254230def task (factory = taskletTaskFactory ):
255231 """Wraps a function so that it will be executed as a task"""
256232 def decorator (func ):
257- """Wraps a function as a task"""
258233 @contextlib .wraps (func )
259234 def helper (* args , ** kwargs ):
260235 return create_task (func , args , kwargs , factory )
261236 return helper
262237 return decorator
238+
239+ # Now comes the "await" functionality. This is a special kind of task
240+ # that has an Awaiter instance in its first argument which contols
241+ # execution flow with its caller
242+
243+ class Awaiter (object ):
244+ """This class provides the "await" method and is used by
245+ Async functions to yield to their caller
246+ """
247+ def __init__ (self , caller ):
248+ self .caller = caller
249+ self .caller_continued = False
250+
251+ def _continue_caller (self ):
252+ if not self .caller_continued :
253+ self .caller .run ()
254+ self .caller_continued = True
255+
256+ def await (self , task , timeout = None ):
257+ with atomic ():
258+ if task .ready :
259+ # get the result without switching
260+ return task .reap ()
261+ # optionally continue the caller
262+ self ._continue_caller ()
263+ # wait for the result, possibly tasklet blocking
264+ return task .reap (timeout )
265+
266+
267+ def async_call_helper (task , awaiter , callable , args , kwargs ):
268+ # remove the caller from the runnables queue. There is a window here where other tasklets
269+ # might run, we need perhaps a primitive to perform this task
270+ try :
271+ awaiter .caller .remove ()
272+ try :
273+ result = callable (awaiter , * args , ** kwargs )
274+ except :
275+ task .post_exception (* sys .exc_info ())
276+ else :
277+ task .post_result (result )
278+ finally :
279+ with atomic (): #for _continue_caller
280+ awaiter ._continue_caller ()
281+ except :
282+ print >> sys .stderr , "Unhandled exception in " , callable
283+ traceback .print_exc ()
284+
285+ class AsyncTask (TaskletTask ):
286+ """A task representing the Async function"""
287+
288+ def call_async (callable , args = (), kwargs = {}):
289+ awaiter = Awaiter (stackless .getcurrent ())
290+ callee = stackless .tasklet (async_call_helper )
291+ task = AsyncTask (callee )
292+ with atomic ():
293+ callee (task , awaiter , callable , args , kwargs )
294+ try :
295+ # here, a run(remove=True) or a switch() primitive would be useful
296+ callee .run ()
297+ finally :
298+ # need this here, in case caller gets awoken by other means, e.g. exception
299+ awaiter .caller_continued = True
300+ return task
301+
302+
303+ def async (func ):
304+ """Wraps a function as an async function, taking an Awaiter as the first argument"""
305+ @contextlib .wraps (func )
306+ def helper (* args , ** kwargs ):
307+ return call_async (func , args , kwargs )
308+ return helper
309+
0 commit comments