Skip to content

Commit ffdc28b

Browse files
committed
pipeline-manager: rename api-server's config and working directory
Signed-off-by: Lalith Suresh <lalith@feldera.com>
1 parent c957d7d commit ffdc28b

File tree

13 files changed

+81
-78
lines changed

13 files changed

+81
-78
lines changed

Cargo.lock

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Earthfile

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -484,7 +484,7 @@ test-python:
484484
WITH DOCKER --pull postgres
485485
RUN docker run --shm-size=512MB -p 5432:5432 -e POSTGRES_HOST_AUTH_METHOD=trust -e PGDATA=/dev/shm -d postgres && \
486486
sleep 3 && \
487-
./pipeline-manager --bind-address=0.0.0.0 --manager-working-directory=/working-dir --compiler-working-directory=/working-dir --sql-compiler-home=/dbsp/sql-to-dbsp-compiler --dbsp-override-path=/dbsp --db-connection-string=postgresql://postgres:postgres@localhost:5432 --unix-daemon && \
487+
./pipeline-manager --bind-address=0.0.0.0 --api-server-working-directory=/working-dir --compiler-working-directory=/working-dir --sql-compiler-home=/dbsp/sql-to-dbsp-compiler --dbsp-override-path=/dbsp --db-connection-string=postgresql://postgres:postgres@localhost:5432 --unix-daemon && \
488488
sleep 1 && \
489489
python3 python/test.py && \
490490
cd demo/demo_notebooks && jupyter execute fraud_detection.ipynb --JupyterApp.log_level='DEBUG'
@@ -521,8 +521,8 @@ build-dbsp-manager-container:
521521
COPY sql-to-dbsp-compiler/SQL-compiler/sql-to-dbsp /database-stream-processor/sql-to-dbsp-compiler/SQL-compiler/sql-to-dbsp
522522
COPY sql-to-dbsp-compiler/lib /database-stream-processor/sql-to-dbsp-compiler/lib
523523
COPY sql-to-dbsp-compiler/temp /database-stream-processor/sql-to-dbsp-compiler/temp
524-
RUN ./pipeline-manager --bind-address=0.0.0.0 --manager-working-directory=/working-dir --compiler-working-directory=/working-dir --sql-compiler-home=/database-stream-processor/sql-to-dbsp-compiler --dbsp-override-path=/database-stream-processor --precompile
525-
ENTRYPOINT ["./pipeline-manager", "--bind-address=0.0.0.0", "--manager-working-directory=/working-dir", "--compiler-working-directory=/working-dir", "--sql-compiler-home=/database-stream-processor/sql-to-dbsp-compiler", "--dbsp-override-path=/database-stream-processor"]
524+
RUN ./pipeline-manager --bind-address=0.0.0.0 --api-server-working-directory=/working-dir --compiler-working-directory=/working-dir --sql-compiler-home=/database-stream-processor/sql-to-dbsp-compiler --dbsp-override-path=/database-stream-processor --precompile
525+
ENTRYPOINT ["./pipeline-manager", "--bind-address=0.0.0.0", "--api-server-working-directory=/working-dir", "--compiler-working-directory=/working-dir", "--sql-compiler-home=/database-stream-processor/sql-to-dbsp-compiler", "--dbsp-override-path=/database-stream-processor"]
526526
SAVE IMAGE ghcr.io/feldera/dbsp-manager
527527

528528
# TODO: mirrors the Dockerfile. See note above.

crates/pipeline_manager/src/api.rs

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ use utoipa_swagger_ui::SwaggerUi;
5454
use uuid::{uuid, Uuid};
5555

5656
pub(crate) use crate::compiler::ProgramStatus;
57-
pub(crate) use crate::config::ManagerConfig;
57+
pub(crate) use crate::config::ApiServerConfig;
5858
use crate::db::{
5959
storage::Storage, AttachedConnector, AttachedConnectorId, ConnectorId, DBError, PipelineId,
6060
PipelineRevision, PipelineStatus, ProgramDescr, ProgramId, ProjectDB, Version,
@@ -222,12 +222,12 @@ pub(crate) struct ServerState {
222222
// for a long time to avoid blocking concurrent requests.
223223
pub db: Arc<Mutex<ProjectDB>>,
224224
runner: RunnerApi,
225-
_config: ManagerConfig,
225+
_config: ApiServerConfig,
226226
pub jwk_cache: Arc<Mutex<JwkCache>>,
227227
}
228228

229229
impl ServerState {
230-
pub async fn new(config: ManagerConfig, db: Arc<Mutex<ProjectDB>>) -> AnyResult<Self> {
230+
pub async fn new(config: ApiServerConfig, db: Arc<Mutex<ProjectDB>>) -> AnyResult<Self> {
231231
let runner = RunnerApi::new(db.clone());
232232

233233
Ok(Self {
@@ -239,32 +239,31 @@ impl ServerState {
239239
}
240240
}
241241

242-
pub fn create_listener(manager_config: ManagerConfig) -> AnyResult<TcpListener> {
242+
pub fn create_listener(api_config: ApiServerConfig) -> AnyResult<TcpListener> {
243243
// Check that the port is available before turning into a daemon, so we can fail
244244
// early if the port is taken.
245-
let listener = TcpListener::bind((manager_config.bind_address.clone(), manager_config.port))
246-
.map_err(|e| {
245+
let listener =
246+
TcpListener::bind((api_config.bind_address.clone(), api_config.port)).map_err(|e| {
247247
AnyError::msg(format!(
248248
"failed to bind port '{}:{}': {e}",
249-
&manager_config.bind_address, manager_config.port
249+
&api_config.bind_address, api_config.port
250250
))
251251
})?;
252252

253253
#[cfg(unix)]
254-
if manager_config.unix_daemon {
255-
let logfile =
256-
std::fs::File::create(manager_config.logfile.as_ref().unwrap()).map_err(|e| {
257-
AnyError::msg(format!(
258-
"failed to create log file '{}': {e}",
259-
&manager_config.logfile.as_ref().unwrap()
260-
))
261-
})?;
254+
if api_config.unix_daemon {
255+
let logfile = std::fs::File::create(api_config.logfile.as_ref().unwrap()).map_err(|e| {
256+
AnyError::msg(format!(
257+
"failed to create log file '{}': {e}",
258+
&api_config.logfile.as_ref().unwrap()
259+
))
260+
})?;
262261

263262
let logfile_clone = logfile.try_clone().unwrap();
264263

265264
let daemonize = Daemonize::new()
266-
.pid_file(manager_config.manager_pid_file_path())
267-
.working_directory(&manager_config.manager_working_directory)
265+
.pid_file(api_config.manager_pid_file_path())
266+
.working_directory(&api_config.api_server_working_directory)
268267
.stdout(logfile_clone)
269268
.stderr(logfile);
270269

@@ -280,11 +279,11 @@ pub fn create_listener(manager_config: ManagerConfig) -> AnyResult<TcpListener>
280279
pub async fn run(
281280
listener: TcpListener,
282281
db: Arc<Mutex<ProjectDB>>,
283-
manager_config: ManagerConfig,
282+
api_config: ApiServerConfig,
284283
) -> AnyResult<()> {
285-
let state = WebData::new(ServerState::new(manager_config.clone(), db).await?);
284+
let state = WebData::new(ServerState::new(api_config.clone(), db).await?);
286285

287-
if manager_config.use_auth {
286+
if api_config.use_auth {
288287
let server = HttpServer::new(move || {
289288
let auth_middleware = HttpAuthentication::with_fn(crate::auth::auth_validator);
290289
let auth_configuration = crate::auth::aws_auth_config();
@@ -294,7 +293,7 @@ pub async fn run(
294293
.app_data(auth_configuration)
295294
.wrap(Logger::default())
296295
.wrap(Condition::new(
297-
manager_config.dev_mode,
296+
api_config.dev_mode,
298297
actix_cors::Cors::permissive(),
299298
))
300299
.service(api_scope().wrap(auth_middleware))
@@ -307,7 +306,7 @@ pub async fn run(
307306
.app_data(state.clone())
308307
.wrap(Logger::default())
309308
.wrap(Condition::new(
310-
manager_config.dev_mode,
309+
api_config.dev_mode,
311310
actix_cors::Cors::permissive(),
312311
))
313312
.service(api_scope().wrap_fn(|req, srv| {

crates/pipeline_manager/src/auth.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -584,7 +584,7 @@ mod test {
584584
use crate::{
585585
api::ServerState,
586586
auth::{self, fetch_jwk_aws_cognito_keys, AuthConfiguration, AwsCognitoClaim, Provider},
587-
config::ManagerConfig,
587+
config::ApiServerConfig,
588588
db::{storage::Storage, ApiPermission},
589589
};
590590

@@ -651,11 +651,11 @@ mod test {
651651
let closure = move |req, bearer_auth| auth::auth_validator(req, bearer_auth);
652652
let auth_middleware = HttpAuthentication::with_fn(closure);
653653

654-
let manager_config = ManagerConfig {
654+
let manager_config = ApiServerConfig {
655655
port: 0,
656656
bind_address: "0.0.0.0".to_owned(),
657657
logfile: None,
658-
manager_working_directory: "".to_owned(),
658+
api_server_working_directory: "".to_owned(),
659659
unix_daemon: false,
660660
use_auth: true,
661661
dev_mode: false,

crates/pipeline_manager/src/bin/api-server.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use clap::{Args, Command, FromArgMatches};
44

55
use colored::Colorize;
66

7-
use pipeline_manager::config::{DatabaseConfig, ManagerConfig};
7+
use pipeline_manager::config::{ApiServerConfig, DatabaseConfig};
88
use pipeline_manager::db::ProjectDB;
99
use tokio::sync::Mutex;
1010

@@ -15,16 +15,16 @@ async fn main() -> anyhow::Result<()> {
1515
pipeline_manager::logging::init_logging(name);
1616
let cli = Command::new("Feldera local runner service");
1717
let cli = DatabaseConfig::augment_args(cli);
18-
let cli = ManagerConfig::augment_args(cli);
18+
let cli = ApiServerConfig::augment_args(cli);
1919
let matches = cli.get_matches();
2020

2121
let database_config = DatabaseConfig::from_arg_matches(&matches)
2222
.map_err(|err| err.exit())
2323
.unwrap();
24-
let manager_config = ManagerConfig::from_arg_matches(&matches)
24+
let api_config = ApiServerConfig::from_arg_matches(&matches)
2525
.map_err(|err| err.exit())
2626
.unwrap();
27-
let manager_config = manager_config.canonicalize().unwrap();
27+
let api_config = api_config.canonicalize().unwrap();
2828
let db = ProjectDB::connect(
2929
&database_config,
3030
#[cfg(feature = "pg-embed")]
@@ -34,9 +34,9 @@ async fn main() -> anyhow::Result<()> {
3434
.unwrap();
3535
let db = Arc::new(Mutex::new(db));
3636

37-
let listener = pipeline_manager::api::create_listener(manager_config.clone())?;
37+
let listener = pipeline_manager::api::create_listener(api_config.clone())?;
3838
// The api-server blocks forever
39-
pipeline_manager::api::run(listener, db, manager_config)
39+
pipeline_manager::api::run(listener, db, api_config)
4040
.await
4141
.unwrap();
4242
Ok(())

crates/pipeline_manager/src/bin/pipeline-manager.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ use clap::{Args, Command, FromArgMatches};
44
use colored::Colorize;
55
use pipeline_manager::api::ApiDoc;
66
use pipeline_manager::compiler::Compiler;
7-
use pipeline_manager::config::{CompilerConfig, DatabaseConfig, LocalRunnerConfig, ManagerConfig};
7+
use pipeline_manager::config::{
8+
ApiServerConfig, CompilerConfig, DatabaseConfig, LocalRunnerConfig,
9+
};
810
use pipeline_manager::db::ProjectDB;
911
use pipeline_manager::local_runner;
1012
use std::sync::Arc;
@@ -21,26 +23,26 @@ fn main() -> anyhow::Result<()> {
2123

2224
let cli = Command::new("Pipeline manager CLI");
2325
let cli = DatabaseConfig::augment_args(cli);
24-
let cli = ManagerConfig::augment_args(cli);
26+
let cli = ApiServerConfig::augment_args(cli);
2527
let cli = CompilerConfig::augment_args(cli);
2628
let cli = LocalRunnerConfig::augment_args(cli);
2729
let matches = cli.get_matches();
2830

29-
let mut manager_config = ManagerConfig::from_arg_matches(&matches)
31+
let mut api_config = ApiServerConfig::from_arg_matches(&matches)
3032
.map_err(|err| err.exit())
3133
.unwrap();
32-
if manager_config.dump_openapi {
34+
if api_config.dump_openapi {
3335
let openapi_json = ApiDoc::openapi().to_json()?;
3436
std::fs::write("openapi.json", openapi_json.as_bytes())?;
3537
return Ok(());
3638
}
3739

38-
if let Some(config_file) = &manager_config.config_file {
40+
if let Some(config_file) = &api_config.config_file {
3941
let config_yaml = std::fs::read(config_file).map_err(|e| {
4042
anyhow::Error::msg(format!("error reading config file '{config_file}': {e}"))
4143
})?;
4244
let config_yaml = String::from_utf8_lossy(&config_yaml);
43-
manager_config = serde_yaml::from_str(&config_yaml).map_err(|e| {
45+
api_config = serde_yaml::from_str(&config_yaml).map_err(|e| {
4446
anyhow::Error::msg(format!("error parsing config file '{config_file}': {e}"))
4547
})?;
4648
}
@@ -51,7 +53,7 @@ fn main() -> anyhow::Result<()> {
5153
.map_err(|err| err.exit())
5254
.unwrap();
5355

54-
let manager_config = manager_config.canonicalize()?;
56+
let api_config = api_config.canonicalize()?;
5557
let compiler_config = compiler_config.canonicalize()?;
5658
let local_runner_config = local_runner_config.canonicalize()?;
5759
if compiler_config.precompile {
@@ -62,12 +64,12 @@ fn main() -> anyhow::Result<()> {
6264
let database_config = DatabaseConfig::from_arg_matches(&matches)
6365
.map_err(|err| err.exit())
6466
.unwrap();
65-
let listener = pipeline_manager::api::create_listener(manager_config.clone())?;
67+
let listener = pipeline_manager::api::create_listener(api_config.clone())?;
6668
rt::System::new().block_on(async move {
6769
let db: ProjectDB = ProjectDB::connect(
6870
&database_config,
6971
#[cfg(feature = "pg-embed")]
70-
Some(&manager_config),
72+
Some(&api_config),
7173
)
7274
.await
7375
.unwrap();
@@ -83,7 +85,7 @@ fn main() -> anyhow::Result<()> {
8385
local_runner::run(db_clone, &local_runner_config.clone()).await;
8486
});
8587
// The api-server blocks forever
86-
pipeline_manager::api::run(listener, db, manager_config)
88+
pipeline_manager::api::run(listener, db, api_config)
8789
.await
8890
.unwrap();
8991
});

crates/pipeline_manager/src/config.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,12 @@ impl DatabaseConfig {
6565
/// line arguments.
6666
#[derive(Parser, Deserialize, Debug, Clone)]
6767
#[command(author, version, about, long_about = None)]
68-
pub struct ManagerConfig {
69-
/// Directory where the manager stores its filesystem state:
68+
pub struct ApiServerConfig {
69+
/// Directory where the api-server stores its filesystem state:
7070
/// generated Rust crates, pipeline logs, etc.
7171
#[serde(default = "default_working_directory")]
7272
#[arg(short, long, default_value_t = default_working_directory())]
73-
pub manager_working_directory: String,
73+
pub api_server_working_directory: String,
7474

7575
/// Port number for the HTTP service, defaults to 8080.
7676
#[serde(default = "default_server_port")]
@@ -135,33 +135,33 @@ pub struct ManagerConfig {
135135
pub dev_mode: bool,
136136
}
137137

138-
impl ManagerConfig {
138+
impl ApiServerConfig {
139139
/// Convert all directory paths in the `self` to absolute paths.
140140
///
141141
/// Converts `working_directory` `sql_compiler_home`, and
142142
/// `dbsp_override_path` fields to absolute paths;
143143
/// fails if any of the paths doesn't exist or isn't readable.
144144
pub fn canonicalize(mut self) -> AnyResult<Self> {
145-
create_dir_all(&self.manager_working_directory).map_err(|e| {
145+
create_dir_all(&self.api_server_working_directory).map_err(|e| {
146146
AnyError::msg(format!(
147147
"unable to create or open working directory '{}': {e}",
148-
self.manager_working_directory
148+
self.api_server_working_directory
149149
))
150150
})?;
151151

152-
self.manager_working_directory = canonicalize(&self.manager_working_directory)
152+
self.api_server_working_directory = canonicalize(&self.api_server_working_directory)
153153
.map_err(|e| {
154154
AnyError::msg(format!(
155155
"error canonicalizing working directory path '{}': {e}",
156-
self.manager_working_directory
156+
self.api_server_working_directory
157157
))
158158
})?
159159
.to_string_lossy()
160160
.into_owned();
161161

162162
// Running as daemon and no log file specified - use default log file name.
163163
if self.logfile.is_none() && self.unix_daemon {
164-
self.logfile = Some(format!("{}/manager.log", self.manager_working_directory));
164+
self.logfile = Some(format!("{}/manager.log", self.api_server_working_directory));
165165
}
166166

167167
if let Some(logfile) = &self.logfile {
@@ -194,15 +194,15 @@ impl ManagerConfig {
194194
/// e.g., `<working-directory>/data`
195195
#[cfg(feature = "pg-embed")]
196196
pub(crate) fn postgres_embed_data_dir(&self) -> PathBuf {
197-
Path::new(&self.manager_working_directory).join("data")
197+
Path::new(&self.api_server_working_directory).join("data")
198198
}
199199

200200
/// Manager pid file.
201201
///
202202
/// e.g., `<working-directory>/manager.pid`
203203
#[cfg(unix)]
204204
pub(crate) fn manager_pid_file_path(&self) -> PathBuf {
205-
Path::new(&self.manager_working_directory).join("manager.pid")
205+
Path::new(&self.api_server_working_directory).join("manager.pid")
206206
}
207207
}
208208

crates/pipeline_manager/src/db/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#[cfg(feature = "pg-embed")]
2-
use crate::config::ManagerConfig;
2+
use crate::config::ApiServerConfig;
33
use crate::{
44
auth::{TenantId, TenantRecord},
55
compiler::ProgramStatus,
@@ -2123,15 +2123,15 @@ impl Storage for ProjectDB {
21232123
impl ProjectDB {
21242124
pub async fn connect(
21252125
db_config: &DatabaseConfig,
2126-
#[cfg(feature = "pg-embed")] manager_config: Option<&ManagerConfig>,
2126+
#[cfg(feature = "pg-embed")] api_config: Option<&ApiServerConfig>,
21272127
) -> Result<Self, DBError> {
21282128
let connection_str = db_config.database_connection_string();
21292129
let initial_sql = &db_config.initial_sql;
21302130

21312131
#[cfg(feature = "pg-embed")]
21322132
if connection_str.starts_with("postgres-embed") {
2133-
let database_dir = manager_config
2134-
.expect("ManagerConfig needs to be provided when using pg-embed")
2133+
let database_dir = api_config
2134+
.expect("ApiServerConfig needs to be provided when using pg-embed")
21352135
.postgres_embed_data_dir();
21362136
let pg_inst = pg_setup::install(database_dir, true, Some(8082)).await?;
21372137
let connection_string = pg_inst.db_uri.to_string();

0 commit comments

Comments
 (0)