diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json index e9d869cc508c..8144784f5f02 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run!", - "modification": 1, + "modification": 2, } diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json index e623d3373a93..e1b083e439cc 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run!", - "modification": 1, + "modification": 3, } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java index 500aedecb5d6..0f747a106d2b 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java @@ -63,6 +63,12 @@ public interface DoFnRunner void onWindowExpiration( BoundedWindow window, Instant timestamp, KeyT key); + /** + * Performs per-key cleanup or processing after all elements and timers for a key have been + * processed. + */ + void finishKey(); + /** * Returns the underlying fn instance. * diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java index 41052a76f13e..f95de8c5ed15 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java @@ -101,6 +101,11 @@ public void finishBundle() { doFnRunner.finishBundle(); } + @Override + public void finishKey() { + doFnRunner.finishKey(); + } + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) { doFnRunner.onWindowExpiration(window, timestamp, key); diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index d553a7be2d44..b5ba4c158592 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -229,6 +229,9 @@ public void finishBundle() { } } + @Override + public void finishKey() {} + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) { invoker.invokeOnWindowExpiration( diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java index 779138834669..b42abcd9c27c 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java @@ -131,6 +131,11 @@ public void finishBundle() { doFnRunner.finishBundle(); } + @Override + public void finishKey() { + doFnRunner.finishKey(); + } + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) { doFnRunner.onWindowExpiration(window, timestamp, key); diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java index 1ae937b7a836..7d6bec47f93f 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java @@ -379,6 +379,9 @@ public void finishBundle() { finished = true; } + @Override + public void finishKey() {} + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {} } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java index 56e077253ae6..c70d5fb622c1 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java @@ -105,6 +105,9 @@ public void finishBundle() { container.updateMetrics(stepName); } + @Override + public void finishKey() {} + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) { delegate.onWindowExpiration(window, timestamp, key); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java index 4ebb359fceae..32dbcdcdb7da 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java @@ -1064,6 +1064,9 @@ public void finishBundle() { } } + @Override + public void finishKey() {} + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java index 73b20238ef05..5388cb5febd7 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java @@ -255,6 +255,9 @@ public void finishBundle() { Optional.ofNullable(finishBundleCallback).ifPresent(Runnable::run); } + @Override + public void finishKey() {} + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {} diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java index 5c4975ffab01..5a23468f404c 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java @@ -523,6 +523,9 @@ public void finishBundle() { wrappedRunner.finishBundle(); } + @Override + public void finishKey() {} + @Override public void onWindowExpiration( BoundedWindow window, Instant timestamp, KeyT key) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/AssignWindowsParDoFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/AssignWindowsParDoFnFactory.java index 83cbc3aa62c7..6319997f5546 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/AssignWindowsParDoFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/AssignWindowsParDoFnFactory.java @@ -119,6 +119,9 @@ public void processTimers() throws Exception { // Nothing. } + @Override + public void finishKey() throws Exception {} + @Override public void finishBundle() throws Exception { receiver = null; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeUngroupingParDoFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeUngroupingParDoFn.java index 8e2b325b580a..3bf1ac30837b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeUngroupingParDoFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeUngroupingParDoFn.java @@ -73,6 +73,9 @@ public void processTimers() throws Exception { // The timers for the underlying ParDoFn are processed at the end of each element } + @Override + public void finishKey() throws Exception {} + @Override public void finishBundle() throws Exception { underlyingParDoFn.finishBundle(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/CreateIsmShardKeyAndSortKeyDoFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/CreateIsmShardKeyAndSortKeyDoFnFactory.java index bd991560c186..c3aec27ab14f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/CreateIsmShardKeyAndSortKeyDoFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/CreateIsmShardKeyAndSortKeyDoFnFactory.java @@ -114,6 +114,9 @@ public void processElement(Object untypedElem) throws Exception { @Override public void processTimers() {} + @Override + public void finishKey() throws Exception {} + @Override public void finishBundle() {} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java index ec1fcd6c8432..a95349b3d076 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java @@ -128,6 +128,11 @@ public void finishBundle() { simpleRunner.finishBundle(); } + @Override + public void finishKey() { + simpleRunner.finishKey(); + } + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) { simpleRunner.onWindowExpiration(window, timestamp, key); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ForwardingParDoFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ForwardingParDoFn.java index d55181559322..9a15a3a02004 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ForwardingParDoFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ForwardingParDoFn.java @@ -47,6 +47,11 @@ public void processTimers() throws Exception { delegate.processTimers(); } + @Override + public void finishKey() throws Exception { + delegate.finishKey(); + } + @Override public void finishBundle() throws Exception { delegate.finishBundle(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java index 4845bb0c98e4..61913ecf325b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java @@ -102,6 +102,9 @@ private void invokeProcessElement(WindowedValue elem) { @Override public void finishBundle() {} + @Override + public void finishKey() {} + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowsParDoFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowsParDoFn.java index 882dd497e3f5..663fe00a7628 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowsParDoFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowsParDoFn.java @@ -142,6 +142,13 @@ public void processTimers() throws Exception { // it here to build a KeyedWorkItem } + @Override + public void finishKey() throws Exception { + if (fnRunner != null) { + fnRunner.finishKey(); + } + } + @Override public void finishBundle() throws Exception { checkState(fnRunner != null); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PairWithConstantKeyDoFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PairWithConstantKeyDoFnFactory.java index 6951e3a95b20..e744c1de0cea 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PairWithConstantKeyDoFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PairWithConstantKeyDoFnFactory.java @@ -98,6 +98,9 @@ public void processElement(Object untypedElem) throws Exception { @Override public void processTimers() {} + @Override + public void finishKey() throws Exception {} + @Override public void finishBundle() {} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PartialGroupByKeyParDoFns.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PartialGroupByKeyParDoFns.java index 399258d7dbb9..75e830e713ab 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PartialGroupByKeyParDoFns.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PartialGroupByKeyParDoFns.java @@ -317,6 +317,9 @@ public void processElement(Object elem) throws Exception { @Override public void processTimers() {} + @Override + public void finishKey() throws Exception {} + @Override public void finishBundle() throws Exception { groupingTable.flush(receiver); @@ -377,10 +380,14 @@ public void processElement(Object elem) throws Exception { @Override public void processTimers() {} + @Override + public void finishKey() throws Exception { + sideInputFetcher.persist(); + } + @Override public void finishBundle() throws Exception { groupingTable.flush(receiver); - sideInputFetcher.persist(); } @Override diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubReader.java index b60cb84415ff..0c80909a0f3b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubReader.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubReader.java @@ -117,20 +117,12 @@ public NativeReader create( @Override public NativeReaderIterator> iterator() throws IOException { - return new PubsubReaderIterator(context.getWorkItem()); + return new PubsubReaderIterator(); } class PubsubReaderIterator extends WindmillReaderIteratorBase { - protected PubsubReaderIterator(Windmill.WorkItem work) { - super(work, skipUndecodableElements); - } - - @Override - public boolean advance() throws IOException { - if (context.workIsFailed()) { - return false; - } - return super.advance(); + protected PubsubReaderIterator() { + super(context, skipUndecodableElements); } @Override diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReifyTimestampAndWindowsParDoFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReifyTimestampAndWindowsParDoFnFactory.java index 746c09404f6e..a681cb78fe8f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReifyTimestampAndWindowsParDoFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReifyTimestampAndWindowsParDoFnFactory.java @@ -86,6 +86,9 @@ public void processElement(Object untypedElem) throws Exception { @Override public void processTimers() {} + @Override + public void finishKey() throws Exception {} + @Override public void finishBundle() throws Exception { this.receiver = null; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java index 434d46c20a5b..1fca7c4b9ec3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java @@ -363,6 +363,13 @@ public void processTimers() throws Exception { processTimers(TimerType.SYSTEM, stepContext, windowCoder); } + @Override + public void finishKey() throws Exception { + if (fnRunner != null) { + fnRunner.finishKey(); + } + } + private void processUserTimer(TimerData timer) throws Exception { if (fnSignature.timerDeclarations().containsKey(timer.getTimerId()) || fnSignature.timerFamilyDeclarations().containsKey(timer.getTimerFamilyId())) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java index 0b9ccd1f37c6..72c3f84ced83 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java @@ -152,6 +152,11 @@ public void onTimer( @Override public void finishBundle() { simpleDoFnRunner.finishBundle(); + } + + @Override + public void finishKey() { + simpleDoFnRunner.finishKey(); sideInputFetcher.persist(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index f75d452b211b..1ff2c1bc4a1c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -56,6 +56,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInput; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputState; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher; +import org.apache.beam.runners.dataflow.worker.util.common.worker.WorkExecutor; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataId; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataRequest; @@ -157,6 +158,9 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext activeReader; + private @Nullable WorkExecutor workExecutor; + private boolean finishKeyCalled = false; + public StreamingModeExecutionContext( CounterFactory counterFactory, String computationId, @@ -240,9 +244,12 @@ public void start( Work work, WindmillStateReader stateReader, SideInputStateFetcher sideInputStateFetcher, - Windmill.WorkItemCommitRequest.Builder outputBuilder) { + Windmill.WorkItemCommitRequest.Builder outputBuilder, + WorkExecutor workExecutor) { this.key = key; this.work = work; + this.workExecutor = workExecutor; + this.finishKeyCalled = false; this.computationKey = WindmillComputationKey.create(computationId, work.getShardedKey()); this.sideInputStateFetcher = sideInputStateFetcher; StreamingGlobalConfig config = globalConfigHandle.getConfig(); @@ -270,6 +277,17 @@ public void start( } } + public void finishKey() { + checkState(!finishKeyCalled, "finishKey was already called"); + checkNotNull(workExecutor, "workExecutor must be set before calling finishKey()"); + try { + workExecutor.finishKey(); + } catch (Exception e) { + throw new RuntimeException(e); + } + this.finishKeyCalled = true; + } + /** * Ensure that the processing time is greater than any fired processing time timers. Otherwise, a * trigger could ignore the timer and orphan the window. @@ -451,6 +469,7 @@ public void invalidateCache() { } public Map> flushState() { + checkState(finishKeyCalled, "finishKey must be called before flushState"); Map> callbacks = new HashMap<>(); for (StepContext stepContext : getAllStepContexts()) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterParDoFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterParDoFn.java index 61730b0c8d88..6afc0dd9b1b6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterParDoFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterParDoFn.java @@ -73,6 +73,9 @@ public void processElement(Object element) throws Exception { @Override public void processTimers() {} + @Override + public void finishKey() throws Exception {} + @Override public void finishBundle() throws Exception {} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java index 3b7891c5378d..9f88b6a0988a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java @@ -94,6 +94,11 @@ public void onTimer( @Override public void finishBundle() { simpleDoFnRunner.finishBundle(); + } + + @Override + public void finishKey() { + simpleDoFnRunner.finishKey(); sideInputFetcher.persist(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ToIsmRecordForMultimapDoFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ToIsmRecordForMultimapDoFnFactory.java index f9e2d6de2461..18901fbc4073 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ToIsmRecordForMultimapDoFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ToIsmRecordForMultimapDoFnFactory.java @@ -149,6 +149,9 @@ public void processElement(Object untypedElem) throws Exception { @Override public void processTimers() {} + @Override + public void finishKey() throws Exception {} + @Override public void finishBundle() {} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java index 0b883c048462..5c41809384ff 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java @@ -111,20 +111,12 @@ public NativeReader create( @Override public NativeReaderIterator> iterator() throws IOException { - return new UngroupedWindmillReaderIterator(context.getWorkItem()); + return new UngroupedWindmillReaderIterator(); } class UngroupedWindmillReaderIterator extends WindmillReaderIteratorBase { - UngroupedWindmillReaderIterator(Windmill.WorkItem work) { - super(work, skipUndecodableElements); - } - - @Override - public boolean advance() throws IOException { - if (context.workIsFailed()) { - return false; - } - return super.advance(); + UngroupedWindmillReaderIterator() { + super(context, skipUndecodableElements); } @Override diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ValuesDoFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ValuesDoFnFactory.java index 3ddb3c2003db..9b469adc3a28 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ValuesDoFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ValuesDoFnFactory.java @@ -80,6 +80,9 @@ public void processElement(Object untypedElem) throws Exception { @Override public void processTimers() {} + @Override + public void finishKey() throws Exception {} + @Override public void finishBundle() {} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java index 7e6508a4788c..075a1a8a4250 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java @@ -34,6 +34,7 @@ */ public abstract class WindmillReaderIteratorBase extends NativeReader.NativeReaderIterator> { + private final StreamingModeExecutionContext context; private final Windmill.WorkItem work; private int bundleIndex = 0; private int messageIndex = -1; @@ -42,9 +43,10 @@ public abstract class WindmillReaderIteratorBase private static final Logger LOG = LoggerFactory.getLogger(WindmillReaderIteratorBase.class); protected WindmillReaderIteratorBase( - Windmill.WorkItem work, ValueProvider skipUndecodableElements) { + StreamingModeExecutionContext context, ValueProvider skipUndecodableElements) { + this.context = context; this.skipUndecodableElements = skipUndecodableElements; - this.work = work; + this.work = context.getWorkItem(); } @Override @@ -54,9 +56,14 @@ public boolean start() throws IOException { @Override public boolean advance() throws IOException { + if (context.workIsFailed()) { + throw new WorkItemCancelledException(context.getWorkItem().getShardingKey()); + } + while (true) { if (bundleIndex >= work.getMessageBundlesCount()) { current = null; + context.finishKey(); return false; } Windmill.InputMessageBundle bundle = work.getMessageBundles(bundleIndex); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java index 173b254f6395..488684769bd9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java @@ -156,6 +156,7 @@ public NativeReaderIterator>> iterator() throw return new NativeReaderIterator>>() { @Override public boolean start() throws IOException { + context.finishKey(); return false; } @@ -182,6 +183,7 @@ public boolean start() throws IOException { @Override public boolean advance() throws IOException { current = null; + context.finishKey(); return false; } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourceOperationExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourceOperationExecutor.java index 31528e96e07f..a1321d57ebb6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourceOperationExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourceOperationExecutor.java @@ -89,6 +89,9 @@ public void execute() throws Exception { LOG.debug("Source operation execution complete"); } + @Override + public void finishKey() throws Exception {} + @Override public SourceOperationResponse getResponse() { return response; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java index c00edffeaf95..29d5fb3561a1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java @@ -824,6 +824,7 @@ public boolean start() throws IOException { } try { if (!reader.start()) { + context.finishKey(); return false; } } catch (Exception e) { @@ -841,10 +842,13 @@ public boolean advance() throws IOException { // that there are regular checkpoints and that state does not become too large. BackOff backoff = backoffFactory.backoff(); while (true) { + if (context.workIsFailed()) { + throw new WorkItemCancelledException(context.getWorkItem().getShardingKey()); + } if (elemsRead >= maxElems || Instant.now().isAfter(endTime) - || context.isSinkFullHintSet() - || context.workIsFailed()) { + || context.isSinkFullHintSet()) { + context.finishKey(); return false; } try { @@ -857,6 +861,7 @@ public boolean advance() throws IOException { } long nextBackoff = backoff.nextBackOffMillis(); if (nextBackoff == BackOff.STOP) { + context.finishKey(); return false; } Uninterruptibles.sleepUninterruptibly(nextBackoff, TimeUnit.MILLISECONDS); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java index 8dc681fc640c..b4f3a22a7f52 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java @@ -74,7 +74,7 @@ public final void executeWork( SideInputStateFetcher sideInputStateFetcher, Windmill.WorkItemCommitRequest.Builder outputBuilder) throws Exception { - context().start(key, work, stateReader, sideInputStateFetcher, outputBuilder); + context().start(key, work, stateReader, sideInputStateFetcher, outputBuilder, workExecutor()); workExecutor().execute(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/FlattenOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/FlattenOperation.java index 4847e9f2ea9c..af1b2b9c48bd 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/FlattenOperation.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/FlattenOperation.java @@ -43,6 +43,9 @@ public void process(Object elem) throws Exception { } } + @Override + public void finishKey() throws Exception {} + @Override public boolean supportsRestart() { return true; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java index 58b95f286d55..3c33e1904069 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java @@ -112,6 +112,13 @@ public void execute() throws Exception { // TODO: support for success / failure ports? } + @Override + public void finishKey() throws Exception { + for (Operation op : operations) { + op.finishKey(); + } + } + @Override public NativeReader.Progress getWorkerProgress() throws Exception { return getReadOperation().getProgress(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/Operation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/Operation.java index b7b4e255cfa5..b630da33cfad 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/Operation.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/Operation.java @@ -137,6 +137,9 @@ public void finish() throws Exception { } } + /** Called when all elements for a specific key have been processed. */ + public abstract void finishKey() throws Exception; + /** Aborts this Operation's execution. */ public void abort() throws Exception { synchronized (initializationStateLock) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoFn.java index 84dbbd627b08..eb3393cd4602 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoFn.java @@ -30,6 +30,8 @@ public interface ParDoFn { void processTimers() throws Exception; + void finishKey() throws Exception; + void finishBundle() throws Exception; void abort() throws Exception; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperation.java index 27b6e9d1fb35..d64eba5dc98f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperation.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperation.java @@ -46,12 +46,18 @@ public void process(Object elem) throws Exception { } @Override - public void finish() throws Exception { + // Batch mode does not use this method and instead relies on BatchModeUngroupingParDoFn + // to process timers per key. + public void finishKey() throws Exception { try (Closeable scope = context.enterProcessTimers()) { checkStarted(); fn.processTimers(); + fn.finishKey(); } + } + @Override + public void finish() throws Exception { try (Closeable scope = context.enterFinish()) { fn.finishBundle(); super.finish(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ReadOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ReadOperation.java index d6b020483d4c..fabc8d6af25b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ReadOperation.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ReadOperation.java @@ -271,6 +271,9 @@ public void finish() throws Exception { } } + @Override + public void finishKey() throws Exception {} + @Override public void abort() throws Exception { if (readerIterator != null) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/SimplePartialGroupByKeyParDoFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/SimplePartialGroupByKeyParDoFn.java index 7a9fcfdf0694..455dd8278c97 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/SimplePartialGroupByKeyParDoFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/SimplePartialGroupByKeyParDoFn.java @@ -39,6 +39,9 @@ public void processElement(Object elem) throws Exception { @Override public void processTimers() {} + @Override + public void finishKey() throws Exception {} + @Override public void finishBundle() throws Exception { groupingTable.flush(receiver); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WorkExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WorkExecutor.java index b7170c80ced9..1083fdbb9c42 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WorkExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WorkExecutor.java @@ -34,6 +34,9 @@ public interface WorkExecutor extends AutoCloseable { /** Executes the task. */ public abstract void execute() throws Exception; + /** Called when all elements for a specific key have been processed. */ + void finishKey() throws Exception; + /** * Returns the worker's current progress. * diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WriteOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WriteOperation.java index 673140d58d89..d28e7f3e5d3d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WriteOperation.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WriteOperation.java @@ -105,6 +105,9 @@ public void finish() throws Exception { } } + @Override + public void finishKey() throws Exception {} + @Override public void abort() throws Exception { if (writer == null) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorTest.java index c519efd4172c..259b699dc383 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorTest.java @@ -104,6 +104,9 @@ public void abort() throws Exception { aborted = true; super.abort(); } + + @Override + public void finishKey() throws Exception {} } // A mock ReadOperation fed to a MapTaskExecutor in test. @@ -217,6 +220,9 @@ public void finishBundle() {} @Override public void abort() {} + + @Override + public void finishKey() throws Exception {} } /** Verify counts for the per-element-output-time counter are correct. */ @@ -312,6 +318,9 @@ public void start() throws Exception { Metrics.counter("TestMetric", "MetricCounter").inc(1L); } } + + @Override + public void finishKey() throws Exception {} }, new Operation(new OutputReceiver[] {}, context2) { @Override @@ -321,6 +330,9 @@ public void start() throws Exception { Metrics.counter("TestMetric", "MetricCounter").inc(2L); } } + + @Override + public void finishKey() throws Exception {} }, new Operation(new OutputReceiver[] {}, context3) { @Override @@ -330,6 +342,9 @@ public void start() throws Exception { Metrics.counter("TestMetric", "MetricCounter").inc(3L); } } + + @Override + public void finishKey() throws Exception {} }); try (IntrinsicMapTaskExecutor executor = diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index d8a1d1b90d47..ff82a1ab5c4c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -3657,8 +3657,8 @@ public void testActiveWorkFailure() throws Exception { server.waitForAndGetCommitsWithTimeout(1, Duration.standardSeconds(5)); assertEquals(1, commits.size()); - assertEquals(0, BlockingFn.teardownCounter.get()); - assertEquals(1, BlockingFn.setupCounter.get()); + assertEquals(1, BlockingFn.teardownCounter.get()); + assertEquals(2, BlockingFn.setupCounter.get()); worker.stop(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java index 4bfa6efc8880..850c0988ac8a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java @@ -62,6 +62,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.config.FakeGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher; +import org.apache.beam.runners.dataflow.worker.util.common.worker.WorkExecutor; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.FakeGetDataClient; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache; @@ -99,6 +100,7 @@ public class StreamingModeExecutionContextTest { @Rule public transient Timeout globalTimeout = Timeout.seconds(600); @Mock private SideInputStateFetcher sideInputStateFetcher; @Mock private WindmillStateReader stateReader; + @Mock private WorkExecutor workExecutor; private static final String COMPUTATION_ID = "computationId"; @@ -152,7 +154,7 @@ COMPUTATION_ID, new FakeGetDataClient(), ignored -> {}, mock(HeartbeatSender.cla } @Test - public void testTimerInternalsSetTimer() { + public void testTimerInternalsSetTimer() throws Exception { Windmill.WorkItemCommitRequest.Builder outputBuilder = Windmill.WorkItemCommitRequest.newBuilder(); NameContext nameContext = NameContextsForTests.nameContextForTest(); @@ -168,7 +170,8 @@ public void testTimerInternalsSetTimer() { Watermarks.builder().setInputDataWatermark(new Instant(1000)).build()), stateReader, sideInputStateFetcher, - outputBuilder); + outputBuilder, + workExecutor); TimerInternals timerInternals = stepContext.timerInternals(); @@ -179,6 +182,7 @@ public void testTimerInternalsSetTimer() { new Instant(5000), TimeDomain.EVENT_TIME, CausedByDrain.NORMAL)); + executionContext.finishKey(); executionContext.flushState(); Windmill.Timer timer = outputBuilder.buildPartial().getOutputTimers(0); @@ -218,7 +222,8 @@ public void testTimerInternalsProcessingTimeSkew() { Watermarks.builder().setInputDataWatermark(new Instant(1000)).build()), stateReader, sideInputStateFetcher, - outputBuilder); + outputBuilder, + workExecutor); TimerInternals timerInternals = stepContext.timerInternals(); assertTrue(timerTimestamp.isBefore(timerInternals.currentProcessingTime())); } @@ -427,7 +432,8 @@ public void testStateTagEncodingBasedOnConfig() { Watermarks.builder().setInputDataWatermark(new Instant(1000)).build()), stateReader, sideInputStateFetcher, - outputBuilder); + outputBuilder, + workExecutor); assertEquals(expectedEncoding, executionContext.getWindmillTagEncoding().getClass()); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunnerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunnerTest.java index d18bc512723e..45bbc831a2ae 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunnerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunnerTest.java @@ -150,6 +150,7 @@ public void testSideInputNotReady() throws Exception { runner.startBundle(); runner.processElement(createDatum("e", 0)); + runner.finishKey(); runner.finishBundle(); assertTrue(outputManager.getOutput(mainOutputTag).isEmpty()); @@ -214,6 +215,7 @@ public void testMultipleWindowsNotReady() throws Exception { runner.startBundle(); runner.processElement(elem); + runner.finishKey(); runner.finishBundle(); assertTrue(outputManager.getOutput(mainOutputTag).isEmpty()); @@ -317,6 +319,7 @@ public void testSideInputNotification() throws Exception { when(mockSideInputReader.get(eq(view), any(BoundedWindow.class))).thenReturn("data"); runner.startBundle(); + runner.finishKey(); runner.finishBundle(); assertThat(outputManager.getOutput(mainOutputTag), contains(createDatum("e:data", 0))); @@ -373,6 +376,7 @@ public void testMultipleSideInputs() throws Exception { runner.startBundle(); runner.processElement(createDatum("e2", 2)); + runner.finishKey(); runner.finishBundle(); assertThat( diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBaseTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBaseTest.java index 61e2f4250d06..539c38eeb1da 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBaseTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBaseTest.java @@ -19,6 +19,11 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; @@ -40,8 +45,8 @@ public class WindmillReaderIteratorBaseTest { private static class TestWindmillReaderIterator extends WindmillReaderIteratorBase { protected TestWindmillReaderIterator( - Windmill.WorkItem work, ValueProvider skipUndecodableElements) { - super(work, skipUndecodableElements); + StreamingModeExecutionContext context, ValueProvider skipUndecodableElements) { + super(context, skipUndecodableElements); } @Override @@ -81,6 +86,51 @@ public void testSkipErrors() throws IOException { testForMessageBundleCounts(true, 0, 0, 1, 3, 0, 1, 0, 0, 0, 0); } + @Test + public void testWorkItemCancelledException() throws IOException { + StreamingModeExecutionContext mockContext = mock(StreamingModeExecutionContext.class); + when(mockContext.workIsFailed()).thenReturn(true); + Windmill.WorkItem workItem = + Windmill.WorkItem.newBuilder().setKey(ByteString.EMPTY).setWorkToken(0L).build(); + when(mockContext.getWorkItem()).thenReturn(workItem); + + try (TestWindmillReaderIterator iter = + new TestWindmillReaderIterator(mockContext, ValueProvider.StaticValueProvider.of(false))) { + iter.start(); + fail("Expected WorkItemCancelledException"); + } catch (WorkItemCancelledException e) { + // Expected + } + } + + @Test + public void testFinishKeyCalled() throws Exception { + StreamingModeExecutionContext mockContext = mock(StreamingModeExecutionContext.class); + when(mockContext.workIsFailed()).thenReturn(false); + Windmill.WorkItem workItem = + Windmill.WorkItem.newBuilder() + .setKey(ByteString.EMPTY) + .setWorkToken(0L) + .addMessageBundles( + Windmill.InputMessageBundle.newBuilder() + .setSourceComputationId("foo") + .addMessages( + Windmill.Message.newBuilder() + .setTimestamp(0) + .setData(ByteString.EMPTY) + .build()) + .build()) + .build(); + when(mockContext.getWorkItem()).thenReturn(workItem); + + try (TestWindmillReaderIterator iter = + new TestWindmillReaderIterator(mockContext, ValueProvider.StaticValueProvider.of(false))) { + assertTrue(iter.start()); + assertFalse(iter.advance()); // This should trigger finishKey + verify(mockContext).finishKey(); + } + } + private void testForMessageBundleCounts(int... messageBundleCounts) throws IOException { testForMessageBundleCounts(false, messageBundleCounts); } @@ -111,9 +161,13 @@ private void testForMessageBundleCounts(boolean skipErrors, int... messageBundle .setWorkToken(0L) .addAllMessageBundles(bundles) .build(); + + StreamingModeExecutionContext mockContext = mock(StreamingModeExecutionContext.class); + when(mockContext.getWorkItem()).thenReturn(workItem); + try (TestWindmillReaderIterator iter = new TestWindmillReaderIterator( - workItem, ValueProvider.StaticValueProvider.of(skipErrors))) { + mockContext, ValueProvider.StaticValueProvider.of(skipErrors))) { List actual = ReaderTestUtils.windowedValuesToValues( ReaderUtils.readRemainingFromIterator(iter, false)); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java index ce8ad32f71aa..d5cf2948d928 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java @@ -97,6 +97,7 @@ import org.apache.beam.runners.dataflow.worker.testing.TestCountingSource; import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader; import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.NativeReaderIterator; +import org.apache.beam.runners.dataflow.worker.util.common.worker.WorkExecutor; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.FakeGetDataClient; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache; @@ -643,7 +644,8 @@ public void testReadUnboundedReader() throws Exception { Watermarks.builder().setInputDataWatermark(new Instant(0)).build()), mock(WindmillStateReader.class), mock(SideInputStateFetcher.class), - Windmill.WorkItemCommitRequest.newBuilder()); + Windmill.WorkItemCommitRequest.newBuilder(), + mock(WorkExecutor.class)); @SuppressWarnings({"unchecked", "rawtypes"}) NativeReader>>> reader = @@ -1023,7 +1025,8 @@ public void testFailedWorkItemsAbort() throws Exception { dummyWork, mock(WindmillStateReader.class), mock(SideInputStateFetcher.class), - Windmill.WorkItemCommitRequest.newBuilder()); + Windmill.WorkItemCommitRequest.newBuilder(), + mock(WorkExecutor.class)); @SuppressWarnings({"unchecked", "rawtypes"}) NativeReader>>> reader = @@ -1038,14 +1041,19 @@ public void testFailedWorkItemsAbort() throws Exception { NativeReaderIterator>>> readerIterator = reader.iterator(); int numReads = 0; - while ((numReads == 0) ? readerIterator.start() : readerIterator.advance()) { - WindowedValue>> value = readerIterator.getCurrent(); - assertEquals(KV.of(0, numReads), value.getValue().getValue()); - numReads++; - // Fail the work item after reading two elements. - if (numReads == 2) { - dummyWork.setFailed(); + try { + while ((numReads == 0) ? readerIterator.start() : readerIterator.advance()) { + WindowedValue>> value = readerIterator.getCurrent(); + assertEquals(KV.of(0, numReads), value.getValue().getValue()); + numReads++; + // Fail the work item after reading two elements. + if (numReads == 2) { + dummyWork.setFailed(); + } } + fail("Expected WorkItemCancelledException"); + } catch (WorkItemCancelledException e) { + // Expected } assertThat(numReads, equalTo(2)); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ExecutorTestUtils.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ExecutorTestUtils.java index 2c35f4bf99db..d5e3b9c87139 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ExecutorTestUtils.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ExecutorTestUtils.java @@ -59,6 +59,9 @@ private static OutputReceiver[] createOutputReceivers(int numOutputs, CounterSet } return receivers; } + + @Override + public void finishKey() throws Exception {} } /** A {@code Reader} that yields a specified set of values. */ diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java index 188466a50572..649b41483706 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java @@ -100,6 +100,9 @@ public void abort() throws Exception { aborted = true; super.abort(); } + + @Override + public void finishKey() throws Exception {} } // A mock ReadOperation fed to a MapTaskExecutor in test. @@ -213,6 +216,9 @@ public void finishBundle() {} @Override public void abort() {} + + @Override + public void finishKey() throws Exception {} } /** Verify counts for the per-element-output-time counter are correct. */ @@ -309,6 +315,9 @@ public void start() throws Exception { Metrics.counter("TestMetric", "MetricCounter").inc(1L); } } + + @Override + public void finishKey() throws Exception {} }, new Operation(new OutputReceiver[] {}, context2) { @Override @@ -318,6 +327,9 @@ public void start() throws Exception { Metrics.counter("TestMetric", "MetricCounter").inc(2L); } } + + @Override + public void finishKey() throws Exception {} }, new Operation(new OutputReceiver[] {}, context3) { @Override @@ -327,6 +339,9 @@ public void start() throws Exception { Metrics.counter("TestMetric", "MetricCounter").inc(3L); } } + + @Override + public void finishKey() throws Exception {} }); assertEquals(TimeUnit.MINUTES.toMillis(10), stateTracker.getNextBundleLullDurationReportMs()); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperationTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperationTest.java index ba327f92cc44..9819f6e957a1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperationTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ParDoOperationTest.java @@ -85,6 +85,9 @@ public void finishBundle() throws Exception { public void abort() throws Exception { outputReceiver.process("a-aborted"); } + + @Override + public void finishKey() throws Exception {} } @Test @@ -104,6 +107,7 @@ public void testRunParDoOperation() throws Exception { parDoOperation.process(""); parDoOperation.process("bob"); + parDoOperation.finishKey(); parDoOperation.finish(); parDoOperation.abort(); @@ -147,6 +151,7 @@ public void testParDoOperationContext() throws Exception { operation.start(); operation.process("hello"); + operation.finishKey(); operation.finish(); InOrder inOrder = diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.java index 99ce3dc69889..cf514b4ca499 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.java @@ -284,6 +284,9 @@ public void finishBundle() { } } + @Override + public void finishKey() {} + @Override public DoFn getFn() { throw new UnsupportedOperationException(); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java index 28dbf44cb8fe..543042098b63 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java @@ -104,6 +104,9 @@ public void finishBundle() { } } + @Override + public void finishKey() {} + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) { delegate.onWindowExpiration(window, timestamp, key); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java index c8cd7eb5f262..5dbfe5b096a7 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java @@ -103,6 +103,9 @@ public void finishBundle() { } } + @Override + public void finishKey() {} + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) { delegate.onWindowExpiration(window, timestamp, key); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest.java index 2dc428d5a6b2..6a5e03e30ab8 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest.java @@ -259,6 +259,9 @@ public void onTimer( @Override public void finishBundle() {} + @Override + public void finishKey() {} + @Override public void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {}