feat: per-session Python UDF inlining toggle + sender ctx + strict refusal (3/4)#1546
Draft
timsaucer wants to merge 3 commits into
Draft
feat: per-session Python UDF inlining toggle + sender ctx + strict refusal (3/4)#1546timsaucer wants to merge 3 commits into
timsaucer wants to merge 3 commits into
Conversation
Adds Python-aware encoding to PythonLogicalCodec/PythonPhysicalCodec so a ScalarUDF defined in Python travels inside the serialized expression (cloudpickled into fun_definition) instead of needing a matching registration on the receiver. With that in place, Expr gains __reduce__ + classmethod from_bytes(buf, ctx=None) so pickle.dumps / pickle.loads work end-to-end on expressions built from col, lit, built-in functions, and Python scalar UDFs. Wire format is framed as <DFPYUDF magic, version byte, cloudpickle tuple>; the version byte lets a too-new/too-old payload surface a clean Execution error instead of an opaque cloudpickle unpack failure. Schema serde is via arrow-rs's native IPC (no pyarrow round-trip). Cloudpickle module handle is cached per-interpreter through PyOnceLock. Worker-side context resolution lives in a new datafusion.ipc module: set_worker_ctx / get_worker_ctx / clear_worker_ctx plus a private _resolve_ctx helper consulted by Expr.from_bytes. Priority is explicit ctx > worker ctx > global SessionContext. FFI UDFs still travel by name and require the matching registration on the receiver's context. Aggregate and window UDF inline encoding, the per-session with_python_udf_inlining toggle, sender-side context, and the user-guide docs land in follow-on PRs. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Extends the PythonLogicalCodec / PythonPhysicalCodec inline encoding
introduced for scalar UDFs to also cover Python-defined aggregate and
window UDFs. The cloudpickle tuple shape per family is:
DFPYUDA (agg) (name, accumulator_factory, input_schema_bytes,
return_schema_bytes, state_schema_bytes,
volatility_str)
DFPYUDW (window) (name, evaluator_factory, input_schema_bytes,
return_schema_bytes, volatility_str)
Same wire-framing as scalar (family magic + version byte + cloudpickle
blob), same schema serde (arrow-rs native IPC), same cached cloudpickle
handle. The agg state schema is encoded as a full IPC schema so the
post-decode UDF reports the same names + nullability + metadata as the
sender — relevant for accumulators whose StateFieldsArgs consumers key
off names rather than positional DataType.
Required restructuring two existing UDF impls so the codec can grab
the Python callable directly:
* udaf.rs: replaces create_udaf + AccumulatorFactoryFunction closure
with a named PythonFunctionAggregateUDF that stores the Py<PyAny>
accumulator factory. Synthesizes state_{i} field names when the
Python constructor passes only Vec<DataType>; from_parts preserves
the full state schema on the decode side.
* udwf.rs: renames MultiColumnWindowUDF -> PythonFunctionWindowUDF,
drops the PartitionEvaluatorFactory PtrEq wrapper, stores the
Py<PyAny> evaluator directly. PartialEq and Hash get the same
pointer-identity fast path + debug-log exception handling already
on PythonFunctionScalarUDF.
User-facing surface:
* AggregateUDF.name and WindowUDF.name properties (parallel to the
ScalarUDF.name shipped in PR1).
* Existing UDAF/UDWF construction paths are unchanged.
The per-session with_python_udf_inlining toggle, sender-side context,
strict refusal, and user-guide docs land in PRs 3-4 of this series.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…fusal
Adds a per-session toggle that turns inline Python UDF encoding on or
off, plus the supporting plumbing to make it usable through
pickle.dumps.
Codec layer:
* PythonLogicalCodec / PythonPhysicalCodec gain a python_udf_inlining
bool (default true) and a with_python_udf_inlining(enabled) builder.
Each try_encode_udf{,af,wf} short-circuits to inner when the toggle
is off; each try_decode_udf{,af,wf} that recognizes a DFPY* magic
on a strict codec returns a clean Execution error instead of
invoking cloudpickle.loads. The refusal message names the UDF and
the wire family so an operator can see at a glance whether to
re-encode the bytes or register the UDF on the receiver.
Session layer:
* PySessionContext::with_python_udf_inlining(enabled) returns a new
session whose stacked logical + physical codecs both carry the
toggle. The Arc<SessionState> is cloned (cheap), only the codec
pair is rebuilt, so registrations and config stay attached.
* SessionContext.with_python_udf_inlining(*, enabled) is the Python
wrapper. enabled is keyword-only because positional booleans at
the call site read as opaque.
Sender-side context:
* datafusion.ipc gains set_sender_ctx / get_sender_ctx /
clear_sender_ctx thread-locals. Expr.__reduce__ now consults
get_sender_ctx() to pick the codec for outbound pickles, which is
the only path through which a strict session affects pickle.dumps
(the protocol calls __reduce__ with no arguments). Without a
sender context the default codec is used.
Tests:
* test_pickle_expr.py picks up TestPythonUdfInliningToggle (covers
both directions of the toggle plus the explicit-ctx fast path),
TestWorkerCtxLifecycle (set/clear/threading), and
TestSenderCtxLifecycle.
* New test_pickle_multiprocessing.py + helpers exercise the full
driver -> worker round-trip on a multiprocessing.Pool with set_*_ctx
installed in the worker initializer.
* CI workflow gets a 30-minute timeout-minutes backstop so a hung
pickle worker can't block the matrix indefinitely.
User-guide docs and the runnable examples land in PR4 of this series.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This was referenced May 15, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
No associated issue. PR 3 of 4 stacked on #1545 (which is itself stacked on #1544). The diff against `main` is cumulative until the prior PRs merge — review the commits on `pr3-toggle-sender-strict` directly for the PR3 delta.
Rationale for this change
PRs 1 and 2 ship Python UDFs inline through the codec. Two follow-on needs:
Both needs are served by the same on/off switch at the codec level. The codec already sits on every session, so the toggle is naturally per-session.
What changes are included in this PR?
Codec layer. `PythonLogicalCodec` and `PythonPhysicalCodec` gain a `python_udf_inlining: bool` (default `true`) plus a `with_python_udf_inlining(enabled)` builder. Encode paths short-circuit to `inner` when the toggle is off (UDFs travel by name); decode paths return an `Execution` error instead of invoking `cloudpickle.loads` if they recognize a `DFPY*` family magic on a strict codec. The refusal message names both the UDF and the wire family so an operator can immediately see whether to re-encode the bytes upstream or register the UDF on the receiver.
Session layer. `PySessionContext::with_python_udf_inlining(enabled)` returns a new session whose stacked logical + physical codecs both carry the toggle. The `Arc` is shared (cheap clone), only the codec pair is rebuilt, so registrations and config stay attached. `SessionContext.with_python_udf_inlining(*, enabled)` is the Python wrapper. `enabled` is keyword-only because positional booleans at the call site read as opaque.
Sender-side context. `datafusion.ipc` gains `set_sender_ctx` / `get_sender_ctx` / `clear_sender_ctx` thread-locals. `Expr.reduce` now consults `get_sender_ctx()` to pick the codec for outbound pickles — without that hook, `pickle.dumps` always invokes `Expr.to_bytes()` with no context, so a strict session would never affect the pickle path. `Expr.to_bytes(ctx)` calls with an explicit `ctx` are unaffected.
Tests. `test_pickle_expr.py` picks up:
`test_pickle_multiprocessing.py` (new) plus `_pickle_multiprocessing_helpers.py` (new) exercise the full driver → worker round-trip on a `multiprocessing.Pool` with `set_worker_ctx` installed in the initializer and `set_sender_ctx` on the driver.
CI. `.github/workflows/test.yml` gets a 30-minute `timeout-minutes` backstop so a hung pickle worker (e.g. during a future regression) cannot block the matrix indefinitely.
Are there any user-facing changes?
Yes:
No breaking changes — the toggle defaults to `true`, matching pre-PR behavior. `api change` label not added.
The user-guide page documenting the full pattern (and the multiprocessing / Ray runnable examples) lands in PR 4 of this series.