Kafka Connect: Fix source offset tracking when SMTs modify the record topic#15880
Merged
danielcweeks merged 22 commits intoapache:mainfrom Apr 13, 2026
Conversation
Contributor
Author
|
@bryanck @danielcweeks can you guys please review this. |
Contributor
Author
|
@rmoff can you too please take a look. |
Contributor
Author
|
@bryanck @danielcweeks @singhpk234 @Fokko can someone please review this. |
danielcweeks
approved these changes
Apr 13, 2026
Contributor
danielcweeks
left a comment
There was a problem hiding this comment.
This looks good to me. I see some notes in the Kafka docs about issues with versions prior to 3.6, but that was the earliest version we supported, so I don't think we need to address backward compatibility beyond that version.
Contributor
Author
|
Thanks @danielcweeks for your valuable review and merging this PR. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
SinkWriter.save()tracked source offsets usingrecord.topic()andrecord.kafkaPartition(), which reflect values after SMT transformations. When topic-rewriting SMTs likeRegexRouterare in the chain, these no longer match the original Kafka topic/partition that the framework'scontext.assignment()and consumer offset management use.This caused:
Worker.receive()to fail matching source offsets againstcontext.assignment(), falling back toNULL_OFFSETfor every partitionChannel.send()to pass the rewritten topic tosendOffsetsToTransaction. Because the rewritten topic (e.g.tmp.dynamic_orders_json) does not exist as a real Kafka topic,the broker hangs trying to resolve its metadata, the 60 s transaction timeout fires, and the transaction is aborted — rolling back the
DataWrittenandDataCompleteevents thatwere queued in the same transaction. The Coordinator never receives worker data and logs
committed to 0 table(s)on every cycle until the task crashes with:TimeoutException: Timeout expired after 60000ms while awaiting TxnOffsetCommitHandler
Kafka Connect SinkRecord JavaDoc
Reproduction
Use dynamic routing with a
RegexRouter+InsertFieldSMT chain:{ "iceberg.tables.dynamic-enabled": "true", "iceberg.tables.route-field": "srcTopic", "transforms": "addDbPrefix, insertTopic", "transforms.addDbPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.addDbPrefix.regex": ".*", "transforms.addDbPrefix.replacement": "tmp.dynamic_$0", "transforms.insertTopic.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.insertTopic.topic.field": "srcTopic" }RegexRouter changes record.topic() from orders → tmp.dynamic_orders. The sink then keys offsets under TopicPartition("tmp.dynamic_orders", 0), which never matches context.assignment()
containing TopicPartition("orders", 0).
Fix
Changed SinkWriter.save() to use record.originalTopic(), record.originalKafkaPartition(), and record.originalKafkaOffset() — the pre-SMT values that stay consistent with the framework's
consumer offset tracking.
Test plan
Added testOffsetTrackedByOriginalTopicPartition — creates a SinkRecord, transforms it via newRecord() with a different topic (simulating RegexRouter), and verifies offsets are keyed by
the original topic, not the transformed one
Added integration tests to verify the snapshots which fail when if record.topic() is used and pass if originalTopic() is used.
All existing TestSinkWriter tests pass
Addresses -> #13457