Skip to content

Streaming exchange#6014

Draft
blp wants to merge 21 commits intomainfrom
exchange
Draft

Streaming exchange#6014
blp wants to merge 21 commits intomainfrom
exchange

Conversation

@blp
Copy link
Copy Markdown
Member

@blp blp commented Apr 9, 2026

Please read the individual commits.

I'm currently working to figure out why one test fails (see final commit). That's why this is being submitted as a draft PR. After I fix it, I'll mark it ready for review.

blp added 15 commits April 9, 2026 15:23
Until now, DBSP has measured operator runtimes by the amount of wall-clock
time from when they are started to when they complete.  This is an
overestimate, because operators are asynchronous and can yield mid-run.
This commit changes the scheduler to instead instrument each individual
invocation of an operator and then sum the times to get an accurate
runtime.

Previously it was possible to avoid profiling overhead, but I think in
practice we always enabled it.

Signed-off-by: Ben Pfaff <blp@feldera.com>
No particular reason, I was just reading code and confused about the bounds
and it turned out they weren't needed.

Signed-off-by: Ben Pfaff <blp@feldera.com>
The trait documentation says so:

```rust
    /// Returns `true` if `self` is an input operator.
    ///
    /// An input operator feeds new data into the circuit. Examples are
    /// the `Input` and `Generator` operators.
    fn is_input(&self) -> bool {
        false
    }
```

I don't know why they did not return `true` for these input operators
before.  I don't know what the consequences are for returning the wrong
value (this is not in response to some bug I discovered).

Signed-off-by: Ben Pfaff <blp@feldera.com>
I thought maybe it was possible to have an operator indicate that it was
already flushed immediately and then never get evaluated.  This is not the
case.

Plus a few other minor comment improvements.

Signed-off-by: Ben Pfaff <blp@feldera.com>
As far as I can tell, `eval` and `eval_owned` were the same except for
a clone() call, so this simplifies things.

Signed-off-by: Ben Pfaff <blp@feldera.com>
This will enable adding a field that can't be cloned, in an upcoming
commit.

Signed-off-by: Ben Pfaff <blp@feldera.com>
Before commit e60a8a3 ("[dbsp] Use new protocol for exchanging data
among hosts."), each remote exchange RPC waited for the data to be picked
up out of its mailboxes before returning.  That commit changed that, since
now sending data doesn't wait for a reply from the remote host (there is
in fact no reply).  In fact, it didn't do anything to prevent such races.

This fixes the problem.  Instead of introducing a wait on the RPC sender
side, it introduces it on the receiver side.  The receiver waits for the
mailboxes it is writing to be empty before writing to them.

I noticed that we have a 2-d array of notifiers for this purpose but we
were going to wait on all of the notifiers in the column for a given
sender in sequence.  It is more efficient in time and space to simply
have one per sender instead, so I made that change.  At the same time,
I noticed that Broadcast could simply use that same Notify instead of
adding its own via registering a callback, so I made that simplification
too.

sender_notifies is now redundant with sender_callbacks, but it would
involve changing a lot of code to switch from one to the other, so this
commit does not make that change.

Signed-off-by: Ben Pfaff <blp@feldera.com>
Signed-off-by: Ben Pfaff <blp@feldera.com>
Signed-off-by: Ben Pfaff <blp@feldera.com>
Signed-off-by: Ben Pfaff <blp@feldera.com>
In practice these were always supplied as Runtime::worker_index() and it
didn't make sense to supply any other value.

Signed-off-by: Ben Pfaff <blp@feldera.com>
Until now, the code for exchange has always bound its own socket to listen
for incoming connections.  This works fine in what is our common case,
where the listening socket is a well-known port in a container that doesn't
have anything else running.  But for tests, or for situations where the
host running the pipeline has other software running and doesn't reserve
specific ports for the pipeline, it's more reliable to bind all the
listening ports (probably letting the kernel choose a port number), then
start the pipelines with the port numbers known in advance.  This commit
allows for that.  The initial use case will be unit tests, in upcoming
commits.

Signed-off-by: Ben Pfaff <blp@feldera.com>
By moving this out of `Exchange` into an argument for try_receive_all(),
we move it closer to where it's actually needed, and it's no longer
necessary to box it.

Signed-off-by: Ben Pfaff <blp@feldera.com>
This separation was painful and I'm glad to get rid of it.

Signed-off-by: Ben Pfaff <blp@feldera.com>
The word `Service` wasn't doing anything here.

Signed-off-by: Ben Pfaff <blp@feldera.com>
@blp blp requested a review from ryzhyk April 9, 2026 22:27
@blp blp self-assigned this Apr 9, 2026
@blp blp added DBSP core Related to the core DBSP library ft Fault tolerant, distributed, and scale-out implementation performance rust Pull requests that update Rust code multihost Related to multihost or distributed pipelines labels Apr 9, 2026
@blp blp marked this pull request as draft April 9, 2026 22:28
Copy link
Copy Markdown

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Draft-level comment: this is getting hard to reason about as one unit because the core semantic change (streaming exchange / shard-accumulate) is mixed with a long tail of preparatory cleanups, instrumentation tweaks, and exchange refactors. Before this is ready, I would strongly consider splitting it into a stack: prep refactors/tests first, then the streaming-exchange semantic change, then shard-accumulate on top. That keeps the design review focused on the actual behavioral change instead of making the reviewer re-prove every cleanup commit at the same time.

@blp
Copy link
Copy Markdown
Member Author

blp commented Apr 9, 2026

Draft-level comment: this is getting hard to reason about as one unit because the core semantic change (streaming exchange / shard-accumulate) is mixed with a long tail of preparatory cleanups, instrumentation tweaks, and exchange refactors. Before this is ready, I would strongly consider splitting it into a stack: prep refactors/tests first, then the streaming-exchange semantic change, then shard-accumulate on top. That keeps the design review focused on the actual behavioral change instead of making the reviewer re-prove every cleanup commit at the same time.

That's how it's already ordered.

blp and others added 5 commits April 9, 2026 16:37
Until now, multihost exchange has accumulated the data sent from one host
to another until all of it is available, then sent it all together.  This
means that serialization and deserialization was delayed until all the data
was available.  This commit changes the code to instead send data as it
becomes available.

This commit also makes more use of async functions in the exchange code.

This also adds unit tests for multihost exchange.

Signed-off-by: Ben Pfaff <blp@feldera.com>
Until now, exchange has proceeded in lockstep: every sender sends one
item per receiver, then every receiver receives one item per sender.  This
commit adds an alternative model, called "streaming" exchange.  In
streaming exchange, senders still send a single item to each receiver per
step, but receivers might get multiple items or no items in a given step.
This should allow for better performance, especially in multihost
environments.

Signed-off-by: Ben Pfaff <blp@feldera.com>
Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
This causes a failure in the `proptest_max_retain_values_test` test that
I am trying to track down.  (The failure doesn't happen if I change
`ExchangeKind::Stream` to `ExchangeKind::Sync` in
dyn_shard_accumulate().)

Signed-off-by: Ben Pfaff <blp@feldera.com>
The previous commit makes proptest_max_retain_values_test fail.  The
most obvious reason for that would be if streaming exchange produces the
wrong results.  However, this commit makes the streaming accumulator do
both streaming and synchronous exchange at the same time and then
asserts that they produce the same results.  The assertion does not fail
in this test.

This changes uses streaming exchange as the result of the sharded
accumulator and synchronous exchange as the expected results.  If the
two are swapped, by swapping `ExchangeKind::Stream` and
`ExchangeKind::Sync` in their locations in dyn_shard_accumulate(), so
that synchronous exchange becomes the results and streaming exchange is
used as expected, then the test passes.  This is even more bizarre.

I think that the cause must be related to the details of this particular
test.  This test sets trace bounds using
`accumulate_integrate_trace_retain_values_top_n`, which internally finds
the unsharded version of the stream that it's applied to (with
try_unsharded_version()).  This low-level mapping has bit me in the
past.  To me, it suggests that it's important that the batches that the
trace bounds see are exactly the ones that pass through the exchange,
without the delays that streaming exchange can cause.  But that's all
the explanation I have.
@blp
Copy link
Copy Markdown
Member Author

blp commented Apr 9, 2026

@ryzhyk I think I'll need some advice, please see the final commit "showing why the proptest_max_retain_values_test failure is surprising" (which obviously isn't meant to merge). But only when you have a chance; I'll be away from keyboard for a few days.

@ryzhyk
Copy link
Copy Markdown
Contributor

ryzhyk commented Apr 12, 2026

@ryzhyk I think I'll need some advice, please see the final commit "showing why the proptest_max_retain_values_test failure is surprising" (which obviously isn't meant to merge). But only when you have a chance; I'll be away from keyboard for a few days.

That test fails for me even if I comment out accumulate_integrate_trace_retain_values_top_n. The failure doesn't seem related to lateness and bounds. I haven't yet figured out what's going on here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

DBSP core Related to the core DBSP library ft Fault tolerant, distributed, and scale-out implementation multihost Related to multihost or distributed pipelines performance rust Pull requests that update Rust code

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants