Skip to content

Commit 6bcf84f

Browse files
committed
Add Python processing-time timers and clock injection
1 parent aba5b0d commit 6bcf84f

6 files changed

Lines changed: 95 additions & 60 deletions

File tree

sdks/python/apache_beam/runners/direct/clock.py

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,36 +15,35 @@
1515
# limitations under the License.
1616
#
1717

18-
"""Clock implementations for real time processing and testing."""
18+
"""Clock implementations for real time processing and testing.
1919
20+
For internal use only. No backwards compatibility guarantees.
21+
"""
2022
from __future__ import absolute_import
2123

2224
import time
2325

2426

2527
class Clock(object):
26-
"""For internal use only; no backwards-compatibility guarantees."""
28+
def current_time(self):
29+
raise NotImplementedError()
2730

28-
def time(self):
29-
"""Returns the number of milliseconds since epoch."""
30-
return int(time.time() * 1000)
31+
def advance_time(self):
32+
raise NotImplementedError()
3133

3234

33-
class MockClock(Clock):
34-
"""For internal use only; no backwards-compatibility guarantees.
35+
class RealClock(object):
36+
def current_time(self):
37+
return time.time()
3538

36-
Mock clock implementation for testing."""
3739

38-
def __init__(self, now_in_ms):
39-
self._now_in_ms = now_in_ms
40+
class TestClock(object):
41+
"""Clock used for Testing"""
42+
def __init__(self, current=0):
43+
self._current = current
4044

41-
def time(self):
42-
return self._now_in_ms
45+
def current_time(self):
46+
return self._current
4347

44-
def set_time(self, value_in_ms):
45-
assert value_in_ms >= self._now_in_ms
46-
self._now_in_ms = value_in_ms
47-
48-
def advance(self, duration_in_ms):
49-
assert duration_in_ms >= 0
50-
self._now_in_ms += duration_in_ms
48+
def advance_time(self, advance_by):
49+
self._current += advance_by

sdks/python/apache_beam/runners/direct/direct_runner.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
from apache_beam.options.value_provider import RuntimeValueProvider
3737
from apache_beam.pvalue import PCollection
3838
from apache_beam.runners.direct.bundle_factory import BundleFactory
39+
from apache_beam.runners.direct.clock import RealClock
40+
from apache_beam.runners.direct.clock import TestClock
3941
from apache_beam.runners.runner import PipelineResult
4042
from apache_beam.runners.runner import PipelineRunner
4143
from apache_beam.runners.runner import PipelineState
@@ -90,15 +92,14 @@ class DirectRunner(PipelineRunner):
9092
"""Executes a single pipeline on the local machine."""
9193

9294
# A list of PTransformOverride objects to be applied before running a pipeline
93-
# using DirectRunner.
94-
# Currently this only works for overrides where the input and output types do
95-
# not change.
96-
# For internal SDK use only. This should not be updated by Beam pipeline
97-
# authors.
95+
# using DirectRunner. Currently, this only works for overrides where the input
96+
# and output types do not change.
97+
# For internal use only; no backwards-compatibility guarantees.
9898
_PTRANSFORM_OVERRIDES = []
9999

100100
def __init__(self):
101101
self._cache = None
102+
self._use_test_clock = False # use RealClock() in production
102103

103104
def apply_CombinePerKey(self, transform, pcoll):
104105
# TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
@@ -111,6 +112,10 @@ def apply_CombinePerKey(self, transform, pcoll):
111112
except NotImplementedError:
112113
return transform.expand(pcoll)
113114

115+
def apply_TestStream(self, transform, pcoll):
116+
self._use_test_clock = True # use TestClock() for testing
117+
return transform.expand(pcoll)
118+
114119
def apply__GroupByKeyOnly(self, transform, pcoll):
115120
if (transform.__class__ == _GroupByKeyOnly and
116121
pcoll.pipeline._options.view_as(StandardOptions).streaming):
@@ -204,14 +209,16 @@ def run(self, pipeline):
204209
self.consumer_tracking_visitor = ConsumerTrackingPipelineVisitor()
205210
pipeline.visit(self.consumer_tracking_visitor)
206211

212+
clock = TestClock() if self._use_test_clock else RealClock()
207213
evaluation_context = EvaluationContext(
208214
pipeline._options,
209215
BundleFactory(stacked=pipeline._options.view_as(DirectOptions)
210216
.direct_runner_use_stacked_bundle),
211217
self.consumer_tracking_visitor.root_transforms,
212218
self.consumer_tracking_visitor.value_to_consumers,
213219
self.consumer_tracking_visitor.step_names,
214-
self.consumer_tracking_visitor.views)
220+
self.consumer_tracking_visitor.views,
221+
clock)
215222

216223
evaluation_context.use_pvalue_cache(self._cache)
217224

sdks/python/apache_beam/runners/direct/evaluation_context.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import collections
2323
import threading
2424

25-
from apache_beam.runners.direct.clock import Clock
2625
from apache_beam.runners.direct.direct_metrics import DirectMetrics
2726
from apache_beam.runners.direct.executor import TransformExecutor
2827
from apache_beam.runners.direct.watermark_manager import WatermarkManager
@@ -138,7 +137,7 @@ class EvaluationContext(object):
138137
"""
139138

140139
def __init__(self, pipeline_options, bundle_factory, root_transforms,
141-
value_to_consumers, step_names, views):
140+
value_to_consumers, step_names, views, clock):
142141
self.pipeline_options = pipeline_options
143142
self._bundle_factory = bundle_factory
144143
self._root_transforms = root_transforms
@@ -151,7 +150,7 @@ def __init__(self, pipeline_options, bundle_factory, root_transforms,
151150
self._transform_keyed_states = self._initialize_keyed_states(
152151
root_transforms, value_to_consumers)
153152
self._watermark_manager = WatermarkManager(
154-
Clock(), root_transforms, value_to_consumers,
153+
clock, root_transforms, value_to_consumers,
155154
self._transform_keyed_states)
156155
self._side_inputs_container = _SideInputsContainer(views)
157156
self._pending_unblocked_tasks = []
@@ -286,8 +285,8 @@ def create_empty_committed_bundle(self, output_pcollection):
286285
return self._bundle_factory.create_empty_committed_bundle(
287286
output_pcollection)
288287

289-
def extract_fired_timers(self):
290-
return self._watermark_manager.extract_fired_timers()
288+
def extract_all_timers(self):
289+
return self._watermark_manager.extract_all_timers()
291290

292291
def is_done(self, transform=None):
293292
"""Checks completion of a step or the pipeline.

sdks/python/apache_beam/runners/direct/executor.py

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -545,22 +545,19 @@ def call(self):
545545
self._executor.executor_service.submit(self)
546546

547547
def _should_shutdown(self):
548-
"""_should_shutdown checks whether pipeline is completed or not.
548+
"""Checks whether the pipeline is completed and should be shut down.
549549
550-
It will check for successful completion by checking the watermarks of all
551-
transforms. If they all reached the maximum watermark it means that
552-
pipeline successfully reached to completion.
550+
If there is anything in the queue of tasks to do, do not shut down.
553551
554-
If the above is not true, it will check that at least one executor is
555-
making progress. Otherwise pipeline will be declared stalled.
556-
557-
If the pipeline reached to a terminal state as explained above
558-
_should_shutdown will request executor to gracefully shutdown.
552+
Otherwise, check if all the transforms' watermarks are complete.
553+
If they are not, the pipeline is not progressing (stall detected).
554+
Whether the pipeline has stalled or not, the executor should shut
555+
down the pipeline.
559556
560557
Returns:
561-
True if pipeline reached a terminal state and monitor task could finish.
562-
Otherwise monitor task should schedule itself again for future
563-
execution.
558+
True only if the pipeline has reached a terminal state and should
559+
be shut down.
560+
564561
"""
565562
if self._is_executing():
566563
# There are some bundles still in progress.
@@ -585,8 +582,8 @@ def _fire_timers(self):
585582
Returns:
586583
True if timers fired.
587584
"""
588-
transform_fired_timers = (
589-
self._executor.evaluation_context.extract_fired_timers())
585+
transform_fired_timers, _ = (
586+
self._executor.evaluation_context.extract_all_timers())
590587
for applied_ptransform, fired_timers in transform_fired_timers:
591588
# Use an empty committed bundle. just to trigger.
592589
empty_bundle = (
@@ -602,7 +599,17 @@ def _fire_timers(self):
602599
return bool(transform_fired_timers)
603600

604601
def _is_executing(self):
605-
"""Returns True if there is at least one non-blocked TransformExecutor."""
602+
"""Checks whether the job is still executing.
603+
604+
Returns:
605+
True if there are any timers set or if there is at least
606+
one non-blocked TransformExecutor active."""
607+
608+
watermark_manager = self._executor.evaluation_context._watermark_manager
609+
_, any_unfired_timers = watermark_manager.extract_all_timers()
610+
if any_unfired_timers:
611+
return True
612+
606613
executors = self._executor.transform_executor_services.executors
607614
if not executors:
608615
# Nothing is executing.

sdks/python/apache_beam/runners/direct/watermark_manager.py

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class WatermarkManager(object):
3939

4040
def __init__(self, clock, root_transforms, value_to_consumers,
4141
transform_keyed_states):
42-
self._clock = clock # processing time clock
42+
self._clock = clock
4343
self._root_transforms = root_transforms
4444
self._value_to_consumers = value_to_consumers
4545
self._transform_keyed_states = transform_keyed_states
@@ -143,13 +143,17 @@ def _refresh_watermarks(self, applied_ptransform):
143143
for consumer in consumers:
144144
self._refresh_watermarks(consumer)
145145

146-
def extract_fired_timers(self):
146+
def extract_all_timers(self):
147+
"""Extracts fired timers and reports of any timers set for all transforms."""
147148
all_timers = []
149+
has_realtime_timer = False
148150
for applied_ptransform, tw in self._transform_to_watermarks.iteritems():
149-
fired_timers = tw.extract_fired_timers()
151+
fired_timers, had_realtime_timer = tw.extract_transform_timers()
150152
if fired_timers:
151153
all_timers.append((applied_ptransform, fired_timers))
152-
return all_timers
154+
if had_realtime_timer:
155+
has_realtime_timer = True
156+
return all_timers, has_realtime_timer
153157

154158

155159
class _TransformWatermarks(object):
@@ -244,19 +248,22 @@ def refresh(self):
244248

245249
@property
246250
def synchronized_processing_output_time(self):
247-
return self._clock.time()
251+
return self._clock.current_time()
248252

249-
def extract_fired_timers(self):
253+
def extract_transform_timers(self):
254+
"""Extracts fired timers and reports of any timers set per transform."""
250255
with self._lock:
251-
if self._fired_timers:
252-
return False
253-
254256
fired_timers = []
257+
has_realtime_timer = False
255258
for encoded_key, state in self._keyed_states.iteritems():
256-
timers = state.get_timers(watermark=self._input_watermark)
259+
timers, had_realtime_timer = state.get_timers(
260+
watermark=self._input_watermark,
261+
current_time=self._clock.current_time())
262+
if had_realtime_timer:
263+
has_realtime_timer = True
257264
for expired in timers:
258265
window, (name, time_domain, timestamp) = expired
259266
fired_timers.append(
260267
TimerFiring(encoded_key, window, name, time_domain, timestamp))
261268
self._fired_timers.update(fired_timers)
262-
return fired_timers
269+
return fired_timers, has_realtime_timer

sdks/python/apache_beam/transforms/trigger.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1138,20 +1138,36 @@ def clear_state(self, window, tag):
11381138
if not self.state[window]:
11391139
self.state.pop(window, None)
11401140

1141-
def get_timers(self, clear=False, watermark=MAX_TIMESTAMP):
1141+
def get_timers(self, clear=False, watermark=MAX_TIMESTAMP, current_time=None):
1142+
"""Gets expired timers and reports if there
1143+
are any realtime timers set per state.
1144+
1145+
Expiration is measured against the watermark for event-time timers,
1146+
and against a wall clock for processing-time timers.
1147+
"""
11421148
expired = []
1149+
has_realtime_timer = False
11431150
for window, timers in list(self.timers.items()):
11441151
for (name, time_domain), timestamp in list(timers.items()):
1145-
if timestamp <= watermark:
1152+
if time_domain == 'REAL_TIME':
1153+
time_marker = current_time
1154+
has_realtime_timer = True
1155+
elif time_domain == 'WATERMARK':
1156+
time_marker = watermark
1157+
else:
1158+
logging.error(
1159+
'TimeDomain error: No timers defined for time domain %s.',
1160+
time_domain)
1161+
if timestamp <= time_marker:
11461162
expired.append((window, (name, time_domain, timestamp)))
11471163
if clear:
11481164
del timers[(name, time_domain)]
11491165
if not timers and clear:
11501166
del self.timers[window]
1151-
return expired
1167+
return expired, has_realtime_timer
11521168

11531169
def get_and_clear_timers(self, watermark=MAX_TIMESTAMP):
1154-
return self.get_timers(clear=True, watermark=watermark)
1170+
return self.get_timers(clear=True, watermark=watermark)[0]
11551171

11561172
def get_earliest_hold(self):
11571173
earliest_hold = MAX_TIMESTAMP

0 commit comments

Comments
 (0)