Skip to content

Commit abe856d

Browse files
committed
[adapters] Fix hang while bootstrapping in multihost.
The coordinator waited for a notification that the state was no longer bootstrapping, but the notification never arrived. Signed-off-by: Ben Pfaff <blp@feldera.com>
1 parent ea1b6d1 commit abe856d

File tree

1 file changed

+14
-2
lines changed

1 file changed

+14
-2
lines changed

crates/adapters/src/server.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ use std::{
106106
use tokio::spawn;
107107
use tokio::sync::Notify;
108108
use tokio::task::spawn_blocking;
109-
use tokio::time::sleep;
109+
use tokio::time::{sleep, timeout};
110110
use tokio_stream::wrappers::BroadcastStream;
111111
use tokio_stream::wrappers::WatchStream;
112112
use tracing::{Instrument, Level, debug, error, info, info_span, warn};
@@ -2346,7 +2346,19 @@ async fn coordination_status(state: WebData<ServerState>) -> Result<HttpResponse
23462346
if status != prev {
23472347
break status;
23482348
}
2349-
notify.await;
2349+
2350+
// We put a 1-second timeout on this because the status can
2351+
// change without desired_status_change being notified in two cases:
2352+
//
2353+
// - Bootstrapping has completed.
2354+
//
2355+
// - Replaying is completed.
2356+
//
2357+
// Either of these can only happen at most once per pipeline run
2358+
// (and they can't both happen), and it's probably OK that the
2359+
// notification is delayed a bit. (If it turns out that prompt
2360+
// notification is important, then we can arrange for that.)
2361+
let _ = timeout(Duration::from_secs(1), notify).await;
23502362
},
23512363
};
23522364
Some((

0 commit comments

Comments
 (0)