@@ -11,7 +11,6 @@ use crate::operator::communication::Exchange;
1111use crate :: storage:: backend:: StorageBackend ;
1212use crate :: storage:: file:: format:: Compression ;
1313use crate :: storage:: file:: writer:: Parameters ;
14- use crate :: trace:: aligned_deserialize;
1514use crate :: utils:: process_rss_bytes;
1615use crate :: {
1716 DetailedError ,
@@ -35,6 +34,7 @@ use feldera_types::memory_pressure::{
3534use indexmap:: IndexSet ;
3635use once_cell:: sync:: Lazy ;
3736use serde:: Serialize ;
37+ use std:: convert:: identity;
3838use std:: iter:: repeat;
3939use std:: ops:: Range ;
4040use std:: path:: Path ;
@@ -1283,48 +1283,11 @@ impl Runtime {
12831283
12841284/// A synchronization primitive that allows multiple threads within a runtime to agree
12851285/// when a condition is satisfied.
1286- pub ( crate ) enum Consensus {
1287- SingleThreaded ,
1288- MultiThreaded {
1289- notify_sender : Arc < Notify > ,
1290- notify_receiver : Arc < Notify > ,
1291- exchange : Arc < Exchange < bool > > ,
1292- } ,
1293- }
1286+ pub ( crate ) struct Consensus ( Broadcast < bool > ) ;
12941287
12951288impl Consensus {
12961289 pub fn new ( ) -> Self {
1297- match Runtime :: runtime ( ) {
1298- Some ( runtime) if Runtime :: num_workers ( ) > 1 => {
1299- let worker_index = Runtime :: worker_index ( ) ;
1300- let exchange_id = runtime. sequence_next ( ) . try_into ( ) . unwrap ( ) ;
1301- let exchange = Exchange :: with_runtime (
1302- & runtime,
1303- exchange_id,
1304- Box :: new ( |data| aligned_deserialize ( & data[ ..] ) ) ,
1305- ) ;
1306-
1307- let notify_sender = Arc :: new ( Notify :: new ( ) ) ;
1308- let notify_sender_clone = notify_sender. clone ( ) ;
1309- let notify_receiver = Arc :: new ( Notify :: new ( ) ) ;
1310- let notify_receiver_clone = notify_receiver. clone ( ) ;
1311-
1312- exchange. register_sender_callback ( worker_index, move || {
1313- notify_sender_clone. notify_one ( )
1314- } ) ;
1315-
1316- exchange. register_receiver_callback ( worker_index, move || {
1317- notify_receiver_clone. notify_one ( )
1318- } ) ;
1319-
1320- Self :: MultiThreaded {
1321- notify_sender,
1322- notify_receiver,
1323- exchange,
1324- }
1325- }
1326- _ => Self :: SingleThreaded ,
1327- }
1290+ Self ( Broadcast :: new ( ) )
13281291 }
13291292
13301293 /// Returns `true` if all workers vote `true`.
@@ -1333,37 +1296,7 @@ impl Consensus {
13331296 ///
13341297 /// * `local` - Local vote by the current worker.
13351298 pub async fn check ( & self , local : bool ) -> Result < bool , SchedulerError > {
1336- match self {
1337- Self :: SingleThreaded => Ok ( local) ,
1338- Self :: MultiThreaded {
1339- notify_sender,
1340- notify_receiver,
1341- exchange,
1342- } => {
1343- while !exchange. try_send_all_with_serializer (
1344- Runtime :: worker_index ( ) ,
1345- repeat ( local) ,
1346- |local| FBuf :: from_slice ( & [ local as u8 ] ) ,
1347- ) {
1348- if Runtime :: kill_in_progress ( ) {
1349- return Err ( SchedulerError :: Killed ) ;
1350- }
1351- notify_sender. notified ( ) . await ;
1352- }
1353- // Receive the status of each peer, compute global result
1354- // as a logical and of all peer statuses.
1355- let mut global = true ;
1356- while !exchange. try_receive_all ( Runtime :: worker_index ( ) , |status| global &= status)
1357- {
1358- if Runtime :: kill_in_progress ( ) {
1359- return Err ( SchedulerError :: Killed ) ;
1360- }
1361- // Sleep if other threads are still working.
1362- notify_receiver. notified ( ) . await ;
1363- }
1364- Ok ( global)
1365- }
1366- }
1299+ Ok ( self . 0 . collect ( local) . await ?. into_iter ( ) . all ( identity) )
13671300 }
13681301}
13691302
@@ -1444,8 +1377,7 @@ where
14441377 }
14451378 notify_sender. notified ( ) . await ;
14461379 }
1447- // Receive the status of each peer, compute global result
1448- // as a logical and of all peer statuses.
1380+ // Receive and collect the status of each peer.
14491381 let mut result = Vec :: with_capacity ( Runtime :: num_workers ( ) ) ;
14501382 while !exchange
14511383 . try_receive_all ( Runtime :: worker_index ( ) , |status| result. push ( status) )
0 commit comments