Skip to content

Commit 762d382

Browse files
gzryzhyk
authored andcommitted
Rework pg-embed feature.
Turns out if it's in a once-cell drop doesn't get called if the developer hits ctrl+c which means next time the manager is restarted it won't start again. So we put it back in ProjectDb. Also removed oncecell dependency which means we can simplify feature to just 'pg-embed'.
1 parent 46d176e commit 762d382

File tree

6 files changed

+30
-35
lines changed

6 files changed

+30
-35
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/pipeline_manager/Cargo.toml

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ tokio-postgres = "0.7"
3737
async-trait = "0.1"
3838
# Waiting for https://github.com/faokunega/pg-embed/pull/26
3939
pg-embed = { git = "https://github.com/gz/pg-embed.git", rev = "8906af8", optional = true }
40-
once_cell = { version = "1.17.1", optional = true }
4140

4241
[target.'cfg(unix)'.dependencies]
4342
daemonize = { version = "0.4.1" }
@@ -50,10 +49,7 @@ proptest = "1.0.0"
5049
proptest-derive = "0.3.0"
5150
pretty_assertions = "1.3.0"
5251
# Workaround to enable dev feature during tests: https://github.com/rust-lang/cargo/issues/2911
53-
dbsp_pipeline_manager = { path = ".", features = ["dev"]}
52+
dbsp_pipeline_manager = { path = ".", features = ["pg-embed"]}
5453

5554
[package.metadata.cargo-udeps.ignore]
56-
development = ["dbsp_pipeline_manager"] # false positive from cargo udeps
57-
58-
[features]
59-
dev = ["dep:pg-embed", "dep:once_cell"]
55+
development = ["dbsp_pipeline_manager"] # false positive from cargo udeps

crates/pipeline_manager/src/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ impl ManagerConfig {
228228
}
229229

230230
/// Where Postgres embed stores the database.
231-
#[cfg(feature = "dev")]
231+
#[cfg(feature = "pg-embed")]
232232
pub(crate) fn postgres_embed_data_dir(&self) -> PathBuf {
233233
Path::new(&self.working_directory).join("data")
234234
}

crates/pipeline_manager/src/db/mod.rs

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use utoipa::ToSchema;
1212
#[cfg(test)]
1313
mod test;
1414

15-
#[cfg(any(test, feature = "dev"))]
15+
#[cfg(any(test, feature = "pg-embed"))]
1616
mod pg_setup;
1717
pub(crate) mod storage;
1818

@@ -29,13 +29,13 @@ pub(crate) mod storage;
2929
/// time, which determines the position of the project in the queue.
3030
pub(crate) struct ProjectDB {
3131
conn: Client,
32+
// Used in dev mode for having an embedded Postgres DB live through the
33+
// lifetime of the program.
34+
#[cfg(feature = "pg-embed")]
35+
#[allow(dead_code)] // It has to stay alive until ProjectDB is dropped.
36+
pg_inst: Option<pg_embed::postgres::PgEmbed>,
3237
}
3338

34-
// Used in dev mode for having an embedded Postgres DB live through the lifetime
35-
// of the program.
36-
#[cfg(feature = "dev")]
37-
static PG: once_cell::sync::OnceCell<pg_embed::postgres::PgEmbed> =
38-
once_cell::sync::OnceCell::new();
3939

