Skip to content

Feat/expr pickle#3

Closed
timsaucer wants to merge 19 commits into
feat/proto-codecsfrom
feat/expr-pickle
Closed

Feat/expr pickle#3
timsaucer wants to merge 19 commits into
feat/proto-codecsfrom
feat/expr-pickle

Conversation

@timsaucer
Copy link
Copy Markdown
Owner

Which issue does this PR close?

Closes #.

Rationale for this change

What changes are included in this PR?

Are there any user-facing changes?

timsaucer and others added 19 commits May 15, 2026 09:00
* feat: unify logical + physical proto codec stack via SessionContext

Introduces a single composable codec layer that every serializer reads
from the session, replacing the hardcoded `DefaultLogicalExtensionCodec`
/ `DefaultPhysicalExtensionCodec` calls scattered across PyLogicalPlan,
PyExecutionPlan, and the Rust-wrapped Python provider plumbing.

Key changes:

* New `PythonLogicalCodec` and `PythonPhysicalCodec` (crates/core/src/codec.rs)
  wrap any inner `LogicalExtensionCodec` / `PhysicalExtensionCodec`. Both
  share a `DFPYUDF1` magic-prefix path for in-band cloudpickle encoding
  of Python scalar UDFs, so an `ExecutionPlan` / `PhysicalExpr`
  referencing a Python `ScalarUDF` round-trips through either layer.
  Magic-prefix registry table (DFPYUDF1 in use; DFPYUDA1 / DFPYUDW1 /
  DFPYPE1 reserved) documented in the module header.

* `PySessionContext` stores `Arc<PythonLogicalCodec>` and
  `Arc<PythonPhysicalCodec>` directly. FFI wrappers are built on demand
  via `ffi_logical_codec()` / `ffi_physical_codec()` for capsule export
  and downstream `RustWrappedPy*` consumers. Adds
  `__datafusion_physical_extension_codec__` getter +
  `with_physical_extension_codec` setter (symmetric with the logical
  pair).

* `PyLogicalPlan.to_proto` / `from_proto` renamed to `to_bytes` /
  `from_bytes`, now reading the session's logical codec. `to_proto` /
  `from_proto` survive as deprecated thin wrappers emitting
  `DeprecationWarning`.

* `PyExecutionPlan` gains the same `to_bytes` / `from_bytes` rename +
  deprecated aliases, plus `__datafusion_execution_plan__` capsule
  getter and `from_pycapsule` (ported from poc_ffi_query_planner).

* New `PyPhysicalExpr` class with `to_bytes` / `from_bytes` /
  `from_pycapsule` / `__datafusion_physical_expr__`. `from_bytes`
  takes an input pyarrow Schema for column-reference resolution.

* `datafusion-python-util` gains `from_pycapsule!` /
  `try_from_pycapsule!` macros + `physical_codec_from_pycapsule`,
  `task_context_from_pycapsule`, `create_physical_extension_capsule`
  (ported from poc_ffi_query_planner).

* `PythonFunctionScalarUDF` exposes `func()`, `input_fields()`,
  `return_field()`, `volatility()`, `from_parts()` accessors needed
  by the codec.

Python wrapper updates: `LogicalPlan` / `ExecutionPlan` add
`to_bytes` / `from_bytes` + deprecate `to_proto` / `from_proto`;
`ExecutionPlan` adds capsule getter + `from_pycapsule`; new
`PhysicalExpr` wrapper class exported from the top-level package;
`SessionContext` exposes the physical codec capsule + setter.

Test coverage in python/tests/test_plans.py: round-trip via new API,
deprecation warnings on old API, capsule protocol getters,
session-routed codec on both layers.

`PyLogicalPlan` PyCapsule protocol is intentionally not added —
`datafusion-ffi` does not expose `FFI_LogicalPlan`, so there is no
stable cross-crate shape to publish. Round-tripping a `LogicalPlan`
goes through `to_bytes` / `from_bytes` only.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* test: FFI-example integration tests for codec + plan capsule APIs

Adds four downstream-crate fixtures in `datafusion-ffi-example` so the
new PR1 surface can be tested with the same FFI-handoff pattern used
for table providers, UDFs, etc. Existing tests prove the API exists;
these tests prove it composes with code that lives in another crate.

