forked from apache/datafusion-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathipc.py
More file actions
114 lines (89 loc) · 4.03 KB
/
ipc.py
File metadata and controls
114 lines (89 loc) · 4.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Worker-side setup for distributing DataFusion expressions.
When a :class:`Expr` is shipped to a worker process (e.g. through
:func:`multiprocessing.Pool` or a Ray actor), the worker reconstructs the
expression against a :class:`SessionContext`. If the expression references
UDFs imported via the FFI capsule protocol — or any UDF the worker would
otherwise resolve from its registered functions rather than from inside
the shipped expression — install a configured :class:`SessionContext`
once per worker:
.. code-block:: python
from datafusion import SessionContext
from datafusion.ipc import set_worker_ctx
def init_worker():
ctx = SessionContext()
ctx.register_udaf(my_ffi_aggregate)
set_worker_ctx(ctx)
Built-in functions and Python UDFs (scalar, aggregate, window) travel
inside the shipped expression itself and do not need pre-registration
on the worker.
"""
from __future__ import annotations
import threading
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from datafusion.context import SessionContext
__all__ = [
"clear_worker_ctx",
"get_worker_ctx",
"set_worker_ctx",
]
_local = threading.local()
def set_worker_ctx(ctx: SessionContext) -> None:
"""Install this worker's :class:`SessionContext` for shipped expressions.
Call once per worker — typically from a ``multiprocessing.Pool``
initializer or a Ray actor ``__init__``. Idempotent: overwrites any
previous value. Stored in a thread-local slot, so each thread within a
worker may install its own context independently.
"""
_local.ctx = ctx
def clear_worker_ctx() -> None:
"""Remove this worker's installed :class:`SessionContext`.
After clearing, expressions reconstructed in this worker fall back to
the global :class:`SessionContext` — adequate for built-ins and Python
UDFs (scalar, aggregate, window), but anything imported via the FFI
capsule protocol must be registered on the global context to resolve.
"""
if hasattr(_local, "ctx"):
del _local.ctx
def get_worker_ctx() -> SessionContext | None:
"""Return this worker's installed :class:`SessionContext`, or ``None``."""
return getattr(_local, "ctx", None)
def _resolve_ctx(
explicit_ctx: SessionContext | None = None,
) -> SessionContext:
"""Resolve a context for Expr reconstruction.
Priority: explicit argument > worker context > global context.
Falling back to the global :class:`SessionContext` (instead of a
freshly constructed one) preserves any registrations the user has
installed on it.
"""
if explicit_ctx is not None:
return explicit_ctx
worker = get_worker_ctx()
if worker is not None:
return worker
# Lazy import: `datafusion/__init__.py` imports `datafusion.ipc`
# before `datafusion.context`, so a module-top import would force
# `datafusion.context` to load mid-init of `datafusion.ipc`. The
# cycle is benign today (context.py only pulls expr.py at module
# scope, neither pulls ipc.py back), but a single new import in
# context.py's transitive deps could turn it into a real cycle.
# Deferring keeps `datafusion.ipc` import-order-independent.
from datafusion.context import SessionContext # noqa: PLC0415
return SessionContext.global_ctx()