4040
/// Unique project id.
4141
#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize, ToSchema)]
@@ -1021,18 +1021,16 @@ impl ProjectDB {
10211021
let connection_str = config.database_connection_string();
10221022
let initial_sql = &config.initial_sql;
10231023

1024-
#[cfg(feature = "dev")]
1025-
let connection_str = if connection_str.starts_with("postgres-embed") {
1024+
#[cfg(feature = "pg-embed")]
1025+
if connection_str.starts_with("postgres-embed") {
10261026
let database_dir = config.postgres_embed_data_dir();
1027-
let pg = pg_setup::install(database_dir, true, Some(8082)).await?;
1028-
let connection_string = pg.db_uri.to_string();
1029-
let _ = PG.set(pg);
1030-
connection_string
1031-
} else {
1032-
connection_str
1027+
let pg_inst = pg_setup::install(database_dir, true, Some(8082)).await?;
1028+
let connection_string = pg_inst.db_uri.to_string();
1029+
return Self::connect_inner(connection_string.as_str(), initial_sql, Some(pg_inst)).await
10331030
};
10341031

1035-
Self::connect_inner(connection_str.as_str(), initial_sql).await
1032+
Self::connect_inner(connection_str.as_str(), initial_sql, #[cfg(feature = "pg-embed")] None).await
1033+
10361034
}
10371035

10381036
/// Connect to the project database.
@@ -1046,7 +1044,7 @@ impl ProjectDB {
10461044
/// - `is_persistent`: Whether the embedded postgres database should be
10471045
/// persistent or removed on shutdown.
10481046
/// - `port`: The port to use for the embedded Postgres database to run on.
1049-
async fn connect_inner(connection_str: &str, initial_sql: &Option<String>) -> AnyResult<Self> {
1047+
async fn connect_inner(connection_str: &str, initial_sql: &Option<String>, #[cfg(feature = "pg-embed")] pg_inst: Option<pg_embed::postgres::PgEmbed>) -> AnyResult<Self> {
10501048
if !connection_str.starts_with("postgres") {
10511049
panic!("Unsupported connection string {}", connection_str)
10521050
}
@@ -1162,7 +1160,10 @@ impl ProjectDB {
11621160
}
11631161
}
11641162

1165-
Ok(Self { conn: client })
1163+
#[cfg(feature = "pg-embed")]
1164+
return Ok(Self { conn: client, pg_inst });
1165+
#[cfg(not(feature = "pg-embed"))]
1166+
return Ok(Self { conn: client });
11661167
}
11671168

11681169
/// Attach connector to the config.

crates/pipeline_manager/src/db/test.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use crate::db::{pg_setup, DBError};
77
use anyhow::Result as AnyResult;
88
use async_trait::async_trait;
99
use chrono::DateTime;
10-
use pg_embed::postgres::PgEmbed;
1110
use pretty_assertions::assert_eq;
1211
use proptest::prelude::*;
1312
use proptest::test_runner::{Config, TestRunner};
@@ -21,7 +20,6 @@ use tokio::sync::Mutex;
2120

2221
struct DbHandle {
2322
db: ProjectDB,
24-
pg: PgEmbed,
2523
_temp_dir: TempDir,
2624
}
2725

@@ -31,9 +29,11 @@ impl Drop for DbHandle {
3129
// shutdown postgres). Otherwise postgres log an error that the
3230
// directory is already gone during shutdown which could be
3331
// confusing for a developer.
34-
let _r = async {
35-
self.pg.stop_db().await.unwrap();
36-
};
32+
if let Some(pg) = self.db.pg_inst.as_mut() {
33+
let _r = async {
34+
pg.stop_db().await.unwrap();
35+
};
36+
}
3737
}
3838
}
3939

@@ -58,14 +58,13 @@ async fn test_setup() -> DbHandle {
5858
let pg = pg_setup::install(temp_path.into(), false, Some(port))
5959
.await
6060
.unwrap();
61-
62-
let conn = ProjectDB::connect_inner(&pg.db_uri, &Some("".to_string()))
61+
let db_uri = pg.db_uri.clone();
62+
let conn = ProjectDB::connect_inner(&db_uri, &Some("".to_string()), Some(pg))
6363
.await
6464
.unwrap();
6565

6666
DbHandle {
6767
db: conn,
68-
pg: pg,
6968
_temp_dir,
7069
}
7170
}

scripts/start_manager.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ pkill -9 dbsp_pipeline_
3030

3131
set -e
3232

33-
cd "${MANAGER_DIR}" && ~/.cargo/bin/cargo build --release
34-
cd "${MANAGER_DIR}" && ~/.cargo/bin/cargo run --release -- \
33+
cd "${MANAGER_DIR}" && ~/.cargo/bin/cargo build --release --features pg-embed
34+
cd "${MANAGER_DIR}" && ~/.cargo/bin/cargo run --release --features pg-embed -- \
3535
--bind-address="${BIND_ADDRESS}" \
3636
--working-directory="${WORKING_DIR_ABS}" \
3737
--sql-compiler-home="${SQL_COMPILER_DIR}" \

0 commit comments

Comments
 (0)