From f91b5af808b38dad5c8b8f2c74fdaa664f40fc9e Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Sun, 10 May 2026 10:04:54 +0000 Subject: [PATCH 1/9] [Dataflow] Add Operation::finishKey() and move timers into it Moving the processTimers call from finish() to finishKey(). In upcoming changes there'll be multiple streaming work items in a single beam bundle. With multiple work items, we've to process elements and timers of each work item before moving to the next work items. finishKey() will be called by the NativeIterator classes after iterating through all elements from a work item. Batch processes timers in BatchModeUngroupingParDoFn and does not rely on the processTimers() in ParDoOperation::finish(). So removing the processTimers() call from ParDoOperation::finish() is safe. Batch also does not use the new finishKey() method. --- ...tCommit_Java_ValidatesRunner_Dataflow.json | 2 +- ...va_ValidatesRunner_Dataflow_Streaming.json | 2 +- .../runners/dataflow/worker/PubsubReader.java | 14 +---- .../worker/StreamingModeExecutionContext.java | 12 +++- .../worker/UngroupedWindmillReader.java | 14 +---- .../worker/WindmillReaderIteratorBase.java | 15 ++++- .../worker/WindowingWindmillReader.java | 5 ++ .../WorkerCustomSourceOperationExecutor.java | 3 + .../streaming/ComputationWorkExecutor.java | 2 +- .../util/common/worker/FlattenOperation.java | 3 + .../util/common/worker/MapTaskExecutor.java | 7 +++ .../worker/util/common/worker/Operation.java | 3 + .../util/common/worker/ParDoOperation.java | 7 ++- .../util/common/worker/ReadOperation.java | 3 + .../util/common/worker/WorkExecutor.java | 3 + .../util/common/worker/WriteOperation.java | 3 + .../worker/IntrinsicMapTaskExecutorTest.java | 12 ++++ .../worker/StreamingDataflowWorkerTest.java | 4 +- .../StreamingModeExecutionContextTest.java | 11 +++- .../WindmillReaderIteratorBaseTest.java | 60 ++++++++++++++++++- .../worker/WorkerCustomSourcesTest.java | 7 ++- .../util/common/worker/ExecutorTestUtils.java | 3 + .../common/worker/MapTaskExecutorTest.java | 12 ++++ .../common/worker/ParDoOperationTest.java | 2 + 24 files changed, 170 insertions(+), 39 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json index e9d869cc508c..8144784f5f02 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run!", - "modification": 1, + "modification": 2, } diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json index e623d3373a93..50d17c108f2e 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run!", - "modification": 1, + "modification": 2, } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubReader.java index b60cb84415ff..0c80909a0f3b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubReader.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubReader.java @@ -117,20 +117,12 @@ public NativeReader create( @Override public NativeReaderIterator> iterator() throws IOException { - return new PubsubReaderIterator(context.getWorkItem()); + return new PubsubReaderIterator(); } class PubsubReaderIterator extends WindmillReaderIteratorBase { - protected PubsubReaderIterator(Windmill.WorkItem work) { - super(work, skipUndecodableElements); - } - - @Override - public boolean advance() throws IOException { - if (context.workIsFailed()) { - return false; - } - return super.advance(); + protected PubsubReaderIterator() { + super(context, skipUndecodableElements); } @Override diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index f75d452b211b..0255d5c577ec 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -56,6 +56,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInput; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputState; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher; +import org.apache.beam.runners.dataflow.worker.util.common.worker.WorkExecutor; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataId; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataRequest; @@ -157,6 +158,8 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext activeReader; + private @Nullable WorkExecutor workExecutor; + public StreamingModeExecutionContext( CounterFactory counterFactory, String computationId, @@ -240,9 +243,11 @@ public void start( Work work, WindmillStateReader stateReader, SideInputStateFetcher sideInputStateFetcher, - Windmill.WorkItemCommitRequest.Builder outputBuilder) { + Windmill.WorkItemCommitRequest.Builder outputBuilder, + WorkExecutor workExecutor) { this.key = key; this.work = work; + this.workExecutor = workExecutor; this.computationKey = WindmillComputationKey.create(computationId, work.getShardedKey()); this.sideInputStateFetcher = sideInputStateFetcher; StreamingGlobalConfig config = globalConfigHandle.getConfig(); @@ -270,6 +275,11 @@ public void start( } } + public void finishKey() throws Exception { + checkNotNull(workExecutor, "workExecutor must be set before calling finishKey()"); + workExecutor.finishKey(); + } + /** * Ensure that the processing time is greater than any fired processing time timers. Otherwise, a * trigger could ignore the timer and orphan the window. diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java index 8ef0bf80323a..f6924493f190 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java @@ -109,20 +109,12 @@ public NativeReader create( @Override public NativeReaderIterator> iterator() throws IOException { - return new UngroupedWindmillReaderIterator(context.getWorkItem()); + return new UngroupedWindmillReaderIterator(); } class UngroupedWindmillReaderIterator extends WindmillReaderIteratorBase { - UngroupedWindmillReaderIterator(Windmill.WorkItem work) { - super(work, skipUndecodableElements); - } - - @Override - public boolean advance() throws IOException { - if (context.workIsFailed()) { - return false; - } - return super.advance(); + UngroupedWindmillReaderIterator() { + super(context, skipUndecodableElements); } @Override diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java index 7e6508a4788c..b8232ca9f61a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java @@ -34,6 +34,7 @@ */ public abstract class WindmillReaderIteratorBase extends NativeReader.NativeReaderIterator> { + private final StreamingModeExecutionContext context; private final Windmill.WorkItem work; private int bundleIndex = 0; private int messageIndex = -1; @@ -42,9 +43,10 @@ public abstract class WindmillReaderIteratorBase private static final Logger LOG = LoggerFactory.getLogger(WindmillReaderIteratorBase.class); protected WindmillReaderIteratorBase( - Windmill.WorkItem work, ValueProvider skipUndecodableElements) { + StreamingModeExecutionContext context, ValueProvider skipUndecodableElements) { + this.context = context; this.skipUndecodableElements = skipUndecodableElements; - this.work = work; + this.work = context.getWorkItem(); } @Override @@ -54,9 +56,18 @@ public boolean start() throws IOException { @Override public boolean advance() throws IOException { + if (context.workIsFailed()) { + throw new WorkItemCancelledException(context.getWorkItem().getShardingKey()); + } + while (true) { if (bundleIndex >= work.getMessageBundlesCount()) { current = null; + try { + context.finishKey(); + } catch (Exception e) { + throw new RuntimeException(e); + } return false; } Windmill.InputMessageBundle bundle = work.getMessageBundles(bundleIndex); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java index 173b254f6395..7483f2d138a2 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java @@ -182,6 +182,11 @@ public boolean start() throws IOException { @Override public boolean advance() throws IOException { current = null; + try { + context.finishKey(); + } catch (Exception e) { + throw new RuntimeException(e); + } return false; } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourceOperationExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourceOperationExecutor.java index 31528e96e07f..a1321d57ebb6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourceOperationExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourceOperationExecutor.java @@ -89,6 +89,9 @@ public void execute() throws Exception { LOG.debug("Source operation execution complete"); } + @Override + public void finishKey() throws Exception {} + @Override public SourceOperationResponse getResponse() { return response; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java index 8dc681fc640c..b4f3a22a7f52 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java @@ -74,7 +74,7 @@ public final void executeWork( SideInputStateFetcher sideInputStateFetcher, Windmill.WorkItemCommitRequest.Builder outputBuilder) throws Exception { - context().start(key, work, stateReader, sideInputStateFetcher, outputBuilder); + context().start(key, work, stateReader, sideInputStateFetcher, outputBuilder, workExecutor()); workExecutor().execute(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/FlattenOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/FlattenOperation.java index 4847e9f2ea9c..af1b2b9c48bd 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/FlattenOperation.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/FlattenOperation.java @@ -43,6 +43,9 @@ public void process(Object elem) throws Exception { } } + @Override + public void finishKey() throws Exception {} + @Override public boolean supportsRestart() { return true; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java index 58b95f286d55..3c33e1904069 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java @@ -112,6 +112,13 @@ public void execute() throws Exception { // TODO: support for success / failure ports? } + @Override + public void finishKey() throws Exception { + for (Operation op : operations) { + op.finishKey(); + } + } + @Override public NativeReader.Progress getWorkerProgress() throws Exception { return getReadOperation().getProgress(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/Operation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/Operation.java index b7b4e255cfa5..b630da33cfad 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/Operation.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/Operation.java @@ -137,6 +137,9 @@ public void finish() throws Exception { } } + /** Called when all elements for a specific key have been processed. */ + public abstract void finishKey() throws Exception; + /** Aborts this Operation's execution. */ public void abort() throws Exception { synchronized (initializationStateLock) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperation.java index 27b6e9d1fb35..5aec82073366 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperation.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperation.java @@ -46,12 +46,17 @@ public void process(Object elem) throws Exception { } @Override - public void finish() throws Exception { + // Batch mode does not use this method and instead relies on BatchModeUngroupingParDoFn + // to process timers per key. + public void finishKey() throws Exception { try (Closeable scope = context.enterProcessTimers()) { checkStarted(); fn.processTimers(); } + } + @Override + public void finish() throws Exception { try (Closeable scope = context.enterFinish()) { fn.finishBundle(); super.finish(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ReadOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ReadOperation.java index d6b020483d4c..fabc8d6af25b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ReadOperation.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ReadOperation.java @@ -271,6 +271,9 @@ public void finish() throws Exception { } } + @Override + public void finishKey() throws Exception {} + @Override public void abort() throws Exception { if (readerIterator != null) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WorkExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WorkExecutor.java index b7170c80ced9..1083fdbb9c42 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WorkExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WorkExecutor.java @@ -34,6 +34,9 @@ public interface WorkExecutor extends AutoCloseable { /** Executes the task. */ public abstract void execute() throws Exception; + /** Called when all elements for a specific key have been processed. */ + void finishKey() throws Exception; + /** * Returns the worker's current progress. * diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WriteOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WriteOperation.java index 673140d58d89..d28e7f3e5d3d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WriteOperation.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WriteOperation.java @@ -105,6 +105,9 @@ public void finish() throws Exception { } } + @Override + public void finishKey() throws Exception {} + @Override public void abort() throws Exception { if (writer == null) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorTest.java index c519efd4172c..396c8db87e6b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorTest.java @@ -104,6 +104,9 @@ public void abort() throws Exception { aborted = true; super.abort(); } + + @Override + public void finishKey() throws Exception {} } // A mock ReadOperation fed to a MapTaskExecutor in test. @@ -312,6 +315,9 @@ public void start() throws Exception { Metrics.counter("TestMetric", "MetricCounter").inc(1L); } } + + @Override + public void finishKey() throws Exception {} }, new Operation(new OutputReceiver[] {}, context2) { @Override @@ -321,6 +327,9 @@ public void start() throws Exception { Metrics.counter("TestMetric", "MetricCounter").inc(2L); } } + + @Override + public void finishKey() throws Exception {} }, new Operation(new OutputReceiver[] {}, context3) { @Override @@ -330,6 +339,9 @@ public void start() throws Exception { Metrics.counter("TestMetric", "MetricCounter").inc(3L); } } + + @Override + public void finishKey() throws Exception {} }); try (IntrinsicMapTaskExecutor executor = diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index d8a1d1b90d47..ff82a1ab5c4c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -3657,8 +3657,8 @@ public void testActiveWorkFailure() throws Exception { server.waitForAndGetCommitsWithTimeout(1, Duration.standardSeconds(5)); assertEquals(1, commits.size()); - assertEquals(0, BlockingFn.teardownCounter.get()); - assertEquals(1, BlockingFn.setupCounter.get()); + assertEquals(1, BlockingFn.teardownCounter.get()); + assertEquals(2, BlockingFn.setupCounter.get()); worker.stop(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java index 4bfa6efc8880..970a9a537c0d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java @@ -62,6 +62,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.config.FakeGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher; +import org.apache.beam.runners.dataflow.worker.util.common.worker.WorkExecutor; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.FakeGetDataClient; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache; @@ -99,6 +100,7 @@ public class StreamingModeExecutionContextTest { @Rule public transient Timeout globalTimeout = Timeout.seconds(600); @Mock private SideInputStateFetcher sideInputStateFetcher; @Mock private WindmillStateReader stateReader; + @Mock private WorkExecutor workExecutor; private static final String COMPUTATION_ID = "computationId"; @@ -168,7 +170,8 @@ public void testTimerInternalsSetTimer() { Watermarks.builder().setInputDataWatermark(new Instant(1000)).build()), stateReader, sideInputStateFetcher, - outputBuilder); + outputBuilder, + workExecutor); TimerInternals timerInternals = stepContext.timerInternals(); @@ -218,7 +221,8 @@ public void testTimerInternalsProcessingTimeSkew() { Watermarks.builder().setInputDataWatermark(new Instant(1000)).build()), stateReader, sideInputStateFetcher, - outputBuilder); + outputBuilder, + workExecutor); TimerInternals timerInternals = stepContext.timerInternals(); assertTrue(timerTimestamp.isBefore(timerInternals.currentProcessingTime())); } @@ -427,7 +431,8 @@ public void testStateTagEncodingBasedOnConfig() { Watermarks.builder().setInputDataWatermark(new Instant(1000)).build()), stateReader, sideInputStateFetcher, - outputBuilder); + outputBuilder, + workExecutor); assertEquals(expectedEncoding, executionContext.getWindmillTagEncoding().getClass()); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBaseTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBaseTest.java index 61e2f4250d06..539c38eeb1da 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBaseTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBaseTest.java @@ -19,6 +19,11 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; @@ -40,8 +45,8 @@ public class WindmillReaderIteratorBaseTest { private static class TestWindmillReaderIterator extends WindmillReaderIteratorBase { protected TestWindmillReaderIterator( - Windmill.WorkItem work, ValueProvider skipUndecodableElements) { - super(work, skipUndecodableElements); + StreamingModeExecutionContext context, ValueProvider skipUndecodableElements) { + super(context, skipUndecodableElements); } @Override @@ -81,6 +86,51 @@ public void testSkipErrors() throws IOException { testForMessageBundleCounts(true, 0, 0, 1, 3, 0, 1, 0, 0, 0, 0); } + @Test + public void testWorkItemCancelledException() throws IOException { + StreamingModeExecutionContext mockContext = mock(StreamingModeExecutionContext.class); + when(mockContext.workIsFailed()).thenReturn(true); + Windmill.WorkItem workItem = + Windmill.WorkItem.newBuilder().setKey(ByteString.EMPTY).setWorkToken(0L).build(); + when(mockContext.getWorkItem()).thenReturn(workItem); + + try (TestWindmillReaderIterator iter = + new TestWindmillReaderIterator(mockContext, ValueProvider.StaticValueProvider.of(false))) { + iter.start(); + fail("Expected WorkItemCancelledException"); + } catch (WorkItemCancelledException e) { + // Expected + } + } + + @Test + public void testFinishKeyCalled() throws Exception { + StreamingModeExecutionContext mockContext = mock(StreamingModeExecutionContext.class); + when(mockContext.workIsFailed()).thenReturn(false); + Windmill.WorkItem workItem = + Windmill.WorkItem.newBuilder() + .setKey(ByteString.EMPTY) + .setWorkToken(0L) + .addMessageBundles( + Windmill.InputMessageBundle.newBuilder() + .setSourceComputationId("foo") + .addMessages( + Windmill.Message.newBuilder() + .setTimestamp(0) + .setData(ByteString.EMPTY) + .build()) + .build()) + .build(); + when(mockContext.getWorkItem()).thenReturn(workItem); + + try (TestWindmillReaderIterator iter = + new TestWindmillReaderIterator(mockContext, ValueProvider.StaticValueProvider.of(false))) { + assertTrue(iter.start()); + assertFalse(iter.advance()); // This should trigger finishKey + verify(mockContext).finishKey(); + } + } + private void testForMessageBundleCounts(int... messageBundleCounts) throws IOException { testForMessageBundleCounts(false, messageBundleCounts); } @@ -111,9 +161,13 @@ private void testForMessageBundleCounts(boolean skipErrors, int... messageBundle .setWorkToken(0L) .addAllMessageBundles(bundles) .build(); + + StreamingModeExecutionContext mockContext = mock(StreamingModeExecutionContext.class); + when(mockContext.getWorkItem()).thenReturn(workItem); + try (TestWindmillReaderIterator iter = new TestWindmillReaderIterator( - workItem, ValueProvider.StaticValueProvider.of(skipErrors))) { + mockContext, ValueProvider.StaticValueProvider.of(skipErrors))) { List actual = ReaderTestUtils.windowedValuesToValues( ReaderUtils.readRemainingFromIterator(iter, false)); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java index ce8ad32f71aa..5dfd65b46123 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java @@ -97,6 +97,7 @@ import org.apache.beam.runners.dataflow.worker.testing.TestCountingSource; import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader; import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.NativeReaderIterator; +import org.apache.beam.runners.dataflow.worker.util.common.worker.WorkExecutor; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.FakeGetDataClient; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache; @@ -643,7 +644,8 @@ public void testReadUnboundedReader() throws Exception { Watermarks.builder().setInputDataWatermark(new Instant(0)).build()), mock(WindmillStateReader.class), mock(SideInputStateFetcher.class), - Windmill.WorkItemCommitRequest.newBuilder()); + Windmill.WorkItemCommitRequest.newBuilder(), + mock(WorkExecutor.class)); @SuppressWarnings({"unchecked", "rawtypes"}) NativeReader>>> reader = @@ -1023,7 +1025,8 @@ public void testFailedWorkItemsAbort() throws Exception { dummyWork, mock(WindmillStateReader.class), mock(SideInputStateFetcher.class), - Windmill.WorkItemCommitRequest.newBuilder()); + Windmill.WorkItemCommitRequest.newBuilder(), + mock(WorkExecutor.class)); @SuppressWarnings({"unchecked", "rawtypes"}) NativeReader>>> reader = diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ExecutorTestUtils.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ExecutorTestUtils.java index 2c35f4bf99db..d5e3b9c87139 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ExecutorTestUtils.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ExecutorTestUtils.java @@ -59,6 +59,9 @@ private static OutputReceiver[] createOutputReceivers(int numOutputs, CounterSet } return receivers; } + + @Override + public void finishKey() throws Exception {} } /** A {@code Reader} that yields a specified set of values. */ diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java index 188466a50572..5d8f8eebb6f6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java @@ -100,6 +100,9 @@ public void abort() throws Exception { aborted = true; super.abort(); } + + @Override + public void finishKey() throws Exception {} } // A mock ReadOperation fed to a MapTaskExecutor in test. @@ -309,6 +312,9 @@ public void start() throws Exception { Metrics.counter("TestMetric", "MetricCounter").inc(1L); } } + + @Override + public void finishKey() throws Exception {} }, new Operation(new OutputReceiver[] {}, context2) { @Override @@ -318,6 +324,9 @@ public void start() throws Exception { Metrics.counter("TestMetric", "MetricCounter").inc(2L); } } + + @Override + public void finishKey() throws Exception {} }, new Operation(new OutputReceiver[] {}, context3) { @Override @@ -327,6 +336,9 @@ public void start() throws Exception { Metrics.counter("TestMetric", "MetricCounter").inc(3L); } } + + @Override + public void finishKey() throws Exception {} }); assertEquals(TimeUnit.MINUTES.toMillis(10), stateTracker.getNextBundleLullDurationReportMs()); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperationTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperationTest.java index ba327f92cc44..5d058b1968cb 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperationTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperationTest.java @@ -104,6 +104,7 @@ public void testRunParDoOperation() throws Exception { parDoOperation.process(""); parDoOperation.process("bob"); + parDoOperation.finishKey(); parDoOperation.finish(); parDoOperation.abort(); @@ -147,6 +148,7 @@ public void testParDoOperationContext() throws Exception { operation.start(); operation.process("hello"); + operation.finishKey(); operation.finish(); InOrder inOrder = From a8e4b0eef083d3a7125947f9bd89be4a9f35b11d Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Mon, 11 May 2026 05:48:08 +0000 Subject: [PATCH 2/9] Update empty work item iterator --- .../runners/dataflow/worker/WindowingWindmillReader.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java index 7483f2d138a2..7108b4b7992d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java @@ -156,6 +156,11 @@ public NativeReaderIterator>> iterator() throw return new NativeReaderIterator>>() { @Override public boolean start() throws IOException { + try { + context.finishKey(); + } catch (Exception e) { + throw new RuntimeException(e); + } return false; } From 986b90331df73c0b29f41b75bb3b3381d3c956f7 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Mon, 11 May 2026 06:28:43 +0000 Subject: [PATCH 3/9] Check if finishKey is called before flushState --- .../dataflow/worker/StreamingModeExecutionContext.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index 0255d5c577ec..b5ced4400848 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -159,6 +159,7 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext activeReader; private @Nullable WorkExecutor workExecutor; + private boolean finishKeyCalled = false; public StreamingModeExecutionContext( CounterFactory counterFactory, @@ -248,6 +249,7 @@ public void start( this.key = key; this.work = work; this.workExecutor = workExecutor; + this.finishKeyCalled = false; this.computationKey = WindmillComputationKey.create(computationId, work.getShardedKey()); this.sideInputStateFetcher = sideInputStateFetcher; StreamingGlobalConfig config = globalConfigHandle.getConfig(); @@ -276,8 +278,10 @@ public void start( } public void finishKey() throws Exception { + checkState(!finishKeyCalled, "finishKey was already called"); checkNotNull(workExecutor, "workExecutor must be set before calling finishKey()"); workExecutor.finishKey(); + this.finishKeyCalled = true; } /** @@ -461,6 +465,7 @@ public void invalidateCache() { } public Map> flushState() { + checkState(finishKeyCalled, "finishKey must be called before flushState"); Map> callbacks = new HashMap<>(); for (StepContext stepContext : getAllStepContexts()) { From 107517b169c5c4aeb36a6376a03d33a96b8b2c41 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Mon, 11 May 2026 07:25:35 +0000 Subject: [PATCH 4/9] Fix tests --- .../dataflow/worker/StreamingModeExecutionContextTest.java | 3 ++- .../beam/runners/dataflow/worker/WorkerCustomSourcesTest.java | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java index 970a9a537c0d..850c0988ac8a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java @@ -154,7 +154,7 @@ COMPUTATION_ID, new FakeGetDataClient(), ignored -> {}, mock(HeartbeatSender.cla } @Test - public void testTimerInternalsSetTimer() { + public void testTimerInternalsSetTimer() throws Exception { Windmill.WorkItemCommitRequest.Builder outputBuilder = Windmill.WorkItemCommitRequest.newBuilder(); NameContext nameContext = NameContextsForTests.nameContextForTest(); @@ -182,6 +182,7 @@ public void testTimerInternalsSetTimer() { new Instant(5000), TimeDomain.EVENT_TIME, CausedByDrain.NORMAL)); + executionContext.finishKey(); executionContext.flushState(); Windmill.Timer timer = outputBuilder.buildPartial().getOutputTimers(0); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java index 5dfd65b46123..bbc8bed7187e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java @@ -680,6 +680,7 @@ public void testReadUnboundedReader() throws Exception { numReadOnThisIteration, lessThanOrEqualTo(debugOptions.getUnboundedReaderMaxElements())); // Extract and verify state modifications. + context.finishKey(); context.flushState(); state = context.getOutputBuilder().getSourceStateUpdates().getState(); // CountingSource's watermark is the last record + 1. i is now one past the last record, From 9e83f33a4a620d4c947e5a1459d8e7c819654d3c Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Mon, 11 May 2026 08:01:57 +0000 Subject: [PATCH 5/9] Call finishKey from WorkerCustomSources --- .../worker/StreamingModeExecutionContext.java | 8 ++++++-- .../dataflow/worker/WindmillReaderIteratorBase.java | 6 +----- .../dataflow/worker/WindowingWindmillReader.java | 12 ++---------- .../runners/dataflow/worker/WorkerCustomSources.java | 9 +++++++-- 4 files changed, 16 insertions(+), 19 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index b5ced4400848..1ff2c1bc4a1c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -277,10 +277,14 @@ public void start( } } - public void finishKey() throws Exception { + public void finishKey() { checkState(!finishKeyCalled, "finishKey was already called"); checkNotNull(workExecutor, "workExecutor must be set before calling finishKey()"); - workExecutor.finishKey(); + try { + workExecutor.finishKey(); + } catch (Exception e) { + throw new RuntimeException(e); + } this.finishKeyCalled = true; } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java index b8232ca9f61a..075a1a8a4250 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java @@ -63,11 +63,7 @@ public boolean advance() throws IOException { while (true) { if (bundleIndex >= work.getMessageBundlesCount()) { current = null; - try { - context.finishKey(); - } catch (Exception e) { - throw new RuntimeException(e); - } + context.finishKey(); return false; } Windmill.InputMessageBundle bundle = work.getMessageBundles(bundleIndex); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java index 7108b4b7992d..488684769bd9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java @@ -156,11 +156,7 @@ public NativeReaderIterator>> iterator() throw return new NativeReaderIterator>>() { @Override public boolean start() throws IOException { - try { - context.finishKey(); - } catch (Exception e) { - throw new RuntimeException(e); - } + context.finishKey(); return false; } @@ -187,11 +183,7 @@ public boolean start() throws IOException { @Override public boolean advance() throws IOException { current = null; - try { - context.finishKey(); - } catch (Exception e) { - throw new RuntimeException(e); - } + context.finishKey(); return false; } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java index c00edffeaf95..29d5fb3561a1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java @@ -824,6 +824,7 @@ public boolean start() throws IOException { } try { if (!reader.start()) { + context.finishKey(); return false; } } catch (Exception e) { @@ -841,10 +842,13 @@ public boolean advance() throws IOException { // that there are regular checkpoints and that state does not become too large. BackOff backoff = backoffFactory.backoff(); while (true) { + if (context.workIsFailed()) { + throw new WorkItemCancelledException(context.getWorkItem().getShardingKey()); + } if (elemsRead >= maxElems || Instant.now().isAfter(endTime) - || context.isSinkFullHintSet() - || context.workIsFailed()) { + || context.isSinkFullHintSet()) { + context.finishKey(); return false; } try { @@ -857,6 +861,7 @@ public boolean advance() throws IOException { } long nextBackoff = backoff.nextBackOffMillis(); if (nextBackoff == BackOff.STOP) { + context.finishKey(); return false; } Uninterruptibles.sleepUninterruptibly(nextBackoff, TimeUnit.MILLISECONDS); From d5ef9e5909d2991d878d7b778deb0c01b1aba560 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Mon, 11 May 2026 19:39:02 +0000 Subject: [PATCH 6/9] Fix tests --- .../worker/WorkerCustomSourcesTest.java | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java index bbc8bed7187e..d5cf2948d928 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java @@ -680,7 +680,6 @@ public void testReadUnboundedReader() throws Exception { numReadOnThisIteration, lessThanOrEqualTo(debugOptions.getUnboundedReaderMaxElements())); // Extract and verify state modifications. - context.finishKey(); context.flushState(); state = context.getOutputBuilder().getSourceStateUpdates().getState(); // CountingSource's watermark is the last record + 1. i is now one past the last record, @@ -1042,14 +1041,19 @@ public void testFailedWorkItemsAbort() throws Exception { NativeReaderIterator>>> readerIterator = reader.iterator(); int numReads = 0; - while ((numReads == 0) ? readerIterator.start() : readerIterator.advance()) { - WindowedValue>> value = readerIterator.getCurrent(); - assertEquals(KV.of(0, numReads), value.getValue().getValue()); - numReads++; - // Fail the work item after reading two elements. - if (numReads == 2) { - dummyWork.setFailed(); + try { + while ((numReads == 0) ? readerIterator.start() : readerIterator.advance()) { + WindowedValue>> value = readerIterator.getCurrent(); + assertEquals(KV.of(0, numReads), value.getValue().getValue()); + numReads++; + // Fail the work item after reading two elements. + if (numReads == 2) { + dummyWork.setFailed(); + } } + fail("Expected WorkItemCancelledException"); + } catch (WorkItemCancelledException e) { + // Expected } assertThat(numReads, equalTo(2)); } From 08a7f7df50e6bdd62108bfd5b5828d39df20ddeb Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Tue, 12 May 2026 01:29:32 +0000 Subject: [PATCH 7/9] Add DoFnRunner::finishKey method. In upcoming changes there'll be multiple dataflow streaming work items in a single beam bundle. With multiple work items, we've to process elements and timers of each work item before moving to the next work items. The new finishKey method allows the DoFnRunners to cleanup/persist state (that should not be carried over) before switching work items on multi key bundles. Streaming SideInputDoFnRunners are the only classes using the finishKey method right now. The finishKey() is not exposed to DoFns and is not visible in user apis. --- .../java/org/apache/beam/runners/core/DoFnRunner.java | 6 ++++++ .../beam/runners/core/LateDataDroppingDoFnRunner.java | 5 +++++ .../apache/beam/runners/core/SimpleDoFnRunner.java | 5 +++++ .../apache/beam/runners/core/StatefulDoFnRunner.java | 5 +++++ .../core/SimplePushbackSideInputDoFnRunnerTest.java | 3 +++ .../dataflow/worker/AssignWindowsParDoFnFactory.java | 5 +++++ .../dataflow/worker/BatchModeUngroupingParDoFn.java | 5 +++++ .../CreateIsmShardKeyAndSortKeyDoFnFactory.java | 5 +++++ .../dataflow/worker/DataflowProcessFnRunner.java | 5 +++++ .../runners/dataflow/worker/ForwardingParDoFn.java | 5 +++++ .../dataflow/worker/GroupAlsoByWindowFnRunner.java | 3 +++ .../dataflow/worker/GroupAlsoByWindowsParDoFn.java | 7 +++++++ .../worker/PairWithConstantKeyDoFnFactory.java | 5 +++++ .../dataflow/worker/PartialGroupByKeyParDoFns.java | 11 ++++++++++- .../ReifyTimestampAndWindowsParDoFnFactory.java | 5 +++++ .../beam/runners/dataflow/worker/SimpleParDoFn.java | 7 +++++++ .../StreamingKeyedWorkItemSideInputDoFnRunner.java | 5 +++++ .../worker/StreamingPCollectionViewWriterParDoFn.java | 5 +++++ .../dataflow/worker/StreamingSideInputDoFnRunner.java | 5 +++++ .../worker/ToIsmRecordForMultimapDoFnFactory.java | 5 +++++ .../runners/dataflow/worker/ValuesDoFnFactory.java | 5 +++++ .../dataflow/worker/util/common/worker/ParDoFn.java | 2 ++ .../worker/util/common/worker/ParDoOperation.java | 1 + .../common/worker/SimplePartialGroupByKeyParDoFn.java | 5 +++++ .../dataflow/worker/IntrinsicMapTaskExecutorTest.java | 3 +++ .../util/common/worker/MapTaskExecutorTest.java | 3 +++ .../worker/util/common/worker/ParDoOperationTest.java | 3 +++ 27 files changed, 128 insertions(+), 1 deletion(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java index 500aedecb5d6..0f747a106d2b 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java @@ -63,6 +63,12 @@ public interface DoFnRunner void onWindowExpiration( BoundedWindow window, Instant timestamp, KeyT key); + /** + * Performs per-key cleanup or processing after all elements and timers for a key have been + * processed. + */ + void finishKey(); + /** * Returns the underlying fn instance. * diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java index 2ce94533d9e2..c3469a464e70 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java @@ -101,6 +101,11 @@ public void finishBundle() { doFnRunner.finishBundle(); } + @Override + public void finishKey() { + doFnRunner.finishKey(); + } + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) { doFnRunner.onWindowExpiration(window, timestamp, key); diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index d553a7be2d44..4ce5d128d09d 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -229,6 +229,11 @@ public void finishBundle() { } } + @Override + public void finishKey() { + // Do nothing by default. + } + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) { invoker.invokeOnWindowExpiration( diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java index 779138834669..b42abcd9c27c 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java @@ -131,6 +131,11 @@ public void finishBundle() { doFnRunner.finishBundle(); } + @Override + public void finishKey() { + doFnRunner.finishKey(); + } + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) { doFnRunner.onWindowExpiration(window, timestamp, key); diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java index 1ae937b7a836..7d6bec47f93f 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java @@ -379,6 +379,9 @@ public void finishBundle() { finished = true; } + @Override + public void finishKey() {} + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {} } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/AssignWindowsParDoFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/AssignWindowsParDoFnFactory.java index 83cbc3aa62c7..730cc2cf65f4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/AssignWindowsParDoFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/AssignWindowsParDoFnFactory.java @@ -119,6 +119,11 @@ public void processTimers() throws Exception { // Nothing. } + @Override + public void finishKey() throws Exception { + // Nothing. + } + @Override public void finishBundle() throws Exception { receiver = null; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeUngroupingParDoFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeUngroupingParDoFn.java index 8e2b325b580a..a03538a49c1c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeUngroupingParDoFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeUngroupingParDoFn.java @@ -73,6 +73,11 @@ public void processTimers() throws Exception { // The timers for the underlying ParDoFn are processed at the end of each element } + @Override + public void finishKey() throws Exception { + // Nothing. + } + @Override public void finishBundle() throws Exception { underlyingParDoFn.finishBundle(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/CreateIsmShardKeyAndSortKeyDoFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/CreateIsmShardKeyAndSortKeyDoFnFactory.java index bd991560c186..51cec7429a14 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/CreateIsmShardKeyAndSortKeyDoFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/CreateIsmShardKeyAndSortKeyDoFnFactory.java @@ -114,6 +114,11 @@ public void processElement(Object untypedElem) throws Exception { @Override public void processTimers() {} + @Override + public void finishKey() throws Exception { + // Nothing. + } + @Override public void finishBundle() {} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java index ec1fcd6c8432..a95349b3d076 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java @@ -128,6 +128,11 @@ public void finishBundle() { simpleRunner.finishBundle(); } + @Override + public void finishKey() { + simpleRunner.finishKey(); + } + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) { simpleRunner.onWindowExpiration(window, timestamp, key); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ForwardingParDoFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ForwardingParDoFn.java index d55181559322..9a15a3a02004 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ForwardingParDoFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ForwardingParDoFn.java @@ -47,6 +47,11 @@ public void processTimers() throws Exception { delegate.processTimers(); } + @Override + public void finishKey() throws Exception { + delegate.finishKey(); + } + @Override public void finishBundle() throws Exception { delegate.finishBundle(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java index 4845bb0c98e4..61913ecf325b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java @@ -102,6 +102,9 @@ private void invokeProcessElement(WindowedValue elem) { @Override public void finishBundle() {} + @Override + public void finishKey() {} + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowsParDoFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowsParDoFn.java index 882dd497e3f5..663fe00a7628 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowsParDoFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowsParDoFn.java @@ -142,6 +142,13 @@ public void processTimers() throws Exception { // it here to build a KeyedWorkItem } + @Override + public void finishKey() throws Exception { + if (fnRunner != null) { + fnRunner.finishKey(); + } + } + @Override public void finishBundle() throws Exception { checkState(fnRunner != null); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PairWithConstantKeyDoFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PairWithConstantKeyDoFnFactory.java index 6951e3a95b20..514cc3c8afd2 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PairWithConstantKeyDoFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PairWithConstantKeyDoFnFactory.java @@ -98,6 +98,11 @@ public void processElement(Object untypedElem) throws Exception { @Override public void processTimers() {} + @Override + public void finishKey() throws Exception { + // Nothing. + } + @Override public void finishBundle() {} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PartialGroupByKeyParDoFns.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PartialGroupByKeyParDoFns.java index 399258d7dbb9..82e7360f0c10 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PartialGroupByKeyParDoFns.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PartialGroupByKeyParDoFns.java @@ -317,6 +317,11 @@ public void processElement(Object elem) throws Exception { @Override public void processTimers() {} + @Override + public void finishKey() throws Exception { + // Nothing. + } + @Override public void finishBundle() throws Exception { groupingTable.flush(receiver); @@ -377,10 +382,14 @@ public void processElement(Object elem) throws Exception { @Override public void processTimers() {} + @Override + public void finishKey() throws Exception { + sideInputFetcher.persist(); + } + @Override public void finishBundle() throws Exception { groupingTable.flush(receiver); - sideInputFetcher.persist(); } @Override diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReifyTimestampAndWindowsParDoFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReifyTimestampAndWindowsParDoFnFactory.java index 746c09404f6e..0232f83dacf3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReifyTimestampAndWindowsParDoFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReifyTimestampAndWindowsParDoFnFactory.java @@ -86,6 +86,11 @@ public void processElement(Object untypedElem) throws Exception { @Override public void processTimers() {} + @Override + public void finishKey() throws Exception { + // Nothing. + } + @Override public void finishBundle() throws Exception { this.receiver = null; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java index 434d46c20a5b..1fca7c4b9ec3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java @@ -363,6 +363,13 @@ public void processTimers() throws Exception { processTimers(TimerType.SYSTEM, stepContext, windowCoder); } + @Override + public void finishKey() throws Exception { + if (fnRunner != null) { + fnRunner.finishKey(); + } + } + private void processUserTimer(TimerData timer) throws Exception { if (fnSignature.timerDeclarations().containsKey(timer.getTimerId()) || fnSignature.timerFamilyDeclarations().containsKey(timer.getTimerFamilyId())) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java index 0b9ccd1f37c6..72c3f84ced83 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java @@ -152,6 +152,11 @@ public void onTimer( @Override public void finishBundle() { simpleDoFnRunner.finishBundle(); + } + + @Override + public void finishKey() { + simpleDoFnRunner.finishKey(); sideInputFetcher.persist(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterParDoFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterParDoFn.java index 61730b0c8d88..18843a35348e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterParDoFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterParDoFn.java @@ -73,6 +73,11 @@ public void processElement(Object element) throws Exception { @Override public void processTimers() {} + @Override + public void finishKey() throws Exception { + // Nothing. + } + @Override public void finishBundle() throws Exception {} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java index 3b7891c5378d..9f88b6a0988a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java @@ -94,6 +94,11 @@ public void onTimer( @Override public void finishBundle() { simpleDoFnRunner.finishBundle(); + } + + @Override + public void finishKey() { + simpleDoFnRunner.finishKey(); sideInputFetcher.persist(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ToIsmRecordForMultimapDoFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ToIsmRecordForMultimapDoFnFactory.java index f9e2d6de2461..48c2007f12f4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ToIsmRecordForMultimapDoFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ToIsmRecordForMultimapDoFnFactory.java @@ -149,6 +149,11 @@ public void processElement(Object untypedElem) throws Exception { @Override public void processTimers() {} + @Override + public void finishKey() throws Exception { + // Nothing. + } + @Override public void finishBundle() {} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ValuesDoFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ValuesDoFnFactory.java index 3ddb3c2003db..012ebee1c661 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ValuesDoFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ValuesDoFnFactory.java @@ -80,6 +80,11 @@ public void processElement(Object untypedElem) throws Exception { @Override public void processTimers() {} + @Override + public void finishKey() throws Exception { + // Nothing. + } + @Override public void finishBundle() {} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoFn.java index 84dbbd627b08..eb3393cd4602 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoFn.java @@ -30,6 +30,8 @@ public interface ParDoFn { void processTimers() throws Exception; + void finishKey() throws Exception; + void finishBundle() throws Exception; void abort() throws Exception; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperation.java index 5aec82073366..d64eba5dc98f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperation.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperation.java @@ -52,6 +52,7 @@ public void finishKey() throws Exception { try (Closeable scope = context.enterProcessTimers()) { checkStarted(); fn.processTimers(); + fn.finishKey(); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/SimplePartialGroupByKeyParDoFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/SimplePartialGroupByKeyParDoFn.java index 7a9fcfdf0694..145693a74885 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/SimplePartialGroupByKeyParDoFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/SimplePartialGroupByKeyParDoFn.java @@ -39,6 +39,11 @@ public void processElement(Object elem) throws Exception { @Override public void processTimers() {} + @Override + public void finishKey() throws Exception { + // Nothing. + } + @Override public void finishBundle() throws Exception { groupingTable.flush(receiver); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorTest.java index 396c8db87e6b..259b699dc383 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorTest.java @@ -220,6 +220,9 @@ public void finishBundle() {} @Override public void abort() {} + + @Override + public void finishKey() throws Exception {} } /** Verify counts for the per-element-output-time counter are correct. */ diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java index 5d8f8eebb6f6..649b41483706 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java @@ -216,6 +216,9 @@ public void finishBundle() {} @Override public void abort() {} + + @Override + public void finishKey() throws Exception {} } /** Verify counts for the per-element-output-time counter are correct. */ diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperationTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperationTest.java index 5d058b1968cb..9819f6e957a1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperationTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperationTest.java @@ -85,6 +85,9 @@ public void finishBundle() throws Exception { public void abort() throws Exception { outputReceiver.process("a-aborted"); } + + @Override + public void finishKey() throws Exception {} } @Test From e3f45c7f29329f827b4bb7fd5cc56b4e2ec29152 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Tue, 12 May 2026 03:03:21 +0000 Subject: [PATCH 8/9] Fix build --- .../java/org/apache/beam/runners/core/SimpleDoFnRunner.java | 4 +--- .../runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java | 3 +++ .../wrappers/streaming/ExecutableStageDoFnOperator.java | 3 +++ .../wrappers/streaming/stableinput/BufferingDoFnRunner.java | 3 +++ .../translation/wrappers/streaming/DoFnOperatorTest.java | 3 +++ .../runners/dataflow/worker/AssignWindowsParDoFnFactory.java | 4 +--- .../runners/dataflow/worker/BatchModeUngroupingParDoFn.java | 4 +--- .../worker/CreateIsmShardKeyAndSortKeyDoFnFactory.java | 4 +--- .../dataflow/worker/PairWithConstantKeyDoFnFactory.java | 4 +--- .../runners/dataflow/worker/PartialGroupByKeyParDoFns.java | 4 +--- .../worker/ReifyTimestampAndWindowsParDoFnFactory.java | 4 +--- .../worker/StreamingPCollectionViewWriterParDoFn.java | 4 +--- .../dataflow/worker/ToIsmRecordForMultimapDoFnFactory.java | 4 +--- .../beam/runners/dataflow/worker/ValuesDoFnFactory.java | 4 +--- .../util/common/worker/SimplePartialGroupByKeyParDoFn.java | 4 +--- .../dataflow/worker/StreamingSideInputDoFnRunnerTest.java | 4 ++++ .../translation/batch/DoFnRunnerFactory.java | 3 +++ .../translation/batch/DoFnRunnerWithMetrics.java | 3 +++ .../beam/runners/spark/translation/DoFnRunnerWithMetrics.java | 3 +++ .../spark/translation/SparkInputDataProcessorTest.java | 3 +++ 20 files changed, 39 insertions(+), 33 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 4ce5d128d09d..b5ba4c158592 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -230,9 +230,7 @@ public void finishBundle() { } @Override - public void finishKey() { - // Do nothing by default. - } + public void finishKey() {} @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java index 56e077253ae6..c70d5fb622c1 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java @@ -105,6 +105,9 @@ public void finishBundle() { container.updateMetrics(stepName); } + @Override + public void finishKey() {} + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) { delegate.onWindowExpiration(window, timestamp, key); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java index 4ebb359fceae..32dbcdcdb7da 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java @@ -1064,6 +1064,9 @@ public void finishBundle() { } } + @Override + public void finishKey() {} + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java index 73b20238ef05..5388cb5febd7 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java @@ -255,6 +255,9 @@ public void finishBundle() { Optional.ofNullable(finishBundleCallback).ifPresent(Runnable::run); } + @Override + public void finishKey() {} + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {} diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java index 5c4975ffab01..5a23468f404c 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java @@ -523,6 +523,9 @@ public void finishBundle() { wrappedRunner.finishBundle(); } + @Override + public void finishKey() {} + @Override public void onWindowExpiration( BoundedWindow window, Instant timestamp, KeyT key) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/AssignWindowsParDoFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/AssignWindowsParDoFnFactory.java index 730cc2cf65f4..6319997f5546 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/AssignWindowsParDoFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/AssignWindowsParDoFnFactory.java @@ -120,9 +120,7 @@ public void processTimers() throws Exception { } @Override - public void finishKey() throws Exception { - // Nothing. - } + public void finishKey() throws Exception {} @Override public void finishBundle() throws Exception { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeUngroupingParDoFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeUngroupingParDoFn.java index a03538a49c1c..3bf1ac30837b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeUngroupingParDoFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeUngroupingParDoFn.java @@ -74,9 +74,7 @@ public void processTimers() throws Exception { } @Override - public void finishKey() throws Exception { - // Nothing. - } + public void finishKey() throws Exception {} @Override public void finishBundle() throws Exception { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/CreateIsmShardKeyAndSortKeyDoFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/CreateIsmShardKeyAndSortKeyDoFnFactory.java index 51cec7429a14..c3aec27ab14f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/CreateIsmShardKeyAndSortKeyDoFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/CreateIsmShardKeyAndSortKeyDoFnFactory.java @@ -115,9 +115,7 @@ public void processElement(Object untypedElem) throws Exception { public void processTimers() {} @Override - public void finishKey() throws Exception { - // Nothing. - } + public void finishKey() throws Exception {} @Override public void finishBundle() {} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PairWithConstantKeyDoFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PairWithConstantKeyDoFnFactory.java index 514cc3c8afd2..e744c1de0cea 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PairWithConstantKeyDoFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PairWithConstantKeyDoFnFactory.java @@ -99,9 +99,7 @@ public void processElement(Object untypedElem) throws Exception { public void processTimers() {} @Override - public void finishKey() throws Exception { - // Nothing. - } + public void finishKey() throws Exception {} @Override public void finishBundle() {} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PartialGroupByKeyParDoFns.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PartialGroupByKeyParDoFns.java index 82e7360f0c10..75e830e713ab 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PartialGroupByKeyParDoFns.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PartialGroupByKeyParDoFns.java @@ -318,9 +318,7 @@ public void processElement(Object elem) throws Exception { public void processTimers() {} @Override - public void finishKey() throws Exception { - // Nothing. - } + public void finishKey() throws Exception {} @Override public void finishBundle() throws Exception { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReifyTimestampAndWindowsParDoFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReifyTimestampAndWindowsParDoFnFactory.java index 0232f83dacf3..a681cb78fe8f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReifyTimestampAndWindowsParDoFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReifyTimestampAndWindowsParDoFnFactory.java @@ -87,9 +87,7 @@ public void processElement(Object untypedElem) throws Exception { public void processTimers() {} @Override - public void finishKey() throws Exception { - // Nothing. - } + public void finishKey() throws Exception {} @Override public void finishBundle() throws Exception { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterParDoFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterParDoFn.java index 18843a35348e..6afc0dd9b1b6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterParDoFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterParDoFn.java @@ -74,9 +74,7 @@ public void processElement(Object element) throws Exception { public void processTimers() {} @Override - public void finishKey() throws Exception { - // Nothing. - } + public void finishKey() throws Exception {} @Override public void finishBundle() throws Exception {} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ToIsmRecordForMultimapDoFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ToIsmRecordForMultimapDoFnFactory.java index 48c2007f12f4..18901fbc4073 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ToIsmRecordForMultimapDoFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ToIsmRecordForMultimapDoFnFactory.java @@ -150,9 +150,7 @@ public void processElement(Object untypedElem) throws Exception { public void processTimers() {} @Override - public void finishKey() throws Exception { - // Nothing. - } + public void finishKey() throws Exception {} @Override public void finishBundle() {} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ValuesDoFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ValuesDoFnFactory.java index 012ebee1c661..9b469adc3a28 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ValuesDoFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ValuesDoFnFactory.java @@ -81,9 +81,7 @@ public void processElement(Object untypedElem) throws Exception { public void processTimers() {} @Override - public void finishKey() throws Exception { - // Nothing. - } + public void finishKey() throws Exception {} @Override public void finishBundle() {} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/SimplePartialGroupByKeyParDoFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/SimplePartialGroupByKeyParDoFn.java index 145693a74885..455dd8278c97 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/SimplePartialGroupByKeyParDoFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/SimplePartialGroupByKeyParDoFn.java @@ -40,9 +40,7 @@ public void processElement(Object elem) throws Exception { public void processTimers() {} @Override - public void finishKey() throws Exception { - // Nothing. - } + public void finishKey() throws Exception {} @Override public void finishBundle() throws Exception { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunnerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunnerTest.java index d18bc512723e..45bbc831a2ae 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunnerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunnerTest.java @@ -150,6 +150,7 @@ public void testSideInputNotReady() throws Exception { runner.startBundle(); runner.processElement(createDatum("e", 0)); + runner.finishKey(); runner.finishBundle(); assertTrue(outputManager.getOutput(mainOutputTag).isEmpty()); @@ -214,6 +215,7 @@ public void testMultipleWindowsNotReady() throws Exception { runner.startBundle(); runner.processElement(elem); + runner.finishKey(); runner.finishBundle(); assertTrue(outputManager.getOutput(mainOutputTag).isEmpty()); @@ -317,6 +319,7 @@ public void testSideInputNotification() throws Exception { when(mockSideInputReader.get(eq(view), any(BoundedWindow.class))).thenReturn("data"); runner.startBundle(); + runner.finishKey(); runner.finishBundle(); assertThat(outputManager.getOutput(mainOutputTag), contains(createDatum("e:data", 0))); @@ -373,6 +376,7 @@ public void testMultipleSideInputs() throws Exception { runner.startBundle(); runner.processElement(createDatum("e2", 2)); + runner.finishKey(); runner.finishBundle(); assertThat( diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.java index 99ce3dc69889..cf514b4ca499 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.java @@ -284,6 +284,9 @@ public void finishBundle() { } } + @Override + public void finishKey() {} + @Override public DoFn getFn() { throw new UnsupportedOperationException(); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java index 28dbf44cb8fe..543042098b63 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java @@ -104,6 +104,9 @@ public void finishBundle() { } } + @Override + public void finishKey() {} + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) { delegate.onWindowExpiration(window, timestamp, key); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java index c8cd7eb5f262..5dbfe5b096a7 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java @@ -103,6 +103,9 @@ public void finishBundle() { } } + @Override + public void finishKey() {} + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) { delegate.onWindowExpiration(window, timestamp, key); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest.java index 2dc428d5a6b2..6a5e03e30ab8 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest.java @@ -259,6 +259,9 @@ public void onTimer( @Override public void finishBundle() {} + @Override + public void finishKey() {} + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {} From 5085977a4e6ef5e78f0630fd619be660ab541bcd Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Tue, 12 May 2026 05:20:10 +0000 Subject: [PATCH 9/9] run test --- ...beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json index 50d17c108f2e..e1b083e439cc 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run!", - "modification": 2, + "modification": 3, }