1+ import unittest
2+
3+ from feldera import PipelineBuilder
4+ from feldera .testutils import unique_pipeline_name
15from tests import TEST_CLIENT
26import time
37import os
4- from confluent_kafka .admin import AdminClient
8+ from confluent_kafka .admin import AdminClient , NewTopic
59import requests
610import re
711import json
12+ import socket
813
9- from tests .shared_test_pipeline import SharedTestPipeline , sql
1014
15+ def env (name : str , default : str , check_http : bool = False ) -> str :
16+ """Get environment variables used to configure the Kafka broker or Schema Registry endpoint.
17+ If the environment variables are not set, the default values are used; intended for internal development only.
18+ External users are expected to explicitly configure these variables."""
19+ value = os .getenv (name , default )
1120
12- def env (name : str , default : str ) -> str :
13- """Get environment variables for the Kafka broker and Schema registry.
14- The default values are only meant for internal development; external users must set them."""
15- return os .getenv (name , default )
21+ if value == default :
22+ try :
23+ if check_http :
24+ # Check if Schema Registry is available
25+ requests .get (value , timeout = 2 ).raise_for_status ()
26+ else :
27+ # Check if Kafka broker is available
28+ if "://" in value :
29+ # Remove protocol prefix if present (e.g., "kafka://host:port")
30+ value = value .split ("://" , 1 )[1 ]
31+ host , port = value .split (":" )
32+ socket .create_connection ((host , int (port )), timeout = 2 )
33+ except Exception as e :
34+ raise RuntimeError (
35+ f"{ name } is set to default '{ default } ', but cannot connect to it! ({ e } )"
36+ )
37+
38+ return value
1639
1740
1841# Set these before running the test:
1942# Example(terminal/shell):
2043# export KAFKA_BOOTSTRAP_SERVERS= localhost:9092
2144# export SCHEMA_REGISTRY_URL= http://localhost:8081
2245
46+
2347KAFKA_BOOTSTRAP = env (
2448 "KAFKA_BOOTSTRAP_SERVERS" , "ci-kafka-bootstrap.korat-vibes.ts.net:9094"
2549)
2650SCHEMA_REGISTRY = env (
27- "SCHEMA_REGISTRY_URL" , "http://ci-schema-registry.korat-vibes.ts.net"
51+ "SCHEMA_REGISTRY_URL" ,
52+ "http://ci-schema-registry.korat-vibes.ts.net" ,
53+ check_http = True ,
2854)
2955
56+ KAFKA_ADMIN = AdminClient ({"bootstrap.servers" : KAFKA_BOOTSTRAP })
57+
3058
31- def extract_kafka_avro_artifacts (sql : str ) -> tuple [list [str ], list [str ]]:
59+ def extract_kafka_schema_artifacts (sql : str ) -> tuple [list [str ], list [str ]]:
3260 """Extract Kafka topic and schema subjects from the SQL query"""
3361 topics = re .findall (r'"topic"\s*:\s*"([^"]+)"' , sql )
3462
@@ -39,8 +67,7 @@ def extract_kafka_avro_artifacts(sql: str) -> tuple[list[str], list[str]]:
3967 return list (set (topics )), list (set (subjects ))
4068
4169
42- def delete_kafka_topics (bootstrap_servers : str , topics : list [str ]):
43- admin = AdminClient ({"bootstrap.servers" : bootstrap_servers })
70+ def delete_kafka_topics (admin : AdminClient , topics : list [str ]):
4471 tpcs = admin .delete_topics (topics )
4572
4673 for topic , tpcs in tpcs .items ():
@@ -61,14 +88,34 @@ def delete_schema_subjects(registry_url: str, subjects: list[str]):
6188 )
6289
6390
64- def cleanup_kafka (sql : str , bootstrap_servers : str , registry_url : str ):
91+ def cleanup_kafka_schema_artifacts (sql : str , admin : AdminClient , registry_url : str ):
6592 """Clean up Kafka topics and Schema Subjects after each test run.
6693 Each run produces new records. So, rerunning without cleanup will append data to the same topic(s)."""
67- topics , subjects = extract_kafka_avro_artifacts (sql )
68- delete_kafka_topics (bootstrap_servers , topics )
94+ topics , subjects = extract_kafka_schema_artifacts (sql )
95+ delete_kafka_topics (admin , topics )
6996 delete_schema_subjects (registry_url , subjects )
7097
7198
99+ def create_kafka_topic (
100+ topic_name : str , num_partitions : int = 1 , replication_factor : int = 1
101+ ):
102+ """Create new topics when multiple partitions are required, since the Kafka output connector does not support
103+ specifying the number of partitions during topic creation."""
104+ new_topic = NewTopic (
105+ topic_name , num_partitions = num_partitions , replication_factor = replication_factor
106+ )
107+ futures = KAFKA_ADMIN .create_topics ([new_topic ])
108+ for t , f in futures .items ():
109+ try :
110+ f .result ()
111+ print (f"Topic { t } created with { num_partitions } partitions" )
112+ except Exception as e :
113+ if "already exists" in str (e ):
114+ print (f"Topic { t } already exists" )
115+ else :
116+ raise
117+
118+
72119class Variant :
73120 """Represents a pipeline variant whose tables and views share the same SQL but differ in connector configuration.
74121 Each variant generates unique topic, table, and view names based on the provided configuration."""
@@ -79,6 +126,8 @@ def __init__(self, cfg):
79126 self .partitions = cfg .get ("partitions" )
80127 self .sync = cfg .get ("sync" )
81128 self .start_from = cfg .get ("start_from" )
129+ self .create_topic = cfg .get ("create_topic" , False )
130+ self .num_partitions = cfg .get ("num_partitions" , 1 )
82131
83132 self .topic1 = f"my_topic_avro_{ self .id } "
84133 self .topic2 = f"my_topic_avro2_{ self .id } "
@@ -202,17 +251,9 @@ def sql_loopback_table(v: Variant) -> str:
202251"""
203252
204253
205- def build_sql (configs ) -> str :
254+ def build_sql (v : Variant ) -> str :
206255 """Generate SQL for the pipeline by combining all tables and view for each variant"""
207- variants = [Variant (c ) for c in configs ]
208- parts = []
209-
210- for v in variants :
211- parts .append (sql_source_table (v ))
212- parts .append (sql_view (v ))
213- parts .append (sql_loopback_table (v ))
214-
215- return "\n " .join (parts )
256+ return "\n " .join ([sql_source_table (v ), sql_view (v ), sql_loopback_table (v )])
216257
217258
218259def wait_for_rows (pipeline , expected_rows , timeout_s = 1800 , poll_interval_s = 5 ):
@@ -233,14 +274,14 @@ def wait_for_rows(pipeline, expected_rows, timeout_s=1800, poll_interval_s=5):
233274 time .sleep (poll_interval_s )
234275
235276
236- def validate_loopback (self , variant : Variant ):
277+ def validate_loopback (pipeline , variant : Variant ):
237278 """Validation: once finished, the loopback table should contain all generated values
238279 Validate by comparing the hash of the source table 't' and loopback table"""
239- src_tbl_hash = self . pipeline .query_hash (
280+ src_tbl_hash = pipeline .query_hash (
240281 f"SELECT * FROM { variant .source } ORDER BY id, str"
241282 )
242283
243- loopback_tbl_hash = self . pipeline .query_hash (
284+ loopback_tbl_hash = pipeline .query_hash (
244285 f"SELECT * FROM { variant .loopback } ORDER BY id, str"
245286 )
246287
@@ -255,45 +296,64 @@ def validate_loopback(self, variant: Variant):
255296 print (f"Loopback table validated successfully for variant { variant .id } " )
256297
257298
258- class TestKafkaAvro (SharedTestPipeline ):
299+ def create_and_run_pipeline_variant (cfg ):
300+ """Create and run multiple pipelines based on configurations defined for each pipeline variant"""
301+ v = Variant (cfg )
302+
303+ # Pre-create topics if specified
304+ if v .create_topic :
305+ create_kafka_topic (v .topic1 , v .num_partitions )
306+ create_kafka_topic (v .topic2 , v .num_partitions )
307+
308+ sql = build_sql (v )
309+ pipeline_name = unique_pipeline_name (f"test_kafka_avro_{ v .id } " )
310+ pipeline = PipelineBuilder (TEST_CLIENT , pipeline_name , sql ).create_or_replace ()
311+
312+ try :
313+ pipeline .start ()
314+ # NOTE => total_completed_records counts all rows that are processed through each output as follows:
315+ # 1. Written by the view<v> -> Kafka
316+ # 2. Ingested into loopback table from Kafka
317+ # Thus, expected_records = generated_rows * number_of_outputs (in this case 2)
318+ expected_rows = v .limit * 2
319+ wait_for_rows (pipeline , expected_rows )
320+ validate_loopback (pipeline , v )
321+ finally :
322+ pipeline .stop (force = True )
323+ cleanup_kafka_schema_artifacts (sql , KAFKA_ADMIN , SCHEMA_REGISTRY )
324+
325+
326+ class TestKafkaAvro (unittest .TestCase ):
259327 """Each test method uses its own SQL snippet and processes only its own variant."""
260328
261329 TEST_CONFIGS = [
262- {"id" : 0 , "limit" : 10 },
263- {"id" : 1 , "limit" : 20 },
264- # {
265- # "id ": 2 ,
266- # "limit ": 1000000 ,
267- # "partitions ": [0] ,
268- # "sync ": True,
269- # "start_from ": "earliest",
270- # },
330+ {"id" : 0 , "limit" : 10 , "partitions" : [ 0 ], "sync" : False },
331+ {
332+ "id" : 1 ,
333+ "limit " : 1000000 ,
334+ "partitions " : [ 0 , 1 , 2 ] ,
335+ "sync " : False ,
336+ "create_topic " : True ,
337+ "num_partitions " : 3 , # pre-create topic with 3 partitions
338+ },
271339 ]
272340
273- @sql (build_sql ([TEST_CONFIGS [0 ]]))
274- def test_kafka_avro_config_0 (self ):
275- cfg = self .TEST_CONFIGS [0 ]
276- variant = Variant (cfg )
341+ def test_kafka_avro_variants (self ):
342+ # If a run ID is specified, only the test with the specified run ID is ran
343+ run_id = os .getenv ("RUN_ID" )
344+ configs_to_run = (
345+ [cfg for cfg in self .TEST_CONFIGS if cfg ["id" ] == int (run_id )]
346+ if run_id is not None
347+ else self .TEST_CONFIGS
348+ )
277349
278- self .pipeline .start ()
279- try :
280- expected_rows = variant .limit * 2 # view->Kafka + Kafka->loopback
281- wait_for_rows (self .pipeline , expected_rows )
282- validate_loopback (self , variant )
283- finally :
284- self .pipeline .stop (force = True )
285- cleanup_kafka (build_sql ([cfg ]), KAFKA_BOOTSTRAP , SCHEMA_REGISTRY )
286-
287- @sql (build_sql ([TEST_CONFIGS [1 ]]))
288- def test_kafka_avro_config_1 (self ):
289- cfg = self .TEST_CONFIGS [1 ]
290- variant = Variant (cfg )
291-
292- self .pipeline .start ()
293- try :
294- expected_rows = variant .limit * 2
295- wait_for_rows (self .pipeline , expected_rows )
296- validate_loopback (self , variant )
297- finally :
298- self .pipeline .stop (force = True )
299- cleanup_kafka (build_sql ([cfg ]), KAFKA_BOOTSTRAP , SCHEMA_REGISTRY )
350+ for cfg in configs_to_run :
351+ print (f"\n Running pipeline variant id = { cfg ['id' ]} " )
352+ create_and_run_pipeline_variant (cfg )
353+
354+
355+ # To run all pipelines in this test file:
356+ # python -m pytest ./tests/workloads/test_kafka_avro.py
357+
358+ # To run a specific pipeline variant by its ID:
359+ # RUN_ID=0 python -m pytest ./tests/workloads/test_kafka_avro.py
0 commit comments