1616 connector_action ,
1717 pipeline_stats ,
1818 connector_paused ,
19+ wait_for_condition ,
20+ get ,
1921)
20- from feldera .testutils import single_host_only
22+ from feldera .testutils import single_host_only , FELDERA_TEST_NUM_HOSTS
2123
2224
2325def _basic_orchestration_info (pipeline : str , table : str , connector : str ):
@@ -28,7 +30,6 @@ def _basic_orchestration_info(pipeline: str, table: str, connector: str):
2830
2931
3032@gen_pipeline_name
31- @single_host_only
3233def test_pipeline_orchestration_basic (pipeline_name ):
3334 """
3435 Tests the orchestration of the pipeline, which means the starting and pausing of the
@@ -67,6 +68,22 @@ def test_pipeline_orchestration_basic(pipeline_name):
6768 create_pipeline (cur_pipeline_name , sql )
6869 start_pipeline_as_paused (cur_pipeline_name )
6970
71+ if FELDERA_TEST_NUM_HOSTS > 1 :
72+ # The multihost coordinator can report that it is ready
73+ # before some of the hosts are individually ready, but the
74+ # coordinator only reports statistics when all of them are
75+ # ready. This might be a bug in the coordinator; it is
76+ # hard to tell. For now, waiting for statistics to be
77+ # available is a compromise that allows this otherwise
78+ # valuable test to pass.
79+ wait_for_condition (
80+ f"pipeline stats for { cur_pipeline_name } are available" ,
81+ lambda : get (
82+ api_url (f"/pipelines/{ cur_pipeline_name } /stats" )
83+ ).status_code
84+ == HTTPStatus .OK ,
85+ )
86+
7087 # Initial: pipeline paused, connector running, processed=0
7188 p_paused , c_paused , processed = _basic_orchestration_info (
7289 cur_pipeline_name , table_name , connector_name
0 commit comments