11import unittest
22from tests import TEST_CLIENT
33from feldera import PipelineBuilder
4- from feldera .enums import CompilationProfile
54import time
5+ import os
6+ from confluent_kafka .admin import AdminClient
7+ import requests
8+ import re
9+
10+
11+ def env (name : str , default : str ) -> str :
12+ """Get environment variables for the Kafka broker and Schema registry.
13+ The default values are only meant for internal development; external users must set them."""
14+ return os .getenv (name , default )
15+
16+
17+ # Set these before running the test:
18+ # Example(terminal/shell):
19+ # export KAFKA_BOOTSTRAP_SERVERS = localhost:9092
20+ # export SCHEMA_REGISTRY_URL = http://localhost:8081
21+
22+ KAFKA_BOOTSTRAP = env (
23+ "KAFKA_BOOTSTRAP_SERVERS" , "ci-kafka-bootstrap.korat-vibes.ts.net:9094"
24+ )
25+ SCHEMA_REGISTRY = env (
26+ "SCHEMA_REGISTRY_URL" , "http://ci-schema-registry.korat-vibes.ts.net"
27+ )
28+
29+
30+ def extract_kafka_avro_artifacts (sql : str ) -> tuple [list [str ], list [str ]]:
31+ """Extract Kafka topic and schema subjects from the SQL query"""
32+ topics = re .findall (r'"topic"\s*:\s*"([^"]+)"' , sql )
33+
34+ subjects = re .findall (r"create view\s+(\w+)" , sql , re .I ) + re .findall (
35+ r"create index\s+(\w+)" , sql , re .I
36+ )
37+
38+ return list (set (topics )), list (set (subjects ))
39+
40+
41+ def delete_kafka_topics (bootstrap_servers : str , topics : list [str ]):
42+ admin = AdminClient ({"bootstrap.servers" : bootstrap_servers })
43+ tpcs = admin .delete_topics (topics )
44+
45+ for topic , tpcs in tpcs .items ():
46+ try :
47+ tpcs .result ()
48+ print (f"Deleted topic: { topic } " )
49+ except Exception as e :
50+ print (f"Failed to delete { topic } : { e } " )
51+
52+
53+ def delete_schema_subjects (registry_url : str , subjects : list [str ]):
54+ for subject in subjects :
55+ r = requests .delete (f"{ registry_url } /subjects/{ subject } " )
56+ print (
57+ f"Deleted schema subject: { subject } "
58+ if r .status_code == 200
59+ else f"Failed to delete { subject } : { r .status_code } { r .text } "
60+ )
61+
62+
63+ def cleanup_kafka (sql : str , bootstrap_servers : str , registry_url : str ):
64+ """Clean up Kafka topics and Schema Subjects after each test run.
65+ Each run produces new records. So, rerunning without cleanup will append data to the same topic(s)."""
66+ topics , subjects = extract_kafka_avro_artifacts (sql )
67+ delete_kafka_topics (bootstrap_servers , topics )
68+ delete_schema_subjects (registry_url , subjects )
69+
70+
71+ # Set the limit for number of records to generate
72+ LIMIT = 1000000
673
774
875class TestKafkaAvro (unittest .TestCase ):
976 def test_check_avro (self ):
10- sql = """
77+ sql = f """
1178create table t (
1279 id int,
1380 str varchar,
@@ -20,49 +87,49 @@ def test_check_avro(self):
2087 tme time
2188) with (
2289 'materialized' = 'true',
23- 'connectors' = '[{
24- "transport": {
90+ 'connectors' = '[{{
91+ "transport": {{
2592 "name": "datagen",
26- "config": { "plan": [{"limit": 1000000}] }
27- }
28- }]'
93+ "config": {{ "plan": [{{ "limit": { LIMIT } }}], "seed": 1 } }
94+ }}
95+ }} ]'
2996);
3097
3198create view v
3299with (
33- 'connectors' = '[{
34- "transport": {
100+ 'connectors' = '[{{
101+ "transport": {{
35102 "name": "kafka_output",
36- "config": {
37- "bootstrap.servers": "ci-kafka-bootstrap.korat-vibes.ts.net:9094 ",
103+ "config": {{
104+ "bootstrap.servers": "{ KAFKA_BOOTSTRAP } ",
38105 "topic": "my_topic_avro"
39- }
40- },
41- "format": {
106+ }}
107+ }} ,
108+ "format": {{
42109 "name": "avro",
43- "config": {
110+ "config": {{
44111 "update_format": "raw",
45- "registry_urls": ["http://ci-schema-registry.korat-vibes.ts.net "]
46- }
47- }
48- },
49- {
112+ "registry_urls": ["{ SCHEMA_REGISTRY } "]
113+ }}
114+ }}
115+ }} ,
116+ { {
50117 "index": "t_index",
51- "transport": {
118+ "transport": {{
52119 "name": "kafka_output",
53- "config": {
54- "bootstrap.servers": "ci-kafka-bootstrap.korat-vibes.ts.net:9094 ",
120+ "config": {{
121+ "bootstrap.servers": "{ KAFKA_BOOTSTRAP } ",
55122 "topic": "my_topic_avro2"
56- }
57- },
58- "format": {
123+ }}
124+ }} ,
125+ "format": {{
59126 "name": "avro",
60- "config": {
127+ "config": {{
61128 "update_format": "raw",
62- "registry_urls": ["http://ci-schema-registry.korat-vibes.ts.net "]
63- }
64- }
65- }]'
129+ "registry_urls": ["{ SCHEMA_REGISTRY } "]
130+ }}
131+ }}
132+ } }]'
66133)
67134as select * from t;
68135
@@ -80,30 +147,29 @@ def test_check_avro(self):
80147 tme time
81148) with (
82149 'materialized' = 'true',
83- 'connectors' = '[{
84- "transport": {
150+ 'connectors' = '[{{
151+ "transport": {{
85152 "name": "kafka_input",
86- "config": {
153+ "config": {{
87154 "topic": "my_topic_avro2",
88155 "start_from": "earliest",
89- "bootstrap.servers": "ci-kafka-bootstrap.korat-vibes.ts.net:9094 "
90- }
91- },
92- "format": {
156+ "bootstrap.servers": "{ KAFKA_BOOTSTRAP } "
157+ }}
158+ }} ,
159+ "format": {{
93160 "name": "avro",
94- "config": {
161+ "config": {{
95162 "update_format": "raw",
96- "registry_urls": ["http://ci-schema-registry.korat-vibes.ts.net "]
97- }
98- }
99- }]'
163+ "registry_urls": ["{ SCHEMA_REGISTRY } "]
164+ }}
165+ }}
166+ }} ]'
100167);
101168"""
102169 pipeline = PipelineBuilder (
103170 TEST_CLIENT ,
104171 "test_kafka_avro" ,
105172 sql = sql ,
106- compilation_profile = CompilationProfile .DEV ,
107173 ).create_or_replace ()
108174
109175 try :
@@ -113,7 +179,7 @@ def test_check_avro(self):
113179 # 1. Written by the view<v> -> Kafka
114180 # 2. Ingested into loopback table from Kafka
115181 # Thus, expected_records = generated_rows * number_of_outputs (in this case 2)
116- expected_records = 2000000
182+ expected_records = LIMIT * 2
117183 timeout_s = 1800
118184 poll_interval_s = 5
119185
@@ -142,17 +208,19 @@ def test_check_avro(self):
142208 )
143209
144210 # Validation: once finished, the loopback table should contain all generated values
145- count_table = list (pipeline .query ("SELECT COUNT(*) AS count FROM t" ))[0 ][
146- "count"
147- ]
148- count_loopback = list (
149- pipeline .query ("SELECT COUNT(*) AS count FROM loopback" )
150- )[0 ]["count" ]
151-
152- assert count_table == count_loopback , (
153- f"Validation failed: { count_loopback } rows ingested vs { count_table } expected"
211+ # Validate by comparing the hash of the source table 't' and loopback table
212+
213+ expected_hash = pipeline .query_hash ("SELECT * FROM t ORDER BY id, str" )
214+ result_hash = pipeline .query_hash ("SELECT * FROM loopback ORDER BY id, str" )
215+
216+ assert result_hash == expected_hash , (
217+ f"Validation failed: loopback table hash mismatch!\n "
218+ f"Expected: { expected_hash } \n Got: { result_hash } "
154219 )
155- print (f "Loopback table validated successfully: { count_loopback } rows " )
220+ print ("Loopback table validated successfully! " )
156221
157222 finally :
158223 pipeline .stop (force = True )
224+
225+ # Cleanup Kafka and Schema Registry
226+ cleanup_kafka (sql , KAFKA_BOOTSTRAP , SCHEMA_REGISTRY )
0 commit comments