Skip to content

Commit 55dd452

Browse files
committed
python: configure workers and hosts for several tests
The number of workers and hosts can be set for several Python tests through the environment variables `FELDERA_TEST_NUM_WORKERS` and `FELDERA_TEST_NUM_HOSTS`. The only difference this PR results in, is that the tests no longer use the default number of workers or hosts that is set by the backend if no workers or hosts are specified. The environment variables are set by default to these current values (8 and 1 respectively), but if these defaults are ever changed, they also need to be updated in the Python tests. Signed-off-by: Simon Kassing <simon.kassing@feldera.com>
1 parent 7ef02de commit 55dd452

17 files changed

+189
-21
lines changed

python/feldera/runtime_config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ class RuntimeConfig:
6969
def __init__(
7070
self,
7171
workers: Optional[int] = None,
72+
hosts: Optional[int] = None,
7273
storage: Optional[Storage | bool] = None,
7374
tracing: Optional[bool] = False,
7475
tracing_endpoint_jaeger: Optional[str] = "",
@@ -84,6 +85,7 @@ def __init__(
8485
logging: Optional[str] = None,
8586
):
8687
self.workers = workers
88+
self.hosts = hosts
8789
self.tracing = tracing
8890
self.tracing_endpoint_jaeger = tracing_endpoint_jaeger
8991
self.cpu_profiler = cpu_profiler

python/feldera/testutils.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ def _get_effective_api_key():
4040

4141
BASE_URL = os.environ.get("FELDERA_HOST") or "http://localhost:8080"
4242
FELDERA_REQUESTS_VERIFY = requests_verify_from_env()
43+
FELDERA_TEST_NUM_WORKERS = int(os.environ.get("FELDERA_TEST_NUM_WORKERS", "8"))
44+
FELDERA_TEST_NUM_HOSTS = int(os.environ.get("FELDERA_TEST_NUM_HOSTS", "1"))
4345

4446

4547
class _LazyClient:
@@ -249,6 +251,8 @@ def build_pipeline(
249251
runtime_config=RuntimeConfig(
250252
provisioning_timeout_secs=60,
251253
resources=resources,
254+
workers=FELDERA_TEST_NUM_WORKERS,
255+
hosts=FELDERA_TEST_NUM_HOSTS,
252256
),
253257
).create_or_replace()
254258

python/tests/platform/helper.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
from feldera.testutils_oidc import get_oidc_test_helper
2626
from tests import FELDERA_REQUESTS_VERIFY, API_KEY, BASE_URL, unique_pipeline_name
27+
from feldera.testutils import FELDERA_TEST_NUM_WORKERS, FELDERA_TEST_NUM_HOSTS
2728

2829
API_PREFIX = "/v0"
2930

@@ -136,7 +137,15 @@ def wait_for_deployment_status(name: str, desired: str, timeout_s: float = 60.0)
136137
def create_pipeline(name: str, sql: str):
137138
r = post_json(
138139
api_url("/pipelines"),
139-
{"name": name, "program_code": sql, "runtime_config": {"logging": "debug"}},
140+
{
141+
"name": name,
142+
"program_code": sql,
143+
"runtime_config": {
144+
"workers": FELDERA_TEST_NUM_WORKERS,
145+
"hosts": FELDERA_TEST_NUM_HOSTS,
146+
"logging": "debug",
147+
},
148+
},
140149
)
141150
assert r.status_code == HTTPStatus.CREATED, r.text
142151
wait_for_program_success(name, 1)

python/tests/platform/negative_test.py

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
import unittest
22
from feldera import PipelineBuilder, Pipeline
3-
from feldera.testutils import unique_pipeline_name
3+
from feldera.testutils import (
4+
unique_pipeline_name,
5+
FELDERA_TEST_NUM_WORKERS,
6+
FELDERA_TEST_NUM_HOSTS,
7+
)
48
from tests import TEST_CLIENT
9+
from feldera.runtime_config import RuntimeConfig
510

611

712
class NegativeCompilationTests(unittest.TestCase):
@@ -23,7 +28,13 @@ def test_sql_error(self):
2328
^^^^""".strip()
2429
with self.assertRaises(Exception) as err:
2530
PipelineBuilder(
26-
TEST_CLIENT, name=pipeline_name, sql=sql
31+
TEST_CLIENT,
32+
name=pipeline_name,
33+
sql=sql,
34+
runtime_config=RuntimeConfig(
35+
workers=FELDERA_TEST_NUM_WORKERS,
36+
hosts=FELDERA_TEST_NUM_HOSTS,
37+
),
2738
).create_or_replace()
2839
got_err: str = err.exception.args[0].strip()
2940
assert expected == got_err
@@ -36,7 +47,14 @@ def test_rust_error(self):
3647

3748
with self.assertRaises(Exception) as err:
3849
PipelineBuilder(
39-
TEST_CLIENT, name=pipeline_name, sql=sql, udf_rust="Davy Jones"
50+
TEST_CLIENT,
51+
name=pipeline_name,
52+
sql=sql,
53+
udf_rust="Davy Jones",
54+
runtime_config=RuntimeConfig(
55+
workers=FELDERA_TEST_NUM_WORKERS,
56+
hosts=FELDERA_TEST_NUM_HOSTS,
57+
),
4058
).create_or_replace()
4159

4260
assert "Davy Jones" in err.exception.args[0].strip()
@@ -45,7 +63,15 @@ def test_program_error0(self):
4563
sql = "create taabl;"
4664
name = unique_pipeline_name("test_program_error0")
4765
try:
48-
_ = PipelineBuilder(TEST_CLIENT, name, sql).create_or_replace()
66+
_ = PipelineBuilder(
67+
TEST_CLIENT,
68+
name,
69+
sql,
70+
runtime_config=RuntimeConfig(
71+
workers=FELDERA_TEST_NUM_WORKERS,
72+
hosts=FELDERA_TEST_NUM_HOSTS,
73+
),
74+
).create_or_replace()
4975
except Exception:
5076
pass
5177
pipeline = Pipeline.get(name, TEST_CLIENT)
@@ -57,7 +83,15 @@ def test_program_error0(self):
5783
def test_program_error1(self):
5884
sql = ""
5985
name = unique_pipeline_name("test_program_error1")
60-
_ = PipelineBuilder(TEST_CLIENT, name, sql).create_or_replace()
86+
_ = PipelineBuilder(
87+
TEST_CLIENT,
88+
name,
89+
sql,
90+
runtime_config=RuntimeConfig(
91+
workers=FELDERA_TEST_NUM_WORKERS,
92+
hosts=FELDERA_TEST_NUM_HOSTS,
93+
),
94+
).create_or_replace()
6195
pipeline = Pipeline.get(name, TEST_CLIENT)
6296
err = pipeline.program_error()
6397
assert err["sql_compilation"]["exit_code"] == 0
@@ -69,7 +103,15 @@ def test_errors0(self):
69103
sql = "SELECT invalid"
70104
name = unique_pipeline_name("test_errors0")
71105
try:
72-
_ = PipelineBuilder(TEST_CLIENT, name, sql).create_or_replace()
106+
_ = PipelineBuilder(
107+
TEST_CLIENT,
108+
name,
109+
sql,
110+
runtime_config=RuntimeConfig(
111+
workers=FELDERA_TEST_NUM_WORKERS,
112+
hosts=FELDERA_TEST_NUM_HOSTS,
113+
),
114+
).create_or_replace()
73115
except Exception:
74116
pass
75117
pipeline = Pipeline.get(name, TEST_CLIENT)
@@ -95,7 +137,13 @@ def test_initialization_error(self):
95137
);
96138
"""
97139
pipeline = PipelineBuilder(
98-
TEST_CLIENT, name=unique_pipeline_name("test_initialization_error"), sql=sql
140+
TEST_CLIENT,
141+
name=unique_pipeline_name("test_initialization_error"),
142+
sql=sql,
143+
runtime_config=RuntimeConfig(
144+
workers=FELDERA_TEST_NUM_WORKERS,
145+
hosts=FELDERA_TEST_NUM_HOSTS,
146+
),
99147
).create_or_replace()
100148
with self.assertRaises(RuntimeError) as err:
101149
pipeline.start()

python/tests/platform/test_bootstrapping.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from .helper import (
66
gen_pipeline_name,
77
)
8+
from feldera.testutils import FELDERA_TEST_NUM_WORKERS, FELDERA_TEST_NUM_HOSTS
89

910

1011
@enterprise_only
@@ -25,6 +26,8 @@ def test_bootstrap_enterprise(pipeline_name):
2526
pipeline_name,
2627
sql=sql,
2728
runtime_config=RuntimeConfig(
29+
workers=FELDERA_TEST_NUM_WORKERS,
30+
hosts=FELDERA_TEST_NUM_HOSTS,
2831
fault_tolerance_model=None, # We will make manual checkpoints in this test.
2932
),
3033
).create_or_replace()
@@ -277,6 +280,8 @@ def test_bootstrap_non_materialized_table_enterprise(pipeline_name):
277280
pipeline_name,
278281
sql=sql,
279282
runtime_config=RuntimeConfig(
283+
workers=FELDERA_TEST_NUM_WORKERS,
284+
hosts=FELDERA_TEST_NUM_HOSTS,
280285
fault_tolerance_model=None, # We will make manual checkpoints in this test.
281286
),
282287
).create_or_replace()
@@ -319,6 +324,8 @@ def test_bootstrap_table_lateness_enterprise(pipeline_name):
319324
pipeline_name,
320325
sql=sql,
321326
runtime_config=RuntimeConfig(
327+
workers=FELDERA_TEST_NUM_WORKERS,
328+
hosts=FELDERA_TEST_NUM_HOSTS,
322329
fault_tolerance_model=None, # We will make manual checkpoints in this test.
323330
),
324331
).create_or_replace()
@@ -362,6 +369,8 @@ def test_bootstrap_view_lateness_enterprise(pipeline_name):
362369
pipeline_name,
363370
sql=sql,
364371
runtime_config=RuntimeConfig(
372+
workers=FELDERA_TEST_NUM_WORKERS,
373+
hosts=FELDERA_TEST_NUM_HOSTS,
365374
fault_tolerance_model=None, # We will make manual checkpoints in this test.
366375
),
367376
).create_or_replace()
@@ -411,6 +420,8 @@ def gen_sql(connectors):
411420
pipeline_name,
412421
sql=gen_sql(""),
413422
runtime_config=RuntimeConfig(
423+
workers=FELDERA_TEST_NUM_WORKERS,
424+
hosts=FELDERA_TEST_NUM_HOSTS,
414425
fault_tolerance_model=None, # We will make manual checkpoints in this test.
415426
),
416427
).create_or_replace()

