33from feldera .runtime_config import RuntimeConfig , Storage
44from typing import Optional
55import os
6+ import sys
67import time
78from uuid import uuid4
89import random
1718
1819
1920def storage_cfg (
21+ pipeline_name : str ,
2022 endpoint : Optional [str ] = None ,
2123 start_from_checkpoint : Optional [str ] = None ,
2224 strict : bool = False ,
@@ -27,7 +29,7 @@ def storage_cfg(
2729 "name" : "file" ,
2830 "config" : {
2931 "sync" : {
30- "bucket" : DEFAULT_BUCKET ,
32+ "bucket" : f" { DEFAULT_BUCKET } / { pipeline_name } " ,
3133 "access_key" : ACCESS_KEY ,
3234 "secret_key" : SECRET_KEY if not auth_err else SECRET_KEY + "extra" ,
3335 "provider" : "Minio" ,
@@ -53,18 +55,44 @@ def test_checkpoint_sync(
5355 ):
5456 """
5557 CREATE TABLE t0 (c0 INT, c1 VARCHAR);
56- CREATE MATERIALIZED VIEW v0 AS SELECT c0 FROM t0;
58+ CREATE MATERIALIZED VIEW v0 AS SELECT * FROM t0;
5759 """
58- storage_config = storage_cfg ()
5960
60- self .set_runtime_config (RuntimeConfig (storage = Storage (config = storage_config )))
61+ storage_config = storage_cfg (self .pipeline .name )
62+
63+ self .pipeline .set_runtime_config (
64+ RuntimeConfig (storage = Storage (config = storage_config ))
65+ )
6166 self .pipeline .start ()
6267
6368 random .seed (time .time ())
64- data = [{"c0" : i , "c1" : str (i )} for i in range (1 , random .randint (10 , 20 ))]
69+ total = random .randint (10 , 20 )
70+ data = [{"c0" : i , "c1" : str (i )} for i in range (1 , total )]
6571 self .pipeline .input_json ("t0" , data )
66- self .pipeline .execute ("INSERT INTO t0 VALUES (4, 'exists')" )
72+ self .pipeline .execute ("INSERT INTO t0 VALUES (21, 'exists')" )
73+
74+ start = time .time ()
75+ timeout = 5
76+
77+ while True :
78+ processed = self .pipeline .stats ().global_metrics .total_processed_records
79+ if processed == total :
80+ break
81+
82+ if time .time () - start > timeout :
83+ raise TimeoutError (
84+ f"timed out while waiting for pipeline to process { total } records"
85+ )
86+
87+ time .sleep (0.1 )
88+
6789 got_before = list (self .pipeline .query ("SELECT * FROM v0" ))
90+ print (f"{ self .pipeline .name } : records: { total } , { got_before } " , file = sys .stderr )
91+
92+ if len (got_before ) != processed :
93+ raise RuntimeError (
94+ f"adhoc query returned { len (got_before )} but { processed } records were processed: { got_before } "
95+ )
6896
6997 self .pipeline .checkpoint (wait = True )
7098 uuid = self .pipeline .sync_checkpoint (wait = True )
@@ -79,14 +107,22 @@ def test_checkpoint_sync(
79107
80108 # Restart pipeline from checkpoint
81109 storage_config = storage_cfg (
110+ pipeline_name = self .pipeline .name ,
82111 start_from_checkpoint = uuid if from_uuid else "latest" ,
83112 auth_err = auth_err ,
84113 strict = strict ,
85114 )
86- self .set_runtime_config (RuntimeConfig (storage = Storage (config = storage_config )))
115+ self .pipeline .set_runtime_config (
116+ RuntimeConfig (storage = Storage (config = storage_config ))
117+ )
87118 self .pipeline .start ()
88119 got_after = list (self .pipeline .query ("SELECT * FROM v0" ))
89120
121+ print (
122+ f"{ self .pipeline .name } : after: { len (got_after )} , { got_after } " ,
123+ file = sys .stderr ,
124+ )
125+
90126 if expect_empty :
91127 got_before = []
92128
0 commit comments