Skip to content

Commit 4369457

Browse files
committed
Generate unique topic names and lazy-initialize Kafka/Schema Registry connections
- Replace run-id based topic names by unique UUID based topic names to avoid collisions across test runs - Lazy-initialize Kafka and Schema registry connections to prevent import-time network calls Signed-off-by: rivudhk <rivudhkr@gmail.com>
1 parent 369b8af commit 4369457

File tree

1 file changed

+52
-24
lines changed

1 file changed

+52
-24
lines changed

python/tests/workloads/test_kafka_avro.py

Lines changed: 52 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,17 @@
1010
import re
1111
import json
1212
import socket
13+
import uuid
14+
15+
_KAFKA_BOOTSTRAP = None
16+
_SCHEMA_REGISTRY = None
17+
_KAFKA_ADMIN = None
18+
19+
20+
# Set these before running the test:
21+
# Example(terminal/shell):
22+
# export KAFKA_BOOTSTRAP_SERVERS= localhost:9092
23+
# export SCHEMA_REGISTRY_URL= http://localhost:8081
1324

1425

1526
def env(name: str, default: str, check_http: bool = False) -> str:
@@ -29,7 +40,8 @@ def env(name: str, default: str, check_http: bool = False) -> str:
2940
# Remove protocol prefix if present (e.g., "kafka://host:port")
3041
value = value.split("://", 1)[1]
3142
host, port = value.split(":")
32-
socket.create_connection((host, int(port)), timeout=2)
43+
with socket.create_connection((host, int(port)), timeout=2):
44+
pass # just testing connectivity
3345
except Exception as e:
3446
raise RuntimeError(
3547
f"{name} is set to default '{default}', but cannot connect to it! ({e})"
@@ -38,22 +50,36 @@ def env(name: str, default: str, check_http: bool = False) -> str:
3850
return value
3951

4052

41-
# Set these before running the test:
42-
# Example(terminal/shell):
43-
# export KAFKA_BOOTSTRAP_SERVERS= localhost:9092
44-
# export SCHEMA_REGISTRY_URL= http://localhost:8081
53+
"""Kafka bootstrap, schema registry, and Kafka admin client are lazy-initialized to avoid triggering live
54+
HTTP/socket connections when importing this file. Connections are only created when the test calls the
55+
respective getter functions at runtime."""
4556

4657

47-
KAFKA_BOOTSTRAP = env(
48-
"KAFKA_BOOTSTRAP_SERVERS", "ci-kafka-bootstrap.korat-vibes.ts.net:9094"
49-
)
50-
SCHEMA_REGISTRY = env(
51-
"SCHEMA_REGISTRY_URL",
52-
"http://ci-schema-registry.korat-vibes.ts.net",
53-
check_http=True,
54-
)
58+
def get_kafka_bootstrap() -> str:
59+
global _KAFKA_BOOTSTRAP
60+
if _KAFKA_BOOTSTRAP is None:
61+
_KAFKA_BOOTSTRAP = env(
62+
"KAFKA_BOOTSTRAP_SERVERS", "ci-kafka-bootstrap.korat-vibes.ts.net:9094"
63+
)
64+
return _KAFKA_BOOTSTRAP
5565

56-
KAFKA_ADMIN = AdminClient({"bootstrap.servers": KAFKA_BOOTSTRAP})
66+
67+
def get_schema_registry() -> str:
68+
global _SCHEMA_REGISTRY
69+
if _SCHEMA_REGISTRY is None:
70+
_SCHEMA_REGISTRY = env(
71+
"SCHEMA_REGISTRY_URL",
72+
"http://ci-schema-registry.korat-vibes.ts.net",
73+
check_http=True,
74+
)
75+
return _SCHEMA_REGISTRY
76+
77+
78+
def get_kafka_admin() -> AdminClient:
79+
global _KAFKA_ADMIN
80+
if _KAFKA_ADMIN is None:
81+
_KAFKA_ADMIN = AdminClient({"bootstrap.servers": get_kafka_bootstrap()})
82+
return _KAFKA_ADMIN
5783

5884

5985
def extract_kafka_schema_artifacts(sql: str) -> tuple[list[str], list[str]]:
@@ -104,7 +130,7 @@ def create_kafka_topic(
104130
new_topic = NewTopic(
105131
topic_name, num_partitions=num_partitions, replication_factor=replication_factor
106132
)
107-
futures = KAFKA_ADMIN.create_topics([new_topic])
133+
futures = get_kafka_admin().create_topics([new_topic])
108134
for t, f in futures.items():
109135
try:
110136
f.result()
@@ -129,8 +155,10 @@ def __init__(self, cfg):
129155
self.create_topic = cfg.get("create_topic", False)
130156
self.num_partitions = cfg.get("num_partitions", 1)
131157

132-
self.topic1 = f"my_topic_avro_{self.id}"
133-
self.topic2 = f"my_topic_avro2_{self.id}"
158+
suffix = uuid.uuid4().hex[:8]
159+
160+
self.topic1 = f"my_topic_avro{suffix}"
161+
self.topic2 = f"my_topic_avro2{suffix}"
134162
self.source = f"t_{self.id}"
135163
self.view = f"v_{self.id}"
136164
self.loopback = f"loopback_{self.id}"
@@ -168,15 +196,15 @@ def sql_view(v: Variant) -> str:
168196
"transport": {{
169197
"name": "kafka_output",
170198
"config": {{
171-
"bootstrap.servers": "{KAFKA_BOOTSTRAP}",
199+
"bootstrap.servers": "{get_kafka_bootstrap()}",
172200
"topic": "{v.topic1}"
173201
}}
174202
}},
175203
"format": {{
176204
"name": "avro",
177205
"config": {{
178206
"update_format": "raw",
179-
"registry_urls": ["{SCHEMA_REGISTRY}"]
207+
"registry_urls": ["{get_schema_registry()}"]
180208
}}
181209
}}
182210
}},
@@ -185,15 +213,15 @@ def sql_view(v: Variant) -> str:
185213
"transport": {{
186214
"name": "kafka_output",
187215
"config": {{
188-
"bootstrap.servers": "{KAFKA_BOOTSTRAP}",
216+
"bootstrap.servers": "{get_kafka_bootstrap()}",
189217
"topic": "{v.topic2}"
190218
}}
191219
}},
192220
"format": {{
193221
"name": "avro",
194222
"config": {{
195223
"update_format": "raw",
196-
"registry_urls": ["{SCHEMA_REGISTRY}"]
224+
"registry_urls": ["{get_schema_registry()}"]
197225
}}
198226
}}
199227
}}]'
@@ -207,7 +235,7 @@ def sql_view(v: Variant) -> str:
207235
def sql_loopback_table(v: Variant) -> str:
208236
# Optional configurations that will use connector defaults if not specified
209237
config = {
210-
"bootstrap.servers": KAFKA_BOOTSTRAP,
238+
"bootstrap.servers": get_kafka_bootstrap(),
211239
"topic": v.topic2,
212240
}
213241

@@ -243,7 +271,7 @@ def sql_loopback_table(v: Variant) -> str:
243271
"name": "avro",
244272
"config": {{
245273
"update_format": "raw",
246-
"registry_urls": ["{SCHEMA_REGISTRY}"]
274+
"registry_urls": ["{get_schema_registry()}"]
247275
}}
248276
}}
249277
}}]'
@@ -320,7 +348,7 @@ def create_and_run_pipeline_variant(cfg):
320348
validate_loopback(pipeline, v)
321349
finally:
322350
pipeline.stop(force=True)
323-
cleanup_kafka_schema_artifacts(sql, KAFKA_ADMIN, SCHEMA_REGISTRY)
351+
cleanup_kafka_schema_artifacts(sql, get_kafka_admin(), get_schema_registry())
324352

325353

326354
class TestKafkaAvro(unittest.TestCase):

0 commit comments

Comments
 (0)