1010import re
1111import json
1212import socket
13+ import uuid
14+
15+ _KAFKA_BOOTSTRAP = None
16+ _SCHEMA_REGISTRY = None
17+ _KAFKA_ADMIN = None
18+
19+
20+ # Set these before running the test:
21+ # Example(terminal/shell):
22+ # export KAFKA_BOOTSTRAP_SERVERS= localhost:9092
23+ # export SCHEMA_REGISTRY_URL= http://localhost:8081
1324
1425
1526def env (name : str , default : str , check_http : bool = False ) -> str :
@@ -29,7 +40,8 @@ def env(name: str, default: str, check_http: bool = False) -> str:
2940 # Remove protocol prefix if present (e.g., "kafka://host:port")
3041 value = value .split ("://" , 1 )[1 ]
3142 host , port = value .split (":" )
32- socket .create_connection ((host , int (port )), timeout = 2 )
43+ with socket .create_connection ((host , int (port )), timeout = 2 ):
44+ pass # just testing connectivity
3345 except Exception as e :
3446 raise RuntimeError (
3547 f"{ name } is set to default '{ default } ', but cannot connect to it! ({ e } )"
@@ -38,22 +50,36 @@ def env(name: str, default: str, check_http: bool = False) -> str:
3850 return value
3951
4052
41- # Set these before running the test:
42- # Example(terminal/shell):
43- # export KAFKA_BOOTSTRAP_SERVERS= localhost:9092
44- # export SCHEMA_REGISTRY_URL= http://localhost:8081
53+ """Kafka bootstrap, schema registry, and Kafka admin client are lazy-initialized to avoid triggering live
54+ HTTP/socket connections when importing this file. Connections are only created when the test calls the
55+ respective getter functions at runtime."""
4556
4657
47- KAFKA_BOOTSTRAP = env (
48- "KAFKA_BOOTSTRAP_SERVERS" , "ci-kafka-bootstrap.korat-vibes.ts.net:9094"
49- )
50- SCHEMA_REGISTRY = env (
51- "SCHEMA_REGISTRY_URL" ,
52- "http://ci-schema-registry.korat-vibes.ts.net" ,
53- check_http = True ,
54- )
58+ def get_kafka_bootstrap () -> str :
59+ global _KAFKA_BOOTSTRAP
60+ if _KAFKA_BOOTSTRAP is None :
61+ _KAFKA_BOOTSTRAP = env (
62+ "KAFKA_BOOTSTRAP_SERVERS" , "ci-kafka-bootstrap.korat-vibes.ts.net:9094"
63+ )
64+ return _KAFKA_BOOTSTRAP
5565
56- KAFKA_ADMIN = AdminClient ({"bootstrap.servers" : KAFKA_BOOTSTRAP })
66+
67+ def get_schema_registry () -> str :
68+ global _SCHEMA_REGISTRY
69+ if _SCHEMA_REGISTRY is None :
70+ _SCHEMA_REGISTRY = env (
71+ "SCHEMA_REGISTRY_URL" ,
72+ "http://ci-schema-registry.korat-vibes.ts.net" ,
73+ check_http = True ,
74+ )
75+ return _SCHEMA_REGISTRY
76+
77+
78+ def get_kafka_admin () -> AdminClient :
79+ global _KAFKA_ADMIN
80+ if _KAFKA_ADMIN is None :
81+ _KAFKA_ADMIN = AdminClient ({"bootstrap.servers" : get_kafka_bootstrap ()})
82+ return _KAFKA_ADMIN
5783
5884
5985def extract_kafka_schema_artifacts (sql : str ) -> tuple [list [str ], list [str ]]:
@@ -104,7 +130,7 @@ def create_kafka_topic(
104130 new_topic = NewTopic (
105131 topic_name , num_partitions = num_partitions , replication_factor = replication_factor
106132 )
107- futures = KAFKA_ADMIN .create_topics ([new_topic ])
133+ futures = get_kafka_admin () .create_topics ([new_topic ])
108134 for t , f in futures .items ():
109135 try :
110136 f .result ()
@@ -129,8 +155,10 @@ def __init__(self, cfg):
129155 self .create_topic = cfg .get ("create_topic" , False )
130156 self .num_partitions = cfg .get ("num_partitions" , 1 )
131157
132- self .topic1 = f"my_topic_avro_{ self .id } "
133- self .topic2 = f"my_topic_avro2_{ self .id } "
158+ suffix = uuid .uuid4 ().hex [:8 ]
159+
160+ self .topic1 = f"my_topic_avro{ suffix } "
161+ self .topic2 = f"my_topic_avro2{ suffix } "
134162 self .source = f"t_{ self .id } "
135163 self .view = f"v_{ self .id } "
136164 self .loopback = f"loopback_{ self .id } "
@@ -168,15 +196,15 @@ def sql_view(v: Variant) -> str:
168196 "transport": {{
169197 "name": "kafka_output",
170198 "config": {{
171- "bootstrap.servers": "{ KAFKA_BOOTSTRAP } ",
199+ "bootstrap.servers": "{ get_kafka_bootstrap () } ",
172200 "topic": "{ v .topic1 } "
173201 }}
174202 }},
175203 "format": {{
176204 "name": "avro",
177205 "config": {{
178206 "update_format": "raw",
179- "registry_urls": ["{ SCHEMA_REGISTRY } "]
207+ "registry_urls": ["{ get_schema_registry () } "]
180208 }}
181209 }}
182210 }},
@@ -185,15 +213,15 @@ def sql_view(v: Variant) -> str:
185213 "transport": {{
186214 "name": "kafka_output",
187215 "config": {{
188- "bootstrap.servers": "{ KAFKA_BOOTSTRAP } ",
216+ "bootstrap.servers": "{ get_kafka_bootstrap () } ",
189217 "topic": "{ v .topic2 } "
190218 }}
191219 }},
192220 "format": {{
193221 "name": "avro",
194222 "config": {{
195223 "update_format": "raw",
196- "registry_urls": ["{ SCHEMA_REGISTRY } "]
224+ "registry_urls": ["{ get_schema_registry () } "]
197225 }}
198226 }}
199227 }}]'
@@ -207,7 +235,7 @@ def sql_view(v: Variant) -> str:
207235def sql_loopback_table (v : Variant ) -> str :
208236 # Optional configurations that will use connector defaults if not specified
209237 config = {
210- "bootstrap.servers" : KAFKA_BOOTSTRAP ,
238+ "bootstrap.servers" : get_kafka_bootstrap () ,
211239 "topic" : v .topic2 ,
212240 }
213241
@@ -243,7 +271,7 @@ def sql_loopback_table(v: Variant) -> str:
243271 "name": "avro",
244272 "config": {{
245273 "update_format": "raw",
246- "registry_urls": ["{ SCHEMA_REGISTRY } "]
274+ "registry_urls": ["{ get_schema_registry () } "]
247275 }}
248276 }}
249277 }}]'
@@ -320,7 +348,7 @@ def create_and_run_pipeline_variant(cfg):
320348 validate_loopback (pipeline , v )
321349 finally :
322350 pipeline .stop (force = True )
323- cleanup_kafka_schema_artifacts (sql , KAFKA_ADMIN , SCHEMA_REGISTRY )
351+ cleanup_kafka_schema_artifacts (sql , get_kafka_admin (), get_schema_registry () )
324352
325353
326354class TestKafkaAvro (unittest .TestCase ):
0 commit comments