New Rust types in `examples/datafusion-ffi-example/src/`:

* `MyLogicalExtensionCodec` — delegates to
  `DefaultLogicalExtensionCodec` and bumps atomic counters on the UDF
  encode/decode entry points. Exported via
  `__datafusion_logical_extension_codec__`. Installed onto a session
  with `ctx.with_logical_extension_codec(my_codec)`.
* `MyPhysicalExtensionCodec` — mirror for `PhysicalExtensionCodec`.
* `MyExecutionPlan` — wraps a one-column `EmptyExec`, exposes
  `__datafusion_execution_plan__`. Lets the receiver consume an
  `ExecutionPlan` capsule that did not originate in
  datafusion-python.
* `MyPhysicalExpr` — wraps `Literal(Int32(42))`, exposes
  `__datafusion_physical_expr__`. Same FFI handoff for physical
  expressions.

New tests:

* `_test_logical_extension_codec.py` — codec installs cleanly, the
  session re-exports its capsule, and `try_encode_udf` fires on the
  user codec when serializing a plan that references a `ScalarUDF`.
  The decode counterpart is a round-trip check rather than a counter
  assertion: when the UDF is in the receiver's function registry,
  `parse_expr` resolves by name before consulting the codec.
* `_test_physical_extension_codec.py` — symmetric.
* `_test_execution_plan.py` — parametrized over typed-class vs
  raw-capsule input; verifies `ExecutionPlan.from_pycapsule` consumes
  the downstream capsule.
* `_test_physical_expr.py` — same for `PhysicalExpr.from_pycapsule`.

API changes forced by the new tests:

* `PyLogicalPlan.to_bytes`, `PyExecutionPlan.to_bytes`,
  `PyPhysicalExpr.to_bytes` now accept an optional `ctx` parameter.
  When supplied, encoding routes through the session's installed
  codec instead of a fresh default. `ctx=None` preserves the previous
  default-codec behavior used by the deprecated `to_proto` shims.
* The util `from_pycapsule!` / `try_from_pycapsule!` macros now
  validate the capsule name via `pointer_checked(Some(c"..."))`
  rather than `pointer_checked(None)`. The latter rejects named
  capsules outright with CPython's "incorrect name" error.
* `SessionContext.with_logical_extension_codec` and
  `with_physical_extension_codec` now wrap the returned internal
  context in `SessionContext` so the result has the full Python
  surface. The pre-existing logical setter was returning a raw
  internal object that lacked `sql()` and friends.

`examples/datafusion-ffi-example/Cargo.toml` gains `datafusion` and
`datafusion-proto` workspace dependencies for the new Rust impls.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* refactor: tighten PR1 scope to codec plumbing only

Review feedback pass. PR1 is now strictly the composable codec layer +
session routing + class-method serialization API. Anything that
touches actual Python UDF inline encoding or Python expression
wrapping moves to PR2 alongside the pickle work.

Dropped:

* `encode_python_scalar_udf` / `decode_python_scalar_udf` helpers
  from `crates/core/src/codec.rs`, along with cloudpickle and pyarrow
  imports. The wrapper codecs now delegate every method to `inner`.
  `DFPYUDF1` magic constant is kept (marked `dead_code` for now) as a
  reservation so PR2 has a single definition site.
* `udf.rs` reverted to pre-PR1 shape. The codec no longer needs
  `func()` / `input_fields()` / `volatility()` / `from_parts()`
  accessors. Re-added by PR2 when scalar-UDF inlining lands.
* `PyPhysicalExpr` class + Python wrapper + `__init__` export +
  `MyPhysicalExpr` FFI fixture + `_test_physical_expr.py`. No
  consumer in PR1 or PR2 plan documents; symmetry with
  `PyExecutionPlan` is not enough to justify the surface area.
* Rust-side `PyLogicalPlan::to_proto` / `from_proto` and
  `PyExecutionPlan::to_proto` / `from_proto` deprecated wrappers.
  The deprecation lives entirely in the Python wrapper layer, which
  emits `DeprecationWarning` and forwards to `to_bytes` /
  `from_bytes`. Less Rust duplication.
