-
Notifications
You must be signed in to change notification settings - Fork 131
Expand file tree
/
Copy pathtest_null_output.py
More file actions
55 lines (46 loc) · 1.47 KB
/
Copy pathtest_null_output.py
File metadata and controls
55 lines (46 loc) · 1.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
"""Tests for the null_output connector."""
from feldera import PipelineBuilder
from feldera.runtime_config import RuntimeConfig
from feldera.testutils import FELDERA_TEST_NUM_HOSTS, FELDERA_TEST_NUM_WORKERS
from tests import TEST_CLIENT
from tests.platform.helper import gen_pipeline_name
@gen_pipeline_name
def test_null_output(pipeline_name):
"""Verify that a pipeline with a null_output connector runs to completion.
The connector discards all output; correctness is verified by querying the
materialized view directly via an ad-hoc query.
"""
sql = """
CREATE TABLE t (id INT NOT NULL) WITH (
'connectors' = '[{
"name": "t",
"transport": {
"name": "datagen",
"config": {"plan": [{"limit": 1000}]}
}
}]'
);
CREATE MATERIALIZED VIEW v
WITH (
'connectors' = '[{
"name": "null_out",
"transport": {"name": "null_output"},
"format": {"name": "json"}
}]'
) AS SELECT COUNT(*) AS c FROM t;
""".strip()
pipeline = PipelineBuilder(
TEST_CLIENT,
name=pipeline_name,
sql=sql,
runtime_config=RuntimeConfig(
workers=FELDERA_TEST_NUM_WORKERS,
hosts=FELDERA_TEST_NUM_HOSTS,
),
).create_or_replace()
pipeline.start()
pipeline.wait_for_completion()
rows = list(pipeline.query("SELECT c FROM v"))
assert rows == [{"c": 1000}], f"Unexpected row count: {rows}"
pipeline.stop(force=True)
pipeline.delete(True)