Skip to content

Commit 7296630

Browse files
ryzhykblp
authored andcommitted
[py] Run platform tests with logging=debug.
Enable debug logging in platform tests. This involved refactoring tests that invoke REST API directly to use a function to create a pipeline. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
1 parent eaeaa41 commit 7296630

10 files changed

+47
-85
lines changed

python/feldera/runtime_config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ def __init__(
8181
fault_tolerance_model: Optional[FaultToleranceModel] = None,
8282
checkpoint_interval_secs: Optional[int] = None,
8383
dev_tweaks: Optional[dict] = None,
84+
logging: Optional[str] = None,
8485
):
8586
self.workers = workers
8687
self.tracing = tracing
@@ -105,6 +106,7 @@ def __init__(
105106
else:
106107
raise ValueError(f"Unknown value '{storage}' for storage")
107108
self.dev_tweaks = dev_tweaks
109+
self.logging = logging
108110

109111
@staticmethod
110112
def default() -> "RuntimeConfig":

python/tests/platform/helper.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,10 @@ def wait_for_deployment_status(name: str, desired: str, timeout_s: float = 60.0)
132132
f"Timed out waiting for pipeline '{name}' deployment_status={desired} (last={last})"
133133
)
134134

135+
def create_pipeline(name: str, sql: str):
136+
r = post_json(api_url("/pipelines"), {"name": name, "program_code": sql, "runtime_config": {"logging": "debug"}})
137+
assert r.status_code == HTTPStatus.CREATED, r.text
138+
wait_for_program_success(name, 1)
135139

136140
def start_pipeline(name: str, wait: bool = True):
137141
r = post_no_body(api_url(f"/pipelines/{name}/start"))

python/tests/platform/test_checkpoint_suspend.py

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
import pytest
77

88
from tests import TEST_CLIENT, enterprise_only
9+
from tests.platform.test_ingress_formats import create_pipeline
910
from .helper import (
10-
post_json,
1111
get,
1212
wait_for_program_success,
1313
api_url,
@@ -55,9 +55,7 @@ def test_checkpoint_oss(pipeline_name):
5555
pytest.skip("Enterprise edition: use enterprise checkpoint test instead")
5656

5757
sql = "CREATE TABLE t1(x int) WITH ('materialized'='true');"
58-
r = post_json(api_url("/pipelines"), {"name": pipeline_name, "program_code": sql})
59-
assert r.status_code == HTTPStatus.CREATED, r.text
60-
wait_for_program_success(pipeline_name, 1)
58+
create_pipeline(pipeline_name, sql)
6159

6260
resp = post_no_body(api_url(f"/pipelines/{pipeline_name}/checkpoint"))
6361
assert resp.status_code == HTTPStatus.NOT_IMPLEMENTED, resp.text
@@ -72,9 +70,7 @@ def test_checkpoint_enterprise(pipeline_name):
7270
Enterprise: invoke /checkpoint multiple times, poll /checkpoint_status for completion.
7371
"""
7472
sql = "CREATE TABLE t1(x int) WITH ('materialized'='true');"
75-
r = post_json(api_url("/pipelines"), {"name": pipeline_name, "program_code": sql})
76-
assert r.status_code == HTTPStatus.CREATED, r.text
77-
wait_for_program_success(pipeline_name, 1)
73+
create_pipeline(pipeline_name, sql)
7874
start_pipeline_as_paused(pipeline_name)
7975

8076
for _ in range(5):
@@ -112,9 +108,7 @@ def test_suspend_oss(pipeline_name):
112108
pytest.skip("Enterprise edition: use enterprise suspend test instead")
113109

114110
sql = "CREATE TABLE t1(x int) WITH ('materialized'='true');"
115-
r = post_json(api_url("/pipelines"), {"name": pipeline_name, "program_code": sql})
116-
assert r.status_code == HTTPStatus.CREATED
117-
wait_for_program_success(pipeline_name, 1)
111+
create_pipeline(pipeline_name, sql)
118112

119113
resp = post_no_body(api_url(f"/pipelines/{pipeline_name}/stop?force=false"))
120114
assert resp.status_code == HTTPStatus.NOT_IMPLEMENTED, (
@@ -187,9 +181,7 @@ def test_suspend_enterprise(pipeline_name):
187181
);
188182
""".strip()
189183

190-
r = post_json(api_url("/pipelines"), {"name": pipeline_name, "program_code": sql})
191-
assert r.status_code == HTTPStatus.CREATED, r.text
192-
wait_for_program_success(pipeline_name, 1)
184+
create_pipeline(pipeline_name, sql)
193185

194186
# Start pipeline (all connectors remain paused)
195187
start_pipeline(pipeline_name)

python/tests/platform/test_completion_tokens.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from http import HTTPStatus
88

99
from .helper import (
10+
create_pipeline,
1011
get,
1112
post_json,
1213
http_request,
@@ -84,9 +85,7 @@ def test_completion_tokens(pipeline_name):
8485
"WITH ('materialized' = 'true'); "
8586
"CREATE MATERIALIZED VIEW v1 AS SELECT * FROM t1;"
8687
)
87-
r = post_json(api_url("/pipelines"), {"name": pipeline_name, "program_code": sql})
88-
assert r.status_code == HTTPStatus.CREATED, r.text
89-
wait_for_program_success(pipeline_name, 1)
88+
create_pipeline(pipeline_name, sql)
9089
start_pipeline(pipeline_name)
9190

9291
for i in range(0, 200):

python/tests/platform/test_ingress_formats.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,15 @@
66
from urllib.parse import quote
77

88
from .helper import (
9-
post_json,
109
http_request,
11-
wait_for_program_success,
1210
api_url,
1311
start_pipeline,
1412
gen_pipeline_name,
1513
adhoc_query_json,
14+
create_pipeline
1615
)
1716

1817

19-
def create_pipeline(name: str, sql: str):
20-
r = post_json(api_url("/pipelines"), {"name": name, "program_code": sql})
21-
assert r.status_code == HTTPStatus.CREATED, r.text
22-
wait_for_program_success(name, 1)
23-
24-
2518
def _ingress(
2619
pipeline: str,
2720
table: str,

python/tests/platform/test_metrics_logs.py

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,10 @@
44
from urllib.parse import quote_plus
55

66
from .helper import (
7+
create_pipeline,
78
get,
8-
post_json,
99
post_no_body,
1010
http_request,
11-
wait_for_program_success,
1211
api_url,
1312
start_pipeline,
1413
start_pipeline_as_paused,
@@ -52,9 +51,7 @@ def test_pipeline_metrics(pipeline_name):
5251
"""
5352
Tests that circuit metrics can be retrieved from the pipeline.
5453
"""
55-
r = post_json(api_url("/pipelines"), {"name": pipeline_name, "program_code": ""})
56-
assert r.status_code == HTTPStatus.CREATED
57-
wait_for_program_success(pipeline_name, 1)
54+
create_pipeline(pipeline_name, "")
5855
start_pipeline_as_paused(pipeline_name)
5956

6057
# Default
@@ -102,9 +99,8 @@ def test_pipeline_stats(pipeline_name):
10299
);
103100
CREATE MATERIALIZED VIEW v1 AS SELECT * FROM t1;
104101
""".strip()
105-
r = post_json(api_url("/pipelines"), {"name": pipeline_name, "program_code": sql})
106-
assert r.status_code == HTTPStatus.CREATED, r.text
107-
wait_for_program_success(pipeline_name, 1)
102+
103+
create_pipeline(pipeline_name, sql)
108104
start_pipeline(pipeline_name)
109105

110106
# Create output connector on v1 (egress)
@@ -176,15 +172,7 @@ def test_pipeline_logs(pipeline_name):
176172
assert r.status_code == HTTPStatus.NOT_FOUND
177173

178174
# Create pipeline
179-
r = post_json(
180-
api_url("/pipelines"),
181-
{
182-
"name": pipeline_name,
183-
"program_code": "CREATE TABLE t1(c1 INTEGER) WITH ('materialized'='true');",
184-
},
185-
)
186-
assert r.status_code == HTTPStatus.CREATED, r.text
187-
wait_for_program_success(pipeline_name, 1)
175+
create_pipeline(pipeline_name, "CREATE TABLE t1(c1 INTEGER) WITH ('materialized'='true');")
188176

189177
# Poll for logs availability
190178
deadline = time.time() + 30

python/tests/platform/test_orchestration.py

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@
22
from http import HTTPStatus
33

44
from .helper import (
5-
post_json,
6-
wait_for_program_success,
5+
create_pipeline,
76
post_no_body,
87
api_url,
98
start_pipeline,
@@ -63,11 +62,7 @@ def test_pipeline_orchestration_basic(pipeline_name):
6362
);
6463
""".strip()
6564

66-
r = post_json(
67-
api_url("/pipelines"), {"name": cur_pipeline_name, "program_code": sql}
68-
)
69-
assert r.status_code == HTTPStatus.CREATED, r.text
70-
wait_for_program_success(cur_pipeline_name, 1)
65+
create_pipeline(cur_pipeline_name, sql)
7166
start_pipeline_as_paused(cur_pipeline_name)
7267

7368
# Initial: pipeline paused, connector running, processed=0
@@ -130,9 +125,8 @@ def test_pipeline_orchestration_errors(pipeline_name):
130125
}]'
131126
);
132127
""".strip()
133-
r = post_json(api_url("/pipelines"), {"name": pipeline_name, "program_code": sql})
134-
assert r.status_code == HTTPStatus.CREATED, r.text
135-
wait_for_program_success(pipeline_name, 1)
128+
129+
create_pipeline(pipeline_name, sql)
136130
start_pipeline_as_paused(pipeline_name)
137131

138132
# ACCEPTED endpoints
@@ -206,9 +200,7 @@ def test_pipeline_orchestration_scenarios(pipeline_name):
206200
]'
207201
);
208202
""".strip()
209-
r = post_json(api_url("/pipelines"), {"name": pipeline_name, "program_code": sql})
210-
assert r.status_code == HTTPStatus.CREATED, r.text
211-
wait_for_program_success(pipeline_name, 1)
203+
create_pipeline(pipeline_name, sql)
212204
stop_pipeline(pipeline_name, force=True)
213205

214206
class Step:

python/tests/platform/test_pipeline_lifecycle.py

Lines changed: 8 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from http import HTTPStatus
33

44
from .helper import (
5+
create_pipeline,
56
post_json,
67
http_request,
78
wait_for_program_success,
@@ -57,12 +58,7 @@ def test_deploy_pipeline(pipeline_name):
5758
"CREATE TABLE t1(c1 INTEGER) WITH ('materialized' = 'true'); "
5859
"CREATE VIEW v1 AS SELECT * FROM t1;"
5960
)
60-
r = post_json(
61-
api_url("/pipelines"),
62-
{"name": pipeline_name, "program_code": sql},
63-
)
64-
assert r.status_code == HTTPStatus.CREATED, r.text
65-
wait_for_program_success(pipeline_name, 1)
61+
create_pipeline(pipeline_name, sql)
6662

6763
start_pipeline(pipeline_name)
6864
assert _ingress(pipeline_name, "t1", "1\n2\n3\n").status_code == HTTPStatus.OK
@@ -89,9 +85,7 @@ def test_pipeline_panic(pipeline_name):
8985
"CREATE TABLE t1(c1 INTEGER); "
9086
"CREATE VIEW v1 AS SELECT ELEMENT(ARRAY [2, 3]) FROM t1;"
9187
)
92-
r = post_json(api_url("/pipelines"), {"name": pipeline_name, "program_code": sql})
93-
assert r.status_code == HTTPStatus.CREATED, r.text
94-
wait_for_program_success(pipeline_name, 1)
88+
create_pipeline(pipeline_name, sql)
9589

9690
start_pipeline(pipeline_name)
9791
_ingress(pipeline_name, "t1", "1\n2\n3\n")
@@ -109,9 +103,7 @@ def test_pipeline_restart(pipeline_name):
109103
Start -> stop (force) -> start -> stop (force & clear).
110104
"""
111105
sql = "CREATE TABLE t1(c1 INTEGER); CREATE VIEW v1 AS SELECT * FROM t1;"
112-
r = post_json(api_url("/pipelines"), {"name": pipeline_name, "program_code": sql})
113-
assert r.status_code == HTTPStatus.CREATED, r.text
114-
wait_for_program_success(pipeline_name, 1)
106+
create_pipeline(pipeline_name, sql)
115107

116108
start_pipeline(pipeline_name)
117109
stop_pipeline(pipeline_name, force=True)
@@ -192,12 +184,7 @@ def test_pipeline_stop_force_after_start(pipeline_name):
192184
"""
193185
Start and then force stop after varying short delays.
194186
"""
195-
r = post_json(
196-
api_url("/pipelines"),
197-
{"name": pipeline_name, "program_code": "CREATE TABLE t1(c1 INTEGER);"},
198-
)
199-
assert r.status_code == HTTPStatus.CREATED
200-
wait_for_program_success(pipeline_name, 1)
187+
create_pipeline(pipeline_name, "CREATE TABLE t1(c1 INTEGER);")
201188

202189
for delay_sec in [0, 0.1, 0.5, 1, 3, 10]:
203190
start_pipeline(pipeline_name)
@@ -213,9 +200,7 @@ def test_pipeline_stop_with_force(pipeline_name):
213200
"""
214201
Sequences of starting/stopping with force.
215202
"""
216-
r = post_json(api_url("/pipelines"), {"name": pipeline_name, "program_code": ""})
217-
assert r.status_code == HTTPStatus.CREATED
218-
wait_for_program_success(pipeline_name, 1)
203+
create_pipeline(pipeline_name, "")
219204

220205
# Already stopped force
221206
stop_pipeline(pipeline_name, force=True)
@@ -241,9 +226,7 @@ def test_pipeline_stop_without_force(pipeline_name):
241226
"""
242227
Same sequences but without force (Enterprise only).
243228
"""
244-
r = post_json(api_url("/pipelines"), {"name": pipeline_name, "program_code": ""})
245-
assert r.status_code == HTTPStatus.CREATED
246-
wait_for_program_success(pipeline_name, 1)
229+
create_pipeline(pipeline_name, "")
247230

248231
# Already stopped
249232
stop_pipeline(pipeline_name, force=False)
@@ -271,12 +254,7 @@ def test_pipeline_clear(pipeline_name):
271254
"""
272255
Validate storage_status transitions and clear behavior.
273256
"""
274-
r = post_json(
275-
api_url("/pipelines"),
276-
{"name": pipeline_name, "program_code": ""},
277-
)
278-
assert r.status_code == HTTPStatus.CREATED
279-
wait_for_program_success(pipeline_name, 1)
257+
create_pipeline(pipeline_name, "")
280258

281259
obj = get_pipeline(pipeline_name, "status").json()
282260
assert obj.get("storage_status") == "Cleared"

python/tests/platform/test_update_runtime.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ def test_update_runtime(self):
2929
sql=sql,
3030
runtime_config=RuntimeConfig(
3131
dev_tweaks={"backfill_avoidance": True},
32+
logging="debug",
3233
),
3334
).create_or_replace()
3435

python/tests/shared_test_pipeline.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import unittest
2+
from feldera.runtime_config import RuntimeConfig
23
from feldera.testutils import unique_pipeline_name
34
from tests import TEST_CLIENT
45
from feldera import PipelineBuilder, Pipeline
@@ -45,11 +46,19 @@ def setUpClass(cls):
4546
cls.client,
4647
cls.pipeline_name,
4748
cls.ddl,
49+
runtime_config=RuntimeConfig(
50+
logging="debug",
51+
),
4852
).create_or_replace()
4953

5054
def setUp(self):
5155
p = PipelineBuilder(
52-
self.client, unique_pipeline_name(self._testMethodName), sql=self.ddl
56+
self.client,
57+
unique_pipeline_name(self._testMethodName),
58+
sql=self.ddl,
59+
runtime_config=RuntimeConfig(
60+
logging="debug",
61+
),
5362
).create_or_replace()
5463
self.p = p
5564

@@ -66,4 +75,8 @@ def new_pipeline_with_suffix(self, suffix: str) -> Pipeline:
6675
self.client,
6776
unique_pipeline_name(f"{self._testMethodName}_{suffix}"),
6877
sql=self.ddl,
78+
runtime_config=RuntimeConfig(
79+
logging="debug",
80+
),
81+
6982
).create_or_replace()

0 commit comments

Comments
 (0)