* `PythonLogicalCodec::with_default_inner` /
  `PythonPhysicalCodec::with_default_inner` — redundant with
  `impl Default`. Logic moved into `Default::default`.
* `PySessionContext::default_logical_codec` /
  `default_physical_codec` helpers. Inlined as
  `Arc::new(PythonLogicalCodec::default())` at the three call sites.

Tests (root: 1076, FFI example: 36) all green after the cuts.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* remove unuseful code comments

* docs: rewrite codec module comments around purpose, not PR sequence

The previous doc-block framed PythonLogicalCodec / PythonPhysicalCodec
in terms of "PR1 delegates, PR2 will add encoding" — useful for
review, useless for someone reading the code later.

Reframed in terms of what the codecs exist to do: encode Python-side
plan references (pure-Python UDFs, etc.) into the proto wire format
so plans can cross process boundaries without the receiver having to
pre-register every callable. The wrappers sit at the top of the
session's codec stack and delegate non-Python encoding to a
composable inner codec.

Magic-prefix registry table loses the "reserved" column. Doc still
notes that the in-module impls currently delegate and that
encoder/decoder hooks land alongside the corresponding Python-side
serialization work.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* feat(codec): forward every LogicalExtensionCodec /
PhysicalExtensionCodec method to inner

PythonLogicalCodec previously only overrode the four required methods
on the trait plus the scalar UDF pair, so the default trait impls
(returning "LogicalExtensionCodec is not provided") shadowed any
downstream FFI codec for file formats, aggregate UDFs, and window
UDFs. A user installing their own codec via
`SessionContext.with_logical_extension_codec(...)` would silently
lose access to its `try_*_file_format`, `try_*_udaf`,
`try_*_udwf` implementations.

Forward every trait method to `inner` so the user-installed codec is
fully reachable. Same change on the physical side, including
`try_*_expr`, `try_*_udaf`, `try_*_udwf` — the corresponding
Python-aware paths can layer on later by intercepting before
delegation.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* docs: tighten codec dispatch test docstrings

The previous docstrings claimed the tests verify "PythonLogicalCodec
delegates non-Python UDFs to the inner codec." That's
forward-looking — the codecs currently delegate every UDF
unconditionally, so the test would behave identically for Python and
non-Python UDFs.

Rewrite to describe what the test actually proves: the dispatch chain
`PyLogicalPlan.to_bytes -> session.logical_codec -> PythonLogicalCodec
-> FFI -> user impl` (and the physical mirror) forwards correctly,
observable via the user codec's atomic counter incrementing after one
encode pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* refactor(ffi-example): MyExecutionPlan emits real data via
MemorySourceConfig

Was a one-column `EmptyExec` stub useful only as a capsule-handoff
target. Promoted to a minimal reference impl that a downstream Rust
crate can copy when exposing a custom `ExecutionPlan` to
datafusion-python: configurable `num_rows`, produces a single batch
of sequential `Int32` values under column `value`, wrapped in
`DataSourceExec` via `MemorySourceConfig::try_new_exec`. Header
comment explains the typical use case (remote backend, streaming
source, synthetic data generator) and the
`__datafusion_execution_plan__` capsule shape downstream crates
should follow.

Test asserts the schema-bearing plan survives the FFI hop: a
`DataSourceExec` arrives with the expected partitioning and no
children. Schema details are not surfaced through the FFI display
path (only the wrapping `ForeignExecutionPlan` name + inner plan
name appear), so the test does not assert the column name.

`to_bytes` round-trip of an FFI-imported plan is not exercised:
encoding requires a physical codec that knows how to serialize
`ForeignExecutionPlan`, which the default codec does not. A
downstream user round-tripping such a plan must install their own
codec via `with_physical_extension_codec`. Documented in the test
file rather than asserted on.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* refactor: drop dormant ExecutionPlan PyCapsule round-trip

`PyExecutionPlan::from_pycapsule` and the matching
`__datafusion_execution_plan__` exporter have no consumer in this
repo, on the POC `poc_ffi_query_planner` branch, or on any sibling
branch (`testing/datafusion-distributed`, `testing/ffi-library-marker`,
`tmp/ffi-with-codecs`). The pair was wired up speculatively for FFI
plan handoff that no Python code path actually performs today.

Drop the whole capsule round-trip for `ExecutionPlan`:

* Rust `PyExecutionPlan::from_pycapsule` and
  `__datafusion_execution_plan__`.
* Python `ExecutionPlan.from_pycapsule` and
  `__datafusion_execution_plan__` wrappers.
* `MyExecutionPlan` FFI fixture + `_test_execution_plan.py` + lib.rs
  registration. Was solely a test fixture for the dropped path.
* `test_execution_plan_pycapsule_protocol` in `python/tests/test_plans.py`.

`PyExecutionPlan.to_bytes` / `from_bytes` survive — they encode
through the session's physical codec and have real coverage.
Capsule round-trip can be re-added when a concrete consumer
(distributed worker, bridge library) lands.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* feat: PyExpr.to_bytes / from_bytes via session logical codec

Mirrors PyLogicalPlan / PyExecutionPlan: encode through the session's
installed `LogicalExtensionCodec` (or a default-inner
`PythonLogicalCodec` when no `ctx` is supplied), decode against the
session's function registry + codec via `parse_expr`.

Rust side calls `datafusion_proto::logical_plan::to_proto::serialize_expr`
and `from_proto::parse_expr`. Python wrapper threads an optional
`SessionContext` through.

Tests cover the session-routed roundtrip and the no-ctx default-codec
encode path. Adds a third consumer of `session.logical_codec()`
alongside `PyLogicalPlan` and the codec dispatch tests in the FFI
example, broadening coverage of the codec stack.

This is the last piece of the PR1 codec surface — follow-up pickle
work (`Expr.__reduce__`, worker-scoped context, multiprocessing) can
build on this without bundling the byte-level serialization API.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* test(ffi-example): assert codec roundtrip restores plan output

PR review feedback: weak `is not None` checks let regressions slip
past. Mirror python/tests/test_plans.py — logical compares
`df.collect() == round_trip.collect()`; physical compares
`str(original) == str(restored)`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Builds on the codec consistency work in feat/proto-codecs. Python
scalar UDFs are cloudpickled inline into the proto `fun_definition`
field by PythonLogicalCodec / PythonPhysicalCodec, so a pickled Expr
that references a Python `udf()` reconstructs on the receiver with
no pre-registration. UDAFs, UDWFs, and FFI-imported UDFs still
resolve through the receiver's session.

Rust:
* `PythonFunctionScalarUDF` regains the `func()` / `input_fields()` /
  `return_field()` / `volatility()` / `from_parts()` accessors the
  codec needs.
* `crates/core/src/codec.rs` adds shared
  `try_encode_python_scalar_udf` / `try_decode_python_scalar_udf`
  helpers built on cloudpickle + pyarrow IPC for the input schema.
  Both `PythonLogicalCodec.try_encode_udf` and
  `PythonPhysicalCodec.try_encode_udf` consult the helper first and
  fall back to `inner` for non-Python UDFs (and the receiver's
  function registry on decode if the prefix does not match).

Python:
* `datafusion.ipc` module: thread-local `set_worker_ctx` /
  `clear_worker_ctx` / `get_worker_ctx` for installing a receiver
  `SessionContext` on a worker process. `_resolve_ctx` returns
  explicit > worker > fresh.
* `Expr.__reduce__` returns `(Expr._reconstruct, (self.to_bytes(),))`.
  `_reconstruct` calls `Expr.from_bytes(buf, ctx=None)` which
  consults the worker context.
* `Expr.from_bytes` signature switches to `(buf, ctx=None)` (was
  `(ctx, buf)`); no callers in main, only PR1 tests which are
  updated.
* `datafusion.ipc` exported from the top-level package.

Dependencies:
* `cloudpickle>=2.0` added as a runtime dep. Lazy-imported on the
  encode / decode hot paths — users who never pickle a plan or
  expression pay only the install footprint, not import-time cost.
* ruff `S301` added to the test-suite + examples ignore lists
  (legitimate `pickle.loads` use).

Tests:
* `test_pickle_expr.py` — 11 cases covering built-in expr pickle,
  scalar UDF self-contained blobs, closure-capturing UDFs, worker
  ctx lifecycle, thread-local isolation.
