Skip to content

Commit da6e8ce

Browse files
Igor Smolyarigorscs
authored andcommitted
[json-log] add JSON logging docs and fix format
- Align docs with identity fields and examples, including pipeline-specific tagging and clarified schema/notes. - Add pipeline feldera-service tag to pipeline JSON logs - Remove structured tenant_id fields from manager logs to keep payload minimal. - Prepare for cloud kubernetes-runner json support
1 parent da1ccec commit da6e8ce

File tree

7 files changed

+131
-18
lines changed

7 files changed

+131
-18
lines changed

crates/feldera-observability/src/json_logging.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ pub enum ServiceName {
1515
Runner,
1616
CompilerServer,
1717
ControlPlane,
18+
KubernetesRunner,
19+
Pipeline,
1820
}
1921

2022
impl ServiceName {
@@ -24,6 +26,8 @@ impl ServiceName {
2426
ServiceName::Runner => "runner",
2527
ServiceName::CompilerServer => "compiler-server",
2628
ServiceName::ControlPlane => "control-plane",
29+
ServiceName::KubernetesRunner => "kubernetes-runner",
30+
ServiceName::Pipeline => "pipeline",
2731
}
2832
}
2933
}
@@ -86,7 +90,7 @@ where
8690
LogIdentity::Service { service_name } => {
8791
service_name.map(|service| service.as_str().to_string())
8892
}
89-
LogIdentity::Pipeline { .. } => None,
93+
LogIdentity::Pipeline { .. } => Some(ServiceName::Pipeline.as_str().to_string()),
9094
};
9195

9296
// Allow structured fields to override the defaults.
@@ -114,6 +118,12 @@ where
114118
&& feldera_service.as_deref() != Some("compiler-server")
115119
{
116120
feldera_service = Some("compiler-server".to_string());
121+
} else if metadata
122+
.target()
123+
.starts_with("cluster_control_plane::kubernetes_runner")
124+
&& feldera_service.as_deref() != Some("kubernetes-runner")
125+
{
126+
feldera_service = Some("kubernetes-runner".to_string());
117127
}
118128
let mut obj = Map::new();
119129
obj.insert("timestamp".to_string(), Value::String(now_timestamp()));

