Skip to content

Commit e09e6ef

Browse files
committed
[python] Add methods and types for retrieving pipeline statistics.
Signed-off-by: Ben Pfaff <blp@feldera.com>
1 parent 69d5f74 commit e09e6ef

File tree

2 files changed

+162
-14
lines changed

2 files changed

+162
-14
lines changed

python/feldera/pipeline.py

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from feldera._helpers import ensure_dataframe_has_columns, chunk_dataframe
1818
from feldera.rest.sql_table import SQLTable
1919
from feldera.rest.sql_view import SQLView
20+
from feldera.stats import PipelineStatistics
2021

2122

2223
class Pipeline:
@@ -69,6 +70,11 @@ def status(self) -> PipelineStatus:
6970
else:
7071
raise err
7172

73+
def stats(self) -> PipelineStatistics:
74+
"""Gets the pipeline metrics and performance counters."""
75+
76+
return PipelineStatistics.from_dict(self.client.get_pipeline_stats(self.name))
77+
7278
def input_pandas(self, table_name: str, df: pandas.DataFrame, force: bool = False):
7379
"""
7480
Push all rows in a pandas DataFrame to the pipeline.
@@ -309,17 +315,12 @@ def wait_for_completion(
309315
}s, timeout: {timeout_s}s"
310316
)
311317

312-
metrics: dict = self.client.get_pipeline_stats(self.name).get(
313-
"global_metrics"
314-
)
315-
pipeline_complete: bool = metrics.get("pipeline_complete")
316-
318+
pipeline_complete: bool = self.stats().global_metrics.pipeline_complete
317319
if pipeline_complete is None:
318320
raise RuntimeError(
319321
"received unknown metrics from the pipeline, pipeline_complete is None"
320322
)
321-
322-
if pipeline_complete:
323+
elif pipeline_complete:
323324
break
324325

325326
time.sleep(1)
@@ -436,16 +437,14 @@ def wait_for_idle(
436437
now_s = time.monotonic()
437438

438439
# Metrics retrieval
439-
metrics: dict = self.client.get_pipeline_stats(self.name).get(
440-
"global_metrics"
441-
)
442-
total_input_records: int | None = metrics.get("total_input_records")
443-
total_processed_records: int | None = metrics.get("total_processed_records")
444-
if total_input_records is None:
440+
metrics = self.stats().global_metrics
441+
total_input_records = metrics.total_input_records
442+
total_processed_records = metrics.total_processed_records
443+
if metrics.total_input_records is None:
445444
raise RuntimeError(
446445
"total_input_records is missing from the pipeline metrics"
447446
)
448-
if total_processed_records is None:
447+
if metrics.total_processed_records is None:
449448
raise RuntimeError(
450449
"total_processed_records is missing from the pipeline metrics"
451450
)

python/feldera/stats.py

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
from typing import Mapping, Any, Optional, List
2+
from feldera.enums import PipelineStatus
3+
from datetime import datetime
4+
import uuid
5+
6+
7+
class PipelineStatistics:
8+
"""
9+
Represents statistics reported by a pipeline's "/stats" endpoint.
10+
"""
11+
12+
def __init__(self):
13+
"""
14+
Initializes as an empty set of statistics.
15+
"""
16+
17+
self.global_metrics: GlobalPipelineMetrics = GlobalPipelineMetrics()
18+
self.suspend_error: Optional[Any] = None
19+
self.inputs: Mapping[List[InputEndpointStatus()]] = {}
20+
self.outputs: Mapping[List[OutputEndpointStatus]] = {}
21+
22+
@classmethod
23+
def from_dict(cls, d: Mapping[str, Any]):
24+
pipeline = cls()
25+
pipeline.global_metrics = GlobalPipelineMetrics.from_dict(d["global_metrics"])
26+
pipeline.inputs = [
27+
InputEndpointStatus.from_dict(input) for input in d["inputs"]
28+
]
29+
pipeline.inputs = [
30+
OutputEndpointStatus().from_dict(output) for output in d["outputs"]
31+
]
32+
return pipeline
33+
34+
35+
class GlobalPipelineMetrics:
36+
"""Represents the "global_metrics" object within the pipeline's
37+
"/stats" endpoint reply.
38+
"""
39+
40+
def __init__(self):
41+
"""
42+
Initializes as an empty set of metrics.
43+
"""
44+
self.state: Optional[PipelineStatus] = None
45+
self.bootstrap_in_progress: Optional[bool] = None
46+
self.rss_bytes: Optional[int] = None
47+
self.cpu_msecs: Optional[int] = None
48+
self.start_time: Optional[datetime] = None
49+
self.incarnation_uuid: Optional[uuid] = None
50+
self.storage_bytes: Optional[int] = None
51+
self.storage_mb_secs: Optional[int] = None
52+
self.runtime_elapsed_msecs: Optional[int] = None
53+
self.buffered_input_records: Optional[int] = None
54+
self.total_input_records: Optional[int] = None
55+
self.total_processed_records: Optional[int] = None
56+
self.total_completed_records: Optional[int] = None
57+
self.pipeline_complete: Optional[bool] = None
58+
59+
@classmethod
60+
def from_dict(cls, d: Mapping[str, Any]):
61+
metrics = cls()
62+
metrics.__dict__.update(d)
63+
metrics.state = PipelineStatus.from_str(d["state"])
64+
metrics.incarnation_uuid = uuid.UUID(d["incarnation_uuid"])
65+
metrics.start_time = datetime.fromtimestamp(d["start_time"])
66+
return metrics
67+
68+
69+
class InputEndpointStatus:
70+
"""Represents one member of the "inputs" array within the
71+
pipeline's "/stats" endpoint reply.
72+
"""
73+
74+
def __init__(self):
75+
"""Initializes an empty status."""
76+
self.endpoint_name: Optional[str] = None
77+
self.config: Optional[Mapping] = None
78+
self.metrics: Optional[InputEndpointMetrics] = None
79+
self.fatal_error: Optional[str] = None
80+
self.paused: Optional[bool] = None
81+
self.barrier: Optional[bool] = None
82+
83+
@classmethod
84+
def from_dict(cls, d: Mapping[str, Any]):
85+
status = cls()
86+
status.__dict__.update(d)
87+
status.metrics = InputEndpointMetrics.from_dict(d["metrics"])
88+
return status
89+
90+
91+
class InputEndpointMetrics:
92+
"""Represents the "metrics" member within an input endpoint status
93+
in the pipeline's "/stats" endpoint reply.
94+
"""
95+
96+
def __init__(self):
97+
self.total_bytes: Optional[int] = None
98+
self.total_records: Optional[int] = None
99+
self.buffered_records: Optional[int] = None
100+
self.num_transport_errors: Optional[int] = None
101+
self.num_parse_errors: Optional[int] = None
102+
self.end_of_input: Optional[bool] = None
103+
104+
@classmethod
105+
def from_dict(cls, d: Mapping[str, Any]):
106+
metrics = cls()
107+
metrics.__dict__.update(d)
108+
return metrics
109+
110+
111+
class OutputEndpointStatus:
112+
"""Represents one member of the "outputs" array within the
113+
pipeline's "/stats" endpoint reply.
114+
"""
115+
116+
def __init__(self):
117+
"""Initializes an empty status."""
118+
self.endpoint_name: Optional[str] = None
119+
self.config: Optional[Mapping] = None
120+
self.metrics: Optional[OutputEndpointMetrics] = None
121+
self.fatal_error: Optional[str] = None
122+
123+
@classmethod
124+
def from_dict(cls, d: Mapping[str, Any]):
125+
status = cls()
126+
status.__dict__.update(d)
127+
status.metrics = OutputEndpointMetrics.from_dict(d["metrics"])
128+
return status
129+
130+
131+
class OutputEndpointMetrics:
132+
"""Represents the "metrics" member within an output endpoint status
133+
in the pipeline's "/stats" endpoint reply.
134+
"""
135+
136+
def __init__(self):
137+
self.transmitted_records: Optional[int] = None
138+
self.transmitted_bytes: Optional[int] = None
139+
self.queued_records: Optional[int] = None
140+
self.queued_batches: Optional[int] = None
141+
self.num_encode_errors: Optional[int] = None
142+
self.num_transport_errors: Optional[int] = None
143+
self.total_processed_input_records: Optional[int] = None
144+
145+
@classmethod
146+
def from_dict(cls, d: Mapping[str, Any]):
147+
metrics = cls()
148+
metrics.__dict__.update(d)
149+
return metrics

0 commit comments

Comments
 (0)