Skip to content

Commit ebc9065

Browse files
committed
[py] Connector status API.
- Add Python bindings for querying input/output connector statuses. - Add a test that validates all connector fields and specifically tests input/out metric reporting. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
1 parent 59f0944 commit ebc9065

File tree

7 files changed

+1099
-453
lines changed

7 files changed

+1099
-453
lines changed

python/feldera/_helpers.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import pandas as pd
44
from decimal import Decimal
5+
from datetime import datetime
56
from typing import Mapping, Any
67

78

@@ -121,3 +122,43 @@ def chunk_dataframe(df, chunk_size=1000):
121122

122123
for i in range(0, len(df), chunk_size):
123124
yield df.iloc[i : i + chunk_size]
125+
126+
127+
def parse_datetime(value: str, field: str) -> datetime:
128+
"""Parse RFC3339-like datetime strings returned by the API."""
129+
try:
130+
return datetime.fromisoformat(value.replace("Z", "+00:00"))
131+
except ValueError as exc:
132+
raise ValueError(f"invalid datetime for '{field}': {value!r}") from exc
133+
134+
135+
def expect_mapping(d: Mapping[str, Any], key: str) -> Mapping[str, Any]:
136+
"""Return required mapping field or raise ValueError."""
137+
value = d.get(key)
138+
if not isinstance(value, Mapping):
139+
raise ValueError(f"missing or invalid required object field '{key}'")
140+
return value
141+
142+
143+
def expect_str(d: Mapping[str, Any], key: str) -> str:
144+
"""Return required string field or raise ValueError."""
145+
value = d.get(key)
146+
if not isinstance(value, str):
147+
raise ValueError(f"missing or invalid required string field '{key}'")
148+
return value
149+
150+
151+
def expect_int(d: Mapping[str, Any], key: str) -> int:
152+
"""Return required integer field or raise ValueError."""
153+
value = d.get(key)
154+
if not isinstance(value, int):
155+
raise ValueError(f"missing or invalid required integer field '{key}'")
156+
return value
157+
158+
159+
def expect_bool(d: Mapping[str, Any], key: str) -> bool:
160+
"""Return required boolean field or raise ValueError."""
161+
value = d.get(key)
162+
if not isinstance(value, bool):
163+
raise ValueError(f"missing or invalid required boolean field '{key}'")
164+
return value

