-
Notifications
You must be signed in to change notification settings - Fork 118
[dbsp, adapters] Transactional bootstrapping #6223
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e0cbc1c
4f1071e
941e099
15dace0
89f8276
9d6322a
555c33c
dffc5b1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1497,10 +1497,20 @@ pub(crate) fn register_replay_stream<C, B>( | |
| // We currently only support using operators in the top-level circuit | ||
| // as replay sources. | ||
| if TypeId::of::<()>() == TypeId::of::<C::Time>() { | ||
| circuit.cache_insert( | ||
| ReplaySource::new(stream.stream_id()), | ||
| Box::new(replay_stream.clone()), | ||
| ); | ||
| // If a replay source already exists, don't overwrite it. This normally shouldn't | ||
| // happen as we should not have more than one integral for each stream. One situation | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The comment correctly calls this a workaround — could you file a follow-up issue to actually de-duplicate the integrals? Relying on registration order is fragile; a future refactor that swaps the order of |
||
| // where this does happen today is for input streams that have an integral without | ||
| // an accumulator as part of input_upsert, and another integral with an accumulator | ||
| // created by a downstream join or aggregate. In this case, we want to use the former | ||
| // for replay, as the latter may have been added in the new version of the program | ||
| // and may be empty, while the former can have state (conversely, if the input integral | ||
| // is empty, the downstream integral is guaranteed to be empty too). | ||
| if !circuit.cache_contains(&ReplaySource::new(stream.stream_id())) { | ||
| circuit.cache_insert( | ||
| ReplaySource::new(stream.stream_id()), | ||
| Box::new(replay_stream.clone()), | ||
| ); | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -7597,10 +7607,15 @@ impl CircuitHandle { | |
| return true; | ||
| }; | ||
|
|
||
| replay_info.replay_sources.keys().all(|node_id| { | ||
| // Bootstrapping is finished when all replay sources have completed their replay and the | ||
| // transaction has been committed. | ||
|
|
||
| let all_complete = replay_info.replay_sources.keys().all(|node_id| { | ||
| self.circuit | ||
| .map_local_node_mut(*node_id, &mut |node| node.is_replay_complete()) | ||
| }) | ||
| }); | ||
|
|
||
| all_complete && self.is_commit_complete() | ||
| } | ||
|
|
||
| /// Finalize the replay phase of the circuit. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason not to reuse self.bootstrapping?
if there is one maybe this needs an explanation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comment explaining this.