-
-
Notifications
You must be signed in to change notification settings - Fork 34.5k
Expand file tree
/
Copy pathinterpreter.py
More file actions
123 lines (102 loc) · 3.93 KB
/
interpreter.py
File metadata and controls
123 lines (102 loc) · 3.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
"""Implements InterpreterPoolExecutor."""
from concurrent import interpreters
import sys
from . import thread as _thread
import traceback
def do_call(results, func, args, kwargs):
try:
return func(*args, **kwargs)
except BaseException as exc:
# Send the captured exception out on the results queue,
# but still leave it unhandled for the interpreter to handle.
try:
results.put(exc)
except interpreters.NotShareableError:
# The exception is not shareable.
print('exception is not shareable:', file=sys.stderr)
traceback.print_exception(exc)
results.put(None)
raise # re-raise
class WorkerContext(_thread.WorkerContext):
@classmethod
def prepare(cls, initializer, initargs):
def resolve_task(fn, args, kwargs):
if isinstance(fn, str):
# XXX Circle back to this later.
raise TypeError('scripts not supported')
else:
task = (fn, args, kwargs)
return task
if initializer is not None:
try:
initdata = resolve_task(initializer, initargs, {})
except ValueError:
if isinstance(initializer, str) and initargs:
raise ValueError(f'an initializer script does not take args, got {initargs!r}')
raise # re-raise
else:
initdata = None
def create_context():
return cls(initdata)
return create_context, resolve_task
def __init__(self, initdata):
self.initdata = initdata
self.interp = None
self.results = None
def __del__(self):
if self.interp is not None:
self.finalize()
def initialize(self):
assert self.interp is None, self.interp
self.interp = interpreters.create()
try:
maxsize = 0
self.results = interpreters.create_queue(maxsize)
if self.initdata:
self.run(self.initdata)
except BaseException:
self.finalize()
raise # re-raise
def finalize(self):
interp = self.interp
results = self.results
self.results = None
self.interp = None
if results is not None:
del results
if interp is not None:
interp.close()
def run(self, task):
try:
return self.interp.call(do_call, self.results, *task)
except interpreters.ExecutionFailed as wrapper:
# Wait for the exception data to show up.
exc = self.results.get()
if exc is None:
# The exception must have been not shareable.
raise # re-raise
raise exc from wrapper
class BrokenInterpreterPool(_thread.BrokenThreadPool):
"""
Raised when a worker thread in an InterpreterPoolExecutor failed initializing.
"""
class InterpreterPoolExecutor(_thread.ThreadPoolExecutor):
BROKEN = BrokenInterpreterPool
@classmethod
def prepare_context(cls, initializer, initargs):
return WorkerContext.prepare(initializer, initargs)
def __init__(self, max_workers=None, thread_name_prefix='',
initializer=None, initargs=()):
"""Initializes a new InterpreterPoolExecutor instance.
Args:
max_workers: The maximum number of interpreters that can be used to
execute the given calls.
thread_name_prefix: An optional name prefix to give our threads.
initializer: A callable or script used to initialize
each worker interpreter.
initargs: A tuple of arguments to pass to the initializer.
"""
thread_name_prefix = (thread_name_prefix or
(f"InterpreterPoolExecutor-{self._counter()}"))
super().__init__(max_workers, thread_name_prefix,
initializer, initargs)