Skip to content

Commit e0a1c8a

Browse files
committed
Refresh multiprocessing from CPython-2.7.11
1 parent 9a19060 commit e0a1c8a

File tree

10 files changed

+104
-44
lines changed

10 files changed

+104
-44
lines changed

lib-python/2.7/multiprocessing/connection.py

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ def arbitrary_address(family):
9090
return tempfile.mktemp(prefix='listener-', dir=get_temp_dir())
9191
elif family == 'AF_PIPE':
9292
return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
93-
(os.getpid(), _mmap_counter.next()))
93+
(os.getpid(), _mmap_counter.next()), dir="")
9494
else:
9595
raise ValueError('unrecognized family')
9696

@@ -270,32 +270,44 @@ def __init__(self, address, family, backlog=1):
270270
self._unlink = None
271271

272272
def accept(self):
273-
s, self._last_accepted = self._socket.accept()
273+
while True:
274+
try:
275+
s, self._last_accepted = self._socket.accept()
276+
except socket.error as e:
277+
if e.args[0] != errno.EINTR:
278+
raise
279+
else:
280+
break
274281
s.setblocking(True)
275282
fd = duplicate(s.fileno())
276283
conn = _multiprocessing.Connection(fd)
277284
s.close()
278285
return conn
279286

280287
def close(self):
281-
self._socket.close()
282-
if self._unlink is not None:
283-
self._unlink()
288+
try:
289+
self._socket.close()
290+
finally:
291+
unlink = self._unlink
292+
if unlink is not None:
293+
self._unlink = None
294+
unlink()
284295

285296

286297
def SocketClient(address):
287298
'''
288299
Return a connection object connected to the socket given by `address`
289300
'''
290-
family = address_type(address)
291-
s = socket.socket( getattr(socket, family) )
292-
s.setblocking(True)
301+
family = getattr(socket, address_type(address))
293302
t = _init_timeout()
294303

295304
while 1:
305+
s = socket.socket(family)
306+
s.setblocking(True)
296307
try:
297308
s.connect(address)
298309
except socket.error, e:
310+
s.close()
299311
if e.args[0] != errno.ECONNREFUSED or _check_timeout(t):
300312
debug('failed to connect to address %s', address)
301313
raise
@@ -446,10 +458,10 @@ def recv(self):
446458
return self._loads(s)
447459

448460
def _xml_dumps(obj):
449-
return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8')
461+
return xmlrpclib.dumps((obj,), None, None, None, 1)
450462

451463
def _xml_loads(s):
452-
(obj,), method = xmlrpclib.loads(s.decode('utf8'))
464+
(obj,), method = xmlrpclib.loads(s)
453465
return obj
454466

455467
class XmlListener(Listener):

lib-python/2.7/multiprocessing/dummy/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ def _set(self, value):
138138
self._value = value
139139
value = property(_get, _set)
140140
def __repr__(self):
141-
return '<%r(%r, %r)>'%(type(self).__name__,self._typecode,self._value)
141+
return '<%s(%r, %r)>'%(type(self).__name__,self._typecode,self._value)
142142

143143
def Manager():
144144
return sys.modules[__name__]

lib-python/2.7/multiprocessing/forking.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -361,12 +361,13 @@ def get_command_line():
361361
return [sys.executable, '--multiprocessing-fork']
362362
else:
363363
prog = 'from multiprocessing.forking import main; main()'
364-
return [_python_exe, '-c', prog, '--multiprocessing-fork']
364+
opts = util._args_from_interpreter_flags()
365+
return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork']
365366

366367

367368
def main():
368369
'''
369-
Run code specifed by data received over pipe
370+
Run code specified by data received over pipe
370371
'''
371372
assert is_forking(sys.argv)
372373

@@ -469,12 +470,26 @@ def prepare(data):
469470
process.ORIGINAL_DIR = data['orig_dir']
470471

471472
if 'main_path' in data:
473+
# XXX (ncoghlan): The following code makes several bogus
474+
# assumptions regarding the relationship between __file__
475+
# and a module's real name. See PEP 302 and issue #10845
476+
# The problem is resolved properly in Python 3.4+, as
477+
# described in issue #19946
478+
472479
main_path = data['main_path']
473480
main_name = os.path.splitext(os.path.basename(main_path))[0]
474481
if main_name == '__init__':
475482
main_name = os.path.basename(os.path.dirname(main_path))
476483

