Skip to content

Commit abe3aa7

Browse files
author
Kyle Weaver
authored
Merge pull request #15603 from zhoufek/fat
[BEAM-9487] Various Trigger.may_lose_data fixes
2 parents ffde2a6 + 9343f37 commit abe3aa7

3 files changed

Lines changed: 101 additions & 103 deletions

File tree

sdks/python/apache_beam/transforms/ptransform_test.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
from apache_beam.testing.util import SortLists
5151
from apache_beam.testing.util import assert_that
5252
from apache_beam.testing.util import equal_to
53-
from apache_beam.testing.util import is_empty
5453
from apache_beam.transforms import WindowInto
5554
from apache_beam.transforms import trigger
5655
from apache_beam.transforms import window
@@ -507,12 +506,10 @@ def test_group_by_key_allow_unsafe_triggers(self):
507506
| beam.Create([(1, 1), (1, 2), (1, 3), (1, 4)])
508507
| WindowInto(
509508
window.GlobalWindows(),
510-
trigger=trigger.AfterCount(5),
509+
trigger=trigger.AfterCount(4),
511510
accumulation_mode=trigger.AccumulationMode.ACCUMULATING)
512511
| beam.GroupByKey())
513-
# We need five, but it only has four - Displays how this option is
514-
# dangerous.
515-
assert_that(pcoll, is_empty())
512+
assert_that(pcoll, equal_to([(1, [1, 2, 3, 4])]))
516513

517514
def test_group_by_key_reiteration(self):
518515
class MyDoFn(beam.DoFn):

sdks/python/apache_beam/transforms/trigger.py

Lines changed: 43 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,7 @@
3030
from abc import abstractmethod
3131
from enum import Flag
3232
from enum import auto
33-
from functools import reduce
3433
from itertools import zip_longest
35-
from operator import or_
3634

3735
from apache_beam.coders import coder_impl
3836
from apache_beam.coders import observable
@@ -161,12 +159,34 @@ def with_prefix(self, prefix):
161159

162160

163161
class DataLossReason(Flag):
164-
"""Enum defining potential reasons that a trigger may cause data loss."""
162+
"""Enum defining potential reasons that a trigger may cause data loss.
163+
164+
These flags should only cover when the trigger is the cause, though windowing
165+
can be taken into account. For instance, AfterWatermark may not flag itself
166+
as finishing if the windowing doesn't allow lateness.
167+
"""
168+
169+
# Trigger will never be the source of data loss.
165170
NO_POTENTIAL_LOSS = 0
171+
172+
# Trigger may finish. In this case, data that comes in after the trigger may
173+
# be lost. Example: AfterCount(1) will stop firing after the first element.
166174
MAY_FINISH = auto()
175+
176+
# Deprecated: Beam will emit buffered data at GC time. Any other behavior
177+
# should be treated as a bug with the runner used.
167178
CONDITION_NOT_GUARANTEED = auto()
168179

169180

