66import stackless
77import contextlib
88from .util import atomic
9- from .util import timeout as timeout_ctxt
9+ from . import futures
1010
11- try :
12- import threading
13- _threading = getattr (threading , "real_threading" , threading )
14- except ImportError :
15- _threading = None
1611
17- class Event (stackless .channel ):
18- """A channel that sends an event if anyone is listening"""
19- def __init__ (self ):
20- self .preference = 0
21- def set (self , val = None ):
22- with atomic ():
23- if self .balance < 0 :
24- self .send (val )
25- def wait (self , timeout = None ):
26- if timeout is None :
27- return self .receive ()
28- with timeout_ctxt (timeout ):
29- return self .receive ()
30-
31- class Task (object ):
32- """Base for all tasks"""
33-
34- def __init__ (self ):
35- self ._result = None
36- self .callbacks = []
37-
38- @property
39- def ready (self ):
40- """True if the task has completed execution"""
41- return self ._result is not None
42-
43- @property
44- def result (self ):
45- """The task successful result or None"""
46- return self ._result [1 ] if self ._result and self ._result [0 ] else None
47-
48- @property
49- def exception (self ):
50- """The task exception tuple or None"""
51- return self ._result [1 ] if self ._result and not self ._result [0 ] else None
52-
53- def re_raise (self ):
54- """Re-raise the exception raised by the task, or return None"""
55- e = self .exception
56- if e :
57- try :
58- raise e [0 ], e [1 ], e [2 ]
59- finally :
60- # Clear the traceback to reduce cyclic links
61- self .clear_exception ()
62-
63- def clear_exception (self ):
64- if self .exception :
65- self ._result = (False , (None , None , None ))
66-
67- def when_ready (self , cb ):
68- """Append a callback when the event is ready"""
69- self .callbacks .append (cb )
70-
71- def _on_ready (self ):
72- for cb in self .callbacks :
73- cb (self )
74-
75- def wait (self , timeout = None ):
76- """Wait untli task has completed. Returns True if it completed successfully,
77- False if it raised an exception
78- """
79- with atomic ():
80- if not self ._result :
81- e = Event ()
82- self .when_ready (e .set )
83- e .wait (timeout )
84- return self ._result [0 ]
85-
86-
87- def reap (self , timeout = None ):
88- """Wait for the execution of the task and return its result or raise
89- its exception.
90- """
91- self .wait (timeout )
92- self .re_raise ()
93- return self .result
94-
95- def post_result (self , result ):
96- assert self .result is None
97- self ._result = (True , result )
98- self ._on_ready ()
99-
100- def post_exception (self , exc , val = None , tb = None ):
101- assert self .result is None
102- self ._result = (False , (exc , val , tb ))
103- self ._on_ready ()
104-
105- @classmethod
106- def reap_all (cls , tasks , timeout = None ):
107- """Get results of all tasks. If one or more raise
108- an exception, an arbitrary re-raised.
109- """
110- with timeout_ctxt (timeout ):
111- return [t .get_result () for t in tasks ]
112-
113- @classmethod
114- def reap_any (cls , tasks , timeout = None ):
115- """Return the result of an arbitrary taskl. The result is returned
116- as a tuple (i, result), where i is the index in the original list.
117- Can re-raise an exception that was found on an arbitrary task.
118- """
119- return cls .wait_any (tasks , timeout ).reap ()
120-
121- @classmethod
122- def wait_all (cls , tasks , timeout = None ):
123- with timeout_ctxt (timeout ):
124- for t in tasks :
125- t .wait ()
126-
127- @classmethod
128- def wait_any (cls , tasks , timeout = None ):
129- """Wait until one of the tasks is ready. Returns the
130- ready task.
131- """
132- e = Event ()
133- with atomic ():
134- # See if anyone is ready, otherwise, add a callback to our event
135- for t in tasks :
136- if t .ready :
137- return t
138- def cb (t ):
139- e .set (t )
140- t .when_ready (cb )
141- # wait for our event
142- return e .wait (timeout )
143-
144- # class methpods returning tasks that wait/reap all/any
145- @classmethod
146- def waiter_all (cls , tasks ):
147- return create_task (cls .wait_all , (tasks ,))
148-
149- @classmethod
150- def waiter_any (cls , tasks ):
151- return create_task (cls .wait_any , (tasks ,))
152-
153- @classmethod
154- def reaper_all (cls , tasks ):
155- return create_task (cls .reap_all , (tasks ,))
156-
157- @classmethod
158- def reaper_any (cls , tasks ):
159- return create_task (cls .reap_any , (tasks ,))
160-
161- class DummyTask (Task ):
162- """A dummy task object. Work performed on a dummy is
163- simply executed directly as a function call.
164- """
165-
166- class TaskletTask (Task ):
167- """ A Task object which work is performed by a tasklet """
168- def __init__ (self , tasklet ):
169- super (TaskletTask , self ).__init__ ()
170- self .tasklet = tasklet
171-
172- class ThreadTask (Task ):
173- """ A task object that runs on a separate thread"""
174- def __init__ (self , thread ):
175- super (ThreadTask , self ).__init__ ()
176- self .thread = thread
177-
178-
179-
180- class TaskFactory (object ):
181- """Base class for TaskFactories"""
182- def task_wrapper (self , task , callable , args , kwargs ):
183- try :
184- try :
185- result = callable (* args , ** kwargs )
186- except :
187- task .post_exception (* sys .exc_info ())
188- else :
189- task .post_result (result )
190- except :
191- print >> sys .stderr , "Unhandled exception in " , callable
192- traceback .print_exc ()
193-
194- class DummyTaskFactory (TaskFactory ):
195- """Just executes the task, and wraps the result"""
196- def create_task (self , callable , args = (), kwargs = {}):
197- task = DummyTask ()
198- self .task_wrapper (task , callable , args , kwargs )
199- return task
200-
201- class TaskletTaskFactory (TaskFactory ):
202- """Creates a tasklet to run the task"""
203- def create_task (self , callable , args = (), kwargs = {}):
204- callee = stackless .tasklet (self .task_wrapper )
205- task = TaskletTask (callee )
206- callee (task , callable , args , kwargs )
207- return task
208-
209- class ThreadTaskFactory (TaskFactory ):
210- """Creates tasks that run on a new thread"""
211- def create_task (self , callable , args = (), kwargs = {}):
212- task = None
213- def helper ():
214- """helper to pass "task" into the target"""
215- self .task_wrapper (task , callable , args , kwargs )
216- callee = _threading .Thread (target = helper )
217- task = ThreadTask (callee )
218- callee .start ()
219- return task
220-
221- dummyTaskFactory = DummyTaskFactory ()
222- taskletTaskFactory = TaskletTaskFactory ()
223- threadTaskFactory = ThreadTaskFactory ()
224-
225-
226- def create_task (callable , args = (), kwargs = {}, factory = taskletTaskFactory ):
12+ # Helpers to run create futures and decorate functions as being Future functions
13+ def create_future (callable , args = (), kwargs = {}, executor = futures .tasklet_executor ):
22714 """Create a task, given a function and arguments"""
228- return factory . create_task (callable , args , kwargs )
15+ return executor . submit_args (callable , args , kwargs )
22916
230- def task ( factory = taskletTaskFactory ):
17+ def future ( executor = futures . tasklet_executor ):
23118 """Wraps a function so that it will be executed as a task"""
23219 def decorator (func ):
23320 @contextlib .wraps (func )
23421 def helper (* args , ** kwargs ):
235- return create_task (func , args , kwargs , factory )
22+ return create_future (func , args , kwargs , executor )
23623 return helper
23724 return decorator
23825
239- # Now comes the "await" functionality. This is a special kind of task
26+ # Now comes the "await" functionality. This is a special kind of future method
24027# that has an Awaiter instance in its first argument which contols
24128# execution flow with its caller
24229
@@ -253,51 +40,44 @@ def _continue_caller(self):
25340 self .caller .run ()
25441 self .caller_continued = True
25542
256- def await (self , task , timeout = None ):
43+ def await (self , future , timeout = None ):
25744 with atomic ():
258- if task . ready :
45+ if future . done () :
25946 # get the result without switching
260- return task . reap ()
47+ return future . result ()
26148 # optionally continue the caller
26249 self ._continue_caller ()
26350 # wait for the result, possibly tasklet blocking
264- return task . reap (timeout )
51+ return future . result (timeout )
26552
26653
267- def async_call_helper (task , awaiter , callable , args , kwargs ):
54+ def async_call_helper (future , awaiter , callable , args , kwargs ):
26855 # remove the caller from the runnables queue. There is a window here where other tasklets
26956 # might run, we need perhaps a primitive to perform this task
27057 try :
27158 awaiter .caller .remove ()
27259 try :
273- result = callable (awaiter , * args , ** kwargs )
274- except :
275- task .post_exception (* sys .exc_info ())
276- else :
277- task .post_result (result )
60+ future .execute (callable , (awaiter ,) + args , kwargs )
27861 finally :
27962 with atomic (): #for _continue_caller
28063 awaiter ._continue_caller ()
28164 except :
28265 print >> sys .stderr , "Unhandled exception in " , callable
28366 traceback .print_exc ()
28467
285- class AsyncTask (TaskletTask ):
286- """A task representing the Async function"""
287-
28868def call_async (callable , args = (), kwargs = {}):
28969 awaiter = Awaiter (stackless .getcurrent ())
29070 callee = stackless .tasklet (async_call_helper )
291- task = AsyncTask ( callee )
71+ future = futures . Future ( )
29272 with atomic ():
293- callee (task , awaiter , callable , args , kwargs )
73+ callee (future , awaiter , callable , args , kwargs )
29474 try :
29575 # here, a run(remove=True) or a switch() primitive would be useful
29676 callee .run ()
29777 finally :
29878 # need this here, in case caller gets awoken by other means, e.g. exception
29979 awaiter .caller_continued = True
300- return task
80+ return future
30181
30282
30383def async (func ):
0 commit comments