Skip to content

Commit ad98896

Browse files
committed
Add hash validation, Kafka artifacts cleanup, and environment variables
- Update dependencies to include confluent-kafka - Validate loopback table by comparing its hash with the source table - Delete Kafka topics and schema subjects after each test run - Add environment variables for Kafka bootstrap and Schema Registry URL Signed-off-by: rivudhk <rivudhkr@gmail.com>
1 parent 007cab4 commit ad98896

File tree

1 file changed

+123
-55
lines changed

1 file changed

+123
-55
lines changed
Lines changed: 123 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,80 @@
11
import unittest
22
from tests import TEST_CLIENT
33
from feldera import PipelineBuilder
4-
from feldera.enums import CompilationProfile
54
import time
5+
import os
6+
from confluent_kafka.admin import AdminClient
7+
import requests
8+
import re
9+
10+
11+
def env(name: str, default: str) -> str:
12+
"""Get environment variables for the Kafka broker and Schema registry.
13+
The default values are only meant for internal development; external users must set them."""
14+
return os.getenv(name, default)
15+
16+
17+
# Set these before running the test:
18+
# Example(terminal/shell):
19+
# export KAFKA_BOOTSTRAP_SERVERS = localhost:9092
20+
# export SCHEMA_REGISTRY_URL = http://localhost:8081
21+
22+
KAFKA_BOOTSTRAP = env(
23+
"KAFKA_BOOTSTRAP_SERVERS", "ci-kafka-bootstrap.korat-vibes.ts.net:9094"
24+
)
25+
SCHEMA_REGISTRY = env(
26+
"SCHEMA_REGISTRY_URL", "http://ci-schema-registry.korat-vibes.ts.net"
27+
)
28+
29+
30+
def extract_kafka_avro_artifacts(sql: str) -> tuple[list[str], list[str]]:
31+
"""Extract Kafka topic and schema subjects from the SQL query"""
32+
topics = re.findall(r'"topic"\s*:\s*"([^"]+)"', sql)
33+
34+
subjects = re.findall(r"create view\s+(\w+)", sql, re.I) + re.findall(
35+
r"create index\s+(\w+)", sql, re.I
36+
)
37+
38+
return list(set(topics)), list(set(subjects))
39+
40+
41+
def delete_kafka_topics(bootstrap_servers: str, topics: list[str]):
42+
admin = AdminClient({"bootstrap.servers": bootstrap_servers})
43+
tpcs = admin.delete_topics(topics)
44+
45+
for topic, tpcs in tpcs.items():
46+
try:
47+
tpcs.result()
48+
print(f"Deleted topic: {topic}")
49+
except Exception as e:
50+
print(f"Failed to delete {topic}: {e}")
51+
52+
53+
def delete_schema_subjects(registry_url: str, subjects: list[str]):
54+
for subject in subjects:
55+
r = requests.delete(f"{registry_url}/subjects/{subject}")
56+
print(
57+
f"Deleted schema subject: {subject}"
58+
if r.status_code == 200
59+
else f"Failed to delete {subject}: {r.status_code} {r.text}"
60+
)
61+
62+
63+
def cleanup_kafka(sql: str, bootstrap_servers: str, registry_url: str):
64+
"""Clean up Kafka topics and Schema Subjects after each test run.
65+
Each run produces new records. So, rerunning without cleanup will append data to the same topic(s)."""
66+
topics, subjects = extract_kafka_avro_artifacts(sql)
67+
delete_kafka_topics(bootstrap_servers, topics)
68+
delete_schema_subjects(registry_url, subjects)
69+
70+
71+
# Set the limit for number of records to generate
72+
LIMIT = 1000000
673

774

875
class TestKafkaAvro(unittest.TestCase):
976
def test_check_avro(self):
10-
sql = """
77+
sql = f"""
1178
create table t (
1279
id int,
1380
str varchar,
@@ -20,49 +87,49 @@ def test_check_avro(self):
2087
tme time
2188
) with (
2289
'materialized' = 'true',
23-
'connectors' = '[{
24-
"transport": {
90+
'connectors' = '[{{
91+
"transport": {{
2592
"name": "datagen",
26-
"config": { "plan": [{"limit": 1000000}] }
27-
}
28-
}]'
93+
"config": {{ "plan": [{{"limit": {LIMIT}}}], "seed": 1 }}
94+
}}
95+
}}]'
2996
);
3097
3198
create view v
3299
with (
33-
'connectors' = '[{
34-
"transport": {
100+
'connectors' = '[{{
101+
"transport": {{
35102
"name": "kafka_output",
36-
"config": {
37-
"bootstrap.servers": "ci-kafka-bootstrap.korat-vibes.ts.net:9094",
103+
"config": {{
104+
"bootstrap.servers": "{KAFKA_BOOTSTRAP}",
38105
"topic": "my_topic_avro"
39-
}
40-
},
41-
"format": {
106+
}}
107+
}},
108+
"format": {{
42109
"name": "avro",
43-
"config": {
110+
"config": {{
44111
"update_format": "raw",
45-
"registry_urls": ["http://ci-schema-registry.korat-vibes.ts.net"]
46-
}
47-
}
48-
},
49-
{
112+
"registry_urls": ["{SCHEMA_REGISTRY}"]
113+
}}
114+
}}
115+
}},
116+
{{
50117
"index": "t_index",
51-
"transport": {
118+
"transport": {{
52119
"name": "kafka_output",
53-
"config": {
54-
"bootstrap.servers": "ci-kafka-bootstrap.korat-vibes.ts.net:9094",
120+
"config": {{
121+
"bootstrap.servers": "{KAFKA_BOOTSTRAP}",
55122
"topic": "my_topic_avro2"
56-
}
57-
},
58-
"format": {
123+
}}
124+
}},
125+
"format": {{
59126
"name": "avro",
60-
"config": {
127+
"config": {{
61128
"update_format": "raw",
62-
"registry_urls": ["http://ci-schema-registry.korat-vibes.ts.net"]
63-
}
64-
}
65-
}]'
129+
"registry_urls": ["{SCHEMA_REGISTRY}"]
130+
}}
131+
}}
132+
}}]'
66133
)
67134
as select * from t;
68135
@@ -80,30 +147,29 @@ def test_check_avro(self):
80147
tme time
81148
) with (
82149
'materialized' = 'true',
83-
'connectors' = '[{
84-
"transport": {
150+
'connectors' = '[{{
151+
"transport": {{
85152
"name": "kafka_input",
86-
"config": {
153+
"config": {{
87154
"topic": "my_topic_avro2",
88155
"start_from": "earliest",
89-
"bootstrap.servers": "ci-kafka-bootstrap.korat-vibes.ts.net:9094"
90-
}
91-
},
92-
"format": {
156+
"bootstrap.servers": "{KAFKA_BOOTSTRAP}"
157+
}}
158+
}},
159+
"format": {{
93160
"name": "avro",
94-
"config": {
161+
"config": {{
95162
"update_format": "raw",
96-
"registry_urls": ["http://ci-schema-registry.korat-vibes.ts.net"]
97-
}
98-
}
99-
}]'
163+
"registry_urls": ["{SCHEMA_REGISTRY}"]
164+
}}
165+
}}
166+
}}]'
100167
);
101168
"""
102169
pipeline = PipelineBuilder(
103170
TEST_CLIENT,
104171
"test_kafka_avro",
105172
sql=sql,
106-
compilation_profile=CompilationProfile.DEV,
107173
).create_or_replace()
108174

109175
try:
@@ -113,7 +179,7 @@ def test_check_avro(self):
113179
# 1. Written by the view<v> -> Kafka
114180
# 2. Ingested into loopback table from Kafka
115181
# Thus, expected_records = generated_rows * number_of_outputs (in this case 2)
116-
expected_records = 2000000
182+
expected_records = LIMIT * 2
117183
timeout_s = 1800
118184
poll_interval_s = 5
119185

@@ -142,17 +208,19 @@ def test_check_avro(self):
142208
)
143209

144210
# Validation: once finished, the loopback table should contain all generated values
145-
count_table = list(pipeline.query("SELECT COUNT(*) AS count FROM t"))[0][
146-
"count"
147-
]
148-
count_loopback = list(
149-
pipeline.query("SELECT COUNT(*) AS count FROM loopback")
150-
)[0]["count"]
151-
152-
assert count_table == count_loopback, (
153-
f"Validation failed: {count_loopback} rows ingested vs {count_table} expected"
211+
# Validate by comparing the hash of the source table 't' and loopback table
212+
213+
expected_hash = pipeline.query_hash("SELECT * FROM t ORDER BY id, str")
214+
result_hash = pipeline.query_hash("SELECT * FROM loopback ORDER BY id, str")
215+
216+
assert result_hash == expected_hash, (
217+
f"Validation failed: loopback table hash mismatch!\n"
218+
f"Expected: {expected_hash}\nGot: {result_hash}"
154219
)
155-
print(f"Loopback table validated successfully: {count_loopback} rows")
220+
print("Loopback table validated successfully!")
156221

157222
finally:
158223
pipeline.stop(force=True)
224+
225+
# Cleanup Kafka and Schema Registry
226+
cleanup_kafka(sql, KAFKA_BOOTSTRAP, SCHEMA_REGISTRY)

0 commit comments

Comments
 (0)