Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test-integration-runtime.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,4 @@ jobs:
FELDERA_TLS_INSECURE: true
KAFKA_BOOTSTRAP_SERVERS: ${{ vars.CI_KAFKA_BOOTSTRAP }}
SCHEMA_REGISTRY_URL: ${{ vars.CI_SCHEMA_REGISTRY }}
RUN_ID: 1 # we use this to run a single variant of the Kafka tests in test_kafka_avro.py
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldthis be github.run_id? (i dont know just asking)

105 changes: 79 additions & 26 deletions python/tests/workloads/test_kafka_avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import json
import socket
import uuid
from datetime import datetime, timedelta

_KAFKA_BOOTSTRAP = None
_SCHEMA_REGISTRY = None
Expand Down Expand Up @@ -96,9 +97,9 @@ def extract_kafka_schema_artifacts(sql: str) -> tuple[list[str], list[str]]:
def delete_kafka_topics(admin: AdminClient, topics: list[str]):
tpcs = admin.delete_topics(topics)

for topic, tpcs in tpcs.items():
for topic, fut in tpcs.items():
try:
tpcs.result()
fut.result()
print(f"Deleted topic: {topic}")
except Exception as e:
print(f"Failed to delete {topic}: {e}")
Expand All @@ -122,22 +123,22 @@ def cleanup_kafka_schema_artifacts(sql: str, admin: AdminClient, registry_url: s
delete_schema_subjects(registry_url, subjects)


def create_kafka_topic(
topic_name: str, num_partitions: int = 1, replication_factor: int = 1
):
def create_kafka_topic(topic_name: str, num_partitions: int, replication_factor: int):
"""Create new topics when multiple partitions are required, since the Kafka output connector does not support
specifying the number of partitions during topic creation."""
new_topic = NewTopic(
topic_name, num_partitions=num_partitions, replication_factor=replication_factor
)
futures = get_kafka_admin().create_topics([new_topic])
for t, f in futures.items():
tpcs = get_kafka_admin().create_topics([new_topic])
for topic, fut in tpcs.items():
try:
f.result()
print(f"Topic {t} created with {num_partitions} partitions")
fut.result()
print(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we use print in other tests? I think we have some form of logging framework cc @abhizer

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Within the framework we use the logging framework, but within tests we mostly just use print with stderr.

f"Topic {topic} created with {num_partitions} partitions and {replication_factor} replication factor"
)
except Exception as e:
if "already exists" in str(e):
print(f"Topic {t} already exists")
print(f"Topic {topic} already exists")
else:
raise

Expand All @@ -146,22 +147,28 @@ class Variant:
"""Represents a pipeline variant whose tables and views share the same SQL but differ in connector configuration.
Each variant generates unique topic, table, and view names based on the provided configuration."""

def __init__(self, cfg):
def __init__(self, cfg, pipeline_name):
self.id = cfg["id"]
self.limit = cfg["limit"]
self.partitions = cfg.get("partitions")
self.sync = cfg.get("sync")
self.start_from = cfg.get("start_from")
self.create_topic = cfg.get("create_topic", False)
self.num_partitions = cfg.get("num_partitions", 1)
self.num_partitions = cfg.get("num_partitions")
self.replication_factor = cfg.get("replication_factor")

# Include date for age tracking
date_str = datetime.now().strftime("%Y%m%d")
suffix = uuid.uuid4().hex[:4]

suffix = uuid.uuid4().hex[:8]
self.pipeline_name = pipeline_name
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use unique_pipeline_name in feldera.testutils I htink


self.topic1 = f"my_topic_avro{suffix}"
self.topic2 = f"my_topic_avro2{suffix}"
self.source = f"t_{self.id}"
self.view = f"v_{self.id}"
self.loopback = f"loopback_{self.id}"
self.topic1 = f"{self.pipeline_name}_topic_{date_str}_{suffix}"
self.topic2 = f"{self.pipeline_name}_topic2_{date_str}_{suffix}"
self.source = f"{self.pipeline_name}_table"
self.view = f"{self.pipeline_name}_view"
self.index = f"{self.pipeline_name}_idx_{suffix}"
self.loopback = f"{self.pipeline_name}_loopback"


def sql_source_table(v: Variant) -> str:
Expand Down Expand Up @@ -209,7 +216,7 @@ def sql_view(v: Variant) -> str:
}}
}},
{{
"index": "idx_{v.id}",
"index": "{v.index}",
"transport": {{
"name": "kafka_output",
"config": {{
Expand All @@ -228,7 +235,7 @@ def sql_view(v: Variant) -> str:
)
as select * from {v.source};

create index idx_{v.id} on {v.view}(id);
create index {v.index} on {v.view}(id);
"""


Expand Down Expand Up @@ -284,7 +291,7 @@ def build_sql(v: Variant) -> str:
return "\n".join([sql_source_table(v), sql_view(v), sql_loopback_table(v)])


def wait_for_rows(pipeline, expected_rows, timeout_s=1800, poll_interval_s=5):
def wait_for_rows(pipeline, expected_rows, timeout_s=3600, poll_interval_s=5):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason we need a 1h timeout? it likely seems overkill if it doesnt arrive in 30min, it probably will not arrive in 1h?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even 1800 seems too high imo. I'd give this 10min max

"""Since records aren't processed instantaneously, wait until all rows are processed to validate completion by
polling `total_completed_records` every `poll_interval_s` seconds until it reaches `expected_records`"""
start = time.perf_counter()
Expand Down Expand Up @@ -324,21 +331,38 @@ def validate_loopback(pipeline, variant: Variant):
print(f"Loopback table validated successfully for variant {variant.id}")


def poll_until_topic_exists(admin, topic_name, timeout=30):
"""Poll until the Kafka topic is created by the connector"""
start = time.time()
while time.time() - start < timeout:
if topic_name in admin.list_topics().topics:
return
time.sleep(1)
raise TimeoutError(f"Topic {topic_name} not created within {timeout}s")


def create_and_run_pipeline_variant(cfg):
"""Create and run multiple pipelines based on configurations defined for each pipeline variant"""
v = Variant(cfg)
pipeline_name = unique_pipeline_name(f"test_kafka_avro_{cfg['id']}")
v = Variant(cfg, pipeline_name)

# Pre-create topics if specified
if v.create_topic:
create_kafka_topic(v.topic1, v.num_partitions)
create_kafka_topic(v.topic2, v.num_partitions)
create_kafka_topic(v.topic1, v.num_partitions, v.replication_factor)
create_kafka_topic(v.topic2, v.num_partitions, v.replication_factor)

sql = build_sql(v)
pipeline_name = unique_pipeline_name(f"test_kafka_avro_{v.id}")
pipeline = PipelineBuilder(TEST_CLIENT, pipeline_name, sql).create_or_replace()

try:
pipeline.start()

# For variants without pre-created topics, wait for the output connector to create them
if not v.create_topic:
admin = get_kafka_admin()
poll_until_topic_exists(admin, v.topic1)
poll_until_topic_exists(admin, v.topic2)

# NOTE => total_completed_records counts all rows that are processed through each output as follows:
# 1. Written by the view<v> -> Kafka
# 2. Ingested into loopback table from Kafka
Expand All @@ -355,17 +379,46 @@ class TestKafkaAvro(unittest.TestCase):
"""Each test method uses its own SQL snippet and processes only its own variant."""

TEST_CONFIGS = [
{"id": 0, "limit": 10, "partitions": [0], "sync": False},
{
"id": 0,
"limit": 10,
"sync": False,
"partitions": [0],
"start_from": "earliest",
},
{
"id": 1,
"limit": 1000000,
"partitions": [0, 1, 2],
"sync": False,
"create_topic": True,
"start_from": "earliest",
"num_partitions": 3, # pre-create topic with 3 partitions
"replication_factor": 1,
},
]

@classmethod
def setUpClass(cls):
"""Clean up stale topics older than 3 days from previous test runs"""

admin = get_kafka_admin()
cutoff = (datetime.now() - timedelta(days=3)).strftime("%Y%m%d")

topics_to_delete = []
for topic in admin.list_topics().topics:
if topic.startswith("local_test_kafka_avro_"):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if topic.startswith("local_test_kafka_avro_"):
if topic.startswith($SOME_VAR):

should this name be hardcoded?

try:
if topic.split("_")[4] < cutoff:
topics_to_delete.append(topic)
except IndexError:
# Skip if topic doesn't have expected format
pass

if topics_to_delete:
print(f"Deleting {len(topics_to_delete)} stale topics: {topics_to_delete}")
delete_kafka_topics(admin, topics_to_delete)

def test_kafka_avro_variants(self):
# If a run ID is specified, only the test with the specified run ID is ran
run_id = os.getenv("RUN_ID")
Expand Down
Loading