181+
# Convenience functions for checking if a flag is included. Each is equivalent
182+
# to `reason & flag == flag`
183+
184+
185+
def _IncludesMayFinish(reason):
186+
# type: (DataLossReason) -> bool
187+
return reason & DataLossReason.MAY_FINISH == DataLossReason.MAY_FINISH
188+
189+
170190
# pylint: disable=unused-argument
171191
# TODO(robertwb): Provisional API, Java likely to change as well.
172192
class TriggerFn(metaclass=ABCMeta):
@@ -260,12 +280,6 @@ def may_lose_data(self, unused_windowing):
260280
scenario is only accounted for if the windowing strategy allows
261281
late data. Otherwise, the trigger is not responsible for the data
262282
loss.
263-
* The trigger condition may not be met. For instance,
264-
Repeatedly(AfterCount(N)) may not fire due to N not being met. This
265-
is only accounted for if the condition itself led to data loss.
266-
Repeatedly(AfterCount(1)) is safe, since it would only not fire if
267-
there is no data to lose, but Repeatedly(AfterCount(2)) can cause
268-
data loss if there is only one record.
269283
270284
Note that this only returns the potential for loss. It does not mean that
271285
there will be data loss. It also only accounts for loss related to the
@@ -278,9 +292,7 @@ def may_lose_data(self, unused_windowing):
278292
Returns:
279293
The DataLossReason. If there is no potential loss,
280294
DataLossReason.NO_POTENTIAL_LOSS is returned. Otherwise, all the
281-
potential reasons are returned as a single value. For instance, if
282-
data loss can result from finishing or not having the condition met,
283-
the result will be DataLossReason.MAY_FINISH|CONDITION_NOT_GUARANTEED.
295+
potential reasons are returned as a single value.
284296
"""
285297
# For backwards compatibility's sake, we're assuming the trigger is safe.
286298
return DataLossReason.NO_POTENTIAL_LOSS
@@ -390,6 +402,7 @@ def reset(self, window, context):
390402
pass
391403

392404
def may_lose_data(self, unused_windowing):
405+
"""AfterProcessingTime may finish."""
393406
return DataLossReason.MAY_FINISH
394407

395408
@staticmethod
@@ -444,6 +457,7 @@ def on_fire(self, watermark, window, context):
444457
return False
445458

446459
def may_lose_data(self, unused_windowing):
460+
"""No potential loss, since the trigger always fires."""
447461
return DataLossReason.NO_POTENTIAL_LOSS
448462

449463
@staticmethod
@@ -494,7 +508,7 @@ def may_lose_data(self, unused_windowing):
494508
"""No potential data loss.
495509
496510
Though Never doesn't explicitly trigger, it still collects data on
497-
windowing closing, so any data loss is due to windowing closing.
511+
windowing closing.
498512
"""
499513
return DataLossReason.NO_POTENTIAL_LOSS
500514

@@ -591,13 +605,7 @@ def reset(self, window, context):
591605
self.late.reset(window, NestedContext(context, 'late'))
592606

593607
def may_lose_data(self, windowing):
594-
"""May cause data loss if the windowing allows lateness and either:
595-
596-
* The late trigger is not set
597-
* The late trigger may cause data loss.
598-
599-
The second case is equivalent to Repeatedly(late).may_lose_data(windowing)
600-
"""
608+
"""May cause data loss if lateness allowed and no late trigger set."""
601609
if windowing.allowed_lateness == 0:
602610
return DataLossReason.NO_POTENTIAL_LOSS
603611
if self.late is None:
@@ -674,10 +682,8 @@ def reset(self, window, context):
674682
context.clear_state(self.COUNT_TAG)
675683

676684
def may_lose_data(self, unused_windowing):
677-
reason = DataLossReason.MAY_FINISH
678-
if self.count > 1:
679-
reason |= DataLossReason.CONDITION_NOT_GUARANTEED
680-
return reason
685+
"""AfterCount may finish."""
686+
return DataLossReason.MAY_FINISH
681687

682688
@staticmethod
683689
def from_runner_api(proto, unused_context):
@@ -787,6 +793,13 @@ def on_fire(self, watermark, window, context):
787793
finished.append(trigger.on_fire(watermark, window, nested_context))
788794
return self.combine_op(finished)
789795

796+
def may_lose_data(self, windowing):
797+
may_finish = self.combine_op(
798+
_IncludesMayFinish(t.may_lose_data(windowing)) for t in self.triggers)
799+
return (
800+
DataLossReason.MAY_FINISH
801+
if may_finish else DataLossReason.NO_POTENTIAL_LOSS)
802+
790803
def reset(self, window, context):
791804
for ix, trigger in enumerate(self.triggers):
792805
trigger.reset(window, self._sub_context(context, ix))
@@ -832,15 +845,6 @@ class AfterAny(_ParallelTriggerFn):
832845
"""
833846
combine_op = any
834847

835-
def may_lose_data(self, windowing):
836-
reason = DataLossReason.NO_POTENTIAL_LOSS
837-
for trigger in self.triggers:
838-
t_reason = trigger.may_lose_data(windowing)
839-
if t_reason == DataLossReason.NO_POTENTIAL_LOSS:
840-
return t_reason
841-
reason |= t_reason
842-
return reason
843-
844848

845849
class AfterAll(_ParallelTriggerFn):
846850
"""Fires when all subtriggers have fired.
@@ -849,9 +853,6 @@ class AfterAll(_ParallelTriggerFn):
849853
"""
850854
combine_op = all
851855

852-
def may_lose_data(self, windowing):
853-
return reduce(or_, (t.may_lose_data(windowing) for t in self.triggers))
854-
855856

856857
class AfterEach(TriggerFn):
857858

@@ -908,7 +909,12 @@ def reset(self, window, context):
908909
trigger.reset(window, self._sub_context(context, ix))
909910

910911
def may_lose_data(self, windowing):
911-
return reduce(or_, (t.may_lose_data(windowing) for t in self.triggers))
912+
"""If all sub-triggers may finish, this may finish."""
913+
may_finish = all(
914+
_IncludesMayFinish(t.may_lose_data(windowing)) for t in self.triggers)
915+
return (
916+
DataLossReason.MAY_FINISH
917+
if may_finish else DataLossReason.NO_POTENTIAL_LOSS)
912918

913919
@staticmethod
914920
def _sub_context(context, index):

sdks/python/apache_beam/transforms/trigger_test.py

Lines changed: 56 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -449,8 +449,8 @@ def _test(self, trigger, lateness, expected):
449449
def test_default_trigger(self):
450450
self._test(DefaultTrigger(), 0, DataLossReason.NO_POTENTIAL_LOSS)
451451

452-
def test_after_processing_time(self):
453-
self._test(AfterProcessingTime(), 0, DataLossReason.MAY_FINISH)
452+
def test_after_processing(self):
453+
self._test(AfterProcessingTime(42), 0, DataLossReason.MAY_FINISH)
454454

455455
def test_always(self):
456456
self._test(Always(), 0, DataLossReason.NO_POTENTIAL_LOSS)
@@ -461,7 +461,7 @@ def test_never(self):
461461
def test_after_watermark_no_allowed_lateness(self):
462462
self._test(AfterWatermark(), 0, DataLossReason.NO_POTENTIAL_LOSS)
463463

464-
def test_after_watermark_late_none(self):
464+
def test_after_watermark_no_late_trigger(self):
465465
self._test(AfterWatermark(), 60, DataLossReason.MAY_FINISH)
466466

467467
def test_after_watermark_no_allowed_lateness_safe_late(self):
@@ -470,93 +470,58 @@ def test_after_watermark_no_allowed_lateness_safe_late(self):
470470
0,
471471
DataLossReason.NO_POTENTIAL_LOSS)
472472

473-
def test_after_watermark_safe_late(self):
473+
def test_after_watermark_allowed_lateness_safe_late(self):
474474
self._test(
475475
AfterWatermark(late=DefaultTrigger()),
476476
60,
477477
DataLossReason.NO_POTENTIAL_LOSS)
478478

479-
def test_after_watermark_no_allowed_lateness_may_finish_late(self):
480-
self._test(
481-
AfterWatermark(late=AfterProcessingTime()),
482-
0,
483-
DataLossReason.NO_POTENTIAL_LOSS)
484-
485-
def test_after_watermark_may_finish_late(self):
486-
self._test(
487-
AfterWatermark(late=AfterProcessingTime()),
488-
60,
489-
DataLossReason.NO_POTENTIAL_LOSS)
490-
491-
def test_after_watermark_no_allowed_lateness_condition_late(self):
492-
self._test(
493-
AfterWatermark(late=AfterCount(5)), 0, DataLossReason.NO_POTENTIAL_LOSS)
494-
495-
def test_after_watermark_condition_late(self):
496-
self._test(
497-
AfterWatermark(late=AfterCount(5)),
498-
60,
499-
DataLossReason.NO_POTENTIAL_LOSS)
500-
501-
def test_after_count_one(self):
502-
self._test(AfterCount(1), 0, DataLossReason.MAY_FINISH)
503-
504-
def test_after_count_gt_one(self):
505-
self._test(
506-
AfterCount(2),
507-
0,
508-
DataLossReason.MAY_FINISH | DataLossReason.CONDITION_NOT_GUARANTEED)
479+
def test_after_count(self):
480+
self._test(AfterCount(42), 0, DataLossReason.MAY_FINISH)
509481

510482
def test_repeatedly_safe_underlying(self):
511483
self._test(
512484
Repeatedly(DefaultTrigger()), 0, DataLossReason.NO_POTENTIAL_LOSS)
513485

514-
def test_repeatedly_may_finish_underlying(self):
515-
self._test(Repeatedly(AfterCount(1)), 0, DataLossReason.NO_POTENTIAL_LOSS)
516-
517-
def test_repeatedly_condition_underlying(self):
518-
self._test(Repeatedly(AfterCount(2)), 0, DataLossReason.NO_POTENTIAL_LOSS)
486+
def test_repeatedly_unsafe_underlying(self):
487+
self._test(Repeatedly(AfterCount(42)), 0, DataLossReason.NO_POTENTIAL_LOSS)
519488

520-
def test_after_any_some_unsafe(self):
489+
def test_after_any_one_may_finish(self):
521490
self._test(
522-
AfterAny(AfterCount(1), DefaultTrigger()),
523-
0,
524-
DataLossReason.NO_POTENTIAL_LOSS)
525-
526-
def test_after_any_same_reason(self):
527-
self._test(
528-
AfterAny(AfterCount(1), AfterProcessingTime()),
491+
AfterAny(AfterCount(42), DefaultTrigger()),
529492
0,
530493
DataLossReason.MAY_FINISH)
531494

532-
def test_after_any_different_reasons(self):
495+
def test_after_any_all_safe(self):
533496
self._test(
534-
AfterAny(AfterCount(2), AfterProcessingTime()),
497+
AfterAny(Repeatedly(AfterCount(42)), DefaultTrigger()),
535498
0,
536-
DataLossReason.MAY_FINISH | DataLossReason.CONDITION_NOT_GUARANTEED)
537-
538-
def test_after_all_some_unsafe(self):
539-
self._test(
540-
AfterAll(AfterCount(1), DefaultTrigger()), 0, DataLossReason.MAY_FINISH)
499+
DataLossReason.NO_POTENTIAL_LOSS)
541500

542-
def test_after_all_safe(self):
501+
def test_after_all_some_may_finish(self):
543502
self._test(
544-
AfterAll(Repeatedly(AfterCount(1)), DefaultTrigger()),
503+
AfterAll(AfterCount(1), DefaultTrigger()),
545504
0,
546505
DataLossReason.NO_POTENTIAL_LOSS)
547506

548-
def test_after_each_some_unsafe(self):
507+
def test_afer_all_all_may_finish(self):
549508
self._test(
550-
AfterEach(AfterCount(1), DefaultTrigger()),
509+
AfterAll(AfterCount(42), AfterProcessingTime(42)),
551510
0,
552511
DataLossReason.MAY_FINISH)
553512

554-
def test_after_each_all_safe(self):
513+
def test_after_each_at_least_one_safe(self):
555514
self._test(
556-
AfterEach(Repeatedly(AfterCount(1)), DefaultTrigger()),
515+
AfterEach(AfterCount(1), DefaultTrigger(), AfterCount(2)),
557516
0,
558517
DataLossReason.NO_POTENTIAL_LOSS)
559518

519+
def test_after_each_all_may_finish(self):
520+
self._test(
521+
AfterEach(AfterCount(1), AfterCount(2), AfterCount(3)),
522+
0,
523+
DataLossReason.MAY_FINISH)
524+
560525

561526
class RunnerApiTest(unittest.TestCase):
562527
def test_trigger_encoding(self):
@@ -606,6 +571,35 @@ def format_result(k, vs):
606571
'B-3': {10, 15, 16},
607572
}.items())))
608573

574+
def test_after_count_streaming(self):
575+
test_options = PipelineOptions(
576+
flags=['--allow_unsafe_triggers', '--streaming'])
577+
with TestPipeline(options=test_options) as p:
578+
# yapf: disable
579+
test_stream = (
580+
TestStream()
581+
.advance_watermark_to(0)
582+
.add_elements([('A', 1), ('A', 2), ('A', 3)])
583+
.add_elements([('A', 4), ('A', 5), ('A', 6)])
584+
.add_elements([('B', 1), ('B', 2), ('B', 3)])
585+
.advance_watermark_to_infinity())
586+
# yapf: enable
587+
588+
results = (
589+
p
590+
| test_stream
591+
| beam.WindowInto(
592+
FixedWindows(10),
593+
trigger=AfterCount(3),
594+
accumulation_mode=AccumulationMode.ACCUMULATING)
595+
| beam.GroupByKey())
596+
597+
assert_that(
598+
results,
599+
equal_to(list({
600+
'A': [1, 2, 3], # 4 - 6 discarded because trigger finished
601+
'B': [1, 2, 3]}.items())))
602+
609603
def test_always(self):
610604
with TestPipeline() as p:
611605

@@ -714,7 +708,8 @@ def test_on_pane_watermark_hold_no_pipeline_stall(self):
714708
test_stream.advance_processing_time(START_TIMESTAMP + 2)
715709
test_stream.advance_watermark_to(START_TIMESTAMP + 2)
716710

717-
with TestPipeline(options=PipelineOptions(['--streaming'])) as p:
711+
with TestPipeline(options=PipelineOptions(
712+
['--streaming', '--allow_unsafe_triggers'])) as p:
718713
# pylint: disable=expression-not-assigned
719714
(
720715
p

0 commit comments

Comments
 (0)