-
Notifications
You must be signed in to change notification settings - Fork 113
Expand file tree
/
Copy pathtest_now.py
More file actions
164 lines (136 loc) · 5.23 KB
/
test_now.py
File metadata and controls
164 lines (136 loc) · 5.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import time
import unittest
from feldera.enums import PipelineStatus
from feldera.pipeline import Pipeline
from feldera.runtime_config import Resources
from feldera.testutils import (
ViewSpec,
build_pipeline,
log,
validate_outputs,
unique_pipeline_name,
)
# Add now() value to a table in a transaction.
INPUT_RECORDS = 2000000
# After restart, the implicit `now` clock connector uses this resolution (see
# `now_endpoint_config` in the adapters crate). Stats only expose `stream` for
# the endpoint; the effective resolution is the pipeline runtime setting.
CLOCK_RESOLUTION_USECS_AFTER_RESTART = 3_000_000
def _clock_input_endpoint(pipeline: Pipeline):
"""Return stats for the built-in real-time clock input (`endpoint_name` ``now``)."""
for inp in pipeline.stats().inputs:
if inp.endpoint_name == "now":
return inp
names = [i.endpoint_name for i in pipeline.stats().inputs]
raise AssertionError(f"clock input endpoint 'now' not found; have {names!r}")
def _wait_clock_record_ticks(
pipeline: Pipeline, count: int, *, timeout_s: float
) -> None:
"""Wait until the clock connector's ingested record count increases by ``count``."""
baseline = _clock_input_endpoint(pipeline).metrics.total_records
assert baseline is not None
deadline = time.monotonic() + timeout_s
while time.monotonic() < deadline:
cur = _clock_input_endpoint(pipeline).metrics.total_records
if cur is not None and cur >= baseline + count:
return
time.sleep(0.25)
raise TimeoutError(
f"clock total_records did not increase by {count} within {timeout_s}s "
f"(baseline was {baseline})"
)
class TestNow(unittest.TestCase):
def test_now(self):
tables = {
"t": f"""
create table t(
id bigint not null primary key,
company_id bigint,
name string
) with (
'materialized' = 'true',
'connectors' = '[{{
"name": "datagen",
"transport": {{
"name": "datagen",
"config": {{
"plan": [{{
"limit": {INPUT_RECORDS},
"fields": {{
"name": {{ "strategy": "sentence" }}
}}
}}]
}}
}}
}}]');
"""
}
views = [
ViewSpec(
# Have a deterministic now() for testing against datafusion.
"v_now",
"""
select
now() as now_ts
""",
),
ViewSpec(
"v",
"""
select
t.*,
v_now.now_ts as now_ts
from t, v_now
""",
),
]
pipeline = build_pipeline(
unique_pipeline_name("now-test"),
tables,
views,
# This test uses >2GB of storage in the ad hoc query.
resources=Resources(storage_mb_max=8192),
)
pipeline.start()
start_time = time.monotonic()
pipeline.start_transaction()
while (
pipeline.stats().global_metrics.total_input_records < INPUT_RECORDS
or pipeline.stats().global_metrics.buffered_input_records > 0
):
log(f"Waiting for {INPUT_RECORDS} records to be ingested...")
time.sleep(1)
elapsed = time.monotonic() - start_time
log(f"Data ingested in {elapsed}")
# Freeze the value of now().
pipeline.pause()
pipeline.wait_for_idle()
start_time = time.monotonic()
pipeline.commit_transaction(transaction_id=None, wait=True, timeout_s=600)
elapsed = time.monotonic() - start_time
log(f"Commit took {elapsed}")
# Don't validate v_now which depends on the real-time clock.
views = [view for view in views if view.name != "v_now"]
validate_outputs(pipeline, tables, views)
# Tighten clock resolution, restart, confirm runtime and clock endpoint,
# then wait for two more clock ticks before validating again.
pipeline.stop(force=False)
runtime_cfg = pipeline.runtime_config()
runtime_cfg.clock_resolution_usecs = CLOCK_RESOLUTION_USECS_AFTER_RESTART
pipeline.set_runtime_config(runtime_cfg)
pipeline.start()
pipeline.wait_for_status(PipelineStatus.RUNNING, timeout=300)
assert (
pipeline.runtime_config().clock_resolution_usecs
== CLOCK_RESOLUTION_USECS_AFTER_RESTART
)
clock_status = _clock_input_endpoint(pipeline)
assert clock_status.config is not None
assert clock_status.config.get("stream") == "now"
# Two ticks at 3s resolution need at least ~6s; allow slack for scheduling.
_wait_clock_record_ticks(pipeline, 2, timeout_s=30.0)
validate_outputs(pipeline, tables, views)
assert next(pipeline.query("select count(*) as cnt from v_now"))["cnt"] == 1
pipeline.stop(force=True)
if __name__ == "__main__":
unittest.main()