Commit 7c82e73
* [#37198] Make withBackOffSupplier public to enable bounded retry configuration
Users need to configure bounded backoff to prevent infinite retry loops.
Making withBackOffSupplier public allows users to set
FluentBackoff.DEFAULT.withMaxRetries(n) and control retry behavior.
Changes:
- Changed withBackOffSupplier() visibility from package-private to public
- Added comprehensive integration test with zero-delay BoundedBackOff
- Test verifies: responses empty, 1 failure emitted, call count = maxRetries+1
The test uses a serializable BoundedBackOff class with assertions on both
PAssert (pipeline outputs) and Metrics (retry counts) to ensure bounded
retry behavior works correctly.
Fixes #37198
Related to #37176
Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
* [Python][Java] Add support for record headers in WriteToKafka (Fixes #27033)
This PR adds support for writing Kafka record headers in the Python SDK
by introducing a new cross-language transform.
Changes:
- Python: Add `with_headers` parameter to `WriteToKafka` that accepts
`beam.Row` elements with key, value, and optional headers fields
- Java: Add `WriteWithHeaders` class that converts Row to ProducerRecord
with headers support
- Java: Register new URN `beam:transform:org.apache.beam:kafka_write_with_headers:v1`
- Add test `testConstructKafkaWriteWithHeaders` in KafkaIOExternalTest
When `with_headers=True`, input elements must be `beam.Row` with schema:
- key: bytes (required)
- value: bytes (required)
- headers: List[Row(key=str, value=bytes)] (optional)
- topic: str (optional, per-record override)
- partition: int (optional)
- timestamp: long (optional)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
* fix: Python lint and formatting issues
- Wrap URN_WITH_HEADERS to fit 80 char limit
- Add blank line before docstring list for Sphinx
- Format if statement per yapf style
---------
Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 3316ef9 commit 7c82e73
3 files changed
Lines changed: 249 additions & 2 deletions
File tree
- sdks
- java/io/kafka/src
- main/java/org/apache/beam/sdk/io/kafka
- test/java/org/apache/beam/sdk/io/kafka
- python/apache_beam/io
Lines changed: 139 additions & 1 deletion
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
124 | 124 | | |
125 | 125 | | |
126 | 126 | | |
| 127 | + | |
| 128 | + | |
127 | 129 | | |
128 | 130 | | |
129 | 131 | | |
| |||
3552 | 3554 | | |
3553 | 3555 | | |
3554 | 3556 | | |
| 3557 | + | |
| 3558 | + | |
3555 | 3559 | | |
3556 | 3560 | | |
3557 | 3561 | | |
3558 | 3562 | | |
3559 | 3563 | | |
3560 | | - | |
| 3564 | + | |
| 3565 | + | |
| 3566 | + | |
| 3567 | + | |
3561 | 3568 | | |
3562 | 3569 | | |
3563 | 3570 | | |
| |||
3855 | 3862 | | |
3856 | 3863 | | |
3857 | 3864 | | |
| 3865 | + | |
| 3866 | + | |
| 3867 | + | |
| 3868 | + | |
| 3869 | + | |
| 3870 | + | |
| 3871 | + | |
| 3872 | + | |
| 3873 | + | |
| 3874 | + | |
| 3875 | + | |
| 3876 | + | |
| 3877 | + | |
| 3878 | + | |
| 3879 | + | |
| 3880 | + | |
| 3881 | + | |
| 3882 | + | |
| 3883 | + | |
| 3884 | + | |
| 3885 | + | |
| 3886 | + | |
| 3887 | + | |
| 3888 | + | |
| 3889 | + | |
| 3890 | + | |
| 3891 | + | |
| 3892 | + | |
| 3893 | + | |
| 3894 | + | |
| 3895 | + | |
| 3896 | + | |
| 3897 | + | |
| 3898 | + | |
| 3899 | + | |
| 3900 | + | |
| 3901 | + | |
| 3902 | + | |
| 3903 | + | |
| 3904 | + | |
| 3905 | + | |
| 3906 | + | |
| 3907 | + | |
| 3908 | + | |
| 3909 | + | |
| 3910 | + | |
| 3911 | + | |
| 3912 | + | |
| 3913 | + | |
| 3914 | + | |
| 3915 | + | |
| 3916 | + | |
| 3917 | + | |
| 3918 | + | |
| 3919 | + | |
| 3920 | + | |
| 3921 | + | |
| 3922 | + | |
| 3923 | + | |
| 3924 | + | |
| 3925 | + | |
| 3926 | + | |
| 3927 | + | |
| 3928 | + | |
| 3929 | + | |
| 3930 | + | |
| 3931 | + | |
| 3932 | + | |
| 3933 | + | |
| 3934 | + | |
| 3935 | + | |
| 3936 | + | |
| 3937 | + | |
| 3938 | + | |
| 3939 | + | |
| 3940 | + | |
| 3941 | + | |
| 3942 | + | |
| 3943 | + | |
| 3944 | + | |
| 3945 | + | |
| 3946 | + | |
| 3947 | + | |
| 3948 | + | |
| 3949 | + | |
| 3950 | + | |
| 3951 | + | |
| 3952 | + | |
| 3953 | + | |
| 3954 | + | |
| 3955 | + | |
| 3956 | + | |
| 3957 | + | |
| 3958 | + | |
| 3959 | + | |
| 3960 | + | |
| 3961 | + | |
| 3962 | + | |
| 3963 | + | |
| 3964 | + | |
| 3965 | + | |
| 3966 | + | |
| 3967 | + | |
| 3968 | + | |
| 3969 | + | |
| 3970 | + | |
| 3971 | + | |
| 3972 | + | |
| 3973 | + | |
| 3974 | + | |
| 3975 | + | |
| 3976 | + | |
| 3977 | + | |
| 3978 | + | |
| 3979 | + | |
| 3980 | + | |
| 3981 | + | |
| 3982 | + | |
| 3983 | + | |
| 3984 | + | |
| 3985 | + | |
| 3986 | + | |
| 3987 | + | |
| 3988 | + | |
| 3989 | + | |
| 3990 | + | |
| 3991 | + | |
| 3992 | + | |
| 3993 | + | |
| 3994 | + | |
| 3995 | + | |
3858 | 3996 | | |
3859 | 3997 | | |
3860 | 3998 | | |
| |||
Lines changed: 86 additions & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
379 | 379 | | |
380 | 380 | | |
381 | 381 | | |
| 382 | + | |
| 383 | + | |
| 384 | + | |
| 385 | + | |
| 386 | + | |
| 387 | + | |
| 388 | + | |
| 389 | + | |
| 390 | + | |
| 391 | + | |
| 392 | + | |
| 393 | + | |
| 394 | + | |
| 395 | + | |
| 396 | + | |
| 397 | + | |
| 398 | + | |
| 399 | + | |
| 400 | + | |
| 401 | + | |
| 402 | + | |
| 403 | + | |
| 404 | + | |
| 405 | + | |
| 406 | + | |
| 407 | + | |
| 408 | + | |
| 409 | + | |
| 410 | + | |
| 411 | + | |
| 412 | + | |
| 413 | + | |
| 414 | + | |
| 415 | + | |
| 416 | + | |
| 417 | + | |
| 418 | + | |
| 419 | + | |
| 420 | + | |
| 421 | + | |
| 422 | + | |
| 423 | + | |
| 424 | + | |
| 425 | + | |
| 426 | + | |
| 427 | + | |
| 428 | + | |
| 429 | + | |
| 430 | + | |
| 431 | + | |
| 432 | + | |
| 433 | + | |
| 434 | + | |
| 435 | + | |
| 436 | + | |
| 437 | + | |
| 438 | + | |
| 439 | + | |
| 440 | + | |
| 441 | + | |
| 442 | + | |
| 443 | + | |
| 444 | + | |
| 445 | + | |
| 446 | + | |
| 447 | + | |
| 448 | + | |
| 449 | + | |
| 450 | + | |
| 451 | + | |
| 452 | + | |
| 453 | + | |
| 454 | + | |
| 455 | + | |
| 456 | + | |
| 457 | + | |
| 458 | + | |
| 459 | + | |
| 460 | + | |
| 461 | + | |
| 462 | + | |
| 463 | + | |
| 464 | + | |
| 465 | + | |
| 466 | + | |
| 467 | + | |
382 | 468 | | |
383 | 469 | | |
384 | 470 | | |
| |||
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
274 | 274 | | |
275 | 275 | | |
276 | 276 | | |
| 277 | + | |
| 278 | + | |
| 279 | + | |
| 280 | + | |
| 281 | + | |
| 282 | + | |
| 283 | + | |
| 284 | + | |
| 285 | + | |
| 286 | + | |
277 | 287 | | |
278 | 288 | | |
279 | 289 | | |
280 | 290 | | |
281 | 291 | | |
282 | 292 | | |
283 | 293 | | |
| 294 | + | |
| 295 | + | |
284 | 296 | | |
285 | 297 | | |
286 | 298 | | |
287 | 299 | | |
288 | 300 | | |
289 | 301 | | |
290 | 302 | | |
| 303 | + | |
291 | 304 | | |
292 | 305 | | |
293 | 306 | | |
| |||
302 | 315 | | |
303 | 316 | | |
304 | 317 | | |
| 318 | + | |
| 319 | + | |
| 320 | + | |
305 | 321 | | |
306 | 322 | | |
| 323 | + | |
| 324 | + | |
| 325 | + | |
| 326 | + | |
| 327 | + | |
| 328 | + | |
| 329 | + | |
307 | 330 | | |
308 | | - | |
| 331 | + | |
309 | 332 | | |
310 | 333 | | |
311 | 334 | | |
| |||
0 commit comments