python/tests/platform/test_checkpoint_sync.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from feldera.runtime_config import RuntimeConfig, Storage
1010
from tests import enterprise_only
1111
from tests.shared_test_pipeline import SharedTestPipeline
12+
from feldera.testutils import FELDERA_TEST_NUM_WORKERS, FELDERA_TEST_NUM_HOSTS
1213

1314
DEFAULT_ENDPOINT = os.environ.get(
1415
"DEFAULT_MINIO_ENDPOINT", "http://minio.extra.svc.cluster.local:9000"
@@ -83,6 +84,8 @@ def test_checkpoint_sync(
8384

8485
self.pipeline.set_runtime_config(
8586
RuntimeConfig(
87+
workers=FELDERA_TEST_NUM_WORKERS,
88+
hosts=FELDERA_TEST_NUM_HOSTS,
8689
fault_tolerance_model=ft,
8790
storage=Storage(config=storage_config),
8891
checkpoint_interval_secs=ft_interval,
@@ -159,6 +162,8 @@ def test_checkpoint_sync(
159162
)
160163
self.pipeline.set_runtime_config(
161164
RuntimeConfig(
165+
workers=FELDERA_TEST_NUM_WORKERS,
166+
hosts=FELDERA_TEST_NUM_HOSTS,
162167
fault_tolerance_model=ft,
163168
storage=Storage(config=storage_config),
164169
checkpoint_interval_secs=ft_interval,
@@ -261,7 +266,10 @@ def test_standby_fallback(self, from_uuid: bool = False):
261266
ft = FaultToleranceModel.AtLeastOnce
262267
self.pipeline.set_runtime_config(
263268
RuntimeConfig(
264-
fault_tolerance_model=ft, storage=Storage(config=storage_config)
269+
workers=FELDERA_TEST_NUM_WORKERS,
270+
hosts=FELDERA_TEST_NUM_HOSTS,
271+
fault_tolerance_model=ft,
272+
storage=Storage(config=storage_config),
265273
)
266274
)
267275
self.pipeline.start()
@@ -285,6 +293,8 @@ def test_standby_fallback(self, from_uuid: bool = False):
285293
pull_interval = 1
286294
standby.set_runtime_config(
287295
RuntimeConfig(
296+
workers=FELDERA_TEST_NUM_WORKERS,
297+
hosts=FELDERA_TEST_NUM_HOSTS,
288298
fault_tolerance_model=ft,
289299
storage=Storage(
290300
config=storage_cfg(

python/tests/platform/test_nowstream.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@
33

44
from feldera.pipeline_builder import PipelineBuilder
55
from feldera.runtime_config import RuntimeConfig
6-
from feldera.testutils import unique_pipeline_name
6+
from feldera.testutils import (
7+
unique_pipeline_name,
8+
FELDERA_TEST_NUM_WORKERS,
9+
FELDERA_TEST_NUM_HOSTS,
10+
)
711
from tests import TEST_CLIENT
812
from feldera.enums import PipelineStatus
913

@@ -32,8 +36,10 @@ def test_nowstream(self):
3236
pipeline_name,
3337
sql=sql,
3438
runtime_config=RuntimeConfig(
39+
workers=FELDERA_TEST_NUM_WORKERS,
40+
hosts=FELDERA_TEST_NUM_HOSTS,
3541
# 10 times per second
36-
clock_resolution_usecs=100000
42+
clock_resolution_usecs=100000,
3743
),
3844
).create_or_replace()
3945

python/tests/platform/test_pipeline_builder.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
import unittest
2-
from feldera.testutils import unique_pipeline_name
2+
from feldera.testutils import (
3+
unique_pipeline_name,
4+
FELDERA_TEST_NUM_WORKERS,
5+
FELDERA_TEST_NUM_HOSTS,
6+
)
37
from tests import TEST_CLIENT
48
from feldera import PipelineBuilder
9+
from feldera.runtime_config import RuntimeConfig
510

611

712
class TestPipelineBuilder(unittest.TestCase):
@@ -26,7 +31,13 @@ def test_connector_orchestration(self):
2631
pipeline_name = unique_pipeline_name("test_connector_orchestration")
2732

2833
pipeline = PipelineBuilder(
29-
TEST_CLIENT, pipeline_name, sql=sql
34+
TEST_CLIENT,
35+
pipeline_name,
36+
sql=sql,
37+
runtime_config=RuntimeConfig(
38+
workers=FELDERA_TEST_NUM_WORKERS,
39+
hosts=FELDERA_TEST_NUM_HOSTS,
40+
),
3041
).create_or_replace()
3142
pipeline.start()
3243

python/tests/platform/test_shared_pipeline.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from feldera.runtime_config import RuntimeConfig
1616
from tests import TEST_CLIENT, enterprise_only
1717
from tests.shared_test_pipeline import SharedTestPipeline
18+
from feldera.testutils import FELDERA_TEST_NUM_WORKERS, FELDERA_TEST_NUM_HOSTS
1819

1920

2021
class TestPipeline(SharedTestPipeline):
@@ -637,7 +638,13 @@ def test_pipeline_resource_config(self):
637638
}
638639

639640
resources = Resources(config)
640-
self.pipeline.set_runtime_config(RuntimeConfig(resources=resources))
641+
self.pipeline.set_runtime_config(
642+
RuntimeConfig(
643+
workers=FELDERA_TEST_NUM_WORKERS,
644+
hosts=FELDERA_TEST_NUM_HOSTS,
645+
resources=resources,
646+
)
647+
)
641648
self.pipeline.start()
642649
got = TEST_CLIENT.get_pipeline(
643650
self.pipeline.name, PipelineFieldSelector.ALL

python/tests/platform/test_update_runtime.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@
22

33
from feldera.pipeline_builder import PipelineBuilder
44
from feldera.runtime_config import RuntimeConfig
5-
from feldera.testutils import unique_pipeline_name
5+
from feldera.testutils import (
6+
unique_pipeline_name,
7+
FELDERA_TEST_NUM_WORKERS,
8+
FELDERA_TEST_NUM_HOSTS,
9+
)
610
from tests import TEST_CLIENT
711
from feldera.enums import PipelineStatus
812

@@ -28,6 +32,8 @@ def test_update_runtime(self):
2832
pipeline_name,
2933
sql=sql,
3034
runtime_config=RuntimeConfig(
35+
workers=FELDERA_TEST_NUM_WORKERS,
36+
hosts=FELDERA_TEST_NUM_HOSTS,
3137
logging="debug",
3238
),
3339
).create_or_replace()

0 commit comments

Comments
 (0)