@@ -53,6 +53,10 @@ class TimeoutError(Error):
5353 """The operation exceeded the given deadline."""
5454 pass
5555
56+ class InvalidStateError (Error ):
57+ """The operation is not allowed in this state."""
58+ pass
59+
5660class _Waiter (object ):
5761 """Provides the event that wait() and as_completed() block on."""
5862 def __init__ (self ):
@@ -170,6 +174,29 @@ def _create_and_install_waiters(fs, return_when):
170174
171175 return waiter
172176
177+
178+ def _yield_finished_futures (fs , waiter , ref_collect ):
179+ """
180+ Iterate on the list *fs*, yielding finished futures one by one in
181+ reverse order.
182+ Before yielding a future, *waiter* is removed from its waiters
183+ and the future is removed from each set in the collection of sets
184+ *ref_collect*.
185+
186+ The aim of this function is to avoid keeping stale references after
187+ the future is yielded and before the iterator resumes.
188+ """
189+ while fs :
190+ f = fs [- 1 ]
191+ for futures_set in ref_collect :
192+ futures_set .remove (f )
193+ with f ._condition :
194+ f ._waiters .remove (waiter )
195+ del f
196+ # Careful not to keep a reference to the popped value
197+ yield fs .pop ()
198+
199+
173200def as_completed (fs , timeout = None ):
174201 """An iterator over the given futures that yields each as it completes.
175202
@@ -189,28 +216,30 @@ def as_completed(fs, timeout=None):
189216 before the given timeout.
190217 """
191218 if timeout is not None :
192- end_time = timeout + time .time ()
219+ end_time = timeout + time .monotonic ()
193220
194221 fs = set (fs )
222+ total_futures = len (fs )
195223 with _AcquireFutures (fs ):
196224 finished = set (
197225 f for f in fs
198226 if f ._state in [CANCELLED_AND_NOTIFIED , FINISHED ])
199227 pending = fs - finished
200228 waiter = _create_and_install_waiters (fs , _AS_COMPLETED )
201-
229+ finished = list ( finished )
202230 try :
203- yield from finished
231+ yield from _yield_finished_futures (finished , waiter ,
232+ ref_collect = (fs ,))
204233
205234 while pending :
206235 if timeout is None :
207236 wait_timeout = None
208237 else :
209- wait_timeout = end_time - time .time ()
238+ wait_timeout = end_time - time .monotonic ()
210239 if wait_timeout < 0 :
211240 raise TimeoutError (
212241 '%d (of %d) futures unfinished' % (
213- len (pending ), len ( fs ) ))
242+ len (pending ), total_futures ))
214243
215244 waiter .event .wait (wait_timeout )
216245
@@ -219,11 +248,13 @@ def as_completed(fs, timeout=None):
219248 waiter .finished_futures = []
220249 waiter .event .clear ()
221250
222- for future in finished :
223- yield future
224- pending .remove (future )
251+ # reverse to keep finishing order
252+ finished .reverse ()
253+ yield from _yield_finished_futures (finished , waiter ,
254+ ref_collect = (fs , pending ))
225255
226256 finally :
257+ # Remove waiter from unfinished futures
227258 for f in fs :
228259 with f ._condition :
229260 f ._waiters .remove (waiter )
@@ -373,7 +404,10 @@ def add_done_callback(self, fn):
373404 if self ._state not in [CANCELLED , CANCELLED_AND_NOTIFIED , FINISHED ]:
374405 self ._done_callbacks .append (fn )
375406 return
376- fn (self )
407+ try :
408+ fn (self )
409+ except Exception :
410+ LOGGER .exception ('exception calling callback for %r' , self )
377411
378412 def result (self , timeout = None ):
379413 """Return the result of the call that the future represents.
@@ -486,6 +520,8 @@ def set_result(self, result):
486520 Should only be used by Executor implementations and unit tests.
487521 """
488522 with self ._condition :
523+ if self ._state in {CANCELLED , CANCELLED_AND_NOTIFIED , FINISHED }:
524+ raise InvalidStateError ('{}: {!r}' .format (self ._state , self ))
489525 self ._result = result
490526 self ._state = FINISHED
491527 for waiter in self ._waiters :
@@ -499,6 +535,8 @@ def set_exception(self, exception):
499535 Should only be used by Executor implementations and unit tests.
500536 """
501537 with self ._condition :
538+ if self ._state in {CANCELLED , CANCELLED_AND_NOTIFIED , FINISHED }:
539+ raise InvalidStateError ('{}: {!r}' .format (self ._state , self ))
502540 self ._exception = exception
503541 self ._state = FINISHED
504542 for waiter in self ._waiters :
@@ -509,7 +547,7 @@ def set_exception(self, exception):
509547class Executor (object ):
510548 """This is an abstract base class for concrete asynchronous executors."""
511549
512- def submit (self , fn , * args , ** kwargs ):
550+ def submit (* args , ** kwargs ):
513551 """Submits a callable to be executed with the given arguments.
514552
515553 Schedules the callable to be executed as fn(*args, **kwargs) and returns
@@ -518,7 +556,21 @@ def submit(self, fn, *args, **kwargs):
518556 Returns:
519557 A Future representing the given call.
520558 """
559+ if len (args ) >= 2 :
560+ pass
561+ elif not args :
562+ raise TypeError ("descriptor 'submit' of 'Executor' object "
563+ "needs an argument" )
564+ elif 'fn' in kwargs :
565+ import warnings
566+ warnings .warn ("Passing 'fn' as keyword argument is deprecated" ,
567+ DeprecationWarning , stacklevel = 2 )
568+ else :
569+ raise TypeError ('submit expected at least 1 positional argument, '
570+ 'got %d' % (len (args )- 1 ))
571+
521572 raise NotImplementedError ()
573+ submit .__text_signature__ = '($self, fn, /, *args, **kwargs)'
522574
523575 def map (self , fn , * iterables , timeout = None , chunksize = 1 ):
524576 """Returns an iterator equivalent to map(fn, iter).
@@ -543,19 +595,22 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):
543595 Exception: If fn(*args) raises for any values.
544596 """
545597 if timeout is not None :
546- end_time = timeout + time .time ()
598+ end_time = timeout + time .monotonic ()
547599
548600 fs = [self .submit (fn , * args ) for args in zip (* iterables )]
549601
550602 # Yield must be hidden in closure so that the futures are submitted
551603 # before the first iterator value is required.
552604 def result_iterator ():
553605 try :
554- for future in fs :
606+ # reverse to keep finishing order
607+ fs .reverse ()
608+ while fs :
609+ # Careful not to keep a reference to the popped future
555610 if timeout is None :
556- yield future .result ()
611+ yield fs . pop () .result ()
557612 else :
558- yield future . result (end_time - time .time ())
613+ yield fs . pop (). result (end_time - time .monotonic ())
559614 finally :
560615 for future in fs :
561616 future .cancel ()
@@ -580,3 +635,9 @@ def __enter__(self):
580635 def __exit__ (self , exc_type , exc_val , exc_tb ):
581636 self .shutdown (wait = True )
582637 return False
638+
639+
640+ class BrokenExecutor (RuntimeError ):
641+ """
642+ Raised when a executor has become non-functional after a severe failure.
643+ """
0 commit comments