Skip to content

Commit ff659df

Browse files
committed
[api] Strongly typed connector status endpoints.
Per-table/view status endpoints now return strongly typed results of typr `InputConnectorStatus`/`OutpuConnectorStatus`. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
1 parent ebc9065 commit ff659df

File tree

8 files changed

+187
-57
lines changed

8 files changed

+187
-57
lines changed

crates/fda/src/main.rs

Lines changed: 59 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1935,7 +1935,9 @@ async fn connector(
19351935
.iter()
19361936
.any(|(name, _c)| *name == full_connector_name);
19371937
if !relation_is_table && !relation_is_view {
1938-
eprintln!("No connector named {connector} found in pipeline {pipeline_name}");
1938+
eprintln!(
1939+
"No connector named {connector} found for relation {relation} in pipeline {pipeline_name}"
1940+
);
19391941
std::process::exit(1);
19401942
}
19411943

@@ -1986,40 +1988,64 @@ async fn connector(
19861988
.unwrap();
19871989
println!("Table {relation} connector {connector} paused successfully.");
19881990
}
1991+
ConnectorAction::Stats if relation_is_table => {
1992+
let response = client
1993+
.get_pipeline_input_connector_status()
1994+
.pipeline_name(pipeline_name)
1995+
.table_name(relation)
1996+
.connector_name(connector)
1997+
.send()
1998+
.await
1999+
.map_err(handle_errors_fatal(
2000+
client.baseurl().clone(),
2001+
"Failed to get table connector stats",
2002+
1,
2003+
))
2004+
.unwrap();
2005+
2006+
match format {
2007+
OutputFormat::Text => {
2008+
let stats_value = serde_json::to_value(response.as_ref())
2009+
.expect("Failed to serialize input connector stats");
2010+
let table = json_to_table(&stats_value)
2011+
.collapse()
2012+
.into_pool_table()
2013+
.to_string();
2014+
println!("{}", table);
2015+
}
2016+
OutputFormat::Json => {
2017+
println!(
2018+
"{}",
2019+
serde_json::to_string_pretty(response.as_ref())
2020+
.expect("Failed to serialize input connector stats")
2021+
);
2022+
}
2023+
_ => {
2024+
eprintln!("Unsupported output format: {}", format);
2025+
std::process::exit(1);
2026+
}
2027+
}
2028+
}
19892029
ConnectorAction::Stats => {
1990-
let response = if relation_is_table {
1991-
client
1992-
.get_pipeline_input_connector_status()
1993-
.pipeline_name(pipeline_name)
1994-
.table_name(relation)
1995-
.connector_name(connector)
1996-
.send()
1997-
.await
1998-
.map_err(handle_errors_fatal(
1999-
client.baseurl().clone(),
2000-
"Failed to get table connector stats",
2001-
1,
2002-
))
2003-
.unwrap()
2004-
} else {
2005-
client
2006-
.get_pipeline_output_connector_status()
2007-
.pipeline_name(pipeline_name)
2008-
.view_name(relation)
2009-
.connector_name(connector)
2010-
.send()
2011-
.await
2012-
.map_err(handle_errors_fatal(
2013-
client.baseurl().clone(),
2014-
"Failed to get view connector stats",
2015-
1,
2016-
))
2017-
.unwrap()
2018-
};
2030+
let response = client
2031+
.get_pipeline_output_connector_status()
2032+
.pipeline_name(pipeline_name)
2033+
.view_name(relation)
2034+
.connector_name(connector)
2035+
.send()
2036+
.await
2037+
.map_err(handle_errors_fatal(
2038+
client.baseurl().clone(),
2039+
"Failed to get view connector stats",
2040+
1,
2041+
))
2042+
.unwrap();
20192043

20202044
match format {
20212045
OutputFormat::Text => {
2022-
let table = json_to_table(&serde_json::Value::Object(response.into_inner()))
2046+
let stats_value = serde_json::to_value(response.as_ref())
2047+
.expect("Failed to serialize output connector stats");
2048+
let table = json_to_table(&stats_value)
20232049
.collapse()
20242050
.into_pool_table()
20252051
.to_string();
@@ -2028,8 +2054,8 @@ async fn connector(
20282054
OutputFormat::Json => {
20292055
println!(
20302056
"{}",
2031-
serde_json::to_string_pretty(&response.into_inner())
2032-
.expect("Failed to serialize pipeline stats")
2057+
serde_json::to_string_pretty(response.as_ref())
2058+
.expect("Failed to serialize output connector stats")
20332059
);
20342060
}
20352061
_ => {

crates/feldera-types/src/adapter_stats.rs

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use bytemuck::NoUninit;
2-
use chrono::{DateTime, Utc};
2+
use chrono::{DateTime, SecondsFormat, Utc};
33
use serde::{Deserialize, Serialize};
44
use serde_json::Value as JsonValue;
55
use std::collections::BTreeMap;
@@ -130,25 +130,26 @@ pub struct ExternalTransactionInitiators {
130130

131131
/// A watermark that has been fully processed by the pipeline.
132132
#[derive(Clone, Debug, Deserialize, Serialize, ToSchema)]
133-
#[schema(as = CompletedWatermark)]
134-
pub struct ExternalCompletedWatermark {
133+
pub struct CompletedWatermark {
135134
/// Metadata that describes the position in the input stream (e.g., Kafka partition/offset pairs).
136135
#[schema(value_type = Object)]
137136
pub metadata: JsonValue,
138137
/// Timestamp when the data was ingested from the wire.
139-
pub ingested_at: String,
138+
#[serde(serialize_with = "serialize_timestamp_micros")]
139+
pub ingested_at: DateTime<Utc>,
140140
/// Timestamp when the data was processed by the circuit.
141-
pub processed_at: String,
141+
#[serde(serialize_with = "serialize_timestamp_micros")]
142+
pub processed_at: DateTime<Utc>,
142143
/// Timestamp when all outputs produced from this input have been pushed to all output endpoints.
143-
pub completed_at: String,
144+
#[serde(serialize_with = "serialize_timestamp_micros")]
145+
pub completed_at: DateTime<Utc>,
144146
}
145147

146148
#[derive(Debug, Default, Deserialize, Serialize, ToSchema, Clone)]
147149
pub struct ConnectorError {
148-
/// Timestamp when the error occurred in microseconds since the epoch.
149-
#[serde(with = "chrono::serde::ts_microseconds")]
150-
#[schema(value_type = u64)]
151-
pub timestamp_micros: DateTime<Utc>,
150+
/// Timestamp when the error occurred, serialized as RFC3339 with microseconds.
151+
#[serde(serialize_with = "serialize_timestamp_micros")]
152+
pub timestamp: DateTime<Utc>,
152153

153154
/// Sequence number of the error.
154155
///
@@ -211,7 +212,7 @@ pub struct ExternalInputEndpointStatus {
211212
pub barrier: bool,
212213
/// The latest completed watermark.
213214
#[schema(value_type = Option<CompletedWatermark>)]
214-
pub completed_frontier: Option<ExternalCompletedWatermark>,
215+
pub completed_frontier: Option<CompletedWatermark>,
215216
}
216217

217218
/// Performance metrics for an output endpoint.
@@ -402,3 +403,34 @@ pub struct ExternalControllerStatus {
402403
#[schema(value_type = Vec<OutputEndpointStatus>)]
403404
pub outputs: Vec<ExternalOutputEndpointStatus>,
404405
}
406+
407+
fn serialize_timestamp_micros<S>(
408+
timestamp: &DateTime<Utc>,
409+
serializer: S,
410+
) -> Result<S::Ok, S::Error>
411+
where
412+
S: serde::Serializer,
413+
{
414+
serializer.serialize_str(&timestamp.to_rfc3339_opts(SecondsFormat::Micros, true))
415+
}
416+
417+
#[cfg(test)]
418+
mod tests {
419+
use super::ConnectorError;
420+
use chrono::{DateTime, Utc};
421+
422+
#[test]
423+
fn connector_error_timestamp_serializes_with_microsecond_precision() {
424+
let error = ConnectorError {
425+
timestamp: DateTime::parse_from_rfc3339("2026-03-08T05:26:42.442438448Z")
426+
.unwrap()
427+
.with_timezone(&Utc),
428+
index: 1,
429+
tag: None,
430+
message: "boom".to_string(),
431+
};
432+
433+
let json = serde_json::to_string(&error).unwrap();
434+
assert!(json.contains(r#""timestamp":"2026-03-08T05:26:42.442438Z""#));
435+
}
436+
}

crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ pub(crate) async fn post_pipeline_input_connector_action(
327327
(status = OK
328328
, description = "Input connector status retrieved successfully"
329329
, content_type = "application/json"
330-
, body = Object),
330+
, body = InputEndpointStatus),
331331
(status = NOT_FOUND
332332
, body = ErrorResponse
333333
, description = "Pipeline, table and/or input connector with that name does not exist"
@@ -401,7 +401,7 @@ pub(crate) async fn get_pipeline_input_connector_status(
401401
(status = OK
402402
, description = "Output connector status retrieved successfully"
403403
, content_type = "application/json"
404-
, body = Object),
404+
, body = OutputEndpointStatus),
405405
(status = NOT_FOUND
406406
, body = ErrorResponse
407407
, description = "Pipeline, view and/or output connector with that name does not exist"

crates/pipeline-manager/src/api/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@ It contains the following fields:
443443
feldera_types::adapter_stats::ExternalInputEndpointMetrics,
444444
feldera_types::adapter_stats::ExternalOutputEndpointStatus,
445445
feldera_types::adapter_stats::ExternalOutputEndpointMetrics,
446-
feldera_types::adapter_stats::ExternalCompletedWatermark,
446+
feldera_types::adapter_stats::CompletedWatermark,
447447
feldera_types::adapter_stats::ExternalTransactionInitiators,
448448
feldera_types::adapter_stats::ExternalTransactionPhase,
449449
feldera_types::adapter_stats::ExternalConnectorTransactionPhase,

docs.feldera.com/docs/changelog.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,15 @@ import TabItem from '@theme/TabItem';
1515

1616
## Unreleased
1717

18-
Add connector error list to input/output connector stats.
18+
Added connector error list to input/output connector stats.
1919
[Input](https://docs.feldera.com/api/get-input-status) and
2020
[output](https://docs.feldera.com/api/get-output-status)
2121
status endpoints now list up to 100 most recent transport, parser, and
2222
encoder errors of each type.
2323

24+
In addition, the openapi spec for both endpoints now specifies strongly typed return values
25+
of type `InputConnectorStatus` and `OutputConnectorStatus` respectively.
26+
2427
## v0.252.0
2528

2629
### Python API removed `ignore_deployment_error`

js-packages/web-console/src/lib/services/manager/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ export type {
116116
Configuration,
117117
ConnectOptions,
118118
ConnectorConfig,
119+
ConnectorError,
119120
ConnectorStats,
120121
ConnectorTransactionPhase,
121122
ConsumerConfig,

js-packages/web-console/src/lib/services/manager/types.gen.ts

Lines changed: 71 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -638,6 +638,31 @@ export type ConnectorConfig = OutputBufferConfig & {
638638
transport: TransportConfig
639639
}
640640

641+
export type ConnectorError = {
642+
/**
643+
* Sequence number of the error.
644+
*
645+
* The client can use this field to detect gaps in the error list reported
646+
* by the pipeline. When the connector reports a large number of errors, the
647+
* pipeline will only preserve and report the most recent errors of each kind.
648+
*/
649+
index: number
650+
/**
651+
* Error message.
652+
*/
653+
message: string
654+
/**
655+
* Optional tag for the error.
656+
*
657+
* The tag is used to group errors by their type.
658+
*/
659+
tag?: string | null
660+
/**
661+
* Timestamp when the error occurred, serialized as RFC3339 with microseconds.
662+
*/
663+
timestamp: string
664+
}
665+
641666
/**
642667
* Aggregated connector error statistics.
643668
*
@@ -1788,10 +1813,18 @@ export type InputEndpointStatus = {
17881813
*/
17891814
fatal_error?: string | null
17901815
metrics: InputEndpointMetrics
1816+
/**
1817+
* Recent parse errors on this endpoint.
1818+
*/
1819+
parse_errors?: Array<ConnectorError> | null
17911820
/**
17921821
* Endpoint has been paused by the user.
17931822
*/
17941823
paused: boolean
1824+
/**
1825+
* Recent transport errors on this endpoint.
1826+
*/
1827+
transport_errors?: Array<ConnectorError> | null
17951828
}
17961829

17971830
/**
@@ -2555,6 +2588,10 @@ export type OutputEndpointMetrics = {
25552588
*/
25562589
export type OutputEndpointStatus = {
25572590
config: ShortEndpointConfig
2591+
/**
2592+
* Recent encoding errors on this endpoint.
2593+
*/
2594+
encode_errors?: Array<ConnectorError> | null
25582595
/**
25592596
* Endpoint name.
25602597
*/
@@ -2564,6 +2601,10 @@ export type OutputEndpointStatus = {
25642601
*/
25652602
fatal_error?: string | null
25662603
metrics: OutputEndpointMetrics
2604+
/**
2605+
* Recent transport errors on this endpoint.
2606+
*/
2607+
transport_errors?: Array<ConnectorError> | null
25672608
}
25682609

25692610
/**
@@ -2657,6 +2698,17 @@ export type PipelineConfig = {
26572698
dev_tweaks?: {
26582699
[key: string]: unknown
26592700
}
2701+
/**
2702+
* Environment variables for the pipeline process.
2703+
*
2704+
* These are key-value pairs injected into the pipeline process environment.
2705+
* Some variable names are reserved by the platform and cannot be overridden
2706+
* (for example `RUST_LOG`, and variables in the `FELDERA_`,
2707+
* `KUBERNETES_`, and `TOKIO_` namespaces).
2708+
*/
2709+
env?: {
2710+
[key: string]: string
2711+
}
26602712
fault_tolerance?: FtConfig
26612713
/**
26622714
* Number of DBSP hosts.
@@ -3183,6 +3235,12 @@ export type PostgresWriterConfig = {
31833235
* The table to write the output to.
31843236
*/
31853237
table: string
3238+
/**
3239+
* The number of threads to use during encoding.
3240+
*
3241+
* Default: 1
3242+
*/
3243+
threads?: number
31863244
/**
31873245
* Postgres URI.
31883246
* See: <https://docs.rs/tokio-postgres/0.7.12/tokio_postgres/config/struct.Config.html>
@@ -3742,6 +3800,17 @@ export type RuntimeConfig = {
37423800
dev_tweaks?: {
37433801
[key: string]: unknown
37443802
}
3803+
/**
3804+
* Environment variables for the pipeline process.
3805+
*
3806+
* These are key-value pairs injected into the pipeline process environment.
3807+
* Some variable names are reserved by the platform and cannot be overridden
3808+
* (for example `RUST_LOG`, and variables in the `FELDERA_`,
3809+
* `KUBERNETES_`, and `TOKIO_` namespaces).
3810+
*/
3811+
env?: {
3812+
[key: string]: string
3813+
}
37453814
fault_tolerance?: FtConfig
37463815
/**
37473816
* Number of DBSP hosts.
@@ -6309,9 +6378,7 @@ export type GetPipelineInputConnectorStatusResponses = {
63096378
/**
63106379
* Input connector status retrieved successfully
63116380
*/
6312-
200: {
6313-
[key: string]: unknown
6314-
}
6381+
200: InputEndpointStatus
63156382
}
63166383

63176384
export type GetPipelineInputConnectorStatusResponse =
@@ -6527,9 +6594,7 @@ export type GetPipelineOutputConnectorStatusResponses = {
65276594
/**
65286595
* Output connector status retrieved successfully
65296596
*/
6530-
200: {
6531-
[key: string]: unknown
6532-
}
6597+
200: OutputEndpointStatus
65336598
}
65346599

65356600
export type GetPipelineOutputConnectorStatusResponse =

0 commit comments

Comments
 (0)