Conversation
mythical-fred
left a comment
There was a problem hiding this comment.
Two hard blocks (manual test plan + missing tests for the actual behavior change) and a couple of design points worth a look. Findings inline.
On the PR description: ### Describe Manual Test Plan literally says TODO: verify if we're actually spilling to disk. The whole point of this PR is to make Delta/Iceberg connectors spill so they stop OOMing on #6153 — please actually verify spilling happens (e.g. run a Delta CDC scan with a tight memory_mb_max against data large enough to force a spill, point at a storage path, observe files appearing under <storage>/delta-tmp-*/) and document those steps. Without that, neither you nor a reviewer can claim this fixes the bug.
mythical-fred
left a comment
There was a problem hiding this comment.
Architectural fix is in good shape — single shared RuntimeEnv, no more unwrap, sanitize_path_component retired, scratch dir centralized. One soft follow-up inline.
| F: FnOnce(SessionConfig) -> SessionConfig, | ||
| { | ||
| let session_config = SessionConfig::new() | ||
| .with_target_partitions(pipeline_config.global.workers as usize) |
There was a problem hiding this comment.
This setting is only needed for ad hoc queries. If we don't pass pipeline_config to this function, then we also won't need to pass it to delta and iceberg connectors.
There was a problem hiding this comment.
I think this is a good setting to configure, note that I did some shenanigans with delta-rs so it's tied to max_concurrent_readers by default (6) and the way I partition is now custom to allow reading data more ordered.
392df47
and
gz/delta-rs@c37cf6e
when I think more about it the max_concurrent_reader hack should not be necessary in delta-rs as long as we set a reasonable target_partitions size...
There was a problem hiding this comment.
you might want to set this to io_workers though?
| F: FnOnce(SessionConfig) -> SessionConfig, | ||
| { | ||
| let session_config = SessionConfig::new() | ||
| .with_target_partitions(pipeline_config.global.workers as usize) |
There was a problem hiding this comment.
I think this is a good setting to configure, note that I did some shenanigans with delta-rs so it's tied to max_concurrent_readers by default (6) and the way I partition is now custom to allow reading data more ordered.
392df47
and
gz/delta-rs@c37cf6e
when I think more about it the max_concurrent_reader hack should not be necessary in delta-rs as long as we set a reasonable target_partitions size...
| F: FnOnce(SessionConfig) -> SessionConfig, | ||
| { | ||
| let session_config = SessionConfig::new() | ||
| .with_target_partitions(pipeline_config.global.workers as usize) |
There was a problem hiding this comment.
you might want to set this to io_workers though?
mythical-fred
left a comment
There was a problem hiding this comment.
LGTM — only change since my prior approval (a93909b) is the regenerated openapi.json. Trivial.
…ctors
Delta Lake and Iceberg connectors built bare SessionContexts with no
memory pool or spill path, so large scans (e.g. ORDER BY in Delta CDC)
could OOM the pipeline.
Build one Arc<RuntimeEnv> per pipeline with a FairSpillPool and a
spill directory at {storage.path}/datafusion-tmp/, shared by the ad-hoc
engine and every DataFusion-using connector. Add
RuntimeConfig.datafusion_memory_mb (default: 5% of the effective
memory budget, capped at 2 GB) and subtract it from the
DBSP circuit's RSS limit so the two no longer double-book RAM.
Rename ADHOC_TEMP_DIR -> DATAFUSION_TEMP_DIR ("datafusion-tmp") so
checkpointer::gc_startup keeps the new directory. Stale adhoc-tmp/
from prior releases is removed by the existing GC sweep.
Expose create_runtime_env / create_session_context[_with] in
feldera_adapterlib::utils::datafusion; the _with variant preserves
Delta's schema_force_view_types override.
Signed-off-by: Swanand Mulay <73115739+swanandx@users.noreply.github.com>
see commit
Fix #6153
Describe Manual Test Plan
Verified there are datafusion session files on disk, if we configure pipeline with low memory we get:
though this was same behavior prev ig
pipeline sql:
run
select * from seeds order by id;Checklist
Breaking Changes?
not a breaking change