3030from abc import abstractmethod
3131from enum import Flag
3232from enum import auto
33- from functools import reduce
3433from itertools import zip_longest
35- from operator import or_
3634
3735from apache_beam .coders import coder_impl
3836from apache_beam .coders import observable
@@ -161,12 +159,34 @@ def with_prefix(self, prefix):
161159
162160
163161class DataLossReason (Flag ):
164- """Enum defining potential reasons that a trigger may cause data loss."""
162+ """Enum defining potential reasons that a trigger may cause data loss.
163+
164+ These flags should only cover when the trigger is the cause, though windowing
165+ can be taken into account. For instance, AfterWatermark may not flag itself
166+ as finishing if the windowing doesn't allow lateness.
167+ """
168+
169+ # Trigger will never be the source of data loss.
165170 NO_POTENTIAL_LOSS = 0
171+
172+ # Trigger may finish. In this case, data that comes in after the trigger may
173+ # be lost. Example: AfterCount(1) will stop firing after the first element.
166174 MAY_FINISH = auto ()
175+
176+ # Deprecated: Beam will emit buffered data at GC time. Any other behavior
177+ # should be treated as a bug with the runner used.
167178 CONDITION_NOT_GUARANTEED = auto ()
168179
169180
181+ # Convenience functions for checking if a flag is included. Each is equivalent
182+ # to `reason & flag == flag`
183+
184+
185+ def _IncludesMayFinish (reason ):
186+ # type: (DataLossReason) -> bool
187+ return reason & DataLossReason .MAY_FINISH == DataLossReason .MAY_FINISH
188+
189+
170190# pylint: disable=unused-argument
171191# TODO(robertwb): Provisional API, Java likely to change as well.
172192class TriggerFn (metaclass = ABCMeta ):
@@ -260,12 +280,6 @@ def may_lose_data(self, unused_windowing):
260280 scenario is only accounted for if the windowing strategy allows
261281 late data. Otherwise, the trigger is not responsible for the data
262282 loss.
263- * The trigger condition may not be met. For instance,
264- Repeatedly(AfterCount(N)) may not fire due to N not being met. This
265- is only accounted for if the condition itself led to data loss.
266- Repeatedly(AfterCount(1)) is safe, since it would only not fire if
267- there is no data to lose, but Repeatedly(AfterCount(2)) can cause
268- data loss if there is only one record.
269283
270284 Note that this only returns the potential for loss. It does not mean that
271285 there will be data loss. It also only accounts for loss related to the
@@ -278,9 +292,7 @@ def may_lose_data(self, unused_windowing):
278292 Returns:
279293 The DataLossReason. If there is no potential loss,
280294 DataLossReason.NO_POTENTIAL_LOSS is returned. Otherwise, all the
281- potential reasons are returned as a single value. For instance, if
282- data loss can result from finishing or not having the condition met,
283- the result will be DataLossReason.MAY_FINISH|CONDITION_NOT_GUARANTEED.
295+ potential reasons are returned as a single value.
284296 """
285297 # For backwards compatibility's sake, we're assuming the trigger is safe.
286298 return DataLossReason .NO_POTENTIAL_LOSS
@@ -390,6 +402,7 @@ def reset(self, window, context):
390402 pass
391403
392404 def may_lose_data (self , unused_windowing ):
405+ """AfterProcessingTime may finish."""
393406 return DataLossReason .MAY_FINISH
394407
395408 @staticmethod
@@ -444,6 +457,7 @@ def on_fire(self, watermark, window, context):
444457 return False
445458
446459 def may_lose_data (self , unused_windowing ):
460+ """No potential loss, since the trigger always fires."""
447461 return DataLossReason .NO_POTENTIAL_LOSS
448462
449463 @staticmethod
@@ -494,7 +508,7 @@ def may_lose_data(self, unused_windowing):
494508 """No potential data loss.
495509
496510 Though Never doesn't explicitly trigger, it still collects data on
497- windowing closing, so any data loss is due to windowing closing .
511+ windowing closing.
498512 """
499513 return DataLossReason .NO_POTENTIAL_LOSS
500514
@@ -591,13 +605,7 @@ def reset(self, window, context):
591605 self .late .reset (window , NestedContext (context , 'late' ))
592606
593607 def may_lose_data (self , windowing ):
594- """May cause data loss if the windowing allows lateness and either:
595-
596- * The late trigger is not set
597- * The late trigger may cause data loss.
598-
599- The second case is equivalent to Repeatedly(late).may_lose_data(windowing)
600- """
608+ """May cause data loss if lateness allowed and no late trigger set."""
601609 if windowing .allowed_lateness == 0 :
602610 return DataLossReason .NO_POTENTIAL_LOSS
603611 if self .late is None :
@@ -674,10 +682,8 @@ def reset(self, window, context):
674682 context .clear_state (self .COUNT_TAG )
675683
676684 def may_lose_data (self , unused_windowing ):
677- reason = DataLossReason .MAY_FINISH
678- if self .count > 1 :
679- reason |= DataLossReason .CONDITION_NOT_GUARANTEED
680- return reason
685+ """AfterCount may finish."""
686+ return DataLossReason .MAY_FINISH
681687
682688 @staticmethod
683689 def from_runner_api (proto , unused_context ):
@@ -787,6 +793,13 @@ def on_fire(self, watermark, window, context):
787793 finished .append (trigger .on_fire (watermark , window , nested_context ))
788794 return self .combine_op (finished )
789795
796+ def may_lose_data (self , windowing ):
797+ may_finish = self .combine_op (
798+ _IncludesMayFinish (t .may_lose_data (windowing )) for t in self .triggers )
799+ return (
800+ DataLossReason .MAY_FINISH
801+ if may_finish else DataLossReason .NO_POTENTIAL_LOSS )
802+
790803 def reset (self , window , context ):
791804 for ix , trigger in enumerate (self .triggers ):
792805 trigger .reset (window , self ._sub_context (context , ix ))
@@ -832,15 +845,6 @@ class AfterAny(_ParallelTriggerFn):
832845 """
833846 combine_op = any
834847
835- def may_lose_data (self , windowing ):
836- reason = DataLossReason .NO_POTENTIAL_LOSS
837- for trigger in self .triggers :
838- t_reason = trigger .may_lose_data (windowing )
839- if t_reason == DataLossReason .NO_POTENTIAL_LOSS :
840- return t_reason
841- reason |= t_reason
842- return reason
843-
844848
845849class AfterAll (_ParallelTriggerFn ):
846850 """Fires when all subtriggers have fired.
@@ -849,9 +853,6 @@ class AfterAll(_ParallelTriggerFn):
849853 """
850854 combine_op = all
851855
852- def may_lose_data (self , windowing ):
853- return reduce (or_ , (t .may_lose_data (windowing ) for t in self .triggers ))
854-
855856
856857class AfterEach (TriggerFn ):
857858
@@ -908,7 +909,12 @@ def reset(self, window, context):
908909 trigger .reset (window , self ._sub_context (context , ix ))
909910
910911 def may_lose_data (self , windowing ):
911- return reduce (or_ , (t .may_lose_data (windowing ) for t in self .triggers ))
912+ """If all sub-triggers may finish, this may finish."""
913+ may_finish = all (
914+ _IncludesMayFinish (t .may_lose_data (windowing )) for t in self .triggers )
915+ return (
916+ DataLossReason .MAY_FINISH
917+ if may_finish else DataLossReason .NO_POTENTIAL_LOSS )
912918
913919 @staticmethod
914920 def _sub_context (context , index ):
0 commit comments