* `test_pickle_multiprocessing.py` + `_pickle_multiprocessing_helpers.py`
  — parametrized over `fork`/`forkserver`/`spawn` start methods. 9
  cases. Auto-skip when the sandbox blocks semaphore creation; CI
  runs the full matrix.
* `test_expr.py` — existing `from_bytes` tests updated to new
  signature.

1088 root tests pass (up from 1077), 13 skipped (up from 4, the new
mp cases skip locally under sandboxed semaphores).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Companion to the pickle work in the previous commit. Ships the
discoverable surface a user would actually reach for when they hit
"how do I distribute these expressions":

* `docs/source/user-guide/io/distributing_expressions.rst` — end-to-end
  user guide covering the recommended `Pool(initializer=...)` pattern,
  the worker context shape, what does and does not survive the
  round-trip (scalar UDFs yes, UDAF/UDWF/FFI by name), Python 3.14
  start-method change, and the cloudpickle security note.
* `examples/ray_pickle_expr.py` — runnable Ray actor demo using
  `set_worker_ctx` from an actor `__init__`.
* `examples/README.md` — links to the Ray example.
* `docs/source/user-guide/io/index.rst` — adds the new page to the
  IO TOC.
* `.github/workflows/test.yml` — 30-minute `timeout-minutes` backstop
  on the test matrix so a hung multiprocessing worker (e.g. during a
  pickle regression) does not block CI indefinitely.
* `python/datafusion/user_defined.py` — `ScalarUDF` / `AggregateUDF` /
  `WindowUDF` get a `.name` property surfacing the registered name.
  Useful for tests asserting an expression carries a specific UDF
  reference, and for users debugging worker registrations.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`input_fields: Vec<Field>` and `volatility: Volatility` were added to
the struct so the codec could read them on encode. Both were
redundant:

* `Signature` already carries the `Vec<DataType>` (via
  `TypeSignature::Exact`) and `Volatility` — the constructor collapses
  the incoming `Vec<Field>` to `DataType`s on its way into the
  signature, so `Field`-level metadata (nullability, attached
  metadata) is never propagated anywhere on the local side.
* On decode, `from_parts` runs that same collapse again. Sender's
  `Signature` and receiver's `Signature` end up with the same
  `DataType`s and the same `Volatility`. The reconstructed
  `PythonFunctionScalarUDF` is functionally equivalent to the
  original without preserving the input-side `Field`s.

Revert the struct to its original 4-field shape (`name`, `func`,
`signature`, `return_field`). The codec now derives the input
`DataType`s from `signature.type_signature` and reads volatility from
`signature.volatility`. Input fields are still serialized into the
cloudpickle payload (with synthesized `arg_i` names) so the wire
format is unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previous round-trip went Rust Schema -> pyarrow Schema -> IPC bytes
-> cloudpickle tuple -> pyarrow Schema -> Rust Schema, for both the
input schema and the return Field. Two unnecessary pyarrow trips on
each side.

Replace with `StreamWriter::try_new(&mut buf, &schema)?.finish()?`
on the encoder and `StreamReader::try_new(cursor, None)?.schema()`
on the decoder. Both ends produce / consume the same Arrow IPC
stream bytes — arrow-rs writes a schema-only stream, arrow-rs reads
it back, no PyArrow involvement.

Tuple shape changes slightly: the fourth field is now a one-field
`return_schema_bytes` IPC blob instead of a pickled pyarrow `Field`.
Keeps everything in `Vec<u8>` form before cloudpickle picks it up.

`pyarrow.ipc.read_schema` and the `ToPyArrow` / `FromPyArrow` traits
on `Schema` / `Field` are no longer needed on the codec hot path,
shaving a noticeable chunk of pyarrow function dispatch from each
encode / decode call.

Pickle tests still green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previous wording for `Expr.to_bytes`, `Expr.__reduce__`, and the
`datafusion.ipc` module header referenced ``PythonLogicalCodec`` and
``cloudpickle`` to explain what survives the wire. Neither name is
importable from Python and the mechanism is irrelevant to the end
user — only the resulting contract matters.

Reword each docstring to describe the user-facing guarantee directly:

* Python scalar UDFs travel inside the pickle / serialized blob, no
  pre-registration needed on the receiver.
* Aggregate UDFs, window UDFs, and FFI-capsule UDFs travel by name
  only and require the receiver to have them registered (typically
  via `set_worker_ctx`).

The implementation can change underneath without invalidating these
docs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
User-facing docs throughout this PR led with "pickle support":
filename-shaped headings, function docstrings describing how things
get cloudpickled into a Rust-side codec, etc. That's the
implementation pathway, not the user's goal.

The user's goal is to build an expression in a driver process and
ship it to worker processes for distributed evaluation. Pickle is the
mechanism Python provides to make that work; we hook into it. End
users typically don't care how the bytes are produced — they care
which references survive the trip and what they have to register on
each worker.

Reframe across user-facing surfaces:

* `docs/source/user-guide/io/distributing_expressions.rst` — leads
  with the worker-pool use case, drops `PythonUDFCodec` /
  cloudpickle vocabulary, presents "what travels with the
  expression" as the user contract.
* `datafusion.ipc` module docstring + `set_worker_ctx` /
  `clear_worker_ctx` / `get_worker_ctx` — describes what the user
  installs and why, not internal lookup details.
* `Expr.to_bytes` / `from_bytes` / `__reduce__` — describes what's
  shipped vs what travels by name; cross-references the user guide
  instead of repeating the codec story.
* `examples/ray_pickle_expr.py` header + comment + README entry —
  goal-first wording.
* Pickle test module docstrings — drop the dangling reference to
  `PythonUDFCodec` (also a stale name post-PR1).

Code behavior unchanged. 1088 tests still green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Window UDFs no longer need worker-side pre-registration. The codec
serializes the Python evaluator factory into the wire format and the
receiver reconstructs the UDF from bytes alone, same as scalar UDFs.

Refactor `MultiColumnWindowUDF` to store the Python evaluator
callable directly (`evaluator: Py<PyAny>`) instead of a
`PartitionEvaluatorFactory` closure. The factory closure was a
boxed `Fn` that captured the Python state opaquely, with nothing for
the codec to downcast back to. Now the named struct holds the
`Py<PyAny>` and builds a partition evaluator inside
`partition_evaluator()` on demand.

`PyWindowUDF::new` constructs `MultiColumnWindowUDF` directly with
the evaluator. `to_rust_partition_evaluator` is replaced by
`instantiate_partition_evaluator`, called from the trait method.

Codec wiring:
* `crates/core/src/codec.rs` adds `try_encode_python_window_udf` /
  `try_decode_python_window_udf` plus the `DFPYUDW1` magic prefix.
* `PythonLogicalCodec.try_encode_udwf` / `try_decode_udwf` and the
  matching `PythonPhysicalCodec` methods consult the helpers first
  and fall back to `inner` for non-Python window UDFs.

Test coverage in `test_pickle_expr.py::TestWindowUDFCodec` mirrors
the scalar UDF cases: self-contained blob, decode into fresh
context, decode via pickle with no worker context.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Aggregate UDFs no longer need worker-side pre-registration. The codec
serializes the Python accumulator factory + state schema into the
wire format and the receiver reconstructs the UDF from bytes alone.

New `PythonFunctionAggregateUDF` named struct (in `crates/core/src/udaf.rs`)
holds `accumulator: Py<PyAny>` plus signature, return type, and state
fields directly. Full `AggregateUDFImpl` impl mirroring upstream
`SimpleAggregateUDF`: `as_any`, `name`, `signature`, `return_type`,
`accumulator`, `state_fields`. `accumulator()` lazily instantiates a
fresh accumulator per partition via the new
`instantiate_accumulator()` helper.

`PyAggregateUDF::new` now constructs `PythonFunctionAggregateUDF`
directly via `AggregateUDF::new_from_impl(...)` instead of routing
through `create_udaf(...)` + `to_rust_accumulator(...)`. The closure-
based factory path is gone; the Python state stays addressable.

Codec wiring:
* `crates/core/src/codec.rs` adds `try_encode_python_agg_udf` /
  `try_decode_python_agg_udf` plus the `DFPYUDA1` magic prefix.
  Tuple shape: `(name, accumulator, input_schema_bytes,
  return_schema_bytes, state_schema_bytes, volatility_str)`.
* `PythonLogicalCodec.try_encode_udaf` / `try_decode_udaf` and the
  matching `PythonPhysicalCodec` methods consult the helpers first
  and fall back to `inner` for non-Python aggregate UDFs.

Test coverage in `test_pickle_expr.py::TestAggregateUDFCodec` mirrors
the scalar / window UDF cases.

1094 root tests pass (up from 1088, plus 3 new UDAF cases and 3 new
UDWF cases from the prior commit).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
With aggregate UDFs and window UDFs now reconstructable from bytes
alone, the user-facing contract simplifies to:

* Built-in functions and **all** Python UDFs (scalar, aggregate,
  window) travel inside the shipped expression. No worker-side
  pre-registration.
* Only UDFs imported via the FFI capsule protocol travel by name and
  require pre-registration via `set_worker_ctx`.

Update each user-facing surface:

* `docs/source/user-guide/io/distributing_expressions.rst` — drop the
  "aggregate/window UDFs travel by name only" caveat; rename the
  practical-considerations entry that called out the limitation.
* `python/datafusion/ipc.py` module + `clear_worker_ctx` — explicitly
  list scalar, aggregate, and window as inline-portable.
* `python/datafusion/expr.py` — `to_bytes` and `__reduce__`
  docstrings updated.
* Test module docstrings updated.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…inds

Two fixes in the intro paragraph:

* Link to the standard library pickle docs rather than relying on the
  reader's familiarity with `pickle.dumps` / `pickle.loads`.
* "Python scalar UDFs ride along" only covered scalar UDFs. With
  aggregate and window UDFs now also traveling inline, the line is
  reworded to call out all three kinds.

Also updates the inline code comment in the worker-pool example.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The Pool / Ray-actor examples called `pickle.dumps` on the sender
and `pickle.loads` on the worker explicitly. That's not what real
user code looks like — `multiprocessing.Pool.starmap`, Ray's
`@ray.remote`, and similar frameworks serialize their function
arguments automatically. Showing the manual wrapping makes the API
look more involved than it is and obscures the point: users hand a
DataFusion `Expr` to their distribution framework like any other
Python object, and it Just Works.

Rewrites:

* User guide worker-pool example switches from
  `pool.map(evaluate, [(blob, batch), ...])` (where `blob =
  pickle.dumps(expr)`) to `pool.starmap(evaluate, [(expr, batch),
  ...])`. `evaluate(expr, batch)` receives the reconstructed
  expression directly.
* Ray example drops the `pickle.dumps(expr)` / `pickle.loads(blob)`
  pair; `evaluate(expr, batch)` takes a typed `Expr`. Drops the
  unused `pickle` import.
* Worker-context narrative updated: "expressions reconstructed by
  pickle.loads" -> "expressions arriving from the driver".
* Security warning reworded to mention pickle as the underlying
  mechanism while still framing the contract in user terms (only
  accept expressions from trusted sources).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The page previously framed itself entirely around expression-level
distribution. With datafusion-distributed and datafusion-ballista
integrations in progress upstream, an overview of *all* distribution
approaches is more durable: it establishes the page as the
distribution landing spot, sets reader expectations about what is
ready today versus what is on the way, and lets the future
integrations slot in without renaming or restructuring.

Rename `distributing_expressions.rst` → `distributing_work.rst` and
rewrite as:

* Overview lead — three approaches (expression-level, query-level via
  datafusion-distributed, query-level via Ballista) with status
  markers.
* "Expression-level distribution" — the existing content, slotted in
  as a sub-section.
* "Query-level distribution via datafusion-distributed" — placeholder
  noting the upstream WIP and that the integration will be documented
  here once usable.
* "Query-level distribution via Apache Ballista" — same.

