Skip to content

Commit 7188c2d

Browse files
committed
py: refactor python tests to run in parallel
* python tests should no longer run for hours in CI, seems like it was a bug in the stop implementation, where we did for x in collection.pop(), which instead of popping all the items, only pops the first one and iterates on it. * adds support for waiting until all data represented by a completion token has been processed * refactor the tests to run in parallel Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
1 parent 6243c26 commit 7188c2d

File tree

10 files changed

+603
-484
lines changed

10 files changed

+603
-484
lines changed

.github/workflows/test-integration.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ jobs:
184184

185185
- name: Run python tests
186186
if: ${{ vars.CI_DRY_RUN != 'true' }}
187-
run: uv run --locked pytest . --timeout=600
187+
run: uv run --locked pytest -n auto . --timeout=600
188188
working-directory: python
189189
env:
190190
FELDERA_HOST: http://localhost:8080

python/feldera/pipeline.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ def input_json(
142142
data: Dict | list,
143143
update_format: str = "raw",
144144
force: bool = False,
145+
wait: bool = True,
145146
):
146147
"""
147148
Push this JSON data to the specified table of the pipeline.
@@ -155,6 +156,7 @@ def input_json(
155156
:param update_format: The update format of the JSON data to be pushed to the pipeline. Must be one of:
156157
"raw", "insert_delete". https://docs.feldera.com/formats/json#the-insertdelete-format
157158
:param force: `True` to push data even if the pipeline is paused. `False` by default.
159+
:param wait: If True, blocks until this input has been processed by the pipeline
158160
159161
:raises ValueError: If the update format is invalid.
160162
:raises FelderaAPIError: If the pipeline is not in a valid state to push data.
@@ -177,6 +179,7 @@ def input_json(
177179
update_format=update_format,
178180
array=array,
179181
force=force,
182+
wait=wait,
180183
)
181184

182185
def pause_connector(self, table_name: str, connector_name: str):
@@ -372,7 +375,7 @@ def start(self, wait: bool = True, timeout_s: Optional[float] = None):
372375
return
373376

374377
self.client.pause_pipeline(
375-
self.name, "Unable to START the pipeline.\n", timeout_s
378+
self.name, "Unable to START the pipeline.\n", wait=wait, timeout_s=timeout_s
376379
)
377380
self.__setup_output_listeners()
378381
self.resume(timeout_s=timeout_s)
@@ -506,9 +509,11 @@ def stop(self, force: bool, wait: bool = True, timeout_s: Optional[float] = None
506509
queue.put(_CallbackRunnerInstruction.RanToCompletion)
507510

508511
if len(self.views_tx) > 0:
509-
for view_name, queue in self.views_tx.pop().items():
510-
# block until the callback runner has been stopped
511-
queue.join()
512+
while self.views_tx:
513+
view = self.views_tx.pop()
514+
for view_name, queue in view.items():
515+
# block until the callback runner has been stopped
516+
queue.join()
512517

513518
time.sleep(3)
514519
self.client.stop_pipeline(

python/feldera/rest/feldera_client.py

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
from feldera.rest.config import Config
1010
from feldera.rest.feldera_config import FelderaConfig
11-
from feldera.rest.errors import FelderaTimeoutError
11+
from feldera.rest.errors import FelderaTimeoutError, FelderaAPIError
1212
from feldera.rest.pipeline import Pipeline
1313
from feldera.rest._httprequests import HttpRequests
1414
from feldera.rest._helpers import client_version
@@ -67,7 +67,7 @@ def __init__(
6767
config = self.get_config()
6868
version = client_version()
6969
if config.version != version:
70-
logging.warn(
70+
logging.warning(
7171
f"Client is on version {version} while server is at "
7272
f"{config.version}. There could be incompatibilities."
7373
)
@@ -593,7 +593,8 @@ def push_to_pipeline(
593593
update_format: str = "raw",
594594
json_flavor: Optional[str] = None,
595595
serialize: bool = True,
596-
):
596+
wait: bool = True,
597+
) -> str:
597598
"""
598599
Insert data into a pipeline
599600
@@ -610,6 +611,9 @@ def push_to_pipeline(
610611
"debezium_mysql", "snowflake", "kafka_connect_json_converter", "pandas"
611612
:param data: The data to insert
612613
:param serialize: If True, the data will be serialized to JSON. True by default
614+
:param wait: If True, blocks until this input has been processed by the pipeline
615+
616+
:returns: The completion token to this input.
613617
"""
614618

615619
if format not in ["json", "csv"]:
@@ -671,14 +675,62 @@ def push_to_pipeline(
671675
content_type = "text/csv"
672676
data = bytes(str(data), "utf-8")
673677

674-
self.http.post(
678+
resp = self.http.post(
675679
path=f"/pipelines/{pipeline_name}/ingress/{table_name}",
676680
params=params,
677681
content_type=content_type,
678682
body=data,
679683
serialize=serialize,
680684
)
681685

686+
token = resp.get("token")
687+
if token is None:
688+
raise FelderaAPIError("response did not contain a completion token", resp)
689+
690+
if not wait:
691+
return token
692+
693+
self.wait_for_token(pipeline_name, token)
694+
695+
return token
696+
697+
def wait_for_token(self, pipeline_name: str, token: str):
698+
"""
699+
Blocks until this all records represented by this completion token have
700+
been processed.
701+
702+
:param pipeline_name: The name of the pipeline
703+
:param token: The token to check for completion
704+
"""
705+
706+
params = {
707+
"token": token,
708+
}
709+
710+
start = time.time()
711+
712+
while True:
713+
resp = self.http.get(
714+
path=f"/pipelines/{pipeline_name}/completion_status", params=params
715+
)
716+
717+
status: Optional[str] = resp.get("status")
718+
if status is None:
719+
raise FelderaAPIError(
720+
f"got empty status when checking for completion status for token: {token}",
721+
resp,
722+
)
723+
724+
if status.lower() == "complete":
725+
break
726+
727+
elapsed = time.time() - start
728+
logging.debug(
729+
f"still waiting for inputs represented by {token} to be processed; elapsed: {elapsed}s"
730+
)
731+
732+
time.sleep(0.1)
733+
682734
def listen_to_pipeline(
683735
self,
684736
pipeline_name: str,

python/feldera/stats.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ def __init__(self):
1616

1717
self.global_metrics: GlobalPipelineMetrics = GlobalPipelineMetrics()
1818
self.suspend_error: Optional[Any] = None
19-
self.inputs: Mapping[List[InputEndpointStatus()]] = {}
19+
self.inputs: Mapping[List[InputEndpointStatus]] = {}
2020
self.outputs: Mapping[List[OutputEndpointStatus]] = {}
2121

2222
@classmethod

python/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ Issues = "https://github.com/feldera/feldera/issues"
3939
dev-dependencies = [
4040
"kafka-python-ng==2.2.2",
4141
"pytest-timeout>=2.3.1",
42+
"pytest-xdist>=3.8.0",
4243
"pytest>=8.3.5",
4344
"sphinx-rtd-theme==2.0.0",
4445
"sphinx==7.3.7",

python/tests/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
"FELDERA_PIPELINE_TO_KAFKA_SERVER", "redpanda:9092"
1414
)
1515

16-
TEST_CLIENT = FelderaClient(BASE_URL, api_key=API_KEY)
16+
TEST_CLIENT = FelderaClient(BASE_URL, api_key=API_KEY, requests_verify=False)
1717

1818

1919
def enterprise_only(fn):

python/tests/shared_test_pipeline.py

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
import unittest
22
from tests import TEST_CLIENT
33
from feldera import PipelineBuilder, Pipeline
4-
from feldera.runtime_config import RuntimeConfig
5-
from feldera.enums import PipelineStatus
64

75

86
class SharedTestPipeline(unittest.TestCase):
@@ -34,25 +32,16 @@ def setUpClass(cls):
3432
cls.ddl,
3533
).create_or_replace()
3634

37-
def set_runtime_config(self, runtime_config: RuntimeConfig):
38-
self._pipeline = PipelineBuilder(
39-
self.client, self.pipeline_name, self.ddl, runtime_config=runtime_config
35+
def setUp(self):
36+
p = PipelineBuilder(
37+
self.client, self._testMethodName, sql=self.ddl
4038
).create_or_replace()
39+
self.p = p
4140

42-
def reset_runtime_config(self):
43-
self.set_runtime_config(RuntimeConfig.default())
44-
45-
@classmethod
46-
def tearDownClass(cls):
47-
if cls._pipeline is not None:
48-
if cls._pipeline.status() != PipelineStatus.STOPPED:
49-
cls._pipeline.stop(force=True)
50-
cls._pipeline.clear_storage()
51-
cls._pipeline = None
52-
cls._ddls = []
41+
def tearDown(self):
42+
self.p.stop(force=True)
43+
self.p.clear_storage()
5344

5445
@property
5546
def pipeline(self) -> Pipeline:
56-
p = type(self)._pipeline
57-
assert p is not None, "Shared pipeline was not created successfully."
58-
return p
47+
return self.p
Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from feldera.runtime_config import RuntimeConfig, Storage
44
from typing import Optional
55
import os
6+
import sys
67
import time
78
from uuid import uuid4
89
import random
@@ -17,6 +18,7 @@
1718

1819

1920
def storage_cfg(
21+
pipeline_name: str,
2022
endpoint: Optional[str] = None,
2123
start_from_checkpoint: Optional[str] = None,
2224
strict: bool = False,
@@ -27,7 +29,7 @@ def storage_cfg(
2729
"name": "file",
2830
"config": {
2931
"sync": {
30-
"bucket": DEFAULT_BUCKET,
32+
"bucket": f"{DEFAULT_BUCKET}/{pipeline_name}",
3133
"access_key": ACCESS_KEY,
3234
"secret_key": SECRET_KEY if not auth_err else SECRET_KEY + "extra",
3335
"provider": "Minio",
@@ -53,18 +55,44 @@ def test_checkpoint_sync(
5355
):
5456
"""
5557
CREATE TABLE t0 (c0 INT, c1 VARCHAR);
56-
CREATE MATERIALIZED VIEW v0 AS SELECT c0 FROM t0;
58+
CREATE MATERIALIZED VIEW v0 AS SELECT * FROM t0;
5759
"""
58-
storage_config = storage_cfg()
5960

60-
self.set_runtime_config(RuntimeConfig(storage=Storage(config=storage_config)))
61+
storage_config = storage_cfg(self.pipeline.name)
62+
63+
self.pipeline.set_runtime_config(
64+
RuntimeConfig(storage=Storage(config=storage_config))
65+
)
6166
self.pipeline.start()
6267

6368
random.seed(time.time())
64-
data = [{"c0": i, "c1": str(i)} for i in range(1, random.randint(10, 20))]
69+
total = random.randint(10, 20)
70+
data = [{"c0": i, "c1": str(i)} for i in range(1, total)]
6571
self.pipeline.input_json("t0", data)
66-
self.pipeline.execute("INSERT INTO t0 VALUES (4, 'exists')")
72+
self.pipeline.execute("INSERT INTO t0 VALUES (21, 'exists')")
73+
74+
start = time.time()
75+
timeout = 5
76+
77+
while True:
78+
processed = self.pipeline.stats().global_metrics.total_processed_records
79+
if processed == total:
80+
break
81+
82+
if time.time() - start > timeout:
83+
raise TimeoutError(
84+
f"timed out while waiting for pipeline to process {total} records"
85+
)
86+
87+
time.sleep(0.1)
88+
6789
got_before = list(self.pipeline.query("SELECT * FROM v0"))
90+
print(f"{self.pipeline.name}: records: {total}, {got_before}", file=sys.stderr)
91+
92+
if len(got_before) != processed:
93+
raise RuntimeError(
94+
f"adhoc query returned {len(got_before)} but {processed} records were processed: {got_before}"
95+
)
6896

6997
self.pipeline.checkpoint(wait=True)
7098
uuid = self.pipeline.sync_checkpoint(wait=True)
@@ -79,14 +107,22 @@ def test_checkpoint_sync(
79107

80108
# Restart pipeline from checkpoint
81109
storage_config = storage_cfg(
110+
pipeline_name=self.pipeline.name,
82111
start_from_checkpoint=uuid if from_uuid else "latest",
83112
auth_err=auth_err,
84113
strict=strict,
85114
)
86-
self.set_runtime_config(RuntimeConfig(storage=Storage(config=storage_config)))
115+
self.pipeline.set_runtime_config(
116+
RuntimeConfig(storage=Storage(config=storage_config))
117+
)
87118
self.pipeline.start()
88119
got_after = list(self.pipeline.query("SELECT * FROM v0"))
89120

121+
print(
122+
f"{self.pipeline.name}: after: {len(got_after)}, {got_after}",
123+
file=sys.stderr,
124+
)
125+
90126
if expect_empty:
91127
got_before = []
92128

0 commit comments

Comments
 (0)