Skip to content

Commit 59f0944

Browse files
committed
[adapters] Report recent connector errors in the API.
Partially fixes #1108. Today, connector errors are only reported in the log where they can be difficult to find or get lost completely if the log is noisy. This commit exposes connector errors via the REST API: - We store up to a fixed number (100) most recent errors of each kind (transport/encoder/parser) and for each tag (we reuse the same tags that were recently introduced for log throttling). - Old errors are evicted in FIFO order - The error list is returned via /tables/table_name/connectors/connector_name/stats endpoint, but not via global /stats to avoid bloating the output of that endpoint. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
1 parent a87a15e commit 59f0944

File tree

7 files changed

+288
-69
lines changed

7 files changed

+288
-69
lines changed

crates/adapters/src/controller.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5701,7 +5701,7 @@ impl ControllerInner {
57015701
let encoder = if let Some(mut endpoint) = endpoint {
57025702
endpoint
57035703
.connect(Box::new(
5704-
move |fatal: bool, e: AnyError, error_tag: Option<&'static str>| {
5704+
move |fatal: bool, e: AnyError, error_tag: Option<&str>| {
57055705
if let Some(controller) = self_weak.upgrade() {
57065706
controller.output_transport_error(
57075707
endpoint_id,
@@ -6028,15 +6028,15 @@ impl ControllerInner {
60286028
endpoint_name: &str,
60296029
) -> Result<ExternalInputEndpointStatus, ControllerError> {
60306030
let endpoint_id = self.input_endpoint_id_by_name(endpoint_name)?;
6031-
Ok(self.status.input_status()[&endpoint_id].to_api_type())
6031+
Ok(self.status.input_status()[&endpoint_id].to_api_type(true))
60326032
}
60336033

60346034
fn output_endpoint_status(
60356035
&self,
60366036
endpoint_name: &str,
60376037
) -> Result<ExternalOutputEndpointStatus, ControllerError> {
60386038
let endpoint_id = self.output_endpoint_id_by_name(endpoint_name)?;
6039-
Ok(self.status.output_status()[&endpoint_id].to_api_type())
6039+
Ok(self.status.output_status()[&endpoint_id].to_api_type(true))
60406040
}
60416041

60426042
fn send_command(&self, command: Command) {
@@ -6059,10 +6059,10 @@ impl ControllerInner {
60596059
endpoint_name: &str,
60606060
fatal: bool,
60616061
error: AnyError,
6062-
tag: Option<&'static str>,
6062+
tag: Option<&str>,
60636063
) {
60646064
self.status
6065-
.input_transport_error(endpoint_id, fatal, &error);
6065+
.input_transport_error(endpoint_id, fatal, tag, &error);
60666066
let tag = tag.map(|tag| format!("{endpoint_name}-{tag}"));
60676067
self.error(
60686068
Arc::new(ControllerError::input_transport_error(
@@ -6075,7 +6075,8 @@ impl ControllerInner {
60756075
}
60766076

60776077
pub fn parse_error(&self, endpoint_id: EndpointId, endpoint_name: &str, error: ParseError) {
6078-
self.status.parse_error(endpoint_id);
6078+
let tag = error.get_error_tag();
6079+
self.status.parse_error(endpoint_id, tag.as_deref(), &error);
60796080

60806081
let tag = error
60816082
.get_error_tag()
@@ -6092,9 +6093,9 @@ impl ControllerInner {
60926093
endpoint_id: EndpointId,
60936094
endpoint_name: &str,
60946095
error: AnyError,
6095-
tag: Option<&'static str>,
6096+
tag: Option<&str>,
60966097
) {
6097-
self.status.encode_error(endpoint_id);
6098+
self.status.encode_error(endpoint_id, tag, &error);
60986099
let tag = tag.map(|tag| format!("{endpoint_name}-{tag}"));
60996100
self.error(
61006101
Arc::new(ControllerError::encode_error(endpoint_name, error)),
@@ -6111,10 +6112,10 @@ impl ControllerInner {
61116112
endpoint_name: &str,
61126113
fatal: bool,
61136114
error: AnyError,
6114-
tag: Option<&'static str>,
6115+
tag: Option<&str>,
61156116
) {
61166117
self.status
6117-
.output_transport_error(endpoint_id, fatal, &error);
6118+
.output_transport_error(endpoint_id, fatal, tag, &error);
61186119
let tag = tag.map(|tag| format!("{endpoint_name}-{tag}"));
61196120
self.error(
61206121
Arc::new(ControllerError::output_transport_error(
@@ -6682,7 +6683,7 @@ impl InputConsumer for InputProbe {
66826683
self.transaction_in_progress.store(false, Ordering::Release);
66836684
}
66846685

6685-
fn error(&self, fatal: bool, error: AnyError, tag: Option<&'static str>) {
6686+
fn error(&self, fatal: bool, error: AnyError, tag: Option<&str>) {
66866687
self.controller.input_transport_error(
66876688
self.endpoint_id,
66886689
&self.endpoint_name,

0 commit comments

Comments
 (0)