crates/pipeline-manager/src/api/endpoints/pipeline_management.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -971,7 +971,6 @@ pub(crate) async fn post_pipeline(
971971
info!(
972972
pipeline = %returned_pipeline.name,
973973
pipeline_id = %returned_pipeline.id,
974-
tenant_id = %tenant_id.0,
975974
"Created pipeline {name:?} ({}) (tenant: {})",
976975
returned_pipeline.id,
977976
*tenant_id
@@ -1045,7 +1044,6 @@ pub(crate) async fn put_pipeline(
10451044
info!(
10461045
pipeline = %returned_pipeline.name,
10471046
pipeline_id = %returned_pipeline.id,
1048-
tenant_id = %tenant_id.0,
10491047
"Created pipeline {pipeline_name:?} ({}) (tenant: {})",
10501048
returned_pipeline.id,
10511049
*tenant_id
@@ -1057,7 +1055,6 @@ pub(crate) async fn put_pipeline(
10571055
info!(
10581056
pipeline = %returned_pipeline.name,
10591057
pipeline_id = %returned_pipeline.id,
1060-
tenant_id = %tenant_id.0,
10611058
"Fully updated pipeline {pipeline_name:?} ({}) to version {} (tenant: {})",
10621059
returned_pipeline.id,
10631060
returned_pipeline.version,
@@ -1268,7 +1265,6 @@ pub(crate) async fn delete_pipeline(
12681265
info!(
12691266
pipeline = %pipeline_name,
12701267
pipeline_id = %pipeline_id,
1271-
tenant_id = %tenant_id.0,
12721268
"Deleted pipeline {pipeline_name:?} ({}) (tenant: {})",
12731269
pipeline_id,
12741270
*tenant_id

crates/pipeline-manager/src/compiler/rust_compiler.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -895,7 +895,7 @@ pub async fn perform_rust_compilation(
895895
})?;
896896
let runtime_selector = program_config.runtime_version();
897897
assert!(has_unstable_feature("runtime_version") || runtime_selector.is_platform());
898-
let pipeline_name_for_log = pipeline_name.clone().unwrap_or_default();
898+
let pipeline_name_for_log = pipeline_name.clone().unwrap_or_else(|| "N/A".to_string());
899899
info!(
900900
pipeline_id = %pipeline_id,
901901
pipeline = pipeline_name_for_log.as_str(),
@@ -1550,7 +1550,7 @@ async fn call_compiler(
15501550
// Create pipeline-binaries directory if it does not yet exist
15511551
let pipeline_binaries_dir = workspace_dir.join("pipeline-binaries");
15521552
create_dir_if_not_exists(&pipeline_binaries_dir).await?;
1553-
let pipeline_name_for_log = pipeline_name.unwrap_or_default();
1553+
let pipeline_name_for_log = pipeline_name.unwrap_or_else(|| "N/A".to_string());
15541554

15551555
// Create file where stdout will be written to
15561556
let stdout_file_path = pipeline_main_crate_dir.join("stdout.log");

crates/pipeline-manager/src/compiler/sql_compiler.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -503,10 +503,10 @@ pub(crate) async fn perform_sql_compilation(
503503

504504
let runtime_selector = program_config.runtime_version();
505505
assert!(has_unstable_feature("runtime_version") || runtime_selector.is_platform());
506-
let pipeline_name = pipeline_name.as_deref();
506+
let pipeline_name = pipeline_name.as_deref().unwrap_or("N/A");
507507
info!(
508508
pipeline_id = %pipeline_id,
509-
pipeline = pipeline_name.unwrap_or(""),
509+
pipeline = pipeline_name,
510510
"SQL compilation started: pipeline {} (program version: {}{})",
511511
pipeline_id,
512512
program_version,
@@ -637,7 +637,7 @@ pub(crate) async fn perform_sql_compilation(
637637
Err(e) => {
638638
error!(
639639
pipeline_id = %pipeline_id,
640-
pipeline = pipeline_name.unwrap_or(""),
640+
pipeline = pipeline_name,
641641
"SQL compilation outdated check failed due to database error: {e}"
642642
)
643643
// As preemption check failing is not fatal, compilation will continue
@@ -697,7 +697,7 @@ pub(crate) async fn perform_sql_compilation(
697697
} else {
698698
error!(
699699
pipeline_id = %pipeline_id,
700-
pipeline = pipeline_name.unwrap_or(""),
700+
pipeline = pipeline_name,
701701
"Unable to parse SQL compiler response after successful compilation, warnings were not passed to client: {}",
702702
stderr_str
703703
);

crates/pipeline-manager/src/runner/pipeline_automata.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ impl<T: PipelineExecutor> PipelineAutomaton<T> {
219219
pub async fn run(mut self) {
220220
let pipeline_id = self.pipeline_id;
221221
debug!(
222-
pipeline = self.pipeline_name.as_deref().unwrap_or(""),
222+
pipeline = self.pipeline_name.as_deref().unwrap_or("N/A"),
223223
pipeline_id = %pipeline_id,
224224
"Automaton started: pipeline {pipeline_id}"
225225
);
@@ -252,7 +252,7 @@ impl<T: PipelineExecutor> PipelineAutomaton<T> {
252252
// Pipeline deletions should not lead to errors in the logs.
253253
DBError::UnknownPipeline { pipeline_id } => {
254254
info!(
255-
pipeline = self.pipeline_name.as_deref().unwrap_or(""),
255+
pipeline = self.pipeline_name.as_deref().unwrap_or("N/A"),
256256
pipeline_id = %pipeline_id,
257257
"Automaton ended: pipeline {pipeline_id}"
258258
);
@@ -452,7 +452,7 @@ impl<T: PipelineExecutor> PipelineAutomaton<T> {
452452
// and as such will cause a database error to bubble up if it does not.
453453
info!(
454454
pipeline_id = %self.pipeline_id,
455-
pipeline = self.pipeline_name.as_deref().unwrap_or(""),
455+
pipeline = self.pipeline_name.as_deref().unwrap_or("N/A"),
456456
"Pipeline automaton {}: version initially intended to be started ({}) is outdated by latest ({})",
457457
self.pipeline_id,
458458
outdated_version,
@@ -462,7 +462,7 @@ impl<T: PipelineExecutor> PipelineAutomaton<T> {
462462
{
463463
error!(
464464
pipeline_id = %self.pipeline_id,
465-
pipeline = self.pipeline_name.as_deref().unwrap_or(""),
465+
pipeline = self.pipeline_name.as_deref().unwrap_or("N/A"),
466466
"Outdated pipeline version occurred when transitioning from {} to Provisioning (not Stopped)",
467467
pipeline.deployment_resources_status
468468
);
@@ -1276,7 +1276,7 @@ impl<T: PipelineExecutor> PipelineAutomaton<T> {
12761276
_ => {
12771277
warn!(
12781278
pipeline_id = %pipeline_id,
1279-
pipeline = self.pipeline_name.as_deref().unwrap_or(""),
1279+
pipeline = self.pipeline_name.as_deref().unwrap_or("N/A"),
12801280
"Pipeline {pipeline_id} status is unavailable because the endpoint responded with 503 Service Unavailable:\n{error_response:?}"
12811281
);
12821282
Ok(ExtendedRuntimeStatus {
@@ -1304,7 +1304,7 @@ impl<T: PipelineExecutor> PipelineAutomaton<T> {
13041304
});
13051305
error!(
13061306
pipeline_id = %pipeline_id,
1307-
pipeline = self.pipeline_name.as_deref().unwrap_or(""),
1307+
pipeline = self.pipeline_name.as_deref().unwrap_or("N/A"),
13081308
"Pipeline {pipeline_id} has fatal runtime error and will be stopped. Error:\n{error_response:?}"
13091309
);
13101310
Err(error_response)
@@ -1323,7 +1323,7 @@ impl<T: PipelineExecutor> PipelineAutomaton<T> {
13231323
} else {
13241324
warn!(
13251325
pipeline_id = %pipeline_id,
1326-
pipeline = self.pipeline_name.as_deref().unwrap_or(""),
1326+
pipeline = self.pipeline_name.as_deref().unwrap_or("N/A"),
13271327
"Pipeline {pipeline_id} status is unavailable because the endpoint could not be reached due to: {e}"
13281328
);
13291329
Ok(ExtendedRuntimeStatus {
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
---
2+
title: JSON logging
3+
sidebar_position: 25
4+
---
5+
6+
# Logging (JSON)
7+
8+
This page documents the structured JSON log format emitted when the `FELDERA_LOG_JSON` environment variable is set (for example, `FELDERA_LOG_JSON=1`). The default pretty text logs are unchanged. Logs come from two sources:
9+
10+
- Control plane components (manager, runner, compiler-server, kubernetes-runner, control-plane): these carry `feldera-service` set to the component name.
11+
- Pipelines (the dataflow processes): these carry `feldera-service: "pipeline"` and `pipeline-name`/`pipeline-id`. Control plane logs that are about a specific pipeline also include the pipeline identifiers.
12+
13+
## Identity fields
14+
15+
These identity fields are lifted alongside the standard top-level metadata (`timestamp`, `level`, `target`):
16+
17+
| Field | Meaning |
18+
| ------------------ | -------------------------------------------------------------------------------- |
19+
| `feldera-service` | Source: `manager` \| `runner` \| `compiler-server` \| `kubernetes-runner` \| `control-plane` \| `pipeline` (auto-tagged by module path). This identifies which Feldera component produced the log. |
20+
| `pipeline-name` | Human-friendly pipeline name when available; if it is not immediately available it is set to `N/A`. Present for pipeline events. Control plane events tied to a specific pipeline also include this name. |
21+
| `pipeline-id` | Pipeline UUID when the event relates to a specific pipeline. Present for pipeline events. Control plane events tied to a specific pipeline also include this ID. |
22+
23+
## JSON object members (spec)
24+
25+
Each log entry is a JSON object whose members are:
26+
27+
- `timestamp` (required): string value, UTC with microsecond precision (e.g. `2025-12-06T02:08:14.902292Z`).
28+
- `level` (required): string value, one of `TRACE` \| `DEBUG` \| `INFO` \| `WARN` \| `ERROR`.
29+
- `target` (required): string value, Rust module path of the log source.
30+
- `fields` (required): object value containing the event payload (one of `message` or `line`).
31+
- `feldera-service` (optional): string value, present for control plane events and for pipelines (as `pipeline`).
32+
- `pipeline-name` (optional): string value, present when the event is tied to a pipeline.
33+
- `pipeline-id` (optional): string value, present when the event is tied to a pipeline.
34+
35+
> Practical rule: every log line has `timestamp`, `level`, `target`, and `fields`. Control plane logs add `feldera-service`; pipeline-related logs add `feldera-service: "pipeline"` plus `pipeline-name` and `pipeline-id`.
36+
37+
## Examples
38+
39+
Manager pipeline lifecycle:
40+
41+
```json
42+
{
43+
"timestamp": "2025-12-05T18:54:07.231095Z",
44+
"level": "INFO",
45+
"target": "pipeline_manager::api::endpoints::pipeline_management",
46+
"feldera-service": "manager",
47+
"pipeline-name": "MyPipeline",
48+
"pipeline-id": "019af011-5282-7751-98c2-f61478d0df63",
49+
"fields": {
50+
"message": "Created pipeline \"MyPipeline\" (019af011-5282-7751-98c2-f61478d0df63) (tenant: 00000000-0000-0000-0000-000000000000)"
51+
}
52+
}
53+
```
54+
55+
Runner log stream starting up:
56+
57+
```json
58+
{
59+
"timestamp": "2025-12-06T02:08:14.902292Z",
60+
"level": "INFO",
61+
"target": "pipeline_manager::runner::pipeline_logs",
62+
"feldera-service": "runner",
63+
"pipeline-name": "N/A",
64+
"pipeline-id": "019af16a-ba26-7933-a6e5-65d9d717cb7a",
65+
"fields": { "line": "Fresh start of pipeline logs" }
66+
}
67+
```
68+
69+
Compiler server log:
70+
71+
```json
72+
{
73+
"timestamp": "2025-12-05T18:01:22.996459Z",
74+
"level": "INFO",
75+
"target": "pipeline_manager::compiler::sql_compiler",
76+
"feldera-service": "compiler-server",
77+
"pipeline-name": "MyPipeline",
78+
"pipeline-id": "019aefac-fc78-75b0-9089-0f7496f8ac1f",
79+
"fields": { "message": "SQL compilation started: pipeline 019aefac-fc78-75b0-9089-0f7496f8ac1f (program version: 1)" }
80+
}
81+
```
82+
83+
Pipeline log:
84+
85+
```json
86+
{
87+
"timestamp": "2025-12-09T21:26:17.362514Z",
88+
"level": "INFO",
89+
"target": "dbsp_adapters::server",
90+
"feldera-service": "pipeline",
91+
"pipeline-name": "MyPipeline",
92+
"pipeline-id": "019af16a-ba26-7933-a6e5-65d9d717cb7a",
93+
"fields": { "message": "Pipeline initialization complete" }
94+
}
95+
```
96+
97+
## Example: enabling JSON
98+
99+
```bash
100+
FELDERA_LOG_JSON=1 cargo run --bin=pipeline-manager
101+
```
102+
103+
## Notes
104+
105+
- Plain-text logging remains the default; JSON is opt-in via `FELDERA_LOG_JSON`.
106+
- The event payload lives under `fields` (`message` or `line`); identity fields (`feldera-service`, `pipeline-name`, `pipeline-id`) are at the top level.

docs.feldera.com/docs/sidebars.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,7 @@ const operations = {
459459
'operations/guide',
460460
'operations/memory',
461461
'operations/metrics',
462+
'operations/json-logging',
462463
]
463464
};
464465

0 commit comments

Comments
 (0)