Skip to content

Commit 7c82e73

Browse files
PDGGKclaude
andauthored
[Python][Java] Add support for record headers in WriteToKafka (Fixes #27033) (#37458)
* [#37198] Make withBackOffSupplier public to enable bounded retry configuration Users need to configure bounded backoff to prevent infinite retry loops. Making withBackOffSupplier public allows users to set FluentBackoff.DEFAULT.withMaxRetries(n) and control retry behavior. Changes: - Changed withBackOffSupplier() visibility from package-private to public - Added comprehensive integration test with zero-delay BoundedBackOff - Test verifies: responses empty, 1 failure emitted, call count = maxRetries+1 The test uses a serializable BoundedBackOff class with assertions on both PAssert (pipeline outputs) and Metrics (retry counts) to ensure bounded retry behavior works correctly. Fixes #37198 Related to #37176 Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> * [Python][Java] Add support for record headers in WriteToKafka (Fixes #27033) This PR adds support for writing Kafka record headers in the Python SDK by introducing a new cross-language transform. Changes: - Python: Add `with_headers` parameter to `WriteToKafka` that accepts `beam.Row` elements with key, value, and optional headers fields - Java: Add `WriteWithHeaders` class that converts Row to ProducerRecord with headers support - Java: Register new URN `beam:transform:org.apache.beam:kafka_write_with_headers:v1` - Add test `testConstructKafkaWriteWithHeaders` in KafkaIOExternalTest When `with_headers=True`, input elements must be `beam.Row` with schema: - key: bytes (required) - value: bytes (required) - headers: List[Row(key=str, value=bytes)] (optional) - topic: str (optional, per-record override) - partition: int (optional) - timestamp: long (optional) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix: Python lint and formatting issues - Wrap URN_WITH_HEADERS to fit 80 char limit - Add blank line before docstring list for Sphinx - Format if statement per yapf style --------- Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 3316ef9 commit 7c82e73

3 files changed

Lines changed: 249 additions & 2 deletions

File tree

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java

Lines changed: 139 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,8 @@
124124
import org.apache.kafka.common.PartitionInfo;
125125
import org.apache.kafka.common.TopicPartition;
126126
import org.apache.kafka.common.config.SaslConfigs;
127+
import org.apache.kafka.common.header.Header;
128+
import org.apache.kafka.common.header.internals.RecordHeader;
127129
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
128130
import org.apache.kafka.common.serialization.Deserializer;
129131
import org.apache.kafka.common.serialization.Serializer;
@@ -3552,12 +3554,17 @@ public PTransform<PCollection<KV<K, V>>, PDone> buildExternal(
35523554
public static class External implements ExternalTransformRegistrar {
35533555

35543556
public static final String URN = "beam:transform:org.apache.beam:kafka_write:v1";
3557+
public static final String URN_WITH_HEADERS =
3558+
"beam:transform:org.apache.beam:kafka_write_with_headers:v1";
35553559

35563560
@Override
35573561
public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
35583562
return ImmutableMap.of(
35593563
URN,
3560-
(Class<KafkaIO.Write.Builder<?, ?>>) (Class<?>) AutoValue_KafkaIO_Write.Builder.class);
3564+
(Class<KafkaIO.Write.Builder<?, ?>>) (Class<?>) AutoValue_KafkaIO_Write.Builder.class,
3565+
URN_WITH_HEADERS,
3566+
(Class<? extends ExternalTransformBuilder<?, ?, ?>>)
3567+
(Class<?>) WriteWithHeaders.Builder.class);
35613568
}
35623569

35633570
/** Parameters class to expose the Write transform to an external SDK. */
@@ -3855,6 +3862,137 @@ public T decode(InputStream inStream) {
38553862
}
38563863
}
38573864

3865+
/**
3866+
* A {@link PTransform} to write to Kafka with support for record headers.
3867+
*
3868+
* <p>This transform accepts {@link Row} elements with the following schema:
3869+
*
3870+
* <ul>
3871+
* <li>key: bytes (required) - The key of the record.
3872+
* <li>value: bytes (required) - The value of the record.
3873+
* <li>headers: List&lt;Row(key=str, value=bytes)&gt; (optional) - Record headers.
3874+
* <li>topic: str (optional) - Per-record topic override.
3875+
* <li>partition: int (optional) - Per-record partition.
3876+
* <li>timestamp: long (optional) - Per-record timestamp in milliseconds.
3877+
* </ul>
3878+
*
3879+
* <p>This class is primarily used as a cross-language transform.
3880+
*/
3881+
static class WriteWithHeaders extends PTransform<PCollection<Row>, PDone> {
3882+
private static final String FIELD_KEY = "key";
3883+
private static final String FIELD_VALUE = "value";
3884+
private static final String FIELD_HEADERS = "headers";
3885+
private static final String FIELD_TOPIC = "topic";
3886+
private static final String FIELD_PARTITION = "partition";
3887+
private static final String FIELD_TIMESTAMP = "timestamp";
3888+
private static final String HEADER_FIELD_KEY = "key";
3889+
private static final String HEADER_FIELD_VALUE = "value";
3890+
3891+
private final WriteRecords<byte[], byte[]> writeRecords;
3892+
3893+
WriteWithHeaders(WriteRecords<byte[], byte[]> writeRecords) {
3894+
this.writeRecords = writeRecords;
3895+
}
3896+
3897+
static class Builder
3898+
implements ExternalTransformBuilder<Write.External.Configuration, PCollection<Row>, PDone> {
3899+
3900+
@Override
3901+
@SuppressWarnings("unchecked")
3902+
public PTransform<PCollection<Row>, PDone> buildExternal(
3903+
Write.External.Configuration configuration) {
3904+
Map<String, Object> producerConfig = new HashMap<>(configuration.producerConfig);
3905+
Class<Serializer<byte[]>> keySerializer =
3906+
(Class<Serializer<byte[]>>) resolveClass(configuration.keySerializer);
3907+
Class<Serializer<byte[]>> valueSerializer =
3908+
(Class<Serializer<byte[]>>) resolveClass(configuration.valueSerializer);
3909+
3910+
WriteRecords<byte[], byte[]> writeRecords =
3911+
KafkaIO.<byte[], byte[]>writeRecords()
3912+
.withProducerConfigUpdates(producerConfig)
3913+
.withKeySerializer(keySerializer)
3914+
.withValueSerializer(valueSerializer);
3915+
3916+
if (configuration.topic != null) {
3917+
writeRecords = writeRecords.withTopic(configuration.topic);
3918+
}
3919+
3920+
return new WriteWithHeaders(writeRecords);
3921+
}
3922+
}
3923+
3924+
@Override
3925+
public PDone expand(PCollection<Row> input) {
3926+
final @Nullable String defaultTopic = writeRecords.getTopic();
3927+
return input
3928+
.apply(
3929+
"Row to ProducerRecord",
3930+
MapElements.via(
3931+
new SimpleFunction<Row, ProducerRecord<byte[], byte[]>>() {
3932+
@Override
3933+
public ProducerRecord<byte[], byte[]> apply(Row row) {
3934+
return toProducerRecord(row, defaultTopic);
3935+
}
3936+
}))
3937+
.setCoder(
3938+
ProducerRecordCoder.of(
3939+
NullableCoder.of(ByteArrayCoder.of()), NullableCoder.of(ByteArrayCoder.of())))
3940+
.apply(writeRecords);
3941+
}
3942+
3943+
@Override
3944+
public void populateDisplayData(DisplayData.Builder builder) {
3945+
super.populateDisplayData(builder);
3946+
writeRecords.populateDisplayData(builder);
3947+
}
3948+
3949+
@SuppressWarnings("argument")
3950+
private static ProducerRecord<byte[], byte[]> toProducerRecord(
3951+
Row row, @Nullable String defaultTopic) {
3952+
String topic = defaultTopic;
3953+
if (row.getSchema().hasField(FIELD_TOPIC)) {
3954+
String rowTopic = row.getString(FIELD_TOPIC);
3955+
if (rowTopic != null) {
3956+
topic = rowTopic;
3957+
}
3958+
}
3959+
checkArgument(
3960+
topic != null, "Row is missing field '%s' and no default topic configured", FIELD_TOPIC);
3961+
3962+
byte[] key = row.getBytes(FIELD_KEY);
3963+
byte[] value = row.getBytes(FIELD_VALUE);
3964+
Integer partition =
3965+
row.getSchema().hasField(FIELD_PARTITION) ? row.getInt32(FIELD_PARTITION) : null;
3966+
Long timestamp =
3967+
row.getSchema().hasField(FIELD_TIMESTAMP) ? row.getInt64(FIELD_TIMESTAMP) : null;
3968+
3969+
boolean hasHeaders = ConsumerSpEL.hasHeaders();
3970+
Iterable<Header> headers = Collections.emptyList();
3971+
if (hasHeaders && row.getSchema().hasField(FIELD_HEADERS)) {
3972+
Iterable<Row> headerRows = row.getArray(FIELD_HEADERS);
3973+
if (headerRows != null) {
3974+
List<Header> headerList = new ArrayList<>();
3975+
for (Row headerRow : headerRows) {
3976+
String headerKey = headerRow.getString(HEADER_FIELD_KEY);
3977+
checkArgument(headerKey != null, "Header key is required");
3978+
byte[] headerValue = headerRow.getBytes(HEADER_FIELD_VALUE);
3979+
headerList.add(new RecordHeader(headerKey, headerValue));
3980+
}
3981+
headers = headerList;
3982+
}
3983+
} else if (!hasHeaders && row.getSchema().hasField(FIELD_HEADERS)) {
3984+
// Log warning when headers are present but Kafka client doesn't support them
3985+
LOG.warn(
3986+
"Dropping headers from Kafka record because the Kafka client version "
3987+
+ "does not support headers (requires Kafka 0.11+).");
3988+
}
3989+
3990+
return hasHeaders
3991+
? new ProducerRecord<>(topic, partition, timestamp, key, value, headers)
3992+
: new ProducerRecord<>(topic, partition, timestamp, key, value);
3993+
}
3994+
}
3995+
38583996
private static Class<?> resolveClass(String className) {
38593997
try {
38603998
return Class.forName(className);

sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,92 @@ public void testConstructKafkaWrite() throws Exception {
379379
assertThat(spec.getValueSerializer().getName(), Matchers.is(valueSerializer));
380380
}
381381

382+
@Test
383+
public void testConstructKafkaWriteWithHeaders() throws Exception {
384+
String topic = "topic";
385+
String keySerializer = "org.apache.kafka.common.serialization.ByteArraySerializer";
386+
String valueSerializer = "org.apache.kafka.common.serialization.ByteArraySerializer";
387+
ImmutableMap<String, String> producerConfig =
388+
ImmutableMap.<String, String>builder()
389+
.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "server1:port,server2:port")
390+
.put("retries", "3")
391+
.build();
392+
393+
ExternalTransforms.ExternalConfigurationPayload payload =
394+
encodeRow(
395+
Row.withSchema(
396+
Schema.of(
397+
Field.of("topic", FieldType.STRING),
398+
Field.of(
399+
"producer_config", FieldType.map(FieldType.STRING, FieldType.STRING)),
400+
Field.of("key_serializer", FieldType.STRING),
401+
Field.of("value_serializer", FieldType.STRING)))
402+
.withFieldValue("topic", topic)
403+
.withFieldValue("producer_config", producerConfig)
404+
.withFieldValue("key_serializer", keySerializer)
405+
.withFieldValue("value_serializer", valueSerializer)
406+
.build());
407+
408+
Schema rowSchema =
409+
Schema.of(Field.of("key", FieldType.BYTES), Field.of("value", FieldType.BYTES));
410+
Row inputRow = Row.withSchema(rowSchema).addValues(new byte[0], new byte[0]).build();
411+
412+
Pipeline p = Pipeline.create();
413+
p.apply(Create.of(inputRow).withRowSchema(rowSchema));
414+
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
415+
String inputPCollection =
416+
Iterables.getOnlyElement(
417+
Iterables.getLast(pipelineProto.getComponents().getTransformsMap().values())
418+
.getOutputsMap()
419+
.values());
420+
421+
ExpansionApi.ExpansionRequest request =
422+
ExpansionApi.ExpansionRequest.newBuilder()
423+
.setComponents(pipelineProto.getComponents())
424+
.setTransform(
425+
RunnerApi.PTransform.newBuilder()
426+
.setUniqueName("test")
427+
.putInputs("input", inputPCollection)
428+
.setSpec(
429+
RunnerApi.FunctionSpec.newBuilder()
430+
.setUrn(
431+
org.apache.beam.sdk.io.kafka.KafkaIO.Write.External
432+
.URN_WITH_HEADERS)
433+
.setPayload(payload.toByteString())))
434+
.setNamespace("test_namespace")
435+
.build();
436+
437+
ExpansionService expansionService = new ExpansionService();
438+
TestStreamObserver<ExpansionApi.ExpansionResponse> observer = new TestStreamObserver<>();
439+
expansionService.expand(request, observer);
440+
441+
ExpansionApi.ExpansionResponse result = observer.result;
442+
RunnerApi.PTransform transform = result.getTransform();
443+
assertThat(
444+
transform.getSubtransformsList(),
445+
Matchers.hasItem(MatchesPattern.matchesPattern(".*Row-to-ProducerRecord.*")));
446+
assertThat(
447+
transform.getSubtransformsList(),
448+
Matchers.hasItem(MatchesPattern.matchesPattern(".*KafkaIO-WriteRecords.*")));
449+
assertThat(transform.getInputsCount(), Matchers.is(1));
450+
assertThat(transform.getOutputsCount(), Matchers.is(0));
451+
452+
RunnerApi.PTransform writeComposite =
453+
result.getComponents().getTransformsOrThrow(transform.getSubtransforms(1));
454+
RunnerApi.PTransform writeParDo =
455+
result.getComponents().getTransformsOrThrow(writeComposite.getSubtransforms(0));
456+
457+
RunnerApi.ParDoPayload parDoPayload =
458+
RunnerApi.ParDoPayload.parseFrom(writeParDo.getSpec().getPayload());
459+
KafkaWriter<?, ?> kafkaWriter = (KafkaWriter<?, ?>) ParDoTranslation.getDoFn(parDoPayload);
460+
KafkaIO.WriteRecords<?, ?> spec = kafkaWriter.getSpec();
461+
462+
assertThat(spec.getProducerConfig(), Matchers.is(producerConfig));
463+
assertThat(spec.getTopic(), Matchers.is(topic));
464+
assertThat(spec.getKeySerializer().getName(), Matchers.is(keySerializer));
465+
assertThat(spec.getValueSerializer().getName(), Matchers.is(valueSerializer));
466+
}
467+
382468
private static ExternalConfigurationPayload encodeRow(Row row) {
383469
ByteStringOutputStream outputStream = new ByteStringOutputStream();
384470
try {

sdks/python/apache_beam/io/kafka.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,20 +274,33 @@ class WriteToKafka(ExternalTransform):
274274
assumed to be byte arrays.
275275
276276
Experimental; no backwards compatibility guarantees.
277+
278+
When with_headers=True, the input PCollection elements must be beam.Row
279+
objects with the following schema:
280+
281+
- key: bytes (required) - The key of the record.
282+
- value: bytes (required) - The value of the record.
283+
- headers: List[Row(key=str, value=bytes)] (optional) - Record headers.
284+
- topic: str (optional) - Per-record topic override.
285+
- partition: int (optional) - Per-record partition.
286+
- timestamp: int (optional) - Per-record timestamp in milliseconds.
277287
"""
278288

279289
# Default serializer which passes raw bytes to Kafka
280290
byte_array_serializer = (
281291
'org.apache.kafka.common.serialization.ByteArraySerializer')
282292

283293
URN = 'beam:transform:org.apache.beam:kafka_write:v1'
294+
URN_WITH_HEADERS = (
295+
'beam:transform:org.apache.beam:kafka_write_with_headers:v1')
284296

285297
def __init__(
286298
self,
287299
producer_config,
288300
topic,
289301
key_serializer=byte_array_serializer,
290302
value_serializer=byte_array_serializer,
303+
with_headers=False,
291304
expansion_service=None):
292305
"""
293306
Initializes a write operation to Kafka.
@@ -302,10 +315,20 @@ def __init__(
302315
Serializer for the topic's value, e.g.
303316
'org.apache.kafka.common.serialization.LongSerializer'.
304317
Default: 'org.apache.kafka.common.serialization.ByteArraySerializer'.
318+
:param with_headers: If True, input elements must be beam.Row objects
319+
containing 'key', 'value', and optional 'headers' fields.
320+
Only ByteArraySerializer is supported when with_headers=True.
305321
:param expansion_service: The address (host:port) of the ExpansionService.
306322
"""
323+
if with_headers and (key_serializer != self.byte_array_serializer or
324+
value_serializer != self.byte_array_serializer):
325+
raise ValueError(
326+
'WriteToKafka(with_headers=True) only supports '
327+
'ByteArraySerializer for key and value.')
328+
329+
urn = self.URN_WITH_HEADERS if with_headers else self.URN
307330
super().__init__(
308-
self.URN,
331+
urn,
309332
NamedTupleBasedPayloadBuilder(
310333
WriteToKafkaSchema(
311334
producer_config=producer_config,

0 commit comments

Comments
 (0)