Skip to content

docs: user guide + runnable examples for distributing expressions (4/4)#1547

Draft
timsaucer wants to merge 4 commits into
apache:mainfrom
timsaucer:pr4-docs-examples
Draft

docs: user guide + runnable examples for distributing expressions (4/4)#1547
timsaucer wants to merge 4 commits into
apache:mainfrom
timsaucer:pr4-docs-examples

Conversation

@timsaucer
Copy link
Copy Markdown
Member

Which issue does this PR close?

No associated issue. PR 4 of 4 stacked on #1546. The diff against `main` is cumulative until the prior PRs merge — review the commits on `pr4-docs-examples` directly for the PR4 delta.

Rationale for this change

PRs 1-3 close the round-trip for Python UDFs and add the toggle that controls it. None of that is discoverable without user-facing documentation. This PR ships the user guide page that explains the multiprocessing / Ray / datafusion-distributed patterns, the runnable examples, and the centralized Security section that nails down what the toggle does and does not protect against.

What changes are included in this PR?

User guide. `docs/source/user-guide/io/distributing_work.rst` is the new canonical page. It walks through:

  • The shipped-expression model (what travels inline vs by name).
  • Worker setup (`datafusion.ipc.set_worker_ctx`).
  • Sender-side configuration (`datafusion.ipc.set_sender_ctx` and `SessionContext.with_python_udf_inlining`).
  • A Security section that is the single source of truth for the cloudpickle / `pickle.loads` threat model.
  • Pointers to the runnable examples and `datafusion-distributed`.

`docs/source/user-guide/io/index.rst` gets the toctree entry.

Runnable examples.

  • `examples/multiprocessing_pickle_expr.py` — `Pool.map` of a closure-capturing UDF across processes, with the worker initializer wiring the worker context. The closure carries non-trivial state to demonstrate that captured state survives the round-trip.
  • `examples/ray_pickle_expr.py` — Ray actor analogue.
  • `examples/datafusion-ffi-example/python/tests/test_pickle_strict_ffi.py` — strict-mode refusal exercised end-to-end against an FFI capsule scalar UDF. Kept under the FFI example crate because it needs that crate's compiled artifacts. The leading `` keeps pytest from auto-collecting it as a unit test; the FFI example's own test harness runs it explicitly.
  • `examples/README.md` picks up index entries for the new files.

Docstring centralization. Three docstrings previously carried near-duplicate copies of the pickle / cloudpickle security warning. Reduced each to a one-line summary plus a pointer to the Security section so there's a single canonical home for the threat model:

  • `PythonLogicalCodec::with_python_udf_inlining` rustdoc.
  • `SessionContext.with_python_udf_inlining` docstring.
  • `datafusion.ipc` module docstring.

The crate-level `codec.rs` module rustdoc also updates "pure-Python scalar UDFs" to "scalar / aggregate / window UDFs" now that PR 2 has shipped agg + window inline.

Are there any user-facing changes?

Docs and examples only. No code behavior changes, no new public APIs.

`api change` not added.

timsaucer and others added 4 commits May 15, 2026 14:17
Adds Python-aware encoding to PythonLogicalCodec/PythonPhysicalCodec
so a ScalarUDF defined in Python travels inside the serialized
expression (cloudpickled into fun_definition) instead of needing a
matching registration on the receiver. With that in place, Expr gains
__reduce__ + classmethod from_bytes(buf, ctx=None) so pickle.dumps /
pickle.loads work end-to-end on expressions built from col, lit,
built-in functions, and Python scalar UDFs.

Wire format is framed as <DFPYUDF magic, version byte, cloudpickle
tuple>; the version byte lets a too-new/too-old payload surface a
clean Execution error instead of an opaque cloudpickle unpack
failure. Schema serde is via arrow-rs's native IPC (no pyarrow
round-trip). Cloudpickle module handle is cached per-interpreter
through PyOnceLock.

Worker-side context resolution lives in a new datafusion.ipc module:
set_worker_ctx / get_worker_ctx / clear_worker_ctx plus a private
_resolve_ctx helper consulted by Expr.from_bytes. Priority is
explicit ctx > worker ctx > global SessionContext. FFI UDFs still
travel by name and require the matching registration on the
receiver's context.

Aggregate and window UDF inline encoding, the per-session
with_python_udf_inlining toggle, sender-side context, and the
user-guide docs land in follow-on PRs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Extends the PythonLogicalCodec / PythonPhysicalCodec inline encoding
introduced for scalar UDFs to also cover Python-defined aggregate and
window UDFs. The cloudpickle tuple shape per family is:

  DFPYUDA  (agg)     (name, accumulator_factory, input_schema_bytes,
                      return_schema_bytes, state_schema_bytes,
                      volatility_str)
  DFPYUDW  (window)  (name, evaluator_factory, input_schema_bytes,
                      return_schema_bytes, volatility_str)

