Skip to content

Commit 8b12b16

Browse files
authored
Add InterpreterPoolExecutor (3.14) (#14008)
1 parent 81fc4a7 commit 8b12b16

6 files changed

Lines changed: 249 additions & 29 deletions

File tree

stdlib/@tests/stubtest_allowlists/py314.txt

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -60,17 +60,6 @@ compression.gzip.GzipFile.readinto1
6060
compression.gzip.GzipFile.readinto1
6161
compression.gzip.compress
6262
compression.zstd
63-
concurrent.futures.__all__
64-
concurrent.futures.InterpreterPoolExecutor
65-
concurrent.futures.ThreadPoolExecutor.BROKEN
66-
concurrent.futures.ThreadPoolExecutor.prepare_context
67-
concurrent.futures.interpreter
68-
concurrent.futures.thread.ThreadPoolExecutor.BROKEN
69-
concurrent.futures.thread.ThreadPoolExecutor.prepare_context
70-
concurrent.futures.thread.WorkerContext
71-
concurrent.futures.thread._WorkItem.__init__
72-
concurrent.futures.thread._WorkItem.run
73-
concurrent.futures.thread._worker
7463
ctypes.POINTER
7564
ctypes.byref
7665
ctypes.memoryview_at

stdlib/@tests/test_cases/check_concurrent_futures.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
from __future__ import annotations
22

3+
import sys
34
from collections.abc import Callable, Iterator
45
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
6+
from typing import Literal
57
from typing_extensions import assert_type
68

79

@@ -28,3 +30,49 @@ def execute_callback(callback: Callable[[], Parent], future: Future[Parent]) ->
2830
fut: Future[Child] = Future()
2931
execute_callback(lambda: Parent(), fut) # type: ignore
3032
assert isinstance(fut.result(), Child)
33+
34+
35+
if sys.version_info >= (3, 14):
36+
37+
def _initializer(x: int) -> None:
38+
pass
39+
40+
def check_interpreter_pool_executor() -> None:
41+
import concurrent.futures.interpreter
42+
from concurrent.futures import InterpreterPoolExecutor
43+
44+
with InterpreterPoolExecutor(initializer=_initializer, initargs=(1,)):
45+
...
46+
47+
with InterpreterPoolExecutor(initializer=_initializer, initargs=("x",)): # type: ignore
48+
...
49+
50+
context = InterpreterPoolExecutor.prepare_context(initializer=_initializer, initargs=(1,), shared={})
51+
worker_context = context[0]()
52+
assert_type(worker_context, concurrent.futures.interpreter.WorkerContext)
53+
resolve_task = context[1]
54+
# Function should enfore that the arguments are correct.
55+
res = resolve_task(_initializer, 1)
56+
assert_type(res, tuple[bytes, Literal["function"]])
57+
# When the function is a script, the arguments should be a string.
58+
str_res = resolve_task("print('Hello, world!')")
59+
assert_type(str_res, tuple[bytes, Literal["script"]])
60+
# When a script is passed, no arguments should be provided.
61+
resolve_task("print('Hello, world!')", 1) # type: ignore
62+
63+
# `WorkerContext.__init__` should accept the result of a resolved task.
64+
concurrent.futures.interpreter.WorkerContext(initdata=res)
65+
66+
# Run should also accept the result of a resolved task.
67+
worker_context.run(res)
68+
69+
def check_thread_worker_context() -> None:
70+
import concurrent.futures.thread
71+
72+
context = concurrent.futures.thread.WorkerContext.prepare(initializer=_initializer, initargs=(1,))
73+
worker_context = context[0]()
74+
assert_type(worker_context, concurrent.futures.thread.WorkerContext)
75+
resolve_task = context[1]
76+
res = resolve_task(_initializer, (1,), {"test": 1})
77+
assert_type(res[1], tuple[int])
78+
assert_type(res[2], dict[str, int])

stdlib/VERSIONS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ colorsys: 3.0-
121121
compileall: 3.0-
122122
compression: 3.14-
123123
concurrent: 3.2-
124+
concurrent.futures.interpreter: 3.14-
124125
configparser: 3.0-
125126
contextlib: 3.0-
126127
contextvars: 3.7-

stdlib/concurrent/futures/__init__.pyi

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,27 @@ from ._base import (
1616
from .process import ProcessPoolExecutor as ProcessPoolExecutor
1717
from .thread import ThreadPoolExecutor as ThreadPoolExecutor
1818

19-
if sys.version_info >= (3, 13):
19+
if sys.version_info >= (3, 14):
20+
from .interpreter import InterpreterPoolExecutor as InterpreterPoolExecutor
21+
22+
__all__ = (
23+
"FIRST_COMPLETED",
24+
"FIRST_EXCEPTION",
25+
"ALL_COMPLETED",
26+
"CancelledError",
27+
"TimeoutError",
28+
"InvalidStateError",
29+
"BrokenExecutor",
30+
"Future",
31+
"Executor",
32+
"wait",
33+
"as_completed",
34+
"ProcessPoolExecutor",
35+
"ThreadPoolExecutor",
36+
"InterpreterPoolExecutor",
37+
)
38+
39+
elif sys.version_info >= (3, 13):
2040
__all__ = (
2141
"FIRST_COMPLETED",
2242
"FIRST_EXCEPTION",
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
import sys
2+
from collections.abc import Callable, Mapping
3+
from concurrent.futures import ThreadPoolExecutor
4+
from typing import Final, Literal, Protocol, overload, type_check_only
5+
from typing_extensions import ParamSpec, Self, TypeAlias, TypeVar, TypeVarTuple, Unpack
6+
7+
_Task: TypeAlias = tuple[bytes, Literal["function", "script"]]
8+
9+
@type_check_only
10+
class _TaskFunc(Protocol):
11+
@overload
12+
def __call__(self, fn: Callable[_P, _R], *args: _P.args, **kwargs: _P.kwargs) -> tuple[bytes, Literal["function"]]: ...
13+
@overload
14+
def __call__(self, fn: str) -> tuple[bytes, Literal["script"]]: ...
15+
16+
_Ts = TypeVarTuple("_Ts")
17+
_P = ParamSpec("_P")
18+
_R = TypeVar("_R")
19+
20+
# A `type.simplenamespace` with `__name__` attribute.
21+
@type_check_only
22+
class _HasName(Protocol):
23+
__name__: str
24+
25+
# `_interpreters.exec` technically gives us a simple namespace.
26+
@type_check_only
27+
class _ExcInfo(Protocol):
28+
formatted: str
29+
msg: str
30+
type: _HasName
31+
32+
if sys.version_info >= (3, 14):
33+
from concurrent.futures.thread import BrokenThreadPool, WorkerContext as ThreadWorkerContext
34+
35+
from _interpreters import InterpreterError
36+
37+
class ExecutionFailed(InterpreterError):
38+
def __init__(self, excinfo: _ExcInfo) -> None: ... # type: ignore[override]
39+
40+
UNBOUND: Final = 2
41+
42+
class WorkerContext(ThreadWorkerContext):
43+
# Parent class doesn't have `shared` argument,
44+
@overload # type: ignore[override]
45+
@classmethod
46+
def prepare(
47+
cls, initializer: Callable[[Unpack[_Ts]], object], initargs: tuple[Unpack[_Ts]], shared: Mapping[str, object]
48+
) -> tuple[Callable[[], Self], _TaskFunc]: ...
49+
@overload # type: ignore[override]
50+
@classmethod
51+
def prepare(
52+
cls, initializer: Callable[[], object], initargs: tuple[()], shared: Mapping[str, object]
53+
) -> tuple[Callable[[], Self], _TaskFunc]: ...
54+
def __init__(
55+
self, initdata: tuple[bytes, Literal["function", "script"]], shared: Mapping[str, object] | None = None
56+
) -> None: ... # type: ignore[override]
57+
def __del__(self) -> None: ...
58+
def run(self, task: _Task) -> None: ... # type: ignore[override]
59+
60+
class BrokenInterpreterPool(BrokenThreadPool): ...
61+
62+
class InterpreterPoolExecutor(ThreadPoolExecutor):
63+
BROKEN: type[BrokenInterpreterPool]
64+
65+
@overload # type: ignore[override]
66+
@classmethod
67+
def prepare_context(
68+
cls, initializer: Callable[[], object], initargs: tuple[()], shared: Mapping[str, object]
69+
) -> tuple[Callable[[], WorkerContext], _TaskFunc]: ...
70+
@overload # type: ignore[override]
71+
@classmethod
72+
def prepare_context(
73+
cls, initializer: Callable[[Unpack[_Ts]], object], initargs: tuple[Unpack[_Ts]], shared: Mapping[str, object]
74+
) -> tuple[Callable[[], WorkerContext], _TaskFunc]: ...
75+
@overload
76+
def __init__(
77+
self,
78+
max_workers: int | None = None,
79+
thread_name_prefix: str = "",
80+
initializer: Callable[[], object] | None = None,
81+
initargs: tuple[()] = (),
82+
shared: Mapping[str, object] | None = None,
83+
) -> None: ...
84+
@overload
85+
def __init__(
86+
self,
87+
max_workers: int | None = None,
88+
thread_name_prefix: str = "",
89+
*,
90+
initializer: Callable[[Unpack[_Ts]], object],
91+
initargs: tuple[Unpack[_Ts]],
92+
shared: Mapping[str, object] | None = None,
93+
) -> None: ...
94+
@overload
95+
def __init__(
96+
self,
97+
max_workers: int | None,
98+
thread_name_prefix: str,
99+
initializer: Callable[[Unpack[_Ts]], object],
100+
initargs: tuple[Unpack[_Ts]],
101+
shared: Mapping[str, object] | None = None,
102+
) -> None: ...

stdlib/concurrent/futures/thread.pyi

Lines changed: 77 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import queue
2+
import sys
23
from collections.abc import Callable, Iterable, Mapping, Set as AbstractSet
34
from threading import Lock, Semaphore, Thread
45
from types import GenericAlias
5-
from typing import Any, Generic, TypeVar, overload
6-
from typing_extensions import TypeVarTuple, Unpack
6+
from typing import Any, Generic, Protocol, TypeVar, overload, type_check_only
7+
from typing_extensions import Self, TypeAlias, TypeVarTuple, Unpack
78
from weakref import ref
89

910
from ._base import BrokenExecutor, Executor, Future
@@ -18,25 +19,71 @@ def _python_exit() -> None: ...
1819

1920
_S = TypeVar("_S")
2021

21-
class _WorkItem(Generic[_S]):
22-
future: Future[_S]
23-
fn: Callable[..., _S]
24-
args: Iterable[Any]
25-
kwargs: Mapping[str, Any]
26-
def __init__(self, future: Future[_S], fn: Callable[..., _S], args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ...
27-
def run(self) -> None: ...
28-
def __class_getitem__(cls, item: Any, /) -> GenericAlias: ...
29-
30-
def _worker(
31-
executor_reference: ref[Any],
32-
work_queue: queue.SimpleQueue[Any],
33-
initializer: Callable[[Unpack[_Ts]], object],
34-
initargs: tuple[Unpack[_Ts]],
35-
) -> None: ...
22+
_Task: TypeAlias = tuple[Callable[..., Any], tuple[Any, ...], dict[str, Any]]
23+
24+
_C = TypeVar("_C", bound=Callable[..., object])
25+
_KT = TypeVar("_KT", bound=str)
26+
_VT = TypeVar("_VT")
27+
28+
@type_check_only
29+
class _ResolveTaskFunc(Protocol):
30+
def __call__(
31+
self, func: _C, args: tuple[Unpack[_Ts]], kwargs: dict[_KT, _VT]
32+
) -> tuple[_C, tuple[Unpack[_Ts]], dict[_KT, _VT]]: ...
33+
34+
if sys.version_info >= (3, 14):
35+
class WorkerContext:
36+
@overload
37+
@classmethod
38+
def prepare(
39+
cls, initializer: Callable[[Unpack[_Ts]], object], initargs: tuple[Unpack[_Ts]]
40+
) -> tuple[Callable[[], Self], _ResolveTaskFunc]: ...
41+
@overload
42+
@classmethod
43+
def prepare(
44+
cls, initializer: Callable[[], object], initargs: tuple[()]
45+
) -> tuple[Callable[[], Self], _ResolveTaskFunc]: ...
46+
@overload
47+
def __init__(self, initializer: Callable[[Unpack[_Ts]], object], initargs: tuple[Unpack[_Ts]]) -> None: ...
48+
@overload
49+
def __init__(self, initializer: Callable[[], object], initargs: tuple[()]) -> None: ...
50+
def initialize(self) -> None: ...
51+
def finalize(self) -> None: ...
52+
def run(self, task: _Task) -> None: ...
53+
54+
if sys.version_info >= (3, 14):
55+
class _WorkItem(Generic[_S]):
56+
future: Future[Any]
57+
task: _Task
58+
def __init__(self, future: Future[Any], task: _Task) -> None: ...
59+
def run(self, ctx: WorkerContext) -> None: ...
60+
def __class_getitem__(cls, item: Any, /) -> GenericAlias: ...
61+
62+
def _worker(executor_reference: ref[Any], ctx: WorkerContext, work_queue: queue.SimpleQueue[Any]) -> None: ...
63+
64+
else:
65+
class _WorkItem(Generic[_S]):
66+
future: Future[_S]
67+
fn: Callable[..., _S]
68+
args: Iterable[Any]
69+
kwargs: Mapping[str, Any]
70+
def __init__(self, future: Future[_S], fn: Callable[..., _S], args: Iterable[Any], kwargs: Mapping[str, Any]) -> None: ...
71+
def run(self) -> None: ...
72+
def __class_getitem__(cls, item: Any, /) -> GenericAlias: ...
73+
74+
def _worker(
75+
executor_reference: ref[Any],
76+
work_queue: queue.SimpleQueue[Any],
77+
initializer: Callable[[Unpack[_Ts]], object],
78+
initargs: tuple[Unpack[_Ts]],
79+
) -> None: ...
3680

3781
class BrokenThreadPool(BrokenExecutor): ...
3882

3983
class ThreadPoolExecutor(Executor):
84+
if sys.version_info >= (3, 14):
85+
BROKEN: type[BrokenThreadPool]
86+
4087
_max_workers: int
4188
_idle_semaphore: Semaphore
4289
_threads: AbstractSet[Thread]
@@ -47,6 +94,19 @@ class ThreadPoolExecutor(Executor):
4794
_initializer: Callable[..., None] | None
4895
_initargs: tuple[Any, ...]
4996
_work_queue: queue.SimpleQueue[_WorkItem[Any]]
97+
98+
if sys.version_info >= (3, 14):
99+
@overload
100+
@classmethod
101+
def prepare_context(
102+
cls, initializer: Callable[[], object], initargs: tuple[()]
103+
) -> tuple[Callable[[], Self], _ResolveTaskFunc]: ...
104+
@overload
105+
@classmethod
106+
def prepare_context(
107+
cls, initializer: Callable[[Unpack[_Ts]], object], initargs: tuple[Unpack[_Ts]]
108+
) -> tuple[Callable[[], Self], _ResolveTaskFunc]: ...
109+
50110
@overload
51111
def __init__(
52112
self,

0 commit comments

Comments
 (0)