Skip to content

Commit 1d9c024

Browse files
committed
[adapters] Add ability to support connector orchestration for multihost.
Signed-off-by: Ben Pfaff <blp@feldera.com>
1 parent 408f003 commit 1d9c024

File tree

3 files changed

+53
-3
lines changed

3 files changed

+53
-3
lines changed

crates/adapters/src/controller.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ use std::{
125125
thread::JoinHandle,
126126
time::{Duration, Instant},
127127
};
128+
use tokio::sync::Notify;
128129
use tokio::{
129130
fs::File,
130131
io::{AsyncReadExt, BufReader},
@@ -1722,6 +1723,20 @@ impl Controller {
17221723
.map(|(_step, snapshot)| snapshot.clone())
17231724
}
17241725

1726+
pub fn incomplete_labels(&self) -> HashSet<String> {
1727+
let mut incomplete_labels = HashSet::new();
1728+
for status in self.inner.status.inputs.read().values() {
1729+
if !status.metrics.end_of_input.load(Ordering::Relaxed) {
1730+
incomplete_labels.extend(status.config.connector_config.labels.iter().cloned());
1731+
}
1732+
}
1733+
incomplete_labels
1734+
}
1735+
1736+
pub fn input_completion_notify(&self) -> Arc<Notify> {
1737+
self.inner.input_completion_notify.clone()
1738+
}
1739+
17251740
pub fn adhoc_catalog(&self) -> AdHocCatalog {
17261741
let mut tables = Vec::new();
17271742
for (name, clh) in self.inner.catalog.output_iter() {
@@ -4724,6 +4739,7 @@ pub struct ControllerInner {
47244739
transaction_sender: tokio::sync::watch::Sender<TransactionCoordination>,
47254740
coordination_request: Mutex<Option<StepRequest>>,
47264741
coordination_prepare_checkpoint: AtomicBool,
4742+
input_completion_notify: Arc<Notify>,
47274743
// The mutex is acquired from async context by actix and
47284744
// from the sync context by the circuit thread.
47294745
transaction_info: Mutex<TransactionInfo>,
@@ -4807,6 +4823,7 @@ impl ControllerInner {
48074823
transaction_receiver,
48084824
coordination_request: Mutex::new(is_multihost.then_some(StepRequest::new_idle(0))),
48094825
coordination_prepare_checkpoint: AtomicBool::new(false),
4826+
input_completion_notify: Arc::new(Notify::new()),
48104827
}
48114828
});
48124829

@@ -5773,7 +5790,8 @@ impl ControllerInner {
57735790
endpoint_id,
57745791
&self.circuit_thread_unparker,
57755792
&self.backpressure_thread_unparker,
5776-
)
5793+
);
5794+
self.input_completion_notify.notify_waiters();
57775795
}
57785796

57795797
fn output_buffers_full(&self) -> bool {

crates/adapters/src/server.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ use feldera_types::completion_token::{
6060
CompletionStatusArgs, CompletionStatusResponse, CompletionTokenResponse,
6161
};
6262
use feldera_types::constants::STATUS_FILE;
63-
use feldera_types::coordination::{AdHocScan, CoordinationActivate, Step, StepRequest};
63+
use feldera_types::coordination::{AdHocScan, CoordinationActivate, Labels, Step, StepRequest};
6464
use feldera_types::pipeline_diff::PipelineDiff;
6565
use feldera_types::query_params::{
6666
ActivateParams, MetricsFormat, MetricsParameters, SamplyProfileParams,
@@ -1203,6 +1203,7 @@ where
12031203
.service(coordination_adhoc_catalog)
12041204
.service(coordination_adhoc_lease)
12051205
.service(coordination_adhoc_scan)
1206+
.service(coordination_labels_incomplete)
12061207
}
12071208

12081209
/// Implements `/start`, `/pause`, `/activate`:
@@ -2442,6 +2443,30 @@ async fn coordination_adhoc_scan(
24422443
))
24432444
}
24442445

2446+
/// Stream the set of incomplete labels.
2447+
#[get("/coordination/labels/incomplete")]
2448+
async fn coordination_labels_incomplete(
2449+
state: WebData<ServerState>,
2450+
) -> Result<HttpResponse, PipelineError> {
2451+
let controller = state.controller()?;
2452+
let notify = controller.input_completion_notify();
2453+
2454+
let response_stream = async_stream::stream! {
2455+
loop {
2456+
let notify = notify.notified();
2457+
let labels = format!("{}\n", serde_json::to_string(&Labels {
2458+
incomplete: controller.incomplete_labels()
2459+
}).unwrap());
2460+
yield Ok::<_, actix_web::Error>(web::Bytes::from(labels));
2461+
notify.await;
2462+
}
2463+
};
2464+
2465+
Ok(HttpResponse::Ok()
2466+
.content_type("application/x-ndjson")
2467+
.streaming(response_stream))
2468+
}
2469+
24452470
#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
24462471
struct StoredStatus {
24472472
/// Desired status.

crates/feldera-types/src/coordination.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
5454
use std::{
5555
borrow::Cow,
56-
collections::{BTreeMap, HashMap},
56+
collections::{BTreeMap, HashMap, HashSet},
5757
net::SocketAddr,
5858
};
5959

@@ -268,3 +268,10 @@ pub struct AdHocScan {
268268
/// Columnar projection.
269269
pub projection: Option<Vec<usize>>,
270270
}
271+
272+
/// `/coordination/labels/incomplete` reply, streamed from pipeline to coordinator.
273+
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
274+
pub struct Labels {
275+
/// All the labels on incomplete connectors.
276+
pub incomplete: HashSet<String>,
277+
}

0 commit comments

Comments
 (0)