-
Notifications
You must be signed in to change notification settings - Fork 131
Expand file tree
/
Copy pathtest_empty_input.py
More file actions
46 lines (37 loc) · 1.25 KB
/
Copy pathtest_empty_input.py
File metadata and controls
46 lines (37 loc) · 1.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
"""Tests for the empty_input connector."""
from feldera import PipelineBuilder
from feldera.runtime_config import RuntimeConfig
from feldera.testutils import FELDERA_TEST_NUM_HOSTS, FELDERA_TEST_NUM_WORKERS
from tests import TEST_CLIENT
from tests.platform.helper import gen_pipeline_name
@gen_pipeline_name
def test_empty_input(pipeline_name):
"""Verify that a pipeline with an empty_input connector runs to completion.
The connector immediately signals end of input; the view should contain
no rows.
"""
sql = """
CREATE TABLE t (id INT NOT NULL) WITH (
'connectors' = '[{
"name": "t",
"transport": {"name": "empty_input"},
"format": {"name": "json"}
}]'
);
CREATE MATERIALIZED VIEW v AS SELECT COUNT(*) AS c FROM t;
""".strip()
pipeline = PipelineBuilder(
TEST_CLIENT,
name=pipeline_name,
sql=sql,
runtime_config=RuntimeConfig(
workers=FELDERA_TEST_NUM_WORKERS,
hosts=FELDERA_TEST_NUM_HOSTS,
),
).create_or_replace()
pipeline.start()
pipeline.wait_for_completion()
rows = list(pipeline.query("SELECT c FROM v"))
assert rows == [{"c": 0}], f"Unexpected row count: {rows}"
pipeline.stop(force=True)
pipeline.delete(True)