py: fix Kafka Avro tests for improved performance#5880
Conversation
rivudhk
commented
Mar 20, 2026
- Set start_from: "earliest" for existing variants to prevent the consumer from missing messages
- Add a delay after pipeline starts for variants without pre-created topics
- Use UUID suffixes for view and topic names to prevent schema/topic collisions
- Remove default 'num_partitions' and 'replication_factor' in create_kafka_topic (previously set to 1)
- Increase 'timeout_s' to 3600 seconds for 'wait_for_rows'
- Rename variable 'futures' in 'create_kafka_topics' to 'tpcs' for consistency with 'delete_kafka_topics'
- Remove PYTEST_EXTRA_ARGS in YAML workflows to enable the Kafka tests
- Add variable RUN_ID: 1 in YAML workflow for Python workload tests for the Kafka test
mythical-fred
left a comment
There was a problem hiding this comment.
Commit subject ends in a colon ("as follows:") making it a fragment rather than a standalone summary. Please rewrite, e.g.: "py: fix Kafka Avro tests: topic naming, start_from, timeouts".
| futures = get_kafka_admin().create_topics([new_topic]) | ||
| for t, f in futures.items(): | ||
| tpcs = get_kafka_admin().create_topics([new_topic]) | ||
| for topic, tpcs in tpcs.items(): |
There was a problem hiding this comment.
Variable shadowing: tpcs (the dict from create_topics) is immediately reused as the loop variable in for topic, tpcs in tpcs.items(). Python evaluates .items() before entering the loop so it works, but it is confusing — and it mirrors the same pattern in delete_kafka_topics. Suggest using fut for the loop variable:
for topic, fut in tpcs.items():
try:
fut.result()|
|
||
| # For variants without pre-created topics, wait for the output connector to create them | ||
| if not v.create_topic: | ||
| time.sleep(3) |
There was a problem hiding this comment.
Unconditional time.sleep(3) to wait for auto-topic creation is a flakiness risk — three seconds may not be enough under load. Is there a way to poll until the topic is visible (e.g. via AdminClient.list_topics()) instead of sleeping?
gz
left a comment
There was a problem hiding this comment.
is there code that cleans up the topics
-need to clean up topics if test is successful
-need to clean up old topics that have been used by old failed tests after a few days
| - name: Run platform tests | ||
| if: ${{ vars.CI_DRY_RUN != 'true' }} | ||
| run: uv run --locked pytest -n ${{ vars.PYTEST_WORKERS }} tests/platform --timeout=1500 -vv ${{ vars.PYTEST_EXTRA }} | ||
| run: uv run --locked pytest -n ${{ vars.PYTEST_WORKERS }} tests/platform --timeout=1500 -vv |
There was a problem hiding this comment.
lets keep the variable in yaml for the future but we can remove the test exclusion from
the variable
| - name: Python workload tests | ||
| if: ${{ vars.CI_DRY_RUN != 'true' && !contains(vars.CI_SKIP_JOBS, 'runtime-workload') }} | ||
| run: uv run --locked pytest -n ${{ vars.PYTEST_WORKERS }} tests/workloads --timeout=3600 -vv ${{ vars.PYTEST_EXTRA }} | ||
| run: uv run --locked pytest -n ${{ vars.PYTEST_WORKERS }} tests/workloads --timeout=3600 -vv |
|
|
||
| self.topic1 = f"my_topic_avro{suffix}" | ||
| self.topic2 = f"my_topic_avro2{suffix}" | ||
| self.topic1 = f"my_topic_avro_{self.id}_{suffix}" |
There was a problem hiding this comment.
this start with the name of the test instead of my_... will make it easier to attribute the topic to a test in the kafka instance
| self.topic1 = f"my_topic_avro{suffix}" | ||
| self.topic2 = f"my_topic_avro2{suffix}" | ||
| self.topic1 = f"my_topic_avro_{self.id}_{suffix}" | ||
| self.topic2 = f"my_topic_avro2_{self.id}_{suffix}" |
There was a problem hiding this comment.
It is manually set in the TEST_CONFIGS array within the TestKafkaAvro class.
|
|
||
| # For variants without pre-created topics, wait for the output connector to create them | ||
| if not v.create_topic: | ||
| time.sleep(3) |
- Set start_from: "earliest" for existing variants to prevent the consumer from missing messages - Add a delay after pipeline starts for variants without pre-created topics - Use UUID suffixes for view and topic names to prevent schema/topic collisions - Remove default 'num_partitions' and 'replication_factor' in create_kafka_topic (previously set to 1) - Increase 'timeout_s' to 3600 seconds for 'wait_for_rows' - Rename variable 'futures' in 'create_kafka_topics' to 'tpcs' for consistency with' delete_kafka_topics' Signed-off-by: rivudhk <rivudhkr@gmail.com>
- Rename variable tpcs to fut for clarity - Replace flaky time.sleep() with polling(poll_until_topic_exists) - Update topic naming mechanism to use pipeline name and other suffixes - Add setUpClass cleanup to delete topics older than 3 days from previous runs - Add variable RUN_ID: 1 in YAML workflow for Python workload tests for the Kafka test Signed-off-by: rivudhk <rivudhkr@gmail.com>
4002fdc to
bbdb2f1
Compare
mythical-fred
left a comment
There was a problem hiding this comment.
Blockers addressed — commit subject cleaned up, variable renamed, sleep replaced with polling. LGTM.
|
|
||
|
|
||
| def wait_for_rows(pipeline, expected_rows, timeout_s=1800, poll_interval_s=5): | ||
| def wait_for_rows(pipeline, expected_rows, timeout_s=3600, poll_interval_s=5): |
There was a problem hiding this comment.
any reason we need a 1h timeout? it likely seems overkill if it doesnt arrive in 30min, it probably will not arrive in 1h?
There was a problem hiding this comment.
even 1800 seems too high imo. I'd give this 10min max
| FELDERA_TLS_INSECURE: true | ||
| KAFKA_BOOTSTRAP_SERVERS: ${{ vars.CI_KAFKA_BOOTSTRAP }} | ||
| SCHEMA_REGISTRY_URL: ${{ vars.CI_SCHEMA_REGISTRY }} | ||
| RUN_ID: 1 # we use this to run a single variant of the Kafka tests in test_kafka_avro.py |
There was a problem hiding this comment.
shouldthis be github.run_id? (i dont know just asking)
| suffix = uuid.uuid4().hex[:4] | ||
|
|
||
| suffix = uuid.uuid4().hex[:8] | ||
| self.pipeline_name = pipeline_name |
There was a problem hiding this comment.
you can use unique_pipeline_name in feldera.testutils I htink
| f.result() | ||
| print(f"Topic {t} created with {num_partitions} partitions") | ||
| fut.result() | ||
| print( |
There was a problem hiding this comment.
do we use print in other tests? I think we have some form of logging framework cc @abhizer
There was a problem hiding this comment.
Within the framework we use the logging framework, but within tests we mostly just use print with stderr.
|
|
||
| topics_to_delete = [] | ||
| for topic in admin.list_topics().topics: | ||
| if topic.startswith("local_test_kafka_avro_"): |
There was a problem hiding this comment.
| if topic.startswith("local_test_kafka_avro_"): | |
| if topic.startswith($SOME_VAR): |
should this name be hardcoded?