Skip to content

Commit 0343fd6

Browse files
committed
[adapters] Fix now connector in multihost.
The clock connector requests steps instead of queueing data, but we were ignoring that in multihost. Also, connect the clock only on the first host in a multihost cluster. This fixes a Python integration test for now in multihost, so enable that test. Signed-off-by: Ben Pfaff <blp@feldera.com>
1 parent 3bce09e commit 0343fd6

File tree

2 files changed

+11
-4
lines changed

2 files changed

+11
-4
lines changed

crates/adapters/src/controller.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3737,7 +3737,14 @@ impl StepTrigger {
37373737
let result = if let Some(request) = &coordination_request {
37383738
match request.action {
37393739
StepAction::Step if request.step >= step => Some(Action::Step),
3740-
StepAction::Trigger if request.step >= step => None,
3740+
StepAction::Trigger if request.step >= step => {
3741+
if self.controller.status.unset_step_requested() {
3742+
// The `clock` connector requests steps explicitly.
3743+
Some(Action::Step)
3744+
} else {
3745+
None
3746+
}
3747+
}
37413748
_ => Some(Action::Park(None)),
37423749
}
37433750
} else if replaying
@@ -5143,7 +5150,9 @@ impl ControllerInner {
51435150
controller.status.set_state(PipelineState::Terminated);
51445151
})?;
51455152

5146-
let _ = controller.connect_input("now", &now_endpoint_config(&config), None);
5153+
if controller.workers.start == 0 {
5154+
let _ = controller.connect_input("now", &now_endpoint_config(&config), None);
5155+
}
51475156

51485157
let backpressure_thread =
51495158
BackpressureThread::new(controller.clone(), backpressure_thread_parker);

python/tests/platform/test_nowstream.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
unique_pipeline_name,
88
FELDERA_TEST_NUM_WORKERS,
99
FELDERA_TEST_NUM_HOSTS,
10-
single_host_only,
1110
)
1211
from tests import TEST_CLIENT
1312
from feldera.enums import PipelineStatus
@@ -20,7 +19,6 @@ def get_result(pipeline) -> str:
2019

2120

2221
class TestNowStream(unittest.TestCase):
23-
@single_host_only
2422
def test_nowstream(self):
2523
"""
2624
Test the now() function:

0 commit comments

Comments
 (0)