477-
if main_name != 'ipython':
484+
if main_name == '__main__':
485+
# For directory and zipfile execution, we assume an implicit
486+
# "if __name__ == '__main__':" around the module, and don't
487+
# rerun the main module code in spawned processes
488+
main_module = sys.modules['__main__']
489+
main_module.__file__ = main_path
490+
elif main_name != 'ipython':
491+
# Main modules not actually called __main__.py may
492+
# contain additional code that should still be executed
478493
import imp
479494

480495
if main_path is None:

lib-python/2.7/multiprocessing/managers.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -763,6 +763,7 @@ def _callmethod(self, methodname, args=(), kwds={}):
763763
elif kind == '#PROXY':
764764
exposed, token = result
765765
proxytype = self._manager._registry[token.typeid][-1]
766+
token.address = self._token.address
766767
proxy = proxytype(
767768
token, self._serializer, manager=self._manager,
768769
authkey=self._authkey, exposed=exposed

lib-python/2.7/multiprocessing/pool.py

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,8 @@ def __init__(self, processes=None, initializer=None, initargs=(),
169169

170170
self._task_handler = threading.Thread(
171171
target=Pool._handle_tasks,
172-
args=(self._taskqueue, self._quick_put, self._outqueue, self._pool)
172+
args=(self._taskqueue, self._quick_put, self._outqueue,
173+
self._pool, self._cache)
173174
)
174175
self._task_handler.daemon = True
175176
self._task_handler._state = RUN
@@ -329,26 +330,38 @@ def _handle_workers(pool):
329330
debug('worker handler exiting')
330331

331332
@staticmethod
332-
def _handle_tasks(taskqueue, put, outqueue, pool):
333+
def _handle_tasks(taskqueue, put, outqueue, pool, cache):
333334
thread = threading.current_thread()
334335

335336
for taskseq, set_length in iter(taskqueue.get, None):
337+
task = None
336338
i = -1
337-
for i, task in enumerate(taskseq):
338-
if thread._state:
339-
debug('task handler found thread._state != RUN')
340-
break
341-
try:
342-
put(task)
343-
except IOError:
344-
debug('could not put task on queue')
345-
break
346-
else:
339+
try:
340+
for i, task in enumerate(taskseq):
341+
if thread._state:
342+
debug('task handler found thread._state != RUN')
343+
break
344+
try:
345+
put(task)
346+
except Exception as e:
347+
job, ind = task[:2]
348+
try:
349+
cache[job]._set(ind, (False, e))
350+
except KeyError:
351+
pass
352+
else:
353+
if set_length:
354+
debug('doing set_length()')
355+
set_length(i+1)
356+
continue
357+
break
358+
except Exception as ex:
359+
job, ind = task[:2] if task else (0, 0)
360+
if job in cache:
361+
cache[job]._set(ind + 1, (False, ex))
347362
if set_length:
348363
debug('doing set_length()')
349364
set_length(i+1)
350-
continue
351-
break
352365
else:
353366
debug('task handler got sentinel')
354367

@@ -565,6 +578,8 @@ def _set(self, i, obj):
565578
self._cond.release()
566579
del self._cache[self._job]
567580

581+
AsyncResult = ApplyResult # create alias -- see #17805
582+
568583
#
569584
# Class whose instances are returned by `Pool.map_async()`
570585
#

lib-python/2.7/multiprocessing/process.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ def _bootstrap(self):
267267
else:
268268
sys.stderr.write(str(e.args[0]) + '\n')
269269
sys.stderr.flush()
270-
exitcode = 0 if isinstance(e.args[0], str) else 1
270+
exitcode = 1
271271
except:
272272
exitcode = 1
273273
import traceback

lib-python/2.7/multiprocessing/queues.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,10 @@
4444

4545
from Queue import Empty, Full
4646
import _multiprocessing
47-
from multiprocessing import Pipe
48-
from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
49-
from multiprocessing.util import debug, info, Finalize, register_after_fork
50-
from multiprocessing.forking import assert_spawning
47+
from . import Pipe
48+
from .synchronize import Lock, BoundedSemaphore, Semaphore, Condition
49+
from .util import debug, info, Finalize, register_after_fork, is_exiting
50+
from .forking import assert_spawning
5151

5252
#
5353
# Queue type using a pipe, buffer and thread
@@ -156,9 +156,13 @@ def put_nowait(self, obj):
156156

157157
def close(self):
158158
self._closed = True
159-
self._reader.close()
160-
if self._close:
161-
self._close()
159+
try:
160+
self._reader.close()
161+
finally:
162+
close = self._close
163+
if close:
164+
self._close = None
165+
close()
162166

163167
def join_thread(self):
164168
debug('Queue.join_thread()')
@@ -229,8 +233,6 @@ def _finalize_close(buffer, notempty):
229233
@staticmethod
230234
def _feed(buffer, notempty, send, writelock, close):
231235
debug('starting thread to feed data to pipe')
232-
from .util import is_exiting
233-
234236
nacquire = notempty.acquire
235237
nrelease = notempty.release
236238
nwait = notempty.wait

lib-python/2.7/multiprocessing/sharedctypes.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,18 @@
4646
#
4747

4848
typecode_to_type = {
49-
'c': ctypes.c_char, 'u': ctypes.c_wchar,
49+
'c': ctypes.c_char,
5050
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
5151
'h': ctypes.c_short, 'H': ctypes.c_ushort,
5252
'i': ctypes.c_int, 'I': ctypes.c_uint,
5353
'l': ctypes.c_long, 'L': ctypes.c_ulong,
5454
'f': ctypes.c_float, 'd': ctypes.c_double
5555
}
56+
try:
57+
typecode_to_type['u'] = ctypes.c_wchar
58+
except AttributeError:
59+
pass
60+
5661

5762
#
5863
#

lib-python/2.7/multiprocessing/synchronize.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ def __repr__(self):
226226
num_waiters = (self._sleeping_count._semlock._get_value() -
227227
self._woken_count._semlock._get_value())
228228
except Exception:
229-
num_waiters = 'unkown'
229+
num_waiters = 'unknown'
230230
return '<Condition(%s, %s)>' % (self._lock, num_waiters)
231231

232232
def wait(self, timeout=None):

lib-python/2.7/multiprocessing/util.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,13 @@
3232
# SUCH DAMAGE.
3333
#
3434

35+
import os
3536
import itertools
3637
import weakref
3738
import atexit
3839
import threading # we want threading to install it's
3940
# cleanup function before multiprocessing does
41+
from subprocess import _args_from_interpreter_flags
4042

4143
from multiprocessing.process import current_process, active_children
4244

@@ -183,6 +185,7 @@ def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
183185
self._args = args
184186
self._kwargs = kwargs or {}
185187
self._key = (exitpriority, _finalizer_counter.next())
188+
self._pid = os.getpid()
186189

187190
_finalizer_registry[self._key] = self
188191

@@ -195,9 +198,13 @@ def __call__(self, wr=None):
195198
except KeyError:
196199
sub_debug('finalizer no longer registered')
197200
else:
198-
sub_debug('finalizer calling %s with args %s and kwargs %s',
199-
self._callback, self._args, self._kwargs)
200-
res = self._callback(*self._args, **self._kwargs)
201+
if self._pid != os.getpid():
202+
sub_debug('finalizer ignored because different process')
203+
res = None
204+
else:
205+
sub_debug('finalizer calling %s with args %s and kwargs %s',
206+
self._callback, self._args, self._kwargs)
207+
res = self._callback(*self._args, **self._kwargs)
201208
self._weakref = self._callback = self._args = \
202209
self._kwargs = self._key = None
203210
return res
@@ -328,10 +335,13 @@ def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
328335

329336
class ForkAwareThreadLock(object):
330337
def __init__(self):
338+
self._reset()
339+
register_after_fork(self, ForkAwareThreadLock._reset)
340+
341+
def _reset(self):
331342
self._lock = threading.Lock()
332343
self.acquire = self._lock.acquire
333344
self.release = self._lock.release
334-
register_after_fork(self, ForkAwareThreadLock.__init__)
335345

336346
class ForkAwareLocal(threading.local):
337347
def __init__(self):

0 commit comments

Comments
 (0)