@@ -169,7 +169,8 @@ def __init__(self, processes=None, initializer=None, initargs=(),
169169
170170 self ._task_handler = threading .Thread (
171171 target = Pool ._handle_tasks ,
172- args = (self ._taskqueue , self ._quick_put , self ._outqueue , self ._pool )
172+ args = (self ._taskqueue , self ._quick_put , self ._outqueue ,
173+ self ._pool , self ._cache )
173174 )
174175 self ._task_handler .daemon = True
175176 self ._task_handler ._state = RUN
@@ -329,26 +330,38 @@ def _handle_workers(pool):
329330 debug ('worker handler exiting' )
330331
331332 @staticmethod
332- def _handle_tasks (taskqueue , put , outqueue , pool ):
333+ def _handle_tasks (taskqueue , put , outqueue , pool , cache ):
333334 thread = threading .current_thread ()
334335
335336 for taskseq , set_length in iter (taskqueue .get , None ):
337+ task = None
336338 i = - 1
337- for i , task in enumerate (taskseq ):
338- if thread ._state :
339- debug ('task handler found thread._state != RUN' )
340- break
341- try :
342- put (task )
343- except IOError :
344- debug ('could not put task on queue' )
345- break
346- else :
339+ try :
340+ for i , task in enumerate (taskseq ):
341+ if thread ._state :
342+ debug ('task handler found thread._state != RUN' )
343+ break
344+ try :
345+ put (task )
346+ except Exception as e :
347+ job , ind = task [:2 ]
348+ try :
349+ cache [job ]._set (ind , (False , e ))
350+ except KeyError :
351+ pass
352+ else :
353+ if set_length :
354+ debug ('doing set_length()' )
355+ set_length (i + 1 )
356+ continue
357+ break
358+ except Exception as ex :
359+ job , ind = task [:2 ] if task else (0 , 0 )
360+ if job in cache :
361+ cache [job ]._set (ind + 1 , (False , ex ))
347362 if set_length :
348363 debug ('doing set_length()' )
349364 set_length (i + 1 )
350- continue
351- break
352365 else :
353366 debug ('task handler got sentinel' )
354367
@@ -565,6 +578,8 @@ def _set(self, i, obj):
565578 self ._cond .release ()
566579 del self ._cache [self ._job ]
567580
581+ AsyncResult = ApplyResult # create alias -- see #17805
582+
568583#
569584# Class whose instances are returned by `Pool.map_async()`
570585#
0 commit comments