|
| 1 | +from datetime import datetime, timezone |
| 2 | +from typing import Any, Mapping, Optional |
| 3 | + |
| 4 | +from feldera._helpers import ( |
| 5 | + expect_bool, |
| 6 | + expect_int, |
| 7 | + expect_mapping, |
| 8 | + expect_str, |
| 9 | + parse_datetime, |
| 10 | +) |
| 11 | + |
| 12 | + |
| 13 | +class ConnectorError: |
| 14 | + """Represents a connector error item reported by connector status endpoints.""" |
| 15 | + |
| 16 | + def __init__(self): |
| 17 | + self.timestamp: datetime = datetime.fromtimestamp(0) |
| 18 | + self.index: int = 0 |
| 19 | + self.tag: Optional[str] = None |
| 20 | + self.message: str = "" |
| 21 | + |
| 22 | + @classmethod |
| 23 | + def from_dict(cls, d: Mapping[str, Any]): |
| 24 | + error = cls() |
| 25 | + error.timestamp = parse_datetime( |
| 26 | + expect_str(d, "timestamp"), "timestamp" |
| 27 | + ) |
| 28 | + error.index = expect_int(d, "index") |
| 29 | + tag = d.get("tag") |
| 30 | + if tag is not None and not isinstance(tag, str): |
| 31 | + raise ValueError("invalid optional field 'tag': expected string or null") |
| 32 | + error.tag = tag |
| 33 | + error.message = expect_str(d, "message") |
| 34 | + return error |
| 35 | + |
| 36 | + |
| 37 | +class ShortEndpointConfig: |
| 38 | + """Endpoint configuration subset returned by connector status endpoints.""" |
| 39 | + |
| 40 | + def __init__(self): |
| 41 | + self.stream: str = "" |
| 42 | + |
| 43 | + @classmethod |
| 44 | + def from_dict(cls, d: Mapping[str, Any]): |
| 45 | + config = cls() |
| 46 | + config.stream = expect_str(d, "stream") |
| 47 | + return config |
| 48 | + |
| 49 | + |
| 50 | +class CompletedWatermark: |
| 51 | + """Latest completed watermark reported by input connector status.""" |
| 52 | + |
| 53 | + def __init__(self): |
| 54 | + self.metadata: Any = None |
| 55 | + self.ingested_at: datetime = datetime.fromtimestamp(0) |
| 56 | + self.processed_at: datetime = datetime.fromtimestamp(0) |
| 57 | + self.completed_at: datetime = datetime.fromtimestamp(0) |
| 58 | + |
| 59 | + @classmethod |
| 60 | + def from_dict(cls, d: Mapping[str, Any]): |
| 61 | + watermark = cls() |
| 62 | + if "metadata" not in d: |
| 63 | + raise ValueError("missing required field 'metadata'") |
| 64 | + watermark.metadata = d.get("metadata") |
| 65 | + watermark.ingested_at = parse_datetime( |
| 66 | + expect_str(d, "ingested_at"), "ingested_at" |
| 67 | + ) |
| 68 | + watermark.processed_at = parse_datetime( |
| 69 | + expect_str(d, "processed_at"), "processed_at" |
| 70 | + ) |
| 71 | + watermark.completed_at = parse_datetime( |
| 72 | + expect_str(d, "completed_at"), "completed_at" |
| 73 | + ) |
| 74 | + return watermark |
| 75 | + |
| 76 | + |
| 77 | +class InputConnectorMetrics: |
| 78 | + """Performance metrics returned by an input connector status endpoint.""" |
| 79 | + |
| 80 | + def __init__(self): |
| 81 | + self.total_bytes: int = 0 |
| 82 | + self.total_records: int = 0 |
| 83 | + self.buffered_records: int = 0 |
| 84 | + self.buffered_bytes: int = 0 |
| 85 | + self.num_transport_errors: int = 0 |
| 86 | + self.num_parse_errors: int = 0 |
| 87 | + self.end_of_input: bool = False |
| 88 | + |
| 89 | + @classmethod |
| 90 | + def from_dict(cls, d: Mapping[str, Any]): |
| 91 | + metrics = cls() |
| 92 | + metrics.total_bytes = expect_int(d, "total_bytes") |
| 93 | + metrics.total_records = expect_int(d, "total_records") |
| 94 | + metrics.buffered_records = expect_int(d, "buffered_records") |
| 95 | + metrics.buffered_bytes = expect_int(d, "buffered_bytes") |
| 96 | + metrics.num_transport_errors = expect_int(d, "num_transport_errors") |
| 97 | + metrics.num_parse_errors = expect_int(d, "num_parse_errors") |
| 98 | + metrics.end_of_input = expect_bool(d, "end_of_input") |
| 99 | + return metrics |
| 100 | + |
| 101 | + |
| 102 | +class OutputConnectorMetrics: |
| 103 | + """Performance metrics returned by an output connector status endpoint.""" |
| 104 | + |
| 105 | + def __init__(self): |
| 106 | + self.transmitted_records: int = 0 |
| 107 | + self.transmitted_bytes: int = 0 |
| 108 | + self.queued_records: int = 0 |
| 109 | + self.queued_batches: int = 0 |
| 110 | + self.buffered_records: int = 0 |
| 111 | + self.buffered_batches: int = 0 |
| 112 | + self.num_encode_errors: int = 0 |
| 113 | + self.num_transport_errors: int = 0 |
| 114 | + self.total_processed_input_records: int = 0 |
| 115 | + self.total_processed_steps: int = 0 |
| 116 | + self.memory: int = 0 |
| 117 | + |
| 118 | + @classmethod |
| 119 | + def from_dict(cls, d: Mapping[str, Any]): |
| 120 | + metrics = cls() |
| 121 | + metrics.transmitted_records = expect_int(d, "transmitted_records") |
| 122 | + metrics.transmitted_bytes = expect_int(d, "transmitted_bytes") |
| 123 | + metrics.queued_records = expect_int(d, "queued_records") |
| 124 | + metrics.queued_batches = expect_int(d, "queued_batches") |
| 125 | + metrics.buffered_records = expect_int(d, "buffered_records") |
| 126 | + metrics.buffered_batches = expect_int(d, "buffered_batches") |
| 127 | + metrics.num_encode_errors = expect_int(d, "num_encode_errors") |
| 128 | + metrics.num_transport_errors = expect_int(d, "num_transport_errors") |
| 129 | + metrics.total_processed_input_records = expect_int( |
| 130 | + d, "total_processed_input_records" |
| 131 | + ) |
| 132 | + metrics.total_processed_steps = expect_int(d, "total_processed_steps") |
| 133 | + metrics.memory = expect_int(d, "memory") |
| 134 | + return metrics |
| 135 | + |
| 136 | + |
| 137 | +class InputConnectorStatus: |
| 138 | + """Mirrors Rust `ExternalInputEndpointStatus`.""" |
| 139 | + |
| 140 | + def __init__(self): |
| 141 | + self.endpoint_name: str = "" |
| 142 | + self.config: ShortEndpointConfig = ShortEndpointConfig() |
| 143 | + self.metrics: InputConnectorMetrics = InputConnectorMetrics() |
| 144 | + self.fatal_error: Optional[str] = None |
| 145 | + self.parse_errors: Optional[list[ConnectorError]] = None |
| 146 | + self.transport_errors: Optional[list[ConnectorError]] = None |
| 147 | + self.paused: bool = False |
| 148 | + self.barrier: bool = False |
| 149 | + self.completed_frontier: Optional[CompletedWatermark] = None |
| 150 | + |
| 151 | + @classmethod |
| 152 | + def from_dict(cls, d: Mapping[str, Any]): |
| 153 | + status = cls() |
| 154 | + status.endpoint_name = expect_str(d, "endpoint_name") |
| 155 | + status.config = ShortEndpointConfig.from_dict(expect_mapping(d, "config")) |
| 156 | + status.metrics = InputConnectorMetrics.from_dict(expect_mapping(d, "metrics")) |
| 157 | + fatal_error = d.get("fatal_error") |
| 158 | + if fatal_error is not None and not isinstance(fatal_error, str): |
| 159 | + raise ValueError( |
| 160 | + "invalid optional field 'fatal_error': expected string or null" |
| 161 | + ) |
| 162 | + status.fatal_error = fatal_error |
| 163 | + status.paused = expect_bool(d, "paused") |
| 164 | + status.barrier = expect_bool(d, "barrier") |
| 165 | + parse_errors = d.get("parse_errors") |
| 166 | + status.parse_errors = ( |
| 167 | + [ConnectorError.from_dict(error) for error in parse_errors] |
| 168 | + if isinstance(parse_errors, list) |
| 169 | + else None |
| 170 | + ) |
| 171 | + transport_errors = d.get("transport_errors") |
| 172 | + status.transport_errors = ( |
| 173 | + [ConnectorError.from_dict(error) for error in transport_errors] |
| 174 | + if isinstance(transport_errors, list) |
| 175 | + else None |
| 176 | + ) |
| 177 | + completed_frontier = d.get("completed_frontier") |
| 178 | + status.completed_frontier = ( |
| 179 | + CompletedWatermark.from_dict(completed_frontier) |
| 180 | + if isinstance(completed_frontier, dict) |
| 181 | + else None |
| 182 | + ) |
| 183 | + return status |
| 184 | + |
| 185 | + |
| 186 | +class OutputConnectorStatus: |
| 187 | + """Mirrors Rust `ExternalOutputEndpointStatus`.""" |
| 188 | + |
| 189 | + def __init__(self): |
| 190 | + self.endpoint_name: str = "" |
| 191 | + self.config: ShortEndpointConfig = ShortEndpointConfig() |
| 192 | + self.metrics: OutputConnectorMetrics = OutputConnectorMetrics() |
| 193 | + self.fatal_error: Optional[str] = None |
| 194 | + self.encode_errors: Optional[list[ConnectorError]] = None |
| 195 | + self.transport_errors: Optional[list[ConnectorError]] = None |
| 196 | + |
| 197 | + @classmethod |
| 198 | + def from_dict(cls, d: Mapping[str, Any]): |
| 199 | + status = cls() |
| 200 | + status.endpoint_name = expect_str(d, "endpoint_name") |
| 201 | + status.config = ShortEndpointConfig.from_dict(expect_mapping(d, "config")) |
| 202 | + status.metrics = OutputConnectorMetrics.from_dict(expect_mapping(d, "metrics")) |
| 203 | + fatal_error = d.get("fatal_error") |
| 204 | + if fatal_error is not None and not isinstance(fatal_error, str): |
| 205 | + raise ValueError( |
| 206 | + "invalid optional field 'fatal_error': expected string or null" |
| 207 | + ) |
| 208 | + status.fatal_error = fatal_error |
| 209 | + encode_errors = d.get("encode_errors") |
| 210 | + status.encode_errors = ( |
| 211 | + [ConnectorError.from_dict(error) for error in encode_errors] |
| 212 | + if isinstance(encode_errors, list) |
| 213 | + else None |
| 214 | + ) |
| 215 | + transport_errors = d.get("transport_errors") |
| 216 | + status.transport_errors = ( |
| 217 | + [ConnectorError.from_dict(error) for error in transport_errors] |
| 218 | + if isinstance(transport_errors, list) |
| 219 | + else None |
| 220 | + ) |
| 221 | + return status |
| 222 | + |
| 223 | + |
| 224 | +# Backward-compatible alias for a common typo. |
| 225 | +OutputConnectrorStatus = OutputConnectorStatus |
0 commit comments