|
1 | 1 | use bytemuck::NoUninit; |
2 | | -use chrono::{DateTime, Utc}; |
| 2 | +use chrono::{DateTime, SecondsFormat, Utc}; |
3 | 3 | use serde::{Deserialize, Serialize}; |
4 | 4 | use serde_json::Value as JsonValue; |
5 | 5 | use std::collections::BTreeMap; |
@@ -130,25 +130,26 @@ pub struct ExternalTransactionInitiators { |
130 | 130 |
|
131 | 131 | /// A watermark that has been fully processed by the pipeline. |
132 | 132 | #[derive(Clone, Debug, Deserialize, Serialize, ToSchema)] |
133 | | -#[schema(as = CompletedWatermark)] |
134 | | -pub struct ExternalCompletedWatermark { |
| 133 | +pub struct CompletedWatermark { |
135 | 134 | /// Metadata that describes the position in the input stream (e.g., Kafka partition/offset pairs). |
136 | 135 | #[schema(value_type = Object)] |
137 | 136 | pub metadata: JsonValue, |
138 | 137 | /// Timestamp when the data was ingested from the wire. |
139 | | - pub ingested_at: String, |
| 138 | + #[serde(serialize_with = "serialize_timestamp_micros")] |
| 139 | + pub ingested_at: DateTime<Utc>, |
140 | 140 | /// Timestamp when the data was processed by the circuit. |
141 | | - pub processed_at: String, |
| 141 | + #[serde(serialize_with = "serialize_timestamp_micros")] |
| 142 | + pub processed_at: DateTime<Utc>, |
142 | 143 | /// Timestamp when all outputs produced from this input have been pushed to all output endpoints. |
143 | | - pub completed_at: String, |
| 144 | + #[serde(serialize_with = "serialize_timestamp_micros")] |
| 145 | + pub completed_at: DateTime<Utc>, |
144 | 146 | } |
145 | 147 |
|
146 | 148 | #[derive(Debug, Default, Deserialize, Serialize, ToSchema, Clone)] |
147 | 149 | pub struct ConnectorError { |
148 | | - /// Timestamp when the error occurred in microseconds since the epoch. |
149 | | - #[serde(with = "chrono::serde::ts_microseconds")] |
150 | | - #[schema(value_type = u64)] |
151 | | - pub timestamp_micros: DateTime<Utc>, |
| 150 | + /// Timestamp when the error occurred, serialized as RFC3339 with microseconds. |
| 151 | + #[serde(serialize_with = "serialize_timestamp_micros")] |
| 152 | + pub timestamp: DateTime<Utc>, |
152 | 153 |
|
153 | 154 | /// Sequence number of the error. |
154 | 155 | /// |
@@ -211,7 +212,7 @@ pub struct ExternalInputEndpointStatus { |
211 | 212 | pub barrier: bool, |
212 | 213 | /// The latest completed watermark. |
213 | 214 | #[schema(value_type = Option<CompletedWatermark>)] |
214 | | - pub completed_frontier: Option<ExternalCompletedWatermark>, |
| 215 | + pub completed_frontier: Option<CompletedWatermark>, |
215 | 216 | } |
216 | 217 |
|
217 | 218 | /// Performance metrics for an output endpoint. |
@@ -402,3 +403,34 @@ pub struct ExternalControllerStatus { |
402 | 403 | #[schema(value_type = Vec<OutputEndpointStatus>)] |
403 | 404 | pub outputs: Vec<ExternalOutputEndpointStatus>, |
404 | 405 | } |
| 406 | + |
| 407 | +fn serialize_timestamp_micros<S>( |
| 408 | + timestamp: &DateTime<Utc>, |
| 409 | + serializer: S, |
| 410 | +) -> Result<S::Ok, S::Error> |
| 411 | +where |
| 412 | + S: serde::Serializer, |
| 413 | +{ |
| 414 | + serializer.serialize_str(×tamp.to_rfc3339_opts(SecondsFormat::Micros, true)) |
| 415 | +} |
| 416 | + |
| 417 | +#[cfg(test)] |
| 418 | +mod tests { |
| 419 | + use super::ConnectorError; |
| 420 | + use chrono::{DateTime, Utc}; |
| 421 | + |
| 422 | + #[test] |
| 423 | + fn connector_error_timestamp_serializes_with_microsecond_precision() { |
| 424 | + let error = ConnectorError { |
| 425 | + timestamp: DateTime::parse_from_rfc3339("2026-03-08T05:26:42.442438448Z") |
| 426 | + .unwrap() |
| 427 | + .with_timezone(&Utc), |
| 428 | + index: 1, |
| 429 | + tag: None, |
| 430 | + message: "boom".to_string(), |
| 431 | + }; |
| 432 | + |
| 433 | + let json = serde_json::to_string(&error).unwrap(); |
| 434 | + assert!(json.contains(r#""timestamp":"2026-03-08T05:26:42.442438Z""#)); |
| 435 | + } |
| 436 | +} |
0 commit comments