55import re
66
77from feldera import PipelineBuilder , Pipeline
8+ from feldera .rest .errors import FelderaAPIError
89from tests import TEST_CLIENT , unique_pipeline_name
910from feldera .enums import CompilationProfile
1011from feldera .runtime_config import RuntimeConfig
@@ -182,11 +183,15 @@ def generate_sql(tables: list[Table], views: list[View]) -> str:
182183 def run_pipeline (self , pipeline_name_prefix : str , sql : str , views : list [View ]):
183184 """Run pipeline with the given SQL, load tables, validate views, and shutdown"""
184185 pipeline = None
186+ sql_id = sql_hash (sql )
187+ pipeline_name = unique_pipeline_name (f"{ pipeline_name_prefix } _{ sql_id } " )
185188 try :
186- sql_id = sql_hash (sql )
189+ # Pipelines that fail to compile will remain None
190+ # as `PipelineBuilder` will raise an exception and not
191+ # return a `Pipeline` object.
187192 pipeline = PipelineBuilder (
188193 TEST_CLIENT ,
189- unique_pipeline_name ( f" { pipeline_name_prefix } _ { sql_id } " ) ,
194+ pipeline_name ,
190195 sql = sql ,
191196 compilation_profile = CompilationProfile .UNOPTIMIZED ,
192197 runtime_config = RuntimeConfig (provisioning_timeout_secs = 180 ),
@@ -204,6 +209,17 @@ def run_pipeline(self, pipeline_name_prefix: str, sql: str, views: list[View]):
204209 for view in views :
205210 view .validate (pipeline )
206211 finally :
212+ # Try to get the pipelines that were created by
213+ # `PipelineBuilder` but failed to compile.
214+ if pipeline is None :
215+ try :
216+ pipeline = Pipeline .get (pipeline_name , TEST_CLIENT )
217+ except FelderaAPIError as e :
218+ if "UnknownPipelineName" in str (e ):
219+ pass
220+ else :
221+ raise
222+
207223 if pipeline is not None :
208224 pipeline .stop (force = True )
209225 pipeline .delete (True )
0 commit comments