Skip to content

Commit 369b8af

Browse files
committed
py: Modified the Kafka Integration Test as follows:
- Removed the SharedTestPipeline framework because the tests were accessing topics not assigned to them - Introduced automatic Kafka topic creation when needed. - Renamed the method 'extract_kafka_avro_artifacts' to 'extract_kafka_schema_artifacts' for clarity. - Renamed the method 'cleanup_kafka' to 'cleanup_kafka_schema_artifacts' for clarity. - Added TCP and HTTP checks to verify the availability of the Kafka broker and Schema Registry before tests run. - Added a RUN_ID mechanism, allowing individual test variants to be executed selectively. Signed-off-by: rivudhk <rivudhkr@gmail.com>
1 parent d652e90 commit 369b8af

File tree

1 file changed

+122
-62
lines changed

1 file changed

+122
-62
lines changed

python/tests/workloads/test_kafka_avro.py

Lines changed: 122 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,62 @@
1+
import unittest
2+
3+
from feldera import PipelineBuilder
4+
from feldera.testutils import unique_pipeline_name
15
from tests import TEST_CLIENT
26
import time
37
import os
4-
from confluent_kafka.admin import AdminClient
8+
from confluent_kafka.admin import AdminClient, NewTopic
59
import requests
610
import re
711
import json
12+
import socket
813

9-
from tests.shared_test_pipeline import SharedTestPipeline, sql
1014

15+
def env(name: str, default: str, check_http: bool = False) -> str:
16+
"""Get environment variables used to configure the Kafka broker or Schema Registry endpoint.
17+
If the environment variables are not set, the default values are used; intended for internal development only.
18+
External users are expected to explicitly configure these variables."""
19+
value = os.getenv(name, default)
1120

12-
def env(name: str, default: str) -> str:
13-
"""Get environment variables for the Kafka broker and Schema registry.
14-
The default values are only meant for internal development; external users must set them."""
15-
return os.getenv(name, default)
21+
if value == default:
22+
try:
23+
if check_http:
24+
# Check if Schema Registry is available
25+
requests.get(value, timeout=2).raise_for_status()
26+
else:
27+
# Check if Kafka broker is available
28+
if "://" in value:
29+
# Remove protocol prefix if present (e.g., "kafka://host:port")
30+
value = value.split("://", 1)[1]
31+
host, port = value.split(":")
32+
socket.create_connection((host, int(port)), timeout=2)
33+
except Exception as e:
34+
raise RuntimeError(
35+
f"{name} is set to default '{default}', but cannot connect to it! ({e})"
36+
)
37+
38+
return value
1639

1740

1841
# Set these before running the test:
1942
# Example(terminal/shell):
2043
# export KAFKA_BOOTSTRAP_SERVERS= localhost:9092
2144
# export SCHEMA_REGISTRY_URL= http://localhost:8081
2245

46+
2347
KAFKA_BOOTSTRAP = env(
2448
"KAFKA_BOOTSTRAP_SERVERS", "ci-kafka-bootstrap.korat-vibes.ts.net:9094"
2549
)
2650
SCHEMA_REGISTRY = env(
27-
"SCHEMA_REGISTRY_URL", "http://ci-schema-registry.korat-vibes.ts.net"
51+
"SCHEMA_REGISTRY_URL",
52+
"http://ci-schema-registry.korat-vibes.ts.net",
53+
check_http=True,
2854
)
2955

56+
KAFKA_ADMIN = AdminClient({"bootstrap.servers": KAFKA_BOOTSTRAP})
57+
3058

31-
def extract_kafka_avro_artifacts(sql: str) -> tuple[list[str], list[str]]:
59+
def extract_kafka_schema_artifacts(sql: str) -> tuple[list[str], list[str]]:
3260
"""Extract Kafka topic and schema subjects from the SQL query"""
3361
topics = re.findall(r'"topic"\s*:\s*"([^"]+)"', sql)
3462

@@ -39,8 +67,7 @@ def extract_kafka_avro_artifacts(sql: str) -> tuple[list[str], list[str]]:
3967
return list(set(topics)), list(set(subjects))
4068

4169

42-
def delete_kafka_topics(bootstrap_servers: str, topics: list[str]):
43-
admin = AdminClient({"bootstrap.servers": bootstrap_servers})
70+
def delete_kafka_topics(admin: AdminClient, topics: list[str]):
4471
tpcs = admin.delete_topics(topics)
4572

4673
for topic, tpcs in tpcs.items():
@@ -61,14 +88,34 @@ def delete_schema_subjects(registry_url: str, subjects: list[str]):
6188
)
6289

6390

64-
def cleanup_kafka(sql: str, bootstrap_servers: str, registry_url: str):
91+
def cleanup_kafka_schema_artifacts(sql: str, admin: AdminClient, registry_url: str):
6592
"""Clean up Kafka topics and Schema Subjects after each test run.
6693
Each run produces new records. So, rerunning without cleanup will append data to the same topic(s)."""
67-
topics, subjects = extract_kafka_avro_artifacts(sql)
68-
delete_kafka_topics(bootstrap_servers, topics)
94+
topics, subjects = extract_kafka_schema_artifacts(sql)
95+
delete_kafka_topics(admin, topics)
6996
delete_schema_subjects(registry_url, subjects)
7097

7198

99+
def create_kafka_topic(
100+
topic_name: str, num_partitions: int = 1, replication_factor: int = 1
101+
):
102+
"""Create new topics when multiple partitions are required, since the Kafka output connector does not support
103+
specifying the number of partitions during topic creation."""
104+
new_topic = NewTopic(
105+
topic_name, num_partitions=num_partitions, replication_factor=replication_factor
106+
)
107+
futures = KAFKA_ADMIN.create_topics([new_topic])
108+
for t, f in futures.items():
109+
try:
110+
f.result()
111+
print(f"Topic {t} created with {num_partitions} partitions")
112+
except Exception as e:
113+
if "already exists" in str(e):
114+
print(f"Topic {t} already exists")
115+
else:
116+
raise
117+
118+
72119
class Variant:
73120
"""Represents a pipeline variant whose tables and views share the same SQL but differ in connector configuration.
74121
Each variant generates unique topic, table, and view names based on the provided configuration."""
@@ -79,6 +126,8 @@ def __init__(self, cfg):
79126
self.partitions = cfg.get("partitions")
80127
self.sync = cfg.get("sync")
81128
self.start_from = cfg.get("start_from")
129+
self.create_topic = cfg.get("create_topic", False)
130+
self.num_partitions = cfg.get("num_partitions", 1)
82131

83132
self.topic1 = f"my_topic_avro_{self.id}"
84133
self.topic2 = f"my_topic_avro2_{self.id}"
@@ -202,17 +251,9 @@ def sql_loopback_table(v: Variant) -> str:
202251
"""
203252

204253

205-
def build_sql(configs) -> str:
254+
def build_sql(v: Variant) -> str:
206255
"""Generate SQL for the pipeline by combining all tables and view for each variant"""
207-
variants = [Variant(c) for c in configs]
208-
parts = []
209-
210-
for v in variants:
211-
parts.append(sql_source_table(v))
212-
parts.append(sql_view(v))
213-
parts.append(sql_loopback_table(v))
214-
215-
return "\n".join(parts)
256+
return "\n".join([sql_source_table(v), sql_view(v), sql_loopback_table(v)])
216257

217258

218259
def wait_for_rows(pipeline, expected_rows, timeout_s=1800, poll_interval_s=5):
@@ -233,14 +274,14 @@ def wait_for_rows(pipeline, expected_rows, timeout_s=1800, poll_interval_s=5):
233274
time.sleep(poll_interval_s)
234275

235276

236-
def validate_loopback(self, variant: Variant):
277+
def validate_loopback(pipeline, variant: Variant):
237278
"""Validation: once finished, the loopback table should contain all generated values
238279
Validate by comparing the hash of the source table 't' and loopback table"""
239-
src_tbl_hash = self.pipeline.query_hash(
280+
src_tbl_hash = pipeline.query_hash(
240281
f"SELECT * FROM {variant.source} ORDER BY id, str"
241282
)
242283

243-
loopback_tbl_hash = self.pipeline.query_hash(
284+
loopback_tbl_hash = pipeline.query_hash(
244285
f"SELECT * FROM {variant.loopback} ORDER BY id, str"
245286
)
246287

@@ -255,45 +296,64 @@ def validate_loopback(self, variant: Variant):
255296
print(f"Loopback table validated successfully for variant {variant.id}")
256297

257298

258-
class TestKafkaAvro(SharedTestPipeline):
299+
def create_and_run_pipeline_variant(cfg):
300+
"""Create and run multiple pipelines based on configurations defined for each pipeline variant"""
301+
v = Variant(cfg)
302+
303+
# Pre-create topics if specified
304+
if v.create_topic:
305+
create_kafka_topic(v.topic1, v.num_partitions)
306+
create_kafka_topic(v.topic2, v.num_partitions)
307+
308+
sql = build_sql(v)
309+
pipeline_name = unique_pipeline_name(f"test_kafka_avro_{v.id}")
310+
pipeline = PipelineBuilder(TEST_CLIENT, pipeline_name, sql).create_or_replace()
311+
312+
try:
313+
pipeline.start()
314+
# NOTE => total_completed_records counts all rows that are processed through each output as follows:
315+
# 1. Written by the view<v> -> Kafka
316+
# 2. Ingested into loopback table from Kafka
317+
# Thus, expected_records = generated_rows * number_of_outputs (in this case 2)
318+
expected_rows = v.limit * 2
319+
wait_for_rows(pipeline, expected_rows)
320+
validate_loopback(pipeline, v)
321+
finally:
322+
pipeline.stop(force=True)
323+
cleanup_kafka_schema_artifacts(sql, KAFKA_ADMIN, SCHEMA_REGISTRY)
324+
325+
326+
class TestKafkaAvro(unittest.TestCase):
259327
"""Each test method uses its own SQL snippet and processes only its own variant."""
260328

261329
TEST_CONFIGS = [
262-
{"id": 0, "limit": 10},
263-
{"id": 1, "limit": 20},
264-
# {
265-
# "id": 2,
266-
# "limit": 1000000,
267-
# "partitions": [0],
268-
# "sync": True,
269-
# "start_from": "earliest",
270-
# },
330+
{"id": 0, "limit": 10, "partitions": [0], "sync": False},
331+
{
332+
"id": 1,
333+
"limit": 1000000,
334+
"partitions": [0, 1, 2],
335+
"sync": False,
336+
"create_topic": True,
337+
"num_partitions": 3, # pre-create topic with 3 partitions
338+
},
271339
]
272340

273-
@sql(build_sql([TEST_CONFIGS[0]]))
274-
def test_kafka_avro_config_0(self):
275-
cfg = self.TEST_CONFIGS[0]
276-
variant = Variant(cfg)
341+
def test_kafka_avro_variants(self):
342+
# If a run ID is specified, only the test with the specified run ID is ran
343+
run_id = os.getenv("RUN_ID")
344+
configs_to_run = (
345+
[cfg for cfg in self.TEST_CONFIGS if cfg["id"] == int(run_id)]
346+
if run_id is not None
347+
else self.TEST_CONFIGS
348+
)
277349

278-
self.pipeline.start()
279-
try:
280-
expected_rows = variant.limit * 2 # view->Kafka + Kafka->loopback
281-
wait_for_rows(self.pipeline, expected_rows)
282-
validate_loopback(self, variant)
283-
finally:
284-
self.pipeline.stop(force=True)
285-
cleanup_kafka(build_sql([cfg]), KAFKA_BOOTSTRAP, SCHEMA_REGISTRY)
286-
287-
@sql(build_sql([TEST_CONFIGS[1]]))
288-
def test_kafka_avro_config_1(self):
289-
cfg = self.TEST_CONFIGS[1]
290-
variant = Variant(cfg)
291-
292-
self.pipeline.start()
293-
try:
294-
expected_rows = variant.limit * 2
295-
wait_for_rows(self.pipeline, expected_rows)
296-
validate_loopback(self, variant)
297-
finally:
298-
self.pipeline.stop(force=True)
299-
cleanup_kafka(build_sql([cfg]), KAFKA_BOOTSTRAP, SCHEMA_REGISTRY)
350+
for cfg in configs_to_run:
351+
print(f"\n Running pipeline variant id = {cfg['id']}")
352+
create_and_run_pipeline_variant(cfg)
353+
354+
355+
# To run all pipelines in this test file:
356+
# python -m pytest ./tests/workloads/test_kafka_avro.py
357+
358+
# To run a specific pipeline variant by its ID:
359+
# RUN_ID=0 python -m pytest ./tests/workloads/test_kafka_avro.py

0 commit comments

Comments
 (0)