Skip to content

Commit 911e85f

Browse files
committed
pipeline-manager: info log level for feldera crates and warn for others
- By default, the Feldera crates have info log level, other crates have warn log level - API request log level is reduced from debug to trace - Migrations log level is reduced from info to debug if no migrations were applied - Compiler reconciliation log level is reduced from info to debug - Instructions on how to configure log levels is added to CONTRIBUTING Signed-off-by: Simon Kassing <simon.kassing@feldera.com>
1 parent dd4f2fe commit 911e85f

6 files changed

Lines changed: 79 additions & 41 deletions

File tree

CONTRIBUTING.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,17 @@ Here are some guidelines when contributing code that affects this database's sch
230230
* Do not modify an existing migration file. If you want to evolve the schema, add a new SQL or rust file to the migrations folder following [refinery's versioning and naming scheme](https://docs.rs/refinery/latest/refinery/#usage). The migration script should update an existing schema as opposed to assuming a clean slate. For example, use `ALTER TABLE` to add a new column to an existing table and fill that column for existing rows with the appropriate defaults.
231231
* If you add a new migration script `V{i}`, add tests for migrations from `V{i-1}` to `V{i}`. For example, add tests that invoke the pipeline manager APIs before and after the migration.
232232

233+
## Logging
234+
235+
By default, the pipeline-manager and pipelines create an `env_logger` which logs
236+
the Feldera crates at INFO level and all other crates at WARN level.
237+
This can be overridden by setting the `RUST_LOG` environment variable.
238+
For example, the following would be the same as the default with additionally
239+
backtrace enabled:
240+
241+
```bash
242+
RUST_BACKTRACE=1 RUST_LOG=warn,pipeline_manager=info,pipeline_types=info,project=info,dbsp=info,dbsp_adapters=info,dbsp_nexmark=info cargo run --package=pipeline-manager --features pg-embed --bin pipeline-manager -- --dev-mode
243+
```
233244

234245
## Release process
235246

crates/adapters/src/server/mod.rs

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use dbsp::circuit::CircuitConfig;
2424
use dbsp::operator::sample::MAX_QUANTILES;
2525
use env_logger::Env;
2626
use futures_util::FutureExt;
27-
use log::{debug, error, info, log, warn, Level};
27+
use log::{debug, error, info, log, trace, warn, Level};
2828
use minitrace::collector::Config;
2929
use pipeline_types::{format::json::JsonFlavor, transport::http::EgressMode};
3030
use pipeline_types::{query::OutputQuery, transport::http::SERVER_PORT_FILE};
@@ -223,7 +223,7 @@ where
223223
let state = state.clone();
224224
build_app(
225225
App::new().wrap_fn(|req, srv| {
226-
debug!("Request: {} {}", req.method(), req.path());
226+
trace!("Request: {} {}", req.method(), req.path());
227227
srv.call(req).map(|res| {
228228
match &res {
229229
Ok(response) => {
@@ -396,22 +396,32 @@ where
396396

397397
// Create env logger.
398398
let pipeline_name = format!("[{}]", config.name.clone().unwrap_or_default()).cyan();
399-
env_logger::Builder::from_env(Env::default().default_filter_or("info"))
400-
.format(move |buf, record| {
401-
let t = chrono::Utc::now();
402-
let t = format!("{}", t.format("%Y-%m-%d %H:%M:%S"));
403-
writeln!(
404-
buf,
405-
"{t} {} {pipeline_name} {}",
406-
buf.default_styled_level(record.level()),
407-
record.args()
408-
)
409-
})
410-
.try_init()
411-
.unwrap_or_else(|e| {
412-
// This happens in unit tests when another test has initialized logging.
413-
eprintln!("Failed to initialize logging: {e}.")
414-
});
399+
// By default, logging is set to INFO level for the Feldera crates:
400+
// - "project" for the generated project<uuid> crate
401+
// - "dbsp" for the dbsp crate
402+
// - "dbsp_adapters" for the adapters crate which is renamed
403+
// - "dbsp_nexmark" for the nexmark crate which is renamed
404+
// - "pipeline_types" for the pipeline-types crate
405+
// For all others, the WARN level is used.
406+
// Note that this can be overridden by setting the RUST_LOG environment variable.
407+
env_logger::Builder::from_env(Env::default().default_filter_or(
408+
"warn,project=info,dbsp=info,dbsp_adapters=info,dbsp_nexmark=info,pipeline_types=info",
409+
))
410+
.format(move |buf, record| {
411+
let t = chrono::Utc::now();
412+
let t = format!("{}", t.format("%Y-%m-%d %H:%M:%S"));
413+
writeln!(
414+
buf,
415+
"{t} {} {pipeline_name} {}",
416+
buf.default_styled_level(record.level()),
417+
record.args()
418+
)
419+
})
420+
.try_init()
421+
.unwrap_or_else(|e| {
422+
// This happens in unit tests when another test has initialized logging.
423+
eprintln!("Failed to initialize logging: {e}.")
424+
});
415425
let _ = loginit_sender.send(());
416426

417427
if config.global.tracing {
@@ -464,7 +474,7 @@ where
464474
);
465475
*state.controller.lock().unwrap() = Some(controller);
466476

467-
info!("Pipeline initialization complete.");
477+
info!("Pipeline initialization complete");
468478
*state.phase.write().unwrap() = PipelinePhase::InitializationComplete;
469479

470480
Ok(())

crates/pipeline_manager/src/api/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use actix_web_httpauth::middleware::HttpAuthentication;
4343
use actix_web_static_files::ResourceFiles;
4444
use anyhow::{Error as AnyError, Result as AnyResult};
4545
use futures_util::FutureExt;
46-
use log::{debug, error, info, log, Level};
46+
use log::{error, info, log, trace, Level};
4747
use std::{env, net::TcpListener, sync::Arc};
4848
use tokio::sync::Mutex;
4949
use utoipa::openapi::security::{HttpAuthScheme, HttpBuilder, SecurityScheme};
@@ -395,7 +395,7 @@ pub async fn run(db: Arc<Mutex<StoragePostgres>>, api_config: ApiServerConfig) -
395395
.app_data(auth_configuration.clone())
396396
.app_data(client)
397397
.wrap_fn(|req, srv| {
398-
debug!("Request: {} {}", req.method(), req.path());
398+
trace!("Request: {} {}", req.method(), req.path());
399399
srv.call(req).map(log_response)
400400
})
401401
.wrap(api_config.cors())
@@ -411,7 +411,7 @@ pub async fn run(db: Arc<Mutex<StoragePostgres>>, api_config: ApiServerConfig) -
411411
.app_data(state.clone())
412412
.app_data(client)
413413
.wrap_fn(|req, srv| {
414-
debug!("Request: {} {}", req.method(), req.path());
414+
trace!("Request: {} {}", req.method(), req.path());
415415
srv.call(req).map(log_response)
416416
})
417417
.wrap(api_config.cors())
@@ -437,7 +437,7 @@ pub async fn run(db: Arc<Mutex<StoragePostgres>>, api_config: ApiServerConfig) -
437437
██ ██ ██ ██ ██ ██ ██ ██ ██ ██
438438
██ ███████ ███████ ██████ ███████ ██ ██ ██ ██
439439
440-
Web UI URL: {}
440+
Web console URL: {}
441441
API server URL: {}
442442
Documentation: https://www.feldera.com/docs/
443443
Version: {}

crates/pipeline_manager/src/compiler.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,7 @@ inherits = "release"
510510
config: &CompilerConfig,
511511
db: &Arc<Mutex<StoragePostgres>>,
512512
) -> Result<(), DBError> {
513-
info!("Reconciling local state with API state");
513+
debug!("Reconciling local compiler state with API state...");
514514
let mut map: HashSet<(Uuid, i64)> = HashSet::new();
515515
let read_dir = fs::read_dir(config.binaries_dir()).await;
516516
match read_dir {
@@ -598,6 +598,7 @@ inherits = "release"
598598
.await?;
599599
}
600600
}
601+
debug!("Local compiler state has been reconciled with API state");
601602
Ok(())
602603
}
603604

crates/pipeline_manager/src/db/storage_postgres.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::db::types::tenant::TenantId;
1515
use crate::{auth::TenantRecord, config::DatabaseConfig};
1616
use async_trait::async_trait;
1717
use deadpool_postgres::{Manager, Pool, RecyclingMethod};
18-
use log::{debug, info};
18+
use log::{debug, info, log, Level};
1919
use pipeline_types::config::{PipelineConfig, RuntimeConfig};
2020
use pipeline_types::error::ErrorResponse;
2121
use tokio_postgres::NoTls;
@@ -769,11 +769,20 @@ impl StoragePostgres {
769769

770770
/// Run database migrations
771771
pub async fn run_migrations(&self) -> Result<(), DBError> {
772-
info!("Running DB migrations");
772+
debug!("Applying database migrations if needed...");
773773
let mut client = self.pool.get().await?;
774-
embedded::migrations::runner()
774+
let report = embedded::migrations::runner()
775775
.run_async(&mut **client)
776776
.await?;
777+
log!(
778+
if report.applied_migrations().is_empty() {
779+
Level::Debug
780+
} else {
781+
Level::Info
782+
},
783+
"Database migrations finished: {} migrations were applied",
784+
report.applied_migrations().len()
785+
);
777786
let default_tenant = TenantRecord::default();
778787
self.get_or_create_tenant_id(
779788
default_tenant.id.0,

crates/pipeline_manager/src/logging.rs

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,25 @@ use env_logger::Env;
33
use std::io::Write;
44

55
pub fn init_logging(name: ColoredString) {
6-
let _ = env_logger::Builder::from_env(Env::default().default_filter_or("info"))
7-
.format(move |buf, record| {
8-
let t = chrono::Utc::now();
9-
let t = format!("{}", t.format("%Y-%m-%d %H:%M:%S"));
10-
writeln!(
11-
buf,
12-
"{} {} {} {}",
13-
t,
14-
buf.default_styled_level(record.level()),
15-
name,
16-
record.args()
17-
)
18-
})
19-
.try_init();
6+
// By default, logging is set to INFO level for the Feldera crates:
7+
// - "pipeline_manager" for the pipeline_manager crate
8+
// - "pipeline_types" for the pipeline-types crate
9+
// For all others, the WARN level is used.
10+
// Note that this can be overridden by setting the RUST_LOG environment variable.
11+
let _ = env_logger::Builder::from_env(
12+
Env::default().default_filter_or("warn,pipeline_manager=info,pipeline_types=info"),
13+
)
14+
.format(move |buf, record| {
15+
let t = chrono::Utc::now();
16+
let t = format!("{}", t.format("%Y-%m-%d %H:%M:%S"));
17+
writeln!(
18+
buf,
19+
"{} {} {} {}",
20+
t,
21+
buf.default_styled_level(record.level()),
22+
name,
23+
record.args()
24+
)
25+
})
26+
.try_init();
2027
}

0 commit comments

Comments
 (0)