Skip to content

[connectors] configure storage for datafusion instances used by connectors#6170

Open
swanandx wants to merge 1 commit into
mainfrom
issue6153
Open

[connectors] configure storage for datafusion instances used by connectors#6170
swanandx wants to merge 1 commit into
mainfrom
issue6153

Conversation

@swanandx
Copy link
Copy Markdown
Member

@swanandx swanandx commented May 1, 2026

see commit

Fix #6153

Describe Manual Test Plan

Verified there are datafusion session files on disk, if we configure pipeline with low memory we get:

Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes caused by Resources exhausted: Failed to allocate additional 64.0 MB for ExternalSorterMerge[0] with 0.0 B already allocated for this reservation - 5.7 MB remain available for the total pool

though this was same behavior prev ig

pipeline sql:

CREATE TABLE seeds (
    id BIGINT NOT NULL
) WITH (
    'materialized' = 'true',
    'connectors' = '[{
        "transport": {
            "name": "datagen",
            "config": {
                "plan": [
                    {
                        "limit": 5000000,
                        "rate": 10000,
                        "fields": {
                            "id": {
                                "strategy": "increment"
                            }
                        }
                    }
                ]
            }
        }
    }]'
);

run select * from seeds order by id;

Checklist

  • Unit tests added/updated
  • Integration tests added/updated
  • Documentation updated
  • Changelog updated

Breaking Changes?

not a breaking change

Copy link
Copy Markdown

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two hard blocks (manual test plan + missing tests for the actual behavior change) and a couple of design points worth a look. Findings inline.

On the PR description: ### Describe Manual Test Plan literally says TODO: verify if we're actually spilling to disk. The whole point of this PR is to make Delta/Iceberg connectors spill so they stop OOMing on #6153 — please actually verify spilling happens (e.g. run a Delta CDC scan with a tight memory_mb_max against data large enough to force a spill, point at a storage path, observe files appearing under <storage>/delta-tmp-*/) and document those steps. Without that, neither you nor a reviewer can claim this fixes the bug.

Comment thread crates/adapterlib/src/utils/datafusion.rs Outdated
Comment thread crates/adapterlib/src/utils/datafusion.rs Outdated
Comment thread crates/adapterlib/src/utils/datafusion.rs Outdated
Comment thread crates/adapterlib/src/utils/datafusion.rs
Comment thread crates/adapters/src/integrated/delta_table/input.rs Outdated
Copy link
Copy Markdown

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Architectural fix is in good shape — single shared RuntimeEnv, no more unwrap, sanitize_path_component retired, scratch dir centralized. One soft follow-up inline.

Comment thread crates/adapterlib/src/utils/datafusion.rs
@swanandx swanandx requested a review from ryzhyk May 2, 2026 07:54
@ryzhyk ryzhyk requested a review from gz May 2, 2026 19:05
Copy link
Copy Markdown
Contributor

@ryzhyk ryzhyk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nice. I didn't know about shared runtime environments.
I requested @gz 's review, since he has the most experience with datafusion config.

Comment thread crates/adapterlib/src/utils/datafusion.rs Outdated
F: FnOnce(SessionConfig) -> SessionConfig,
{
let session_config = SessionConfig::new()
.with_target_partitions(pipeline_config.global.workers as usize)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This setting is only needed for ad hoc queries. If we don't pass pipeline_config to this function, then we also won't need to pass it to delta and iceberg connectors.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good setting to configure, note that I did some shenanigans with delta-rs so it's tied to max_concurrent_readers by default (6) and the way I partition is now custom to allow reading data more ordered.

392df47
and
gz/delta-rs@c37cf6e

when I think more about it the max_concurrent_reader hack should not be necessary in delta-rs as long as we set a reasonable target_partitions size...

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you might want to set this to io_workers though?

Comment thread crates/adapterlib/src/utils/datafusion.rs Outdated
F: FnOnce(SessionConfig) -> SessionConfig,
{
let session_config = SessionConfig::new()
.with_target_partitions(pipeline_config.global.workers as usize)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good setting to configure, note that I did some shenanigans with delta-rs so it's tied to max_concurrent_readers by default (6) and the way I partition is now custom to allow reading data more ordered.

392df47
and
gz/delta-rs@c37cf6e

when I think more about it the max_concurrent_reader hack should not be necessary in delta-rs as long as we set a reasonable target_partitions size...

F: FnOnce(SessionConfig) -> SessionConfig,
{
let session_config = SessionConfig::new()
.with_target_partitions(pipeline_config.global.workers as usize)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you might want to set this to io_workers though?

Comment thread crates/adapterlib/src/utils/datafusion.rs Outdated
Comment thread crates/adapters/src/integrated/delta_table/input.rs Outdated
Copy link
Copy Markdown

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM — only change since my prior approval (a93909b) is the regenerated openapi.json. Trivial.

…ctors

Delta Lake and Iceberg connectors built bare SessionContexts with no
memory pool or spill path, so large scans (e.g. ORDER BY in Delta CDC)
could OOM the pipeline.

Build one Arc<RuntimeEnv> per pipeline with a FairSpillPool and a
spill directory at {storage.path}/datafusion-tmp/, shared by the ad-hoc
engine and every DataFusion-using connector. Add
RuntimeConfig.datafusion_memory_mb (default: 5% of the effective
memory budget, capped at 2 GB) and subtract it from the
DBSP circuit's RSS limit so the two no longer double-book RAM.

Rename ADHOC_TEMP_DIR -> DATAFUSION_TEMP_DIR ("datafusion-tmp") so
checkpointer::gc_startup keeps the new directory. Stale adhoc-tmp/
from prior releases is removed by the existing GC sweep.

Expose create_runtime_env / create_session_context[_with] in
feldera_adapterlib::utils::datafusion; the _with variant preserves
Delta's schema_force_view_types override.

Signed-off-by: Swanand Mulay <73115739+swanandx@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Configure storage for datafusion instances used by connectors.

4 participants