Skip to content

Kafka Connect: Fix source offset tracking when SMTs modify the record topic#15880

Merged
danielcweeks merged 22 commits intoapache:mainfrom
kumarpritam863:fix/kafka-connect-sink-offset-tracking-with-smts
Apr 13, 2026
Merged

Kafka Connect: Fix source offset tracking when SMTs modify the record topic#15880
danielcweeks merged 22 commits intoapache:mainfrom
kumarpritam863:fix/kafka-connect-sink-offset-tracking-with-smts

Conversation

@kumarpritam863
Copy link
Copy Markdown
Contributor

@kumarpritam863 kumarpritam863 commented Apr 3, 2026

Summary

SinkWriter.save() tracked source offsets using record.topic() and record.kafkaPartition(), which reflect values after SMT transformations. When topic-rewriting SMTs like RegexRouter are in the chain, these no longer match the original Kafka topic/partition that the framework's context.assignment() and consumer offset management use.

This caused:

  • Worker.receive() to fail matching source offsets against context.assignment(), falling back to NULL_OFFSET for every partition
  • This caused Channel.send() to pass the rewritten topic to sendOffsetsToTransaction. Because the rewritten topic (e.g. tmp.dynamic_orders_json) does not exist as a real Kafka topic,
    the broker hangs trying to resolve its metadata, the 60 s transaction timeout fires, and the transaction is aborted — rolling back the DataWritten and DataComplete events that
    were queued in the same transaction. The Coordinator never receives worker data and logs committed to 0 table(s) on every cycle until the task crashes with:
    TimeoutException: Timeout expired after 60000ms while awaiting TxnOffsetCommitHandler
  • Source consumer offsets for the original topic were never advanced
  • On connector restart, all records were re-consumed, producing duplicate data

Kafka Connect SinkRecord JavaDoc

Get the original topic for this sink record, before any transformations were applied. In order to be compatible with transformations that mutate topic names, this method should be used by sink tasks instead of topic() for any internal offset tracking purposes (for instance, reporting offsets to the Connect runtime via SinkTask. preCommit(Map)).

Reproduction

Use dynamic routing with a RegexRouter + InsertField SMT chain:

{                                                         
  "iceberg.tables.dynamic-enabled": "true",                                                                                                                                               
  "iceberg.tables.route-field": "srcTopic",                                                                                                                                               
  "transforms": "addDbPrefix, insertTopic",                                                                                                                                               
  "transforms.addDbPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",                                                                                                       
  "transforms.addDbPrefix.regex": ".*",                                                                                                                                                   
  "transforms.addDbPrefix.replacement": "tmp.dynamic_$0",                                                                                                                                 
  "transforms.insertTopic.type": "org.apache.kafka.connect.transforms.InsertField$Value",                                                                                                 
  "transforms.insertTopic.topic.field": "srcTopic"                                                                                                                                        
}                                                                                                                                                                                         

RegexRouter changes record.topic() from orders → tmp.dynamic_orders. The sink then keys offsets under TopicPartition("tmp.dynamic_orders", 0), which never matches context.assignment()
containing TopicPartition("orders", 0).

Fix

Changed SinkWriter.save() to use record.originalTopic(), record.originalKafkaPartition(), and record.originalKafkaOffset() — the pre-SMT values that stay consistent with the framework's
consumer offset tracking.

Test plan

  • Added testOffsetTrackedByOriginalTopicPartition — creates a SinkRecord, transforms it via newRecord() with a different topic (simulating RegexRouter), and verifies offsets are keyed by
    the original topic, not the transformed one

  • Added integration tests to verify the snapshots which fail when if record.topic() is used and pass if originalTopic() is used.

  • All existing TestSinkWriter tests pass

Addresses -> #13457

@kumarpritam863
Copy link
Copy Markdown
Contributor Author

@bryanck @danielcweeks can you guys please review this.

@kumarpritam863
Copy link
Copy Markdown
Contributor Author

@rmoff can you too please take a look.

@kumarpritam863
Copy link
Copy Markdown
Contributor Author

@bryanck @danielcweeks @singhpk234 @Fokko can someone please review this.

Copy link
Copy Markdown
Contributor

@danielcweeks danielcweeks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me. I see some notes in the Kafka docs about issues with versions prior to 3.6, but that was the earliest version we supported, so I don't think we need to address backward compatibility beyond that version.

@danielcweeks danielcweeks merged commit 3f18cd4 into apache:main Apr 13, 2026
14 checks passed
@kumarpritam863
Copy link
Copy Markdown
Contributor Author

Thanks @danielcweeks for your valuable review and merging this PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants