-
Notifications
You must be signed in to change notification settings - Fork 111
py: fix Kafka Avro tests for improved performance #5880
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -11,6 +11,7 @@ | |||||
| import json | ||||||
| import socket | ||||||
| import uuid | ||||||
| from datetime import datetime, timedelta | ||||||
|
|
||||||
| _KAFKA_BOOTSTRAP = None | ||||||
| _SCHEMA_REGISTRY = None | ||||||
|
|
@@ -96,9 +97,9 @@ def extract_kafka_schema_artifacts(sql: str) -> tuple[list[str], list[str]]: | |||||
| def delete_kafka_topics(admin: AdminClient, topics: list[str]): | ||||||
| tpcs = admin.delete_topics(topics) | ||||||
|
|
||||||
| for topic, tpcs in tpcs.items(): | ||||||
| for topic, fut in tpcs.items(): | ||||||
| try: | ||||||
| tpcs.result() | ||||||
| fut.result() | ||||||
| print(f"Deleted topic: {topic}") | ||||||
| except Exception as e: | ||||||
| print(f"Failed to delete {topic}: {e}") | ||||||
|
|
@@ -122,22 +123,22 @@ def cleanup_kafka_schema_artifacts(sql: str, admin: AdminClient, registry_url: s | |||||
| delete_schema_subjects(registry_url, subjects) | ||||||
|
|
||||||
|
|
||||||
| def create_kafka_topic( | ||||||
| topic_name: str, num_partitions: int = 1, replication_factor: int = 1 | ||||||
| ): | ||||||
| def create_kafka_topic(topic_name: str, num_partitions: int, replication_factor: int): | ||||||
| """Create new topics when multiple partitions are required, since the Kafka output connector does not support | ||||||
| specifying the number of partitions during topic creation.""" | ||||||
| new_topic = NewTopic( | ||||||
| topic_name, num_partitions=num_partitions, replication_factor=replication_factor | ||||||
| ) | ||||||
| futures = get_kafka_admin().create_topics([new_topic]) | ||||||
| for t, f in futures.items(): | ||||||
| tpcs = get_kafka_admin().create_topics([new_topic]) | ||||||
| for topic, fut in tpcs.items(): | ||||||
| try: | ||||||
| f.result() | ||||||
| print(f"Topic {t} created with {num_partitions} partitions") | ||||||
| fut.result() | ||||||
| print( | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we use print in other tests? I think we have some form of logging framework cc @abhizer
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Within the framework we use the logging framework, but within tests we mostly just use print with stderr. |
||||||
| f"Topic {topic} created with {num_partitions} partitions and {replication_factor} replication factor" | ||||||
| ) | ||||||
| except Exception as e: | ||||||
| if "already exists" in str(e): | ||||||
| print(f"Topic {t} already exists") | ||||||
| print(f"Topic {topic} already exists") | ||||||
| else: | ||||||
| raise | ||||||
|
|
||||||
|
|
@@ -146,22 +147,28 @@ class Variant: | |||||
| """Represents a pipeline variant whose tables and views share the same SQL but differ in connector configuration. | ||||||
| Each variant generates unique topic, table, and view names based on the provided configuration.""" | ||||||
|
|
||||||
| def __init__(self, cfg): | ||||||
| def __init__(self, cfg, pipeline_name): | ||||||
| self.id = cfg["id"] | ||||||
| self.limit = cfg["limit"] | ||||||
| self.partitions = cfg.get("partitions") | ||||||
| self.sync = cfg.get("sync") | ||||||
| self.start_from = cfg.get("start_from") | ||||||
| self.create_topic = cfg.get("create_topic", False) | ||||||
| self.num_partitions = cfg.get("num_partitions", 1) | ||||||
| self.num_partitions = cfg.get("num_partitions") | ||||||
| self.replication_factor = cfg.get("replication_factor") | ||||||
|
|
||||||
| # Include date for age tracking | ||||||
| date_str = datetime.now().strftime("%Y%m%d") | ||||||
| suffix = uuid.uuid4().hex[:4] | ||||||
|
|
||||||
| suffix = uuid.uuid4().hex[:8] | ||||||
| self.pipeline_name = pipeline_name | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can use |
||||||
|
|
||||||
| self.topic1 = f"my_topic_avro{suffix}" | ||||||
| self.topic2 = f"my_topic_avro2{suffix}" | ||||||
| self.source = f"t_{self.id}" | ||||||
| self.view = f"v_{self.id}" | ||||||
| self.loopback = f"loopback_{self.id}" | ||||||
| self.topic1 = f"{self.pipeline_name}_topic_{date_str}_{suffix}" | ||||||
| self.topic2 = f"{self.pipeline_name}_topic2_{date_str}_{suffix}" | ||||||
| self.source = f"{self.pipeline_name}_table" | ||||||
| self.view = f"{self.pipeline_name}_view" | ||||||
| self.index = f"{self.pipeline_name}_idx_{suffix}" | ||||||
| self.loopback = f"{self.pipeline_name}_loopback" | ||||||
|
|
||||||
|
|
||||||
| def sql_source_table(v: Variant) -> str: | ||||||
|
|
@@ -209,7 +216,7 @@ def sql_view(v: Variant) -> str: | |||||
| }} | ||||||
| }}, | ||||||
| {{ | ||||||
| "index": "idx_{v.id}", | ||||||
| "index": "{v.index}", | ||||||
| "transport": {{ | ||||||
| "name": "kafka_output", | ||||||
| "config": {{ | ||||||
|
|
@@ -228,7 +235,7 @@ def sql_view(v: Variant) -> str: | |||||
| ) | ||||||
| as select * from {v.source}; | ||||||
|
|
||||||
| create index idx_{v.id} on {v.view}(id); | ||||||
| create index {v.index} on {v.view}(id); | ||||||
| """ | ||||||
|
|
||||||
|
|
||||||
|
|
@@ -284,7 +291,7 @@ def build_sql(v: Variant) -> str: | |||||
| return "\n".join([sql_source_table(v), sql_view(v), sql_loopback_table(v)]) | ||||||
|
|
||||||
|
|
||||||
| def wait_for_rows(pipeline, expected_rows, timeout_s=1800, poll_interval_s=5): | ||||||
| def wait_for_rows(pipeline, expected_rows, timeout_s=3600, poll_interval_s=5): | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. any reason we need a 1h timeout? it likely seems overkill if it doesnt arrive in 30min, it probably will not arrive in 1h?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. even 1800 seems too high imo. I'd give this 10min max |
||||||
| """Since records aren't processed instantaneously, wait until all rows are processed to validate completion by | ||||||
| polling `total_completed_records` every `poll_interval_s` seconds until it reaches `expected_records`""" | ||||||
| start = time.perf_counter() | ||||||
|
|
@@ -324,21 +331,38 @@ def validate_loopback(pipeline, variant: Variant): | |||||
| print(f"Loopback table validated successfully for variant {variant.id}") | ||||||
|
|
||||||
|
|
||||||
| def poll_until_topic_exists(admin, topic_name, timeout=30): | ||||||
| """Poll until the Kafka topic is created by the connector""" | ||||||
| start = time.time() | ||||||
| while time.time() - start < timeout: | ||||||
| if topic_name in admin.list_topics().topics: | ||||||
| return | ||||||
| time.sleep(1) | ||||||
| raise TimeoutError(f"Topic {topic_name} not created within {timeout}s") | ||||||
|
|
||||||
|
|
||||||
| def create_and_run_pipeline_variant(cfg): | ||||||
| """Create and run multiple pipelines based on configurations defined for each pipeline variant""" | ||||||
| v = Variant(cfg) | ||||||
| pipeline_name = unique_pipeline_name(f"test_kafka_avro_{cfg['id']}") | ||||||
| v = Variant(cfg, pipeline_name) | ||||||
|
|
||||||
| # Pre-create topics if specified | ||||||
| if v.create_topic: | ||||||
| create_kafka_topic(v.topic1, v.num_partitions) | ||||||
| create_kafka_topic(v.topic2, v.num_partitions) | ||||||
| create_kafka_topic(v.topic1, v.num_partitions, v.replication_factor) | ||||||
| create_kafka_topic(v.topic2, v.num_partitions, v.replication_factor) | ||||||
|
|
||||||
| sql = build_sql(v) | ||||||
| pipeline_name = unique_pipeline_name(f"test_kafka_avro_{v.id}") | ||||||
| pipeline = PipelineBuilder(TEST_CLIENT, pipeline_name, sql).create_or_replace() | ||||||
|
|
||||||
| try: | ||||||
| pipeline.start() | ||||||
|
|
||||||
| # For variants without pre-created topics, wait for the output connector to create them | ||||||
| if not v.create_topic: | ||||||
| admin = get_kafka_admin() | ||||||
| poll_until_topic_exists(admin, v.topic1) | ||||||
| poll_until_topic_exists(admin, v.topic2) | ||||||
|
|
||||||
| # NOTE => total_completed_records counts all rows that are processed through each output as follows: | ||||||
| # 1. Written by the view<v> -> Kafka | ||||||
| # 2. Ingested into loopback table from Kafka | ||||||
|
|
@@ -355,17 +379,46 @@ class TestKafkaAvro(unittest.TestCase): | |||||
| """Each test method uses its own SQL snippet and processes only its own variant.""" | ||||||
|
|
||||||
| TEST_CONFIGS = [ | ||||||
| {"id": 0, "limit": 10, "partitions": [0], "sync": False}, | ||||||
| { | ||||||
| "id": 0, | ||||||
| "limit": 10, | ||||||
| "sync": False, | ||||||
| "partitions": [0], | ||||||
| "start_from": "earliest", | ||||||
| }, | ||||||
| { | ||||||
| "id": 1, | ||||||
| "limit": 1000000, | ||||||
| "partitions": [0, 1, 2], | ||||||
| "sync": False, | ||||||
| "create_topic": True, | ||||||
| "start_from": "earliest", | ||||||
| "num_partitions": 3, # pre-create topic with 3 partitions | ||||||
| "replication_factor": 1, | ||||||
| }, | ||||||
| ] | ||||||
|
|
||||||
| @classmethod | ||||||
| def setUpClass(cls): | ||||||
| """Clean up stale topics older than 3 days from previous test runs""" | ||||||
|
|
||||||
| admin = get_kafka_admin() | ||||||
| cutoff = (datetime.now() - timedelta(days=3)).strftime("%Y%m%d") | ||||||
|
|
||||||
| topics_to_delete = [] | ||||||
| for topic in admin.list_topics().topics: | ||||||
| if topic.startswith("local_test_kafka_avro_"): | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
should this name be hardcoded? |
||||||
| try: | ||||||
| if topic.split("_")[4] < cutoff: | ||||||
| topics_to_delete.append(topic) | ||||||
| except IndexError: | ||||||
| # Skip if topic doesn't have expected format | ||||||
| pass | ||||||
|
|
||||||
| if topics_to_delete: | ||||||
| print(f"Deleting {len(topics_to_delete)} stale topics: {topics_to_delete}") | ||||||
| delete_kafka_topics(admin, topics_to_delete) | ||||||
|
|
||||||
| def test_kafka_avro_variants(self): | ||||||
| # If a run ID is specified, only the test with the specified run ID is ran | ||||||
| run_id = os.getenv("RUN_ID") | ||||||
|
|
||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldthis be github.run_id? (i dont know just asking)