python/feldera/connector_stats.py

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
from datetime import datetime, timezone
2+
from typing import Any, Mapping, Optional
3+
4+
from feldera._helpers import (
5+
expect_bool,
6+
expect_int,
7+
expect_mapping,
8+
expect_str,
9+
parse_datetime,
10+
)
11+
12+
13+
class ConnectorError:
14+
"""Represents a connector error item reported by connector status endpoints."""
15+
16+
def __init__(self):
17+
self.timestamp: datetime = datetime.fromtimestamp(0)
18+
self.index: int = 0
19+
self.tag: Optional[str] = None
20+
self.message: str = ""
21+
22+
@classmethod
23+
def from_dict(cls, d: Mapping[str, Any]):
24+
error = cls()
25+
error.timestamp = parse_datetime(
26+
expect_str(d, "timestamp"), "timestamp"
27+
)
28+
error.index = expect_int(d, "index")
29+
tag = d.get("tag")
30+
if tag is not None and not isinstance(tag, str):
31+
raise ValueError("invalid optional field 'tag': expected string or null")
32+
error.tag = tag
33+
error.message = expect_str(d, "message")
34+
return error
35+
36+
37+
class ShortEndpointConfig:
38+
"""Endpoint configuration subset returned by connector status endpoints."""
39+
40+
def __init__(self):
41+
self.stream: str = ""
42+
43+
@classmethod
44+
def from_dict(cls, d: Mapping[str, Any]):
45+
config = cls()
46+
config.stream = expect_str(d, "stream")
47+
return config
48+
49+
50+
class CompletedWatermark:
51+
"""Latest completed watermark reported by input connector status."""
52+
53+
def __init__(self):
54+
self.metadata: Any = None
55+
self.ingested_at: datetime = datetime.fromtimestamp(0)
56+
self.processed_at: datetime = datetime.fromtimestamp(0)
57+
self.completed_at: datetime = datetime.fromtimestamp(0)
58+
59+
@classmethod
60+
def from_dict(cls, d: Mapping[str, Any]):
61+
watermark = cls()
62+
if "metadata" not in d:
63+
raise ValueError("missing required field 'metadata'")
64+
watermark.metadata = d.get("metadata")
65+
watermark.ingested_at = parse_datetime(
66+
expect_str(d, "ingested_at"), "ingested_at"
67+
)
68+
watermark.processed_at = parse_datetime(
69+
expect_str(d, "processed_at"), "processed_at"
70+
)
71+
watermark.completed_at = parse_datetime(
72+
expect_str(d, "completed_at"), "completed_at"
73+
)
74+
return watermark
75+
76+
77+
class InputConnectorMetrics:
78+
"""Performance metrics returned by an input connector status endpoint."""
79+
80+
def __init__(self):
81+
self.total_bytes: int = 0
82+
self.total_records: int = 0
83+
self.buffered_records: int = 0
84+
self.buffered_bytes: int = 0
85+
self.num_transport_errors: int = 0
86+
self.num_parse_errors: int = 0
87+
self.end_of_input: bool = False
88+
89+
@classmethod
90+
def from_dict(cls, d: Mapping[str, Any]):
91+
metrics = cls()
92+
metrics.total_bytes = expect_int(d, "total_bytes")
93+
metrics.total_records = expect_int(d, "total_records")
94+
metrics.buffered_records = expect_int(d, "buffered_records")
95+
metrics.buffered_bytes = expect_int(d, "buffered_bytes")
96+
metrics.num_transport_errors = expect_int(d, "num_transport_errors")
97+
metrics.num_parse_errors = expect_int(d, "num_parse_errors")
98+
metrics.end_of_input = expect_bool(d, "end_of_input")
99+
return metrics
100+
101+
102+
class OutputConnectorMetrics:
103+
"""Performance metrics returned by an output connector status endpoint."""
104+
105+
def __init__(self):
106+
self.transmitted_records: int = 0
107+
self.transmitted_bytes: int = 0
108+
self.queued_records: int = 0
109+
self.queued_batches: int = 0
110+
self.buffered_records: int = 0
111+
self.buffered_batches: int = 0
112+
self.num_encode_errors: int = 0
113+
self.num_transport_errors: int = 0
114+
self.total_processed_input_records: int = 0
115+
self.total_processed_steps: int = 0
116+
self.memory: int = 0
117+
118+
@classmethod
119+
def from_dict(cls, d: Mapping[str, Any]):
120+
metrics = cls()
121+
metrics.transmitted_records = expect_int(d, "transmitted_records")
122+
metrics.transmitted_bytes = expect_int(d, "transmitted_bytes")
123+
metrics.queued_records = expect_int(d, "queued_records")
124+
metrics.queued_batches = expect_int(d, "queued_batches")
125+
metrics.buffered_records = expect_int(d, "buffered_records")
126+
metrics.buffered_batches = expect_int(d, "buffered_batches")
127+
metrics.num_encode_errors = expect_int(d, "num_encode_errors")
128+
metrics.num_transport_errors = expect_int(d, "num_transport_errors")
129+
metrics.total_processed_input_records = expect_int(
130+
d, "total_processed_input_records"
131+
)
132+
metrics.total_processed_steps = expect_int(d, "total_processed_steps")
133+
metrics.memory = expect_int(d, "memory")
134+
return metrics
135+
136+
137+
class InputConnectorStatus:
138+
"""Mirrors Rust `ExternalInputEndpointStatus`."""
139+
140+
def __init__(self):
141+
self.endpoint_name: str = ""
142+
self.config: ShortEndpointConfig = ShortEndpointConfig()
143+
self.metrics: InputConnectorMetrics = InputConnectorMetrics()
144+
self.fatal_error: Optional[str] = None
145+
self.parse_errors: Optional[list[ConnectorError]] = None
146+
self.transport_errors: Optional[list[ConnectorError]] = None
147+
self.paused: bool = False
148+
self.barrier: bool = False
149+
self.completed_frontier: Optional[CompletedWatermark] = None
150+
151+
@classmethod
152+
def from_dict(cls, d: Mapping[str, Any]):
153+
status = cls()
154+
status.endpoint_name = expect_str(d, "endpoint_name")
155+
status.config = ShortEndpointConfig.from_dict(expect_mapping(d, "config"))
156+
status.metrics = InputConnectorMetrics.from_dict(expect_mapping(d, "metrics"))
157+
fatal_error = d.get("fatal_error")
158+
if fatal_error is not None and not isinstance(fatal_error, str):
159+
raise ValueError(
160+
"invalid optional field 'fatal_error': expected string or null"
161+
)
162+
status.fatal_error = fatal_error
163+
status.paused = expect_bool(d, "paused")
164+
status.barrier = expect_bool(d, "barrier")
165+
parse_errors = d.get("parse_errors")
166+
status.parse_errors = (
167+
[ConnectorError.from_dict(error) for error in parse_errors]
168+
if isinstance(parse_errors, list)
169+
else None
170+
)
171+
transport_errors = d.get("transport_errors")
172+
status.transport_errors = (
173+
[ConnectorError.from_dict(error) for error in transport_errors]
174+
if isinstance(transport_errors, list)
175+
else None
176+
)
177+
completed_frontier = d.get("completed_frontier")
178+
status.completed_frontier = (
179+
CompletedWatermark.from_dict(completed_frontier)
180+
if isinstance(completed_frontier, dict)
181+
else None
182+
)
183+
return status
184+
185+
186+
class OutputConnectorStatus:
187+
"""Mirrors Rust `ExternalOutputEndpointStatus`."""
188+
189+
def __init__(self):
190+
self.endpoint_name: str = ""
191+
self.config: ShortEndpointConfig = ShortEndpointConfig()
192+
self.metrics: OutputConnectorMetrics = OutputConnectorMetrics()
193+
self.fatal_error: Optional[str] = None
194+
self.encode_errors: Optional[list[ConnectorError]] = None
195+
self.transport_errors: Optional[list[ConnectorError]] = None
196+
197+
@classmethod
198+
def from_dict(cls, d: Mapping[str, Any]):
199+
status = cls()
200+
status.endpoint_name = expect_str(d, "endpoint_name")
201+
status.config = ShortEndpointConfig.from_dict(expect_mapping(d, "config"))
202+
status.metrics = OutputConnectorMetrics.from_dict(expect_mapping(d, "metrics"))
203+
fatal_error = d.get("fatal_error")
204+
if fatal_error is not None and not isinstance(fatal_error, str):
205+
raise ValueError(
206+
"invalid optional field 'fatal_error': expected string or null"
207+
)
208+
status.fatal_error = fatal_error
209+
encode_errors = d.get("encode_errors")
210+
status.encode_errors = (
211+
[ConnectorError.from_dict(error) for error in encode_errors]
212+
if isinstance(encode_errors, list)
213+
else None
214+
)
215+
transport_errors = d.get("transport_errors")
216+
status.transport_errors = (
217+
[ConnectorError.from_dict(error) for error in transport_errors]
218+
if isinstance(transport_errors, list)
219+
else None
220+
)
221+
return status
222+
223+
224+
# Backward-compatible alias for a common typo.
225+
OutputConnectrorStatus = OutputConnectorStatus

python/feldera/pipeline.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
from feldera.rest.sql_table import SQLTable
3434
from feldera.rest.sql_view import SQLView
3535
from feldera.runtime_config import RuntimeConfig
36+
from feldera.connector_stats import InputConnectorStatus, OutputConnectorStatus
3637
from feldera.stats import PipelineStatistics
3738
from feldera.types import CheckpointMetadata
3839

@@ -244,6 +245,26 @@ def resume_connector(self, table_name: str, connector_name: str):
244245

245246
self.client.resume_connector(self.name, table_name, connector_name)
246247

248+
def input_connector_stats(
249+
self, table_name: str, connector_name: str
250+
) -> InputConnectorStatus:
251+
"""
252+
Get the status of the specified input connector.
253+
"""
254+
return InputConnectorStatus.from_dict(
255+
self.client.input_connector_stats(self.name, table_name, connector_name)
256+
)
257+
258+
def output_connector_stats(
259+
self, view_name: str, connector_name: str
260+
) -> OutputConnectorStatus:
261+
"""
262+
Get the status of the specified output connector.
263+
"""
264+
return OutputConnectorStatus.from_dict(
265+
self.client.output_connector_stats(self.name, view_name, connector_name)
266+
)
267+
247268
def listen(self, view_name: str) -> OutputHandler:
248269
"""
249270
Follow the change stream (i.e., the output) of the provided view.

python/feldera/rest/feldera_client.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1232,6 +1232,26 @@ def query_as_json(
12321232
if chunk:
12331233
yield json.loads(chunk, parse_float=Decimal)
12341234

1235+
def input_connector_stats(
1236+
self, pipeline_name: str, table_name: str, connector_name: str
1237+
) -> dict:
1238+
"""
1239+
Get the status of the specified input connector.
1240+
"""
1241+
return self.http.get(
1242+
path=f"/pipelines/{pipeline_name}/tables/{table_name}/connectors/{connector_name}/stats"
1243+
)
1244+
1245+
def output_connector_stats(
1246+
self, pipeline_name: str, view_name: str, connector_name: str
1247+
) -> dict:
1248+
"""
1249+
Get the status of the specified output connector.
1250+
"""
1251+
return self.http.get(
1252+
path=f"/pipelines/{pipeline_name}/views/{view_name}/connectors/{connector_name}/stats"
1253+
)
1254+
12351255
def pause_connector(self, pipeline_name, table_name, connector_name):
12361256
"""
12371257
Pause the specified input connector.

python/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ dev = [
4343
"sphinx-rtd-theme==2.0.0",
4444
"sphinx==7.3.7",
4545
"simplejson==3.20.1",
46+
"confluent-kafka>=2.2.0"
4647
]
4748

4849
[tool.pytest.ini_options]

0 commit comments

Comments
 (0)