Skip to content

Commit a9243d4

Browse files
committed
py: aggtest: delete pipelines that fail to compile
Fixes: #4903 Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
1 parent 4a103bd commit a9243d4

File tree

1 file changed

+18
-2
lines changed

1 file changed

+18
-2
lines changed

python/tests/runtime_aggtest/aggtst_base.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import re
66

77
from feldera import PipelineBuilder, Pipeline
8+
from feldera.rest.errors import FelderaAPIError
89
from tests import TEST_CLIENT, unique_pipeline_name
910
from feldera.enums import CompilationProfile
1011
from feldera.runtime_config import RuntimeConfig
@@ -182,11 +183,15 @@ def generate_sql(tables: list[Table], views: list[View]) -> str:
182183
def run_pipeline(self, pipeline_name_prefix: str, sql: str, views: list[View]):
183184
"""Run pipeline with the given SQL, load tables, validate views, and shutdown"""
184185
pipeline = None
186+
sql_id = sql_hash(sql)
187+
pipeline_name = unique_pipeline_name(f"{pipeline_name_prefix}_{sql_id}")
185188
try:
186-
sql_id = sql_hash(sql)
189+
# Pipelines that fail to compile will remain None
190+
# as `PipelineBuilder` will raise an exception and not
191+
# return a `Pipeline` object.
187192
pipeline = PipelineBuilder(
188193
TEST_CLIENT,
189-
unique_pipeline_name(f"{pipeline_name_prefix}_{sql_id}"),
194+
pipeline_name,
190195
sql=sql,
191196
compilation_profile=CompilationProfile.UNOPTIMIZED,
192197
runtime_config=RuntimeConfig(provisioning_timeout_secs=180),
@@ -204,6 +209,17 @@ def run_pipeline(self, pipeline_name_prefix: str, sql: str, views: list[View]):
204209
for view in views:
205210
view.validate(pipeline)
206211
finally:
212+
# Try to get the pipelines that were created by
213+
# `PipelineBuilder` but failed to compile.
214+
if pipeline is None:
215+
try:
216+
pipeline = Pipeline.get(pipeline_name, TEST_CLIENT)
217+
except FelderaAPIError as e:
218+
if "UnknownPipelineName" in str(e):
219+
pass
220+
else:
221+
raise
222+
207223
if pipeline is not None:
208224
pipeline.stop(force=True)
209225
pipeline.delete(True)

0 commit comments

Comments
 (0)