Skip to content

Commit e971b47

Browse files
committed
[dbsp] Eliminate duplicate code in Consensus implementation.
`Consensus` is just `Broadcast<bool>` with a logical-and built in. Signed-off-by: Ben Pfaff <blp@feldera.com>
1 parent c3d05d1 commit e971b47

File tree

1 file changed

+5
-73
lines changed

1 file changed

+5
-73
lines changed

crates/dbsp/src/circuit/runtime.rs

Lines changed: 5 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use crate::operator::communication::Exchange;
1111
use crate::storage::backend::StorageBackend;
1212
use crate::storage::file::format::Compression;
1313
use crate::storage::file::writer::Parameters;
14-
use crate::trace::aligned_deserialize;
1514
use crate::utils::process_rss_bytes;
1615
use crate::{
1716
DetailedError,
@@ -35,6 +34,7 @@ use feldera_types::memory_pressure::{
3534
use indexmap::IndexSet;
3635
use once_cell::sync::Lazy;
3736
use serde::Serialize;
37+
use std::convert::identity;
3838
use std::iter::repeat;
3939
use std::ops::Range;
4040
use std::path::Path;
@@ -1283,48 +1283,11 @@ impl Runtime {
12831283

12841284
/// A synchronization primitive that allows multiple threads within a runtime to agree
12851285
/// when a condition is satisfied.
1286-
pub(crate) enum Consensus {
1287-
SingleThreaded,
1288-
MultiThreaded {
1289-
notify_sender: Arc<Notify>,
1290-
notify_receiver: Arc<Notify>,
1291-
exchange: Arc<Exchange<bool>>,
1292-
},
1293-
}
1286+
pub(crate) struct Consensus(Broadcast<bool>);
12941287

12951288
impl Consensus {
12961289
pub fn new() -> Self {
1297-
match Runtime::runtime() {
1298-
Some(runtime) if Runtime::num_workers() > 1 => {
1299-
let worker_index = Runtime::worker_index();
1300-
let exchange_id = runtime.sequence_next().try_into().unwrap();
1301-
let exchange = Exchange::with_runtime(
1302-
&runtime,
1303-
exchange_id,
1304-
Box::new(|data| aligned_deserialize(&data[..])),
1305-
);
1306-
1307-
let notify_sender = Arc::new(Notify::new());
1308-
let notify_sender_clone = notify_sender.clone();
1309-
let notify_receiver = Arc::new(Notify::new());
1310-
let notify_receiver_clone = notify_receiver.clone();
1311-
1312-
exchange.register_sender_callback(worker_index, move || {
1313-
notify_sender_clone.notify_one()
1314-
});
1315-
1316-
exchange.register_receiver_callback(worker_index, move || {
1317-
notify_receiver_clone.notify_one()
1318-
});
1319-
1320-
Self::MultiThreaded {
1321-
notify_sender,
1322-
notify_receiver,
1323-
exchange,
1324-
}
1325-
}
1326-
_ => Self::SingleThreaded,
1327-
}
1290+
Self(Broadcast::new())
13281291
}
13291292

13301293
/// Returns `true` if all workers vote `true`.
@@ -1333,37 +1296,7 @@ impl Consensus {
13331296
///
13341297
/// * `local` - Local vote by the current worker.
13351298
pub async fn check(&self, local: bool) -> Result<bool, SchedulerError> {
1336-
match self {
1337-
Self::SingleThreaded => Ok(local),
1338-
Self::MultiThreaded {
1339-
notify_sender,
1340-
notify_receiver,
1341-
exchange,
1342-
} => {
1343-
while !exchange.try_send_all_with_serializer(
1344-
Runtime::worker_index(),
1345-
repeat(local),
1346-
|local| FBuf::from_slice(&[local as u8]),
1347-
) {
1348-
if Runtime::kill_in_progress() {
1349-
return Err(SchedulerError::Killed);
1350-
}
1351-
notify_sender.notified().await;
1352-
}
1353-
// Receive the status of each peer, compute global result
1354-
// as a logical and of all peer statuses.
1355-
let mut global = true;
1356-
while !exchange.try_receive_all(Runtime::worker_index(), |status| global &= status)
1357-
{
1358-
if Runtime::kill_in_progress() {
1359-
return Err(SchedulerError::Killed);
1360-
}
1361-
// Sleep if other threads are still working.
1362-
notify_receiver.notified().await;
1363-
}
1364-
Ok(global)
1365-
}
1366-
}
1299+
Ok(self.0.collect(local).await?.into_iter().all(identity))
13671300
}
13681301
}
13691302

@@ -1444,8 +1377,7 @@ where
14441377
}
14451378
notify_sender.notified().await;
14461379
}
1447-
// Receive the status of each peer, compute global result
1448-
// as a logical and of all peer statuses.
1380+
// Receive and collect the status of each peer.
14491381
let mut result = Vec::with_capacity(Runtime::num_workers());
14501382
while !exchange
14511383
.try_receive_all(Runtime::worker_index(), |status| result.push(status))

0 commit comments

Comments
 (0)