Skip to content

Commit 007cab4

Browse files
committed
py: add Kafka integration test for input and output connector that:
- Generates 1000000 records of data using the Datagen connector - Writes input to Kafka and the schema registry in Avro format - Reads input from Kafka to loopback table - Covers multiple supported column types - Monitors completion of message ingestion to and from Kafka - Validates that the loopback table contains all generated values Signed-off-by: rivudhk <rivudhkr@gmail.com>
1 parent 674f287 commit 007cab4

File tree

1 file changed

+158
-0
lines changed

1 file changed

+158
-0
lines changed
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
import unittest
2+
from tests import TEST_CLIENT
3+
from feldera import PipelineBuilder
4+
from feldera.enums import CompilationProfile
5+
import time
6+
7+
8+
class TestKafkaAvro(unittest.TestCase):
9+
def test_check_avro(self):
10+
sql = """
11+
create table t (
12+
id int,
13+
str varchar,
14+
dec decimal,
15+
reall real,
16+
dbl double,
17+
booll boolean,
18+
tmestmp timestamp,
19+
datee date,
20+
tme time
21+
) with (
22+
'materialized' = 'true',
23+
'connectors' = '[{
24+
"transport": {
25+
"name": "datagen",
26+
"config": { "plan": [{"limit": 1000000}] }
27+
}
28+
}]'
29+
);
30+
31+
create view v
32+
with (
33+
'connectors' = '[{
34+
"transport": {
35+
"name": "kafka_output",
36+
"config": {
37+
"bootstrap.servers": "ci-kafka-bootstrap.korat-vibes.ts.net:9094",
38+
"topic": "my_topic_avro"
39+
}
40+
},
41+
"format": {
42+
"name": "avro",
43+
"config": {
44+
"update_format": "raw",
45+
"registry_urls": ["http://ci-schema-registry.korat-vibes.ts.net"]
46+
}
47+
}
48+
},
49+
{
50+
"index": "t_index",
51+
"transport": {
52+
"name": "kafka_output",
53+
"config": {
54+
"bootstrap.servers": "ci-kafka-bootstrap.korat-vibes.ts.net:9094",
55+
"topic": "my_topic_avro2"
56+
}
57+
},
58+
"format": {
59+
"name": "avro",
60+
"config": {
61+
"update_format": "raw",
62+
"registry_urls": ["http://ci-schema-registry.korat-vibes.ts.net"]
63+
}
64+
}
65+
}]'
66+
)
67+
as select * from t;
68+
69+
create index t_index on v(id);
70+
71+
create table loopback (
72+
id int,
73+
str varchar,
74+
dec decimal,
75+
reall real,
76+
dbl double,
77+
booll boolean,
78+
tmestmp timestamp,
79+
datee date,
80+
tme time
81+
) with (
82+
'materialized' = 'true',
83+
'connectors' = '[{
84+
"transport": {
85+
"name": "kafka_input",
86+
"config": {
87+
"topic": "my_topic_avro2",
88+
"start_from": "earliest",
89+
"bootstrap.servers": "ci-kafka-bootstrap.korat-vibes.ts.net:9094"
90+
}
91+
},
92+
"format": {
93+
"name": "avro",
94+
"config": {
95+
"update_format": "raw",
96+
"registry_urls": ["http://ci-schema-registry.korat-vibes.ts.net"]
97+
}
98+
}
99+
}]'
100+
);
101+
"""
102+
pipeline = PipelineBuilder(
103+
TEST_CLIENT,
104+
"test_kafka_avro",
105+
sql=sql,
106+
compilation_profile=CompilationProfile.DEV,
107+
).create_or_replace()
108+
109+
try:
110+
pipeline.start()
111+
112+
# NOTE => total_completed_records counts all rows that are processed through each output as follows:
113+
# 1. Written by the view<v> -> Kafka
114+
# 2. Ingested into loopback table from Kafka
115+
# Thus, expected_records = generated_rows * number_of_outputs (in this case 2)
116+
expected_records = 2000000
117+
timeout_s = 1800
118+
poll_interval_s = 5
119+
120+
start_time = time.perf_counter()
121+
# Poll `total_completed_records` every `poll_interval_s` seconds until it reaches `expected_records`
122+
while True:
123+
stats = TEST_CLIENT.get_pipeline_stats(pipeline.name)
124+
completed = stats["global_metrics"]["total_completed_records"]
125+
126+
print(f"Processed {completed}/{expected_records} rows so far...")
127+
128+
if completed >= expected_records:
129+
break
130+
131+
# Prevent infinite polling
132+
if time.perf_counter() - start_time > timeout_s:
133+
raise AssertionError(
134+
f"Timeout: only {completed}/{expected_records} rows processed"
135+
)
136+
137+
time.sleep(poll_interval_s)
138+
139+
elapsed = time.perf_counter() - start_time
140+
print(
141+
f"All {completed}/{expected_records} rows processed in {elapsed:.3f}s"
142+
)
143+
144+
# Validation: once finished, the loopback table should contain all generated values
145+
count_table = list(pipeline.query("SELECT COUNT(*) AS count FROM t"))[0][
146+
"count"
147+
]
148+
count_loopback = list(
149+
pipeline.query("SELECT COUNT(*) AS count FROM loopback")
150+
)[0]["count"]
151+
152+
assert count_table == count_loopback, (
153+
f"Validation failed: {count_loopback} rows ingested vs {count_table} expected"
154+
)
155+
print(f"Loopback table validated successfully: {count_loopback} rows")
156+
157+
finally:
158+
pipeline.stop(force=True)

0 commit comments

Comments
 (0)