Cross-references in `datafusion.ipc` and `Expr.to_bytes` /
`__reduce__` docstrings updated to the new doc name. TOC entry in
io/index.rst updated. Filename and URL stable from here on.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`pickle-redesign-plan.md` is an off-tree working doc for PR1/PR2
sequencing decisions, not user-facing documentation. It was added by
a stray `git add -A` in the previous commit. Untrack it; the file
stays in the working tree as untracked for local reference.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The "multi-column" name was a relic of an earlier upstream limitation
where `SimpleWindowUDF` only accepted a single input column. With the
struct now also storing the Python evaluator factory directly for
pickle support, the relevant distinction is no longer column count
but "Python-defined". Rename to match `PythonFunctionScalarUDF` and
`PythonFunctionAggregateUDF` for a consistent naming convention
across all three Python UDF kinds.

Also tighten visibility from `pub` to `pub(crate)`. No external
consumer; the struct only needs to be reachable from `PyWindowUDF`
and the codec.

No functional change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…te style

Other code blocks in the user guide present snippets inline at module
level; the worker-pool example was the only one using
``if __name__ == "__main__":``. Restructure as two blocks (worker
function + driver code), both inline, with a prose note explaining
when the guard is actually needed (saving to a .py file and running
under ``spawn`` / ``forkserver``). Matches the look of the surrounding
docs and keeps the snippet copy-pasteable for the interactive case.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds `SessionContext.with_python_udf_inlining(enabled)` for two
related use cases:

* **Cross-language portability.** With inlining disabled, the codec
  no longer emits `DFPYUDF1` / `DFPYUDA1` / `DFPYUDW1` cloudpickle
  blobs. Python UDFs travel by name only, the same way FFI-capsule
  UDFs do. Bytes round-trip through a non-Python decoder.
* **Untrusted-source decode.** `Expr.from_bytes` on bytes from a
  misbehaving sender no longer invokes `cloudpickle.loads`. Inline
  payloads received by a strict decoder raise a clear error.

`PythonLogicalCodec` and `PythonPhysicalCodec` gain a
`python_udf_inlining: bool` field (default `true`) and a builder
method `with_python_udf_inlining(enabled)`. The six UDF
encode/decode dispatchers consult the flag before calling the inline
helpers. Strict decoders that see a magic-prefix payload return a
clear `Plan` error rather than silently failing through to the inner
codec (which would otherwise produce "LogicalExtensionCodec is not
provided" — accurate but unhelpful).

`PySessionContext::with_python_udf_inlining(enabled)` rebuilds both
codecs with the new setting; Python wrapper at
`SessionContext.with_python_udf_inlining` mirrors. Test coverage:
encoder size delta, strict roundtrip via registry,
clear-error-on-inline-payload-when-strict.

`pickle.loads` on untrusted bytes remains unsafe regardless of this
flag; the toggle only governs the `to_bytes` / `from_bytes` codec
path. User guide documents both use cases plus the limitation.

1097 root tests pass (up from 1094 with 3 new strict-mode cases).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
to Python wrapper

Two changes:

* Reference link to the pickle module's official security warning
  in `https://docs.python.org/3/library/pickle.html#module-pickle`.
  Added in the user guide ("Disabling Python UDF inlining" note and
  the Security warning block) and in the Python
  `SessionContext.with_python_udf_inlining` docstring. The
  unqualified phrase "pickle is unsafe on untrusted input" assumed
  reader background that not every datafusion-python user has.

* Strip the user-facing prose docstring from the Rust
  `PySessionContext::with_python_udf_inlining` method. Python
  wrappers are what users see via `help()` and Sphinx; the Rust
  doc-comment duplicated the same text and risked drifting from the
  Python version. Matches the surrounding methods
  (`with_logical_extension_codec`, `with_physical_extension_codec`)
  which carry no Rust doc-comment for the same reason.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The actor was calling `ctx.register_udf(...)` and `set_worker_ctx(ctx)`
to make the inbound expression's UDF resolvable on the worker. With
Python scalar/aggregate/window UDFs now traveling inside the
serialized expression, neither call is necessary — the actor just
needs a `SessionContext` to evaluate against.

Also drops the parallel `sender.register_udf(...)` in the driver; an
expression built with a `udf(...)` callable carries its own reference
and does not require the UDF to be registered on the driver session.

Result: each actor is a few lines (one `SessionContext`, one
`evaluate` method) — what the inline-UDF story is actually trying
to demonstrate.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant