Skip to content

Commit d4242c3

Browse files
committed
Fix build
1 parent 08a7f7d commit d4242c3

7 files changed

Lines changed: 22 additions & 0 deletions

File tree

runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,9 @@ public void finishBundle() {
105105
container.updateMetrics(stepName);
106106
}
107107

108+
@Override
109+
public void finishKey() {}
110+
108111
@Override
109112
public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {
110113
delegate.onWindowExpiration(window, timestamp, key);

runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1064,6 +1064,9 @@ public void finishBundle() {
10641064
}
10651065
}
10661066

1067+
@Override
1068+
public void finishKey() {}
1069+
10671070
@Override
10681071
public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {}
10691072

runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,9 @@ public void finishBundle() {
255255
Optional.ofNullable(finishBundleCallback).ifPresent(Runnable::run);
256256
}
257257

258+
@Override
259+
public void finishKey() {}
260+
258261
@Override
259262
public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {}
260263

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunnerTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ public void testSideInputNotReady() throws Exception {
150150

151151
runner.startBundle();
152152
runner.processElement(createDatum("e", 0));
153+
runner.finishKey();
153154
runner.finishBundle();
154155

155156
assertTrue(outputManager.getOutput(mainOutputTag).isEmpty());
@@ -214,6 +215,7 @@ public void testMultipleWindowsNotReady() throws Exception {
214215

215216
runner.startBundle();
216217
runner.processElement(elem);
218+
runner.finishKey();
217219
runner.finishBundle();
218220

219221
assertTrue(outputManager.getOutput(mainOutputTag).isEmpty());
@@ -317,6 +319,7 @@ public void testSideInputNotification() throws Exception {
317319
when(mockSideInputReader.get(eq(view), any(BoundedWindow.class))).thenReturn("data");
318320

319321
runner.startBundle();
322+
runner.finishKey();
320323
runner.finishBundle();
321324

322325
assertThat(outputManager.getOutput(mainOutputTag), contains(createDatum("e:data", 0)));
@@ -373,6 +376,7 @@ public void testMultipleSideInputs() throws Exception {
373376

374377
runner.startBundle();
375378
runner.processElement(createDatum("e2", 2));
379+
runner.finishKey();
376380
runner.finishBundle();
377381

378382
assertThat(

runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,9 @@ public void finishBundle() {
284284
}
285285
}
286286

287+
@Override
288+
public void finishKey() {}
289+
287290
@Override
288291
public DoFn<InT, T> getFn() {
289292
throw new UnsupportedOperationException();

runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,9 @@ public void finishBundle() {
104104
}
105105
}
106106

107+
@Override
108+
public void finishKey() {}
109+
107110
@Override
108111
public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {
109112
delegate.onWindowExpiration(window, timestamp, key);

runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,9 @@ public void finishBundle() {
103103
}
104104
}
105105

106+
@Override
107+
public void finishKey() {}
108+
106109
@Override
107110
public <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key) {
108111
delegate.onWindowExpiration(window, timestamp, key);

0 commit comments

Comments
 (0)