Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion crates/adapters/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4358,7 +4358,18 @@ impl ControllerInit {
// transient HTTP and adhoc connectors, so that we can use them to
// replay journaled inputs.
inputs: if !modified {
checkpoint_config.inputs
checkpoint_config
.inputs
.into_iter()
.filter(|(_, config)| {
// The clock input connector will be automatically recreated and initialized
// with the clock resolution from the pipeline config.
!matches!(
config.connector_config.transport,
TransportConfig::ClockInput(_)
)
})
.collect()
} else {
config.inputs
},
Expand Down
57 changes: 54 additions & 3 deletions python/tests/workloads/test_now.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import time
import unittest

from feldera.enums import PipelineStatus
from feldera.pipeline import Pipeline
from feldera.runtime_config import Resources
from feldera.testutils import (
ViewSpec,
Expand All @@ -13,6 +16,38 @@

INPUT_RECORDS = 2000000

# After restart, the implicit `now` clock connector uses this resolution (see
# `now_endpoint_config` in the adapters crate). Stats only expose `stream` for
# the endpoint; the effective resolution is the pipeline runtime setting.
CLOCK_RESOLUTION_USECS_AFTER_RESTART = 3_000_000


def _clock_input_endpoint(pipeline: Pipeline):
"""Return stats for the built-in real-time clock input (`endpoint_name` ``now``)."""
for inp in pipeline.stats().inputs:
if inp.endpoint_name == "now":
return inp
names = [i.endpoint_name for i in pipeline.stats().inputs]
raise AssertionError(f"clock input endpoint 'now' not found; have {names!r}")


def _wait_clock_record_ticks(
pipeline: Pipeline, count: int, *, timeout_s: float
) -> None:
"""Wait until the clock connector's ingested record count increases by ``count``."""
baseline = _clock_input_endpoint(pipeline).metrics.total_records
assert baseline is not None
deadline = time.monotonic() + timeout_s
while time.monotonic() < deadline:
cur = _clock_input_endpoint(pipeline).metrics.total_records
if cur is not None and cur >= baseline + count:
return
time.sleep(0.25)
raise TimeoutError(
f"clock total_records did not increase by {count} within {timeout_s}s "
f"(baseline was {baseline})"
)


class TestNow(unittest.TestCase):
def test_now(self):
Expand Down Expand Up @@ -98,9 +133,25 @@ def test_now(self):
views = [view for view in views if view.name != "v_now"]
validate_outputs(pipeline, tables, views)

# Process more clock ticks
pipeline.resume()
time.sleep(5)
# Tighten clock resolution, restart, confirm runtime and clock endpoint,
# then wait for two more clock ticks before validating again.
pipeline.stop(force=False)
runtime_cfg = pipeline.runtime_config()
runtime_cfg.clock_resolution_usecs = CLOCK_RESOLUTION_USECS_AFTER_RESTART
pipeline.set_runtime_config(runtime_cfg)
pipeline.start()
pipeline.wait_for_status(PipelineStatus.RUNNING, timeout=300)

assert (
pipeline.runtime_config().clock_resolution_usecs
== CLOCK_RESOLUTION_USECS_AFTER_RESTART
)
clock_status = _clock_input_endpoint(pipeline)
assert clock_status.config is not None
assert clock_status.config.get("stream") == "now"

# Two ticks at 3s resolution need at least ~6s; allow slack for scheduling.
_wait_clock_record_ticks(pipeline, 2, timeout_s=30.0)

validate_outputs(pipeline, tables, views)

Expand Down
Loading