22from http import HTTPStatus
33
44from .helper import (
5+ create_pipeline ,
56 post_json ,
67 http_request ,
78 wait_for_program_success ,
@@ -57,12 +58,7 @@ def test_deploy_pipeline(pipeline_name):
5758 "CREATE TABLE t1(c1 INTEGER) WITH ('materialized' = 'true'); "
5859 "CREATE VIEW v1 AS SELECT * FROM t1;"
5960 )
60- r = post_json (
61- api_url ("/pipelines" ),
62- {"name" : pipeline_name , "program_code" : sql },
63- )
64- assert r .status_code == HTTPStatus .CREATED , r .text
65- wait_for_program_success (pipeline_name , 1 )
61+ create_pipeline (pipeline_name , sql )
6662
6763 start_pipeline (pipeline_name )
6864 assert _ingress (pipeline_name , "t1" , "1\n 2\n 3\n " ).status_code == HTTPStatus .OK
@@ -89,9 +85,7 @@ def test_pipeline_panic(pipeline_name):
8985 "CREATE TABLE t1(c1 INTEGER); "
9086 "CREATE VIEW v1 AS SELECT ELEMENT(ARRAY [2, 3]) FROM t1;"
9187 )
92- r = post_json (api_url ("/pipelines" ), {"name" : pipeline_name , "program_code" : sql })
93- assert r .status_code == HTTPStatus .CREATED , r .text
94- wait_for_program_success (pipeline_name , 1 )
88+ create_pipeline (pipeline_name , sql )
9589
9690 start_pipeline (pipeline_name )
9791 _ingress (pipeline_name , "t1" , "1\n 2\n 3\n " )
@@ -109,9 +103,7 @@ def test_pipeline_restart(pipeline_name):
109103 Start -> stop (force) -> start -> stop (force & clear).
110104 """
111105 sql = "CREATE TABLE t1(c1 INTEGER); CREATE VIEW v1 AS SELECT * FROM t1;"
112- r = post_json (api_url ("/pipelines" ), {"name" : pipeline_name , "program_code" : sql })
113- assert r .status_code == HTTPStatus .CREATED , r .text
114- wait_for_program_success (pipeline_name , 1 )
106+ create_pipeline (pipeline_name , sql )
115107
116108 start_pipeline (pipeline_name )
117109 stop_pipeline (pipeline_name , force = True )
@@ -192,12 +184,7 @@ def test_pipeline_stop_force_after_start(pipeline_name):
192184 """
193185 Start and then force stop after varying short delays.
194186 """
195- r = post_json (
196- api_url ("/pipelines" ),
197- {"name" : pipeline_name , "program_code" : "CREATE TABLE t1(c1 INTEGER);" },
198- )
199- assert r .status_code == HTTPStatus .CREATED
200- wait_for_program_success (pipeline_name , 1 )
187+ create_pipeline (pipeline_name , "CREATE TABLE t1(c1 INTEGER);" )
201188
202189 for delay_sec in [0 , 0.1 , 0.5 , 1 , 3 , 10 ]:
203190 start_pipeline (pipeline_name )
@@ -213,9 +200,7 @@ def test_pipeline_stop_with_force(pipeline_name):
213200 """
214201 Sequences of starting/stopping with force.
215202 """
216- r = post_json (api_url ("/pipelines" ), {"name" : pipeline_name , "program_code" : "" })
217- assert r .status_code == HTTPStatus .CREATED
218- wait_for_program_success (pipeline_name , 1 )
203+ create_pipeline (pipeline_name , "" )
219204
220205 # Already stopped force
221206 stop_pipeline (pipeline_name , force = True )
@@ -241,9 +226,7 @@ def test_pipeline_stop_without_force(pipeline_name):
241226 """
242227 Same sequences but without force (Enterprise only).
243228 """
244- r = post_json (api_url ("/pipelines" ), {"name" : pipeline_name , "program_code" : "" })
245- assert r .status_code == HTTPStatus .CREATED
246- wait_for_program_success (pipeline_name , 1 )
229+ create_pipeline (pipeline_name , "" )
247230
248231 # Already stopped
249232 stop_pipeline (pipeline_name , force = False )
@@ -271,12 +254,7 @@ def test_pipeline_clear(pipeline_name):
271254 """
272255 Validate storage_status transitions and clear behavior.
273256 """
274- r = post_json (
275- api_url ("/pipelines" ),
276- {"name" : pipeline_name , "program_code" : "" },
277- )
278- assert r .status_code == HTTPStatus .CREATED
279- wait_for_program_success (pipeline_name , 1 )
257+ create_pipeline (pipeline_name , "" )
280258
281259 obj = get_pipeline (pipeline_name , "status" ).json ()
282260 assert obj .get ("storage_status" ) == "Cleared"
0 commit comments