Same wire-framing as scalar (family magic + version byte + cloudpickle
blob), same schema serde (arrow-rs native IPC), same cached cloudpickle
handle. The agg state schema is encoded as a full IPC schema so the
post-decode UDF reports the same names + nullability + metadata as the
sender — relevant for accumulators whose StateFieldsArgs consumers key
off names rather than positional DataType.

Required restructuring two existing UDF impls so the codec can grab
the Python callable directly:

* udaf.rs: replaces create_udaf + AccumulatorFactoryFunction closure
  with a named PythonFunctionAggregateUDF that stores the Py<PyAny>
  accumulator factory. Synthesizes state_{i} field names when the
  Python constructor passes only Vec<DataType>; from_parts preserves
  the full state schema on the decode side.
* udwf.rs: renames MultiColumnWindowUDF -> PythonFunctionWindowUDF,
  drops the PartitionEvaluatorFactory PtrEq wrapper, stores the
  Py<PyAny> evaluator directly. PartialEq and Hash get the same
  pointer-identity fast path + debug-log exception handling already
  on PythonFunctionScalarUDF.

User-facing surface:

* AggregateUDF.name and WindowUDF.name properties (parallel to the
  ScalarUDF.name shipped in PR1).
* Existing UDAF/UDWF construction paths are unchanged.

The per-session with_python_udf_inlining toggle, sender-side context,
strict refusal, and user-guide docs land in PRs 3-4 of this series.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…fusal

Adds a per-session toggle that turns inline Python UDF encoding on or
off, plus the supporting plumbing to make it usable through
pickle.dumps.

Codec layer:
  * PythonLogicalCodec / PythonPhysicalCodec gain a python_udf_inlining
    bool (default true) and a with_python_udf_inlining(enabled) builder.
    Each try_encode_udf{,af,wf} short-circuits to inner when the toggle
    is off; each try_decode_udf{,af,wf} that recognizes a DFPY* magic
    on a strict codec returns a clean Execution error instead of
    invoking cloudpickle.loads. The refusal message names the UDF and
    the wire family so an operator can see at a glance whether to
    re-encode the bytes or register the UDF on the receiver.

Session layer:
  * PySessionContext::with_python_udf_inlining(enabled) returns a new
    session whose stacked logical + physical codecs both carry the
    toggle. The Arc<SessionState> is cloned (cheap), only the codec
    pair is rebuilt, so registrations and config stay attached.
  * SessionContext.with_python_udf_inlining(*, enabled) is the Python
    wrapper. enabled is keyword-only because positional booleans at
    the call site read as opaque.

Sender-side context:
  * datafusion.ipc gains set_sender_ctx / get_sender_ctx /
    clear_sender_ctx thread-locals. Expr.__reduce__ now consults
    get_sender_ctx() to pick the codec for outbound pickles, which is
    the only path through which a strict session affects pickle.dumps
    (the protocol calls __reduce__ with no arguments). Without a
    sender context the default codec is used.

Tests:
  * test_pickle_expr.py picks up TestPythonUdfInliningToggle (covers
    both directions of the toggle plus the explicit-ctx fast path),
    TestWorkerCtxLifecycle (set/clear/threading), and
    TestSenderCtxLifecycle.
  * New test_pickle_multiprocessing.py + helpers exercise the full
    driver -> worker round-trip on a multiprocessing.Pool with set_*_ctx
    installed in the worker initializer.
  * CI workflow gets a 30-minute timeout-minutes backstop so a hung
    pickle worker can't block the matrix indefinitely.

User-guide docs and the runnable examples land in PR4 of this series.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Wraps up the Expr-pickle work with the user-facing material:

* docs/source/user-guide/io/distributing_work.rst — new user guide
  page covering the multiprocessing, Ray, and datafusion-distributed
  patterns. Includes the Security section that is the canonical home
  for the cloudpickle / pickle.loads threat model.
* docs/source/user-guide/io/index.rst — toctree entry.
* examples/multiprocessing_pickle_expr.py — runnable example: a
  Pool.map of a closure-capturing UDF across processes, with worker
  context registration in the initializer.
* examples/ray_pickle_expr.py — Ray actor analogue.
* examples/datafusion-ffi-example/python/tests/_test_pickle_strict_ffi.py
  — exercises the strict-mode refusal end to end against an FFI
  capsule scalar UDF (kept under the FFI example crate because the
  test needs that crate's compiled artifacts).
* examples/README.md — index entries for the new files.

Also tightens three docstrings that previously duplicated the
security warning so they point at the canonical Security section
instead:

* PythonLogicalCodec::with_python_udf_inlining (rustdoc): one-line
  summary plus a relative pointer to distributing_work.rst and the
  upstream Python pickle module security warning.
* SessionContext.with_python_udf_inlining: one-sentence summary plus
  :doc: link to the user guide.
* datafusion.ipc module docstring: cross-reference to the user guide
  for the full pattern.

The crate-level codec.rs module rustdoc also updates "pure-Python
scalar UDFs" to "scalar / aggregate / window UDFs" now that all three
are covered.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant