Conversation
Until now, DBSP has measured operator runtimes by the amount of wall-clock time from when they are started to when they complete. This is an overestimate, because operators are asynchronous and can yield mid-run. This commit changes the scheduler to instead instrument each individual invocation of an operator and then sum the times to get an accurate runtime. Previously it was possible to avoid profiling overhead, but I think in practice we always enabled it. Signed-off-by: Ben Pfaff <blp@feldera.com>
No particular reason, I was just reading code and confused about the bounds and it turned out they weren't needed. Signed-off-by: Ben Pfaff <blp@feldera.com>
The trait documentation says so:
```rust
/// Returns `true` if `self` is an input operator.
///
/// An input operator feeds new data into the circuit. Examples are
/// the `Input` and `Generator` operators.
fn is_input(&self) -> bool {
false
}
```
I don't know why they did not return `true` for these input operators
before. I don't know what the consequences are for returning the wrong
value (this is not in response to some bug I discovered).
Signed-off-by: Ben Pfaff <blp@feldera.com>
I thought maybe it was possible to have an operator indicate that it was already flushed immediately and then never get evaluated. This is not the case. Plus a few other minor comment improvements. Signed-off-by: Ben Pfaff <blp@feldera.com>
As far as I can tell, `eval` and `eval_owned` were the same except for a clone() call, so this simplifies things. Signed-off-by: Ben Pfaff <blp@feldera.com>
This will enable adding a field that can't be cloned, in an upcoming commit. Signed-off-by: Ben Pfaff <blp@feldera.com>
Before commit e60a8a3 ("[dbsp] Use new protocol for exchanging data among hosts."), each remote exchange RPC waited for the data to be picked up out of its mailboxes before returning. That commit changed that, since now sending data doesn't wait for a reply from the remote host (there is in fact no reply). In fact, it didn't do anything to prevent such races. This fixes the problem. Instead of introducing a wait on the RPC sender side, it introduces it on the receiver side. The receiver waits for the mailboxes it is writing to be empty before writing to them. I noticed that we have a 2-d array of notifiers for this purpose but we were going to wait on all of the notifiers in the column for a given sender in sequence. It is more efficient in time and space to simply have one per sender instead, so I made that change. At the same time, I noticed that Broadcast could simply use that same Notify instead of adding its own via registering a callback, so I made that simplification too. sender_notifies is now redundant with sender_callbacks, but it would involve changing a lot of code to switch from one to the other, so this commit does not make that change. Signed-off-by: Ben Pfaff <blp@feldera.com>
Signed-off-by: Ben Pfaff <blp@feldera.com>
Signed-off-by: Ben Pfaff <blp@feldera.com>
Signed-off-by: Ben Pfaff <blp@feldera.com>
In practice these were always supplied as Runtime::worker_index() and it didn't make sense to supply any other value. Signed-off-by: Ben Pfaff <blp@feldera.com>
Until now, the code for exchange has always bound its own socket to listen for incoming connections. This works fine in what is our common case, where the listening socket is a well-known port in a container that doesn't have anything else running. But for tests, or for situations where the host running the pipeline has other software running and doesn't reserve specific ports for the pipeline, it's more reliable to bind all the listening ports (probably letting the kernel choose a port number), then start the pipelines with the port numbers known in advance. This commit allows for that. The initial use case will be unit tests, in upcoming commits. Signed-off-by: Ben Pfaff <blp@feldera.com>
By moving this out of `Exchange` into an argument for try_receive_all(), we move it closer to where it's actually needed, and it's no longer necessary to box it. Signed-off-by: Ben Pfaff <blp@feldera.com>
This separation was painful and I'm glad to get rid of it. Signed-off-by: Ben Pfaff <blp@feldera.com>
The word `Service` wasn't doing anything here. Signed-off-by: Ben Pfaff <blp@feldera.com>
mythical-fred
left a comment
There was a problem hiding this comment.
Draft-level comment: this is getting hard to reason about as one unit because the core semantic change (streaming exchange / shard-accumulate) is mixed with a long tail of preparatory cleanups, instrumentation tweaks, and exchange refactors. Before this is ready, I would strongly consider splitting it into a stack: prep refactors/tests first, then the streaming-exchange semantic change, then shard-accumulate on top. That keeps the design review focused on the actual behavioral change instead of making the reviewer re-prove every cleanup commit at the same time.
That's how it's already ordered. |
Until now, multihost exchange has accumulated the data sent from one host to another until all of it is available, then sent it all together. This means that serialization and deserialization was delayed until all the data was available. This commit changes the code to instead send data as it becomes available. This commit also makes more use of async functions in the exchange code. This also adds unit tests for multihost exchange. Signed-off-by: Ben Pfaff <blp@feldera.com>
Until now, exchange has proceeded in lockstep: every sender sends one item per receiver, then every receiver receives one item per sender. This commit adds an alternative model, called "streaming" exchange. In streaming exchange, senders still send a single item to each receiver per step, but receivers might get multiple items or no items in a given step. This should allow for better performance, especially in multihost environments. Signed-off-by: Ben Pfaff <blp@feldera.com>
Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
This causes a failure in the `proptest_max_retain_values_test` test that I am trying to track down. (The failure doesn't happen if I change `ExchangeKind::Stream` to `ExchangeKind::Sync` in dyn_shard_accumulate().) Signed-off-by: Ben Pfaff <blp@feldera.com>
The previous commit makes proptest_max_retain_values_test fail. The most obvious reason for that would be if streaming exchange produces the wrong results. However, this commit makes the streaming accumulator do both streaming and synchronous exchange at the same time and then asserts that they produce the same results. The assertion does not fail in this test. This changes uses streaming exchange as the result of the sharded accumulator and synchronous exchange as the expected results. If the two are swapped, by swapping `ExchangeKind::Stream` and `ExchangeKind::Sync` in their locations in dyn_shard_accumulate(), so that synchronous exchange becomes the results and streaming exchange is used as expected, then the test passes. This is even more bizarre. I think that the cause must be related to the details of this particular test. This test sets trace bounds using `accumulate_integrate_trace_retain_values_top_n`, which internally finds the unsharded version of the stream that it's applied to (with try_unsharded_version()). This low-level mapping has bit me in the past. To me, it suggests that it's important that the batches that the trace bounds see are exactly the ones that pass through the exchange, without the delays that streaming exchange can cause. But that's all the explanation I have.
|
@ryzhyk I think I'll need some advice, please see the final commit "showing why the proptest_max_retain_values_test failure is surprising" (which obviously isn't meant to merge). But only when you have a chance; I'll be away from keyboard for a few days. |
That test fails for me even if I comment out |
Please read the individual commits.
I'm currently working to figure out why one test fails (see final commit). That's why this is being submitted as a draft PR. After I fix it, I'll mark it ready for review.