Skip to content

Commit 94abfa1

Browse files
committed
[python] Skip many of the integration tests in multihost.
These can be removed as more tests pass on multihost. Signed-off-by: Ben Pfaff <blp@feldera.com>
1 parent 6ecf7a4 commit 94abfa1

File tree

10 files changed

+67
-3
lines changed

10 files changed

+67
-3
lines changed

python/feldera/testutils.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,14 @@ def enterprise_only(fn):
169169
)(fn)
170170

171171

172+
def single_host_only(fn):
173+
fn._single_host_only = True
174+
return unittest.skipUnless(
175+
FELDERA_TEST_NUM_HOSTS == 1,
176+
f"multihost not yet supported for {fn.__name__}, skipping",
177+
)(fn)
178+
179+
172180
def datafusionize(query: str) -> str:
173181
sort_array_pattern = re.compile(re.escape("SORT_ARRAY"), re.IGNORECASE)
174182
truncate_pattern = re.compile(re.escape("TRUNCATE"), re.IGNORECASE)

python/tests/platform/negative_test.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
unique_pipeline_name,
55
FELDERA_TEST_NUM_WORKERS,
66
FELDERA_TEST_NUM_HOSTS,
7+
single_host_only,
78
)
89
from tests import TEST_CLIENT
910
from feldera.runtime_config import RuntimeConfig
@@ -117,6 +118,7 @@ def test_errors0(self):
117118
pipeline = Pipeline.get(name, TEST_CLIENT)
118119
assert pipeline.errors()[0]["sql_compilation"]["exit_code"] != 0
119120

121+
@single_host_only
120122
def test_initialization_error(self):
121123
sql = """
122124
CREATE TABLE t0 (

python/tests/platform/test_bootstrapping.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,14 @@
66
gen_pipeline_name,
77
wait_for_deployment_status,
88
)
9-
from feldera.testutils import FELDERA_TEST_NUM_WORKERS, FELDERA_TEST_NUM_HOSTS
9+
from feldera.testutils import (
10+
FELDERA_TEST_NUM_WORKERS,
11+
FELDERA_TEST_NUM_HOSTS,
12+
single_host_only,
13+
)
1014

1115

16+
@single_host_only
1217
@enterprise_only
1318
@gen_pipeline_name
1419
def test_bootstrap_enterprise(pipeline_name):
@@ -273,6 +278,7 @@ def test_bootstrap_enterprise(pipeline_name):
273278

274279

275280
@enterprise_only
281+
@single_host_only
276282
@gen_pipeline_name
277283
def test_bootstrap_non_materialized_table_enterprise(pipeline_name):
278284
"""
@@ -318,6 +324,7 @@ def test_bootstrap_non_materialized_table_enterprise(pipeline_name):
318324

319325

320326
@enterprise_only
327+
@single_host_only
321328
@gen_pipeline_name
322329
def test_bootstrap_table_lateness_enterprise(pipeline_name):
323330
"""
@@ -362,6 +369,7 @@ def test_bootstrap_table_lateness_enterprise(pipeline_name):
362369

363370

364371
@enterprise_only
372+
@single_host_only
365373
@gen_pipeline_name
366374
def test_bootstrap_view_lateness_enterprise(pipeline_name):
367375
"""
@@ -408,6 +416,7 @@ def test_bootstrap_view_lateness_enterprise(pipeline_name):
408416

409417
# Add/remove connectors.
410418
@enterprise_only
419+
@single_host_only
411420
@gen_pipeline_name
412421
def test_bootstrap_connectors(pipeline_name):
413422
"""

python/tests/platform/test_checkpoint_sync.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,11 @@
77

88
from feldera.enums import FaultToleranceModel, PipelineStatus
99
from feldera.runtime_config import RuntimeConfig, Storage
10-
from feldera.testutils import FELDERA_TEST_NUM_HOSTS, FELDERA_TEST_NUM_WORKERS
10+
from feldera.testutils import (
11+
FELDERA_TEST_NUM_HOSTS,
12+
FELDERA_TEST_NUM_WORKERS,
13+
single_host_only,
14+
)
1115
from tests import enterprise_only
1216
from tests.shared_test_pipeline import SharedTestPipeline
1317

@@ -56,6 +60,7 @@ def storage_cfg(
5660

5761
class TestCheckpointSync(SharedTestPipeline):
5862
@enterprise_only
63+
@single_host_only
5964
def test_checkpoint_sync(
6065
self,
6166
from_uuid: bool = False,
@@ -241,54 +246,66 @@ def test_checkpoint_sync(
241246
self.pipeline.clear_storage()
242247

243248
@enterprise_only
249+
@single_host_only
244250
def test_from_uuid(self):
245251
self.test_checkpoint_sync(from_uuid=True)
246252

247253
@enterprise_only
254+
@single_host_only
248255
def test_without_clearing_storage(self):
249256
self.test_checkpoint_sync(clear_storage=False)
250257

251258
@enterprise_only
259+
@single_host_only
252260
def test_automated_checkpoint(self):
253261
self.test_checkpoint_sync(ft_interval=5, automated_checkpoint=True)
254262

255263
@enterprise_only
264+
@single_host_only
256265
def test_automated_checkpoint_sync(self):
257266
self.test_checkpoint_sync(
258267
ft_interval=5, automated_checkpoint=True, automated_sync_interval=10
259268
)
260269

261270
@enterprise_only
271+
@single_host_only
262272
def test_automated_checkpoint_sync1(self):
263273
self.test_checkpoint_sync(ft_interval=5, automated_sync_interval=10)
264274

265275
@enterprise_only
276+
@single_host_only
266277
def test_autherr_fail(self):
267278
with self.assertRaisesRegex(RuntimeError, "SignatureDoesNotMatch|Forbidden"):
268279
self.test_checkpoint_sync(auth_err=True, strict=True)
269280

270281
@enterprise_only
282+
@single_host_only
271283
def test_autherr(self):
272284
self.test_checkpoint_sync(auth_err=True, strict=False, expect_empty=True)
273285

274286
@enterprise_only
287+
@single_host_only
275288
def test_nonexistent_checkpoint_fail(self):
276289
with self.assertRaisesRegex(RuntimeError, "were not found in source"):
277290
self.test_checkpoint_sync(random_uuid=True, from_uuid=True, strict=True)
278291

279292
@enterprise_only
293+
@single_host_only
280294
def test_nonexistent_checkpoint(self):
281295
self.test_checkpoint_sync(random_uuid=True, from_uuid=True, expect_empty=True)
282296

283297
@enterprise_only
298+
@single_host_only
284299
def test_standby_activation(self):
285300
self.test_checkpoint_sync(standby=True)
286301

287302
@enterprise_only
303+
@single_host_only
288304
def test_standby_activation_from_uuid(self):
289305
self.test_checkpoint_sync(standby=True, from_uuid=True)
290306

291307
@enterprise_only
308+
@single_host_only
292309
def test_standby_fallback(self, from_uuid: bool = False):
293310
# Step 1: Start main pipeline
294311
storage_config = storage_cfg(self.pipeline.name)
@@ -402,5 +419,6 @@ def test_standby_fallback(self, from_uuid: bool = False):
402419
self.pipeline.clear_storage()
403420

404421
@enterprise_only
422+
@single_host_only
405423
def test_standby_fallback_from_uuid(self):
406424
self.test_standby_fallback(from_uuid=True)

python/tests/platform/test_ingress_formats.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
get,
1616
)
1717

18+
from feldera.testutils import single_host_only
19+
1820

1921
def _ingress_and_wait_token(
2022
pipeline: str,
@@ -406,6 +408,7 @@ def test_primary_keys(pipeline_name):
406408

407409

408410
@gen_pipeline_name
411+
@single_host_only
409412
def test_case_sensitive_tables(pipeline_name):
410413
"""
411414
- Distinguish between quoted and unquoted identifiers.
@@ -453,6 +456,7 @@ def test_case_sensitive_tables(pipeline_name):
453456

454457

455458
@gen_pipeline_name
459+
@single_host_only
456460
def test_duplicate_outputs(pipeline_name):
457461
"""
458462
multiple inserts producing duplicate output values.
@@ -502,6 +506,7 @@ def test_duplicate_outputs(pipeline_name):
502506

503507

504508
@gen_pipeline_name
509+
@single_host_only
505510
def test_upsert(pipeline_name):
506511
"""
507512
- Insert several rows with composite PK.

python/tests/platform/test_metrics_logs.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
gen_pipeline_name,
1818
)
1919

20+
from feldera.testutils import single_host_only
21+
2022

2123
def _ingest_lines(name: str, table: str, body: str):
2224
r = http_request(
@@ -47,6 +49,7 @@ def _adhoc_count(name: str, table: str) -> int:
4749

4850

4951
@gen_pipeline_name
52+
@single_host_only
5053
def test_pipeline_metrics(pipeline_name):
5154
"""
5255
Tests that circuit metrics can be retrieved from the pipeline.
@@ -83,6 +86,7 @@ def test_pipeline_metrics(pipeline_name):
8386

8487

8588
@gen_pipeline_name
89+
@single_host_only
8690
def test_pipeline_stats(pipeline_name):
8791
"""
8892
Tests retrieving pipeline statistics via `/stats`.

python/tests/platform/test_nowstream.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
unique_pipeline_name,
88
FELDERA_TEST_NUM_WORKERS,
99
FELDERA_TEST_NUM_HOSTS,
10+
single_host_only,
1011
)
1112
from tests import TEST_CLIENT
1213
from feldera.enums import PipelineStatus
@@ -19,6 +20,7 @@ def get_result(pipeline) -> str:
1920

2021

2122
class TestNowStream(unittest.TestCase):
23+
@single_host_only
2224
def test_nowstream(self):
2325
"""
2426
Test the now() function:

python/tests/platform/test_orchestration.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
pipeline_stats,
1818
connector_paused,
1919
)
20+
from feldera.testutils import single_host_only
2021

2122

2223
def _basic_orchestration_info(pipeline: str, table: str, connector: str):
@@ -27,6 +28,7 @@ def _basic_orchestration_info(pipeline: str, table: str, connector: str):
2728

2829

2930
@gen_pipeline_name
31+
@single_host_only
3032
def test_pipeline_orchestration_basic(pipeline_name):
3133
"""
3234
Tests the orchestration of the pipeline, which means the starting and pausing of the
@@ -107,6 +109,7 @@ def test_pipeline_orchestration_basic(pipeline_name):
107109

108110

109111
@gen_pipeline_name
112+
@single_host_only
110113
def test_pipeline_orchestration_errors(pipeline_name):
111114
"""
112115
Port of Rust pipeline_orchestration_errors:

python/tests/platform/test_pipeline_lifecycle.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
post_no_body,
2323
)
2424
from tests import enterprise_only
25+
from feldera.testutils import single_host_only
2526

2627

2728
def _wait_for_stopped_with_error(name: str, timeout_s: float = 90.0):
@@ -77,6 +78,7 @@ def test_deploy_pipeline(pipeline_name):
7778

7879

7980
@gen_pipeline_name
81+
@single_host_only
8082
def test_pipeline_panic(pipeline_name):
8183
"""
8284
Pipeline that panics at runtime. Verify reported error_code == RuntimeError.WorkerPanic.

python/tests/platform/test_shared_pipeline.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@
1515
from feldera.runtime_config import RuntimeConfig
1616
from tests import TEST_CLIENT, enterprise_only
1717
from tests.shared_test_pipeline import SharedTestPipeline
18-
from feldera.testutils import FELDERA_TEST_NUM_WORKERS, FELDERA_TEST_NUM_HOSTS
18+
from feldera.testutils import (
19+
FELDERA_TEST_NUM_WORKERS,
20+
FELDERA_TEST_NUM_HOSTS,
21+
single_host_only,
22+
)
1923

2024

2125
class TestPipeline(SharedTestPipeline):
@@ -86,6 +90,7 @@ def test_get_pipeline_stats(self):
8690
assert stats.get("inputs") is not None
8791
assert stats.get("outputs") is not None
8892

93+
@single_host_only
8994
def test_case_sensitive_views_listen(self):
9095
self.pipeline.start_paused()
9196

@@ -203,6 +208,7 @@ def test_pipeline_get(self):
203208
assert df.shape[0] == 100
204209
self.pipeline.stop(force=True)
205210

211+
@single_host_only
206212
def test_local_listen_after_start(self):
207213
df_students = pd.read_csv("tests/assets/students.csv")
208214
df_grades = pd.read_csv("tests/assets/grades.csv")
@@ -331,6 +337,7 @@ def test_issue2142(self):
331337
assert out_data == expected
332338
self.pipeline.stop(force=True)
333339

340+
@single_host_only
334341
def test_failed_pipeline_stop(self):
335342
"""
336343
CREATE VIEW id_plus_one AS SELECT id + 1 FROM tbl;
@@ -347,6 +354,7 @@ def test_failed_pipeline_stop(self):
347354
time.sleep(1)
348355
self.pipeline.stop(force=True)
349356

357+
@single_host_only
350358
def test_adhoc_execute(self):
351359
self.pipeline.start()
352360
self.pipeline.execute("INSERT INTO tbl VALUES (1), (2);")
@@ -733,6 +741,7 @@ def test_support_bundle_with_selectors(self):
733741
assert len(file_list) >= 2
734742
assert "manifest.txt" in file_list
735743

744+
@single_host_only
736745
def test_url_encoding_ingress_egress_table_name(self):
737746
"""
738747
CREATE TABLE "t1#a1" (
@@ -779,11 +788,13 @@ def test_url_encoding_ingress_egress_table_name(self):
779788
]
780789
self.assertCountEqual(egress_result, expected_egress)
781790

791+
@single_host_only
782792
def test_listen_non_existent_view_paused(self):
783793
self.pipeline.start_paused()
784794
with self.assertRaises(ValueError):
785795
self.pipeline.listen("FrodoBagginsInMordor")
786796

797+
@single_host_only
787798
def test_listen_non_existent_view_running(self):
788799
self.pipeline.start()
789800
with self.assertRaises(ValueError):

0 commit comments

Comments
 (0)