Skip to content

Commit 3bce09e

Browse files
committed
[adapters] Report total_processed_steps for output connectors in API.
This has been available inside the pipeline for a while, but we forgot to expose it through the API. This fixes that. This also fixes up and enables the corresponding Python integration test. Signed-off-by: Ben Pfaff <blp@feldera.com>
1 parent 47bbd2c commit 3bce09e

File tree

5 files changed

+154
-29
lines changed

5 files changed

+154
-29
lines changed

crates/adapters/src/controller/stats.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1310,6 +1310,10 @@ impl ControllerStatus {
13101310
.metrics
13111311
.total_processed_input_records
13121312
.load(Ordering::Relaxed),
1313+
total_processed_steps: output
1314+
.metrics
1315+
.total_processed_steps
1316+
.load(Ordering::Relaxed),
13131317
memory: output.metrics.memory.load(Ordering::Relaxed),
13141318
},
13151319
fatal_error: output.fatal_error.lock().unwrap().clone(),
@@ -2201,11 +2205,18 @@ pub struct OutputEndpointMetrics {
22012205
/// This metric tracks the end-to-end progress of the pipeline: the output
22022206
/// of this endpoint is equal to the output of the circuit after
22032207
/// processing `total_processed_input_records` records.
2208+
///
2209+
/// In a multihost pipeline, this count reflects only the input records
2210+
/// processed on the same host as the output endpoint, which is not usually
2211+
/// meaningful.
22042212
pub total_processed_input_records: AtomicU64,
22052213

22062214
/// The number of steps whose input records have been processed by the
22072215
/// endpoint.
22082216
///
2217+
/// This is meaningful in a multihost pipeline because steps are
2218+
/// synchronized across all of the hosts.
2219+
///
22092220
/// # Interpretation
22102221
///
22112222
/// This is a count, not a step number. If `total_processed_steps` is 0, no
@@ -2272,6 +2283,7 @@ impl OutputEndpointMetrics {
22722283
total_processed_input_records: self
22732284
.total_processed_input_records
22742285
.load(Ordering::Relaxed),
2286+
total_processed_steps: self.total_processed_steps.load(Ordering::Relaxed),
22752287
memory: self.memory.load(Ordering::Relaxed),
22762288
}
22772289
}

crates/feldera-types/src/adapter_stats.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,30 @@ pub struct ExternalOutputEndpointMetrics {
206206
/// Number of transport errors.
207207
pub num_transport_errors: u64,
208208
/// The number of input records processed by the circuit.
209+
///
210+
/// This metric tracks the end-to-end progress of the pipeline: the output
211+
/// of this endpoint is equal to the output of the circuit after
212+
/// processing `total_processed_input_records` records.
213+
///
214+
/// In a multihost pipeline, this count reflects only the input records
215+
/// processed on the same host as the output endpoint, which is not usually
216+
/// meaningful.
209217
pub total_processed_input_records: u64,
218+
/// The number of steps whose input records have been processed by the
219+
/// endpoint.
220+
///
221+
/// This is meaningful in a multihost pipeline because steps are
222+
/// synchronized across all of the hosts.
223+
///
224+
/// # Interpretation
225+
///
226+
/// This is a count, not a step number. If `total_processed_steps` is 0, no
227+
/// steps have been processed to completion. If `total_processed_steps >
228+
/// 0`, then the last step whose input records have been processed to
229+
/// completion is `total_processed_steps - 1`. A record that was ingested in
230+
/// step `n` is fully processed when `total_processed_steps > n`.
231+
#[schema(value_type = u64)]
232+
pub total_processed_steps: Step,
210233
/// Extra memory in use beyond that used for queuing records.
211234
pub memory: u64,
212235
}

openapi.json

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9398,6 +9398,7 @@
93989398
"num_encode_errors",
93999399
"num_transport_errors",
94009400
"total_processed_input_records",
9401+
"total_processed_steps",
94019402
"memory"
94029403
],
94039404
"properties": {
@@ -9446,7 +9447,13 @@
94469447
"total_processed_input_records": {
94479448
"type": "integer",
94489449
"format": "int64",
9449-
"description": "The number of input records processed by the circuit.",
9450+
"description": "The number of input records processed by the circuit.\n\nThis metric tracks the end-to-end progress of the pipeline: the output\nof this endpoint is equal to the output of the circuit after\nprocessing `total_processed_input_records` records.\n\nIn a multihost pipeline, this count reflects only the input records\nprocessed on the same host as the output endpoint, which is not usually\nmeaningful.",
9451+
"minimum": 0
9452+
},
9453+
"total_processed_steps": {
9454+
"type": "integer",
9455+
"format": "int64",
9456+
"description": "The number of steps whose input records have been processed by the\nendpoint.\n\nThis is meaningful in a multihost pipeline because steps are\nsynchronized across all of the hosts.\n\n# Interpretation\n\nThis is a count, not a step number. If `total_processed_steps` is 0, no\nsteps have been processed to completion. If `total_processed_steps >\n0`, then the last step whose input records have been processed to\ncompletion is `total_processed_steps - 1`. A record that was ingested in\nstep `n` is fully processed when `total_processed_steps > n`.",
94509457
"minimum": 0
94519458
},
94529459
"transmitted_bytes": {

python/feldera/stats.py

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,20 +43,30 @@ def __init__(self):
4343
"""
4444
self.state: Optional[PipelineStatus] = None
4545
self.bootstrap_in_progress: Optional[bool] = None
46+
self.transaction_status: Optional[TransactionStatus] = None
47+
self.transaction_id: Optional[int] = None
48+
self.commit_progress: Optional[CommitProgressSummary] = None
49+
self.transaction_initiators: Optional[TransactionInitiators] = None
4650
self.rss_bytes: Optional[int] = None
4751
self.cpu_msecs: Optional[int] = None
52+
self.uptime_msecs: Optional[float] = None
4853
self.start_time: Optional[datetime] = None
4954
self.incarnation_uuid: Optional[uuid.UUID] = None
55+
self.initial_start_time: Optional[datetime] = None
5056
self.storage_bytes: Optional[int] = None
5157
self.storage_mb_secs: Optional[int] = None
5258
self.runtime_elapsed_msecs: Optional[int] = None
5359
self.buffered_input_records: Optional[int] = None
5460
self.total_input_records: Optional[int] = None
61+
self.buffered_input_bytes: Optional[int] = None
62+
self.total_input_bytes: Optional[int] = None
5563
self.total_processed_records: Optional[int] = None
64+
self.total_processed_bytes: Optional[int] = None
5665
self.total_completed_records: Optional[int] = None
66+
self.output_stall_msecs: Optional[int] = None
67+
self.total_initiated_steps: Optional[int] = None
68+
self.total_completed_steps: Optional[int] = None
5769
self.pipeline_complete: Optional[bool] = None
58-
self.transaction_status: Optional[TransactionStatus] = None
59-
self.transaction_id: Optional[int] = None
6070

6171
@classmethod
6272
def from_dict(cls, d: Mapping[str, Any]):
@@ -65,6 +75,7 @@ def from_dict(cls, d: Mapping[str, Any]):
6575
metrics.state = PipelineStatus.from_str(d["state"])
6676
metrics.incarnation_uuid = uuid.UUID(d["incarnation_uuid"])
6777
metrics.start_time = datetime.fromtimestamp(d["start_time"])
78+
metrics.initial_start_time = datetime.fromtimestamp(d["start_time"])
6879
metrics.transaction_status = TransactionStatus.from_str(d["transaction_status"])
6980
return metrics
7081

@@ -100,6 +111,7 @@ def __init__(self):
100111
self.total_bytes: Optional[int] = None
101112
self.total_records: Optional[int] = None
102113
self.buffered_records: Optional[int] = None
114+
self.buffered_bytes: Optional[int] = None
103115
self.num_transport_errors: Optional[int] = None
104116
self.num_parse_errors: Optional[int] = None
105117
self.end_of_input: Optional[bool] = None
@@ -141,12 +153,69 @@ def __init__(self):
141153
self.transmitted_bytes: Optional[int] = None
142154
self.queued_records: Optional[int] = None
143155
self.queued_batches: Optional[int] = None
156+
self.buffered_records: Optional[int] = None
157+
self.buffered_batches: Optional[int] = None
144158
self.num_encode_errors: Optional[int] = None
145159
self.num_transport_errors: Optional[int] = None
146160
self.total_processed_input_records: Optional[int] = None
161+
self.total_processed_steps: Optional[int] = None
162+
self.memory: Optional[int] = None
147163

148164
@classmethod
149165
def from_dict(cls, d: Mapping[str, Any]):
150166
metrics = cls()
151167
metrics.__dict__.update(d)
152168
return metrics
169+
170+
171+
class CommitProgressSummary:
172+
"""Progress of a transaction commit.
173+
"""
174+
175+
def __init__(self):
176+
"""Initializes an empty status."""
177+
self.completed: Optional[int] = None
178+
self.in_progress: Optional[int] = None
179+
self.remaining: Optional[int] = None
180+
self.in_progress_processed_records: Optional[int] = None
181+
self.in_progress_total_records: Optional[int] = None
182+
183+
@classmethod
184+
def from_dict(cls, d: Mapping[str, Any]):
185+
status = cls()
186+
status.__dict__.update(d)
187+
return status
188+
189+
190+
class TransactionInitiators:
191+
"""Initiators for an ongoing transaction.
192+
"""
193+
194+
def __init__(self):
195+
"""Initializes an empty status."""
196+
self.transaction_id: Optional[int] = None
197+
self.initiated_by_api: Optional[str] = None
198+
self.initiated_by_connectors: Optional[Mapping[str, ConnectorTransactionPhase]] = None
199+
200+
@classmethod
201+
def from_dict(cls, d: Mapping[str, Any]):
202+
status = cls()
203+
status.__dict__.update(d)
204+
status.initiated_by_connectors = ConnectorTransactionPhase.from_dict(d["initiated_by_connectors"])
205+
return status
206+
207+
208+
class ConnectorTransactionPhase:
209+
"""Connector transaction phase with optional label
210+
"""
211+
212+
def __init__(self):
213+
"""Initializes an empty status."""
214+
self.phase: Optional[str] = None
215+
self.label: Optional[str] = None
216+
217+
@classmethod
218+
def from_dict(cls, d: Mapping[str, Any]):
219+
status = cls()
220+
status.__dict__.update(d)
221+
return status

python/tests/platform/test_metrics_logs.py

Lines changed: 40 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
)
1919

2020
from feldera.testutils import FELDERA_TEST_NUM_HOSTS
21+
from feldera.stats import PipelineStatistics
22+
from feldera.enums import PipelineStatus
2123

2224

2325
def _ingest_lines(name: str, table: str, body: str):
@@ -83,7 +85,6 @@ def test_pipeline_metrics(pipeline_name):
8385

8486

8587
@gen_pipeline_name
86-
@single_host_only
8788
def test_pipeline_stats(pipeline_name):
8889
"""
8990
Tests retrieving pipeline statistics via `/stats`.
@@ -102,12 +103,14 @@ def test_pipeline_stats(pipeline_name):
102103
""".strip()
103104

104105
create_pipeline(pipeline_name, sql)
105-
start_pipeline(pipeline_name)
106+
start_pipeline_as_paused(pipeline_name)
106107

107108
# Create output connector on v1 (egress)
108109
r_out = post_no_body(api_url(f"/pipelines/{pipeline_name}/egress/v1"), stream=True)
109110
assert r_out.status_code == HTTPStatus.OK, (r_out.status_code, r_out.text)
110111

112+
resume_pipeline(pipeline_name)
113+
111114
# Wait for datagen completion
112115
time.sleep(3)
113116
deadline = time.time() + 10
@@ -118,37 +121,48 @@ def test_pipeline_stats(pipeline_name):
118121
time.sleep(1)
119122
assert _adhoc_count(pipeline_name, "t1") == 5, "Did not ingest expected 5 rows"
120123

124+
# Wait for all the steps to be completed.
125+
deadline = time.time() + 10
126+
while time.time() < deadline:
127+
r_stats = get(api_url(f"/pipelines/{pipeline_name}/stats"))
128+
assert r_stats.status_code == HTTPStatus.OK, (r_stats.status_code, r_stats.text)
129+
stats = PipelineStatistics.from_dict(r_stats.json())
130+
gm = stats.global_metrics
131+
steps = gm.total_initiated_steps
132+
if steps is not None and steps == gm.total_completed_steps:
133+
break
134+
121135
r_stats = get(api_url(f"/pipelines/{pipeline_name}/stats"))
122136
assert r_stats.status_code == HTTPStatus.OK, (r_stats.status_code, r_stats.text)
123-
stats = r_stats.json()
124-
keys = sorted(stats.keys())
137+
r_stats_json = r_stats.json()
138+
keys = sorted(r_stats_json.keys())
125139
assert keys == ["global_metrics", "inputs", "outputs", "suspend_error"]
140+
stats = PipelineStatistics.from_dict(r_stats_json)
126141

127-
gm = stats["global_metrics"]
128-
assert gm.get("state") == "Running"
129-
assert gm.get("total_input_records") == 5
130-
assert gm.get("total_processed_records") == 5
131-
assert gm.get("pipeline_complete")
132-
assert gm.get("buffered_input_records") == 0
133-
assert gm.get("buffered_input_bytes") == 0
142+
assert gm.state == PipelineStatus.RUNNING
143+
assert gm.total_input_records == 5
144+
assert gm.total_processed_records == 5
145+
assert gm.pipeline_complete
146+
assert gm.buffered_input_records == 0
147+
assert gm.buffered_input_bytes == 0
134148

135-
inputs = stats["inputs"]
136-
assert isinstance(inputs, list) and len(inputs) == 1
149+
inputs = stats.inputs
150+
assert len(inputs) == 1
137151
inp = inputs[0]
138-
assert inp.get("config", {}).get("stream") == "t1"
139-
assert inp.get("metrics", {}).get("buffered_bytes") == 0
140-
assert inp.get("metrics", {}).get("buffered_records") == 0
141-
assert inp.get("metrics", {}).get("end_of_input")
142-
assert inp.get("metrics", {}).get("num_parse_errors") == 0
143-
assert inp.get("metrics", {}).get("num_transport_errors") == 0
144-
assert inp.get("metrics", {}).get("total_bytes") == 40
145-
assert inp.get("metrics", {}).get("total_records") == 5
146-
147-
outputs = stats["outputs"]
148-
assert isinstance(outputs, list) and len(outputs) == 1
152+
assert inp.config["stream"] == "t1"
153+
assert inp.metrics.buffered_bytes == 0
154+
assert inp.metrics.buffered_records == 0
155+
assert inp.metrics.end_of_input
156+
assert inp.metrics.num_parse_errors == 0
157+
assert inp.metrics.num_transport_errors == 0
158+
assert inp.metrics.total_bytes == 40
159+
assert inp.metrics.total_records == 5
160+
161+
outputs = stats.outputs
162+
assert len(outputs) == 1
149163
out = outputs[0]
150-
assert out.get("config", {}).get("stream") == "v1"
151-
assert out.get("metrics", {}).get("total_processed_input_records") == 5
164+
assert out.config["stream"] == "v1"
165+
assert out.metrics.total_processed_steps == steps
152166

153167
# /time_series
154168
r_ts = get(api_url(f"/pipelines/{pipeline_name}/time_series"))

0 commit comments

Comments
 (0)