Skip to content

Commit 402fe05

Browse files
Igor Smolyarigorscs
authored andcommitted
[logging] remove duplicate pipeline id/name
- Remove duplicate fields pipeline id/name in plain text logs - Set pipeline_id and pipeline_name as structured fields - Align manager stdout log prefix with webconsole prefix [manager]
1 parent 2f1f04e commit 402fe05

File tree

15 files changed

+320
-184
lines changed

15 files changed

+320
-184
lines changed

crates/feldera-observability/src/json_logging.rs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use chrono::Utc;
22
use colored::{ColoredString, Colorize};
33
use serde_json::{Map, Value};
44
use tracing::Subscriber;
5-
use tracing_subscriber::fmt::format::{Format, Writer};
5+
use tracing_subscriber::fmt::format::{self, Format, Writer};
66
use tracing_subscriber::fmt::{FormatEvent, FormatFields};
77
use tracing_subscriber::layer::SubscriberExt;
88
use tracing_subscriber::registry::LookupSpan;
@@ -305,7 +305,11 @@ fn init_logging(
305305
.try_init()
306306
} else {
307307
tracing_subscriber::registry()
308-
.with(tracing_subscriber::fmt::layer().event_format(FormatWithPrefix::new(prefix)))
308+
.with(
309+
tracing_subscriber::fmt::layer()
310+
.event_format(FormatWithPrefix::new(prefix))
311+
.fmt_fields(plain_text_fields()),
312+
)
309313
.with(env_filter)
310314
.with(sentry::integrations::tracing::layer())
311315
.try_init()
@@ -352,3 +356,25 @@ fn value_to_string(value: Value) -> Option<String> {
352356
other => Some(other.to_string()),
353357
}
354358
}
359+
360+
/// In plain text, render the message plus selected structured fields.
361+
fn plain_text_fields() -> impl for<'writer> tracing_subscriber::fmt::FormatFields<'writer> + Clone {
362+
format::debug_fn(|writer, field, value| {
363+
match field.name() {
364+
"message" => {
365+
write!(writer, " {value:?}")?;
366+
}
367+
"pipeline" | "pipeline_name" | "pipeline-name" => {
368+
write!(writer, " pipeline-name={value:?}")?;
369+
}
370+
"pipeline_id" | "pipeline-id" => {
371+
write!(writer, " pipeline-id={value:?}")?;
372+
}
373+
"tenant" => {
374+
write!(writer, " tenant={value:?}")?;
375+
}
376+
_ => {}
377+
}
378+
Ok(())
379+
})
380+
}

crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -302,8 +302,10 @@ pub(crate) async fn post_pipeline_input_connector_action(
302302
// Log only if the response indicates success
303303
if response.status() == StatusCode::OK {
304304
info!(
305-
"Connector action: {verb} pipeline '{pipeline_name}' on table '{table_name}' on connector '{connector_name}' (tenant: {})",
306-
*tenant_id
305+
pipeline = %pipeline_name,
306+
pipeline_id = "N/A",
307+
tenant = %tenant_id.0,
308+
"Connector action: {verb} on table '{table_name}' on connector '{connector_name}'"
307309
);
308310
}
309311
Ok(response)
@@ -1334,8 +1336,10 @@ pub(crate) async fn post_pipeline_pause(
13341336

13351337
if response.status() == StatusCode::ACCEPTED {
13361338
info!(
1337-
"Accepted action: pausing pipeline {pipeline_name:?} (tenant: {})", // {pipeline_id}
1338-
*tenant_id
1339+
pipeline = %pipeline_name,
1340+
pipeline_id = "N/A",
1341+
tenant = %tenant_id.0,
1342+
"Accepted action: pausing pipeline"
13391343
);
13401344
}
13411345
Ok(response)
@@ -1396,8 +1400,10 @@ pub(crate) async fn post_pipeline_resume(
13961400

13971401
if response.status() == StatusCode::ACCEPTED {
13981402
info!(
1399-
"Accepted action: resuming pipeline {pipeline_name:?} (tenant: {})", // {pipeline_id}
1400-
*tenant_id
1403+
pipeline = %pipeline_name,
1404+
pipeline_id = "N/A",
1405+
tenant = %tenant_id.0,
1406+
"Accepted action: resuming pipeline"
14011407
);
14021408
}
14031409
Ok(response)
@@ -1482,8 +1488,10 @@ pub(crate) async fn post_pipeline_activate(
14821488

14831489
if response.status() == StatusCode::ACCEPTED {
14841490
info!(
1485-
"Accepted action: activating pipeline {pipeline_name:?} (tenant: {})", // {pipeline_id}
1486-
*tenant_id
1491+
pipeline = %pipeline_name,
1492+
pipeline_id = "N/A",
1493+
tenant = %tenant_id.0,
1494+
"Accepted action: activating pipeline"
14871495
);
14881496
}
14891497
Ok(response)
@@ -1557,8 +1565,10 @@ pub(crate) async fn post_pipeline_approve(
15571565

15581566
if response.status() == StatusCode::ACCEPTED {
15591567
info!(
1560-
"Accepted action: approved pipeline bootstrapping (tenant: {})", // {pipeline_id}
1561-
*tenant_id
1568+
pipeline = %pipeline_name,
1569+
pipeline_id = "N/A",
1570+
tenant = %tenant_id.0,
1571+
"Accepted action: approved pipeline bootstrapping"
15621572
);
15631573
}
15641574
Ok(response)

crates/pipeline-manager/src/api/endpoints/pipeline_management.rs

Lines changed: 48 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -804,8 +804,9 @@ async fn fetch_connector_error_stats(
804804
// Check status code - quietly ignore 404 (endpoint not available on older pipelines)
805805
if response.status() == actix_web::http::StatusCode::NOT_FOUND {
806806
debug!(
807-
"Pipeline '{}' does not support /stats/errors endpoint (404), skipping error stats",
808-
pipeline_name
807+
pipeline = %pipeline_name,
808+
pipeline_id = "N/A",
809+
"Pipeline does not support /stats/errors endpoint (404), skipping error stats"
809810
);
810811
return None;
811812
}
@@ -817,8 +818,9 @@ async fn fetch_connector_error_stats(
817818
Ok(response) => response,
818819
Err(e) => {
819820
error!(
820-
"Failed to deserialize pipeline stats response for '{}': {}",
821-
pipeline_name, e
821+
pipeline = %pipeline_name,
822+
pipeline_id = "N/A",
823+
"Failed to deserialize pipeline stats response: {e}"
822824
);
823825
return None;
824826
}
@@ -954,7 +956,6 @@ pub(crate) async fn post_pipeline(
954956
body: web::Json<PostPutPipelineInternal>,
955957
) -> Result<HttpResponse, ManagerError> {
956958
let pipeline_descr: PipelineDescr = body.into_inner().into();
957-
let name = pipeline_descr.name.clone();
958959
let pipeline = state
959960
.db
960961
.lock()
@@ -971,9 +972,8 @@ pub(crate) async fn post_pipeline(
971972
info!(
972973
pipeline = %returned_pipeline.name,
973974
pipeline_id = %returned_pipeline.id,
974-
"Created pipeline {name:?} ({}) (tenant: {})",
975-
returned_pipeline.id,
976-
*tenant_id
975+
tenant = %tenant_id.0,
976+
"Created pipeline"
977977
);
978978
Ok(HttpResponse::Created()
979979
.insert_header(CacheControl(vec![CacheDirective::NoCache]))
@@ -1044,9 +1044,8 @@ pub(crate) async fn put_pipeline(
10441044
info!(
10451045
pipeline = %returned_pipeline.name,
10461046
pipeline_id = %returned_pipeline.id,
1047-
"Created pipeline {pipeline_name:?} ({}) (tenant: {})",
1048-
returned_pipeline.id,
1049-
*tenant_id
1047+
tenant = %tenant_id.0,
1048+
"Created pipeline"
10501049
);
10511050
Ok(HttpResponse::Created()
10521051
.insert_header(CacheControl(vec![CacheDirective::NoCache]))
@@ -1055,10 +1054,9 @@ pub(crate) async fn put_pipeline(
10551054
info!(
10561055
pipeline = %returned_pipeline.name,
10571056
pipeline_id = %returned_pipeline.id,
1058-
"Fully updated pipeline {pipeline_name:?} ({}) to version {} (tenant: {})",
1059-
returned_pipeline.id,
1060-
returned_pipeline.version,
1061-
*tenant_id
1057+
tenant = %tenant_id.0,
1058+
version = %returned_pipeline.version,
1059+
"Fully updated pipeline"
10621060
);
10631061
Ok(HttpResponse::Ok()
10641062
.insert_header(CacheControl(vec![CacheDirective::NoCache]))
@@ -1131,8 +1129,12 @@ pub(crate) async fn patch_pipeline(
11311129
let returned_pipeline = PipelineInfoInternal::new(pipeline);
11321130

11331131
info!(
1134-
"Partially updated pipeline {pipeline_name:?} ({}) to version {} (tenant: {})",
1135-
returned_pipeline.id, returned_pipeline.version, *tenant_id
1132+
pipeline = %returned_pipeline.name,
1133+
pipeline_id = %returned_pipeline.id,
1134+
tenant = %tenant_id.0,
1135+
version = %returned_pipeline.version,
1136+
"Partially updated pipeline to version {}",
1137+
returned_pipeline.version
11361138
);
11371139
Ok(HttpResponse::Ok()
11381140
.insert_header(CacheControl(vec![CacheDirective::NoCache]))
@@ -1216,8 +1218,12 @@ pub(crate) async fn post_update_runtime(
12161218
let returned_pipeline = PipelineInfoInternal::new(pipeline);
12171219

12181220
info!(
1219-
"Updated pipeline {pipeline_name:?} ({}) platform_version to {} (tenant: {})",
1220-
returned_pipeline.id, returned_pipeline.platform_version, *tenant_id
1221+
pipeline = %returned_pipeline.name,
1222+
pipeline_id = %returned_pipeline.id,
1223+
tenant = %tenant_id.0,
1224+
platform_version = %returned_pipeline.platform_version,
1225+
"Updated pipeline platform_version to {}",
1226+
returned_pipeline.platform_version
12211227
);
12221228
Ok(HttpResponse::Ok()
12231229
.insert_header(CacheControl(vec![CacheDirective::NoCache]))
@@ -1265,9 +1271,8 @@ pub(crate) async fn delete_pipeline(
12651271
info!(
12661272
pipeline = %pipeline_name,
12671273
pipeline_id = %pipeline_id,
1268-
"Deleted pipeline {pipeline_name:?} ({}) (tenant: {})",
1269-
pipeline_id,
1270-
*tenant_id
1274+
tenant = %tenant_id.0,
1275+
"Deleted pipeline"
12711276
);
12721277
Ok(HttpResponse::Ok().finish())
12731278
}
@@ -1368,8 +1373,11 @@ pub(crate) async fn post_pipeline_start(
13681373
};
13691374

13701375
info!(
1371-
"Accepted action: going to start pipeline {pipeline_name:?} ({pipeline_id}) as {} (tenant: {})",
1372-
initial, *tenant_id
1376+
pipeline_id = %pipeline_id,
1377+
pipeline = %pipeline_name,
1378+
tenant = %tenant_id.0,
1379+
"Accepted action: going to start pipeline as {}",
1380+
initial
13731381
);
13741382
Ok(HttpResponse::Accepted().json(json!("Pipeline is starting")))
13751383
}
@@ -1455,8 +1463,10 @@ pub(crate) async fn post_pipeline_stop(
14551463
.set_deployment_resources_desired_status_stopped(*tenant_id, &pipeline_name)
14561464
.await?;
14571465
info!(
1458-
"Accepted action: going to forcefully stop pipeline {pipeline_name:?} ({pipeline_id}) (tenant: {})",
1459-
*tenant_id
1466+
pipeline_id = %pipeline_id,
1467+
pipeline = %pipeline_name,
1468+
tenant = %tenant_id.0,
1469+
"Accepted action: going to forcefully stop pipeline"
14601470
);
14611471
Ok(HttpResponse::Accepted().json(json!("Pipeline is forcefully stopping")))
14621472
} else {
@@ -1479,8 +1489,10 @@ pub(crate) async fn post_pipeline_stop(
14791489
.await?;
14801490
if was_set {
14811491
info!(
1482-
"Accepted action: going to forcefully stop pipeline {pipeline_name:?} ({pipeline_id}) (tenant: {}) because it is not provisioned",
1483-
*tenant_id
1492+
pipeline_id = %pipeline_id,
1493+
pipeline = %pipeline_name,
1494+
tenant = %tenant_id.0,
1495+
"Accepted action: going to forcefully stop pipeline because it is not provisioned"
14841496
);
14851497
Ok(HttpResponse::Accepted().json(json!("Pipeline is forcefully stopping")))
14861498
} else {
@@ -1507,8 +1519,10 @@ pub(crate) async fn post_pipeline_stop(
15071519
.is_ok_and(|v| v.status() == actix_web::http::StatusCode::ACCEPTED)
15081520
{
15091521
info!(
1510-
"Accepted action: going to non-forcefully stop pipeline {pipeline_name:?} ({pipeline_id}) (tenant: {})",
1511-
*tenant_id
1522+
pipeline = %pipeline_name,
1523+
pipeline_id = %pipeline_id,
1524+
tenant = %tenant_id.0,
1525+
"Accepted action: going to non-forcefully stop pipeline"
15121526
);
15131527
}
15141528
response
@@ -1566,8 +1580,10 @@ pub(crate) async fn post_pipeline_clear(
15661580
.await?;
15671581

15681582
info!(
1569-
"Accepted storage action: going to clear storage of pipeline {pipeline_name:?} ({pipeline_id}) (tenant: {})",
1570-
*tenant_id
1583+
pipeline_id = %pipeline_id,
1584+
pipeline = %pipeline_name,
1585+
tenant = %tenant_id.0,
1586+
"Accepted storage action: going to clear storage of pipeline"
15711587
);
15721588
Ok(HttpResponse::Accepted().json(json!("Pipeline storage is being cleared")))
15731589
}

crates/pipeline-manager/src/api/support_data_collector.rs

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1037,8 +1037,9 @@ impl SupportDataCollector {
10371037

10381038
self.schedule.insert(next_collection, entry);
10391039
debug!(
1040-
"Added support data collection for pipeline {} to schedule.",
1041-
pipeline_id
1040+
pipeline_id = %pipeline_id,
1041+
pipeline = "N/A",
1042+
"Added support data collection to schedule"
10421043
);
10431044
}
10441045
}
@@ -1080,8 +1081,9 @@ impl SupportDataCollector {
10801081
}
10811082
Err(e) => {
10821083
error!(
1083-
"Failed to collect support data for pipeline {}: {}",
1084-
entry.pipeline_id, e
1084+
pipeline_id = %entry.pipeline_id,
1085+
pipeline = "N/A",
1086+
"Failed to collect support data: {e}"
10851087
);
10861088

10871089
// Try again in a minute.
@@ -1117,13 +1119,18 @@ impl SupportDataCollector {
11171119
Ok(pipeline) => pipeline,
11181120
Err(DBError::UnknownPipeline { .. }) => {
11191121
debug!(
1120-
"Removing {} from support data collection schedule (pipeline deleted)",
1121-
entry.pipeline_id
1122+
pipeline_id = %entry.pipeline_id,
1123+
pipeline = "N/A",
1124+
"Removing from support data collection schedule (pipeline deleted)"
11221125
);
11231126
return Ok(PostCollectionAction::Remove);
11241127
}
11251128
Err(e) => {
1126-
error!("Failed to get pipeline {}: {}", entry.pipeline_id, e);
1129+
error!(
1130+
pipeline_id = %entry.pipeline_id,
1131+
pipeline = "N/A",
1132+
"Failed to get pipeline: {e}"
1133+
);
11271134
return Err(e.into());
11281135
}
11291136
};
@@ -1135,8 +1142,10 @@ impl SupportDataCollector {
11351142
);
11361143
if combined_status != CombinedStatus::Running {
11371144
debug!(
1138-
"Removing {} from support data collection schedule (status change to: {:?})",
1139-
entry.pipeline_id, combined_status
1145+
pipeline_id = %pipeline.id,
1146+
pipeline = %pipeline.name,
1147+
"Removing from support data collection schedule (status change to: {:?})",
1148+
combined_status
11401149
);
11411150
return Ok(PostCollectionAction::Remove);
11421151
}
@@ -1149,14 +1158,16 @@ impl SupportDataCollector {
11491158
.await?;
11501159
if let Err(err) = self.cleanup_old_collections(entry.pipeline_id).await {
11511160
error!(
1152-
"Failed to cleanup old support data collections for pipeline {}: {}",
1153-
entry.pipeline_id, err
1161+
pipeline_id = %pipeline.id,
1162+
pipeline = %pipeline.name,
1163+
"Failed to cleanup old support data collections: {err}"
11541164
);
11551165
}
11561166

11571167
debug!(
1158-
"Collected support data for pipeline {}, reschedule",
1159-
entry.pipeline_id
1168+
pipeline_id = %pipeline.id,
1169+
pipeline = %pipeline.name,
1170+
"Collected support data, reschedule"
11601171
);
11611172

11621173
Ok(PostCollectionAction::Reschedule)
@@ -1210,8 +1221,10 @@ impl SupportDataCollector {
12101221
txn.commit().await?;
12111222
if r != 0 {
12121223
debug!(
1213-
"Cleaned up {} old support data collection(s) for pipeline {}",
1214-
r, pipeline_id
1224+
pipeline_id = %pipeline_id,
1225+
pipeline = "N/A",
1226+
"Cleaned up {} old support data collection(s)",
1227+
r
12151228
);
12161229
}
12171230
Ok(())

0 commit comments

Comments
 (0)