Skip to content

Commit eceb9c6

Browse files
Leonid Ryzhykryzhyk
authored andcommitted
Modify pipeline manager to support JIT.
Added support for running pipelines in JIT mode to the pipeline manager. - Added `jit_mode` flag to the `Program` table in the pipeline manager database. The flag can be set when creating or modifying the program. Alternatives include making this a property of a pipeline. I ended up making this a per-program config because this way we can skip building the rust executable for the program configured to run in JIT mode. - Modified the compiler service to only run the SQL compiler when the `jit_mode` flag is set for the program. - Modified the local runner to run the SQL compiler again when starting the pipeline in order to generate the JIT IR, followed by the `pipeline` executable to execute the IR. - Add the `pipeline` executable to the pipeline manager container. Partially addresses #680 Signed-off-by: Leonid Ryzhyk <leonid@feldera.com>
1 parent 406a110 commit eceb9c6

File tree

31 files changed

+859
-270
lines changed

31 files changed

+859
-270
lines changed

Earthfile

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ build-adapters:
308308
RUN rm -rf crates/adapters
309309
COPY --keep-ts --dir crates/adapters crates/adapters
310310

311-
RUN cargo +$RUST_TOOLCHAIN build $RUST_BUILD_PROFILE --package dbsp_adapters
311+
RUN cargo +$RUST_TOOLCHAIN build $RUST_BUILD_PROFILE --package dbsp_adapters --features="with-jit"
312312
RUN cd crates/adapters && cargo +$RUST_TOOLCHAIN machete
313313
RUN cargo +$RUST_TOOLCHAIN clippy $RUST_BUILD_PROFILE --package dbsp_adapters -- -D warnings
314314
ENV RUST_BACKTRACE=1
@@ -332,9 +332,11 @@ build-manager:
332332

333333
IF [ -f ./target/debug/pipeline-manager ]
334334
SAVE ARTIFACT --keep-ts ./target/debug/pipeline-manager pipeline-manager
335+
SAVE ARTIFACT --keep-ts ./target/debug/pipeline pipeline
335336
END
336337
IF [ -f ./target/release/pipeline-manager ]
337338
SAVE ARTIFACT --keep-ts ./target/release/pipeline-manager pipeline-manager
339+
SAVE ARTIFACT --keep-ts ./target/release/pipeline pipeline
338340
END
339341

340342
test-sql:
@@ -467,13 +469,13 @@ test-python:
467469

468470
FROM +build-manager --RUST_TOOLCHAIN=$RUST_TOOLCHAIN --RUST_BUILD_PROFILE=$RUST_BUILD_PROFILE
469471
COPY +build-manager/pipeline-manager .
472+
COPY +build-manager/pipeline .
470473
RUN mkdir -p /root/.local/lib/python3.10
471474
RUN mkdir -p /root/.local/bin
472475

473476
COPY +install-python/python3.10 /root/.local/lib/python3.10
474477
COPY +install-python/bin /root/.local/bin
475478

476-
COPY +build-manager/pipeline-manager .
477479
COPY +build-sql/sql-to-dbsp-compiler sql-to-dbsp-compiler
478480

479481
COPY demo/demo_notebooks demo/demo_notebooks
@@ -489,7 +491,7 @@ test-python:
489491
WITH DOCKER --pull postgres
490492
RUN docker run --shm-size=512MB -p 5432:5432 -e POSTGRES_HOST_AUTH_METHOD=trust -e PGDATA=/dev/shm -d postgres && \
491493
sleep 10 && \
492-
(./pipeline-manager --bind-address=0.0.0.0 --api-server-working-directory=/working-dir --compiler-working-directory=/working-dir --runner-working-directory=/working-dir --sql-compiler-home=/dbsp/sql-to-dbsp-compiler --dbsp-override-path=/dbsp --db-connection-string=postgresql://postgres:postgres@localhost:5432 &) && \
494+
(./pipeline-manager --bind-address=0.0.0.0 --api-server-working-directory=/working-dir --compiler-working-directory=/working-dir --runner-working-directory=/working-dir --sql-compiler-home=/dbsp/sql-to-dbsp-compiler --dbsp-override-path=/dbsp --jit-pipeline-runner-path=/dbsp/pipeline --db-connection-string=postgresql://postgres:postgres@localhost:5432 &) && \
493495
sleep 5 && \
494496
python3 python/test.py && \
495497
cd demo/demo_notebooks && jupyter execute fraud_detection.ipynb --JupyterApp.log_level='DEBUG'
@@ -514,6 +516,7 @@ build-pipeline-manager-container:
514516
# First, copy over the artifacts built from previous stages
515517
RUN mkdir -p database-stream-processor/sql-to-dbsp-compiler/SQL-compiler/target
516518
COPY +build-manager/pipeline-manager .
519+
COPY +build-manager/pipeline .
517520
COPY +build-sql/sql2dbsp-jar-with-dependencies.jar database-stream-processor/sql-to-dbsp-compiler/SQL-compiler/target/
518521

519522
# Then copy over the crates needed by the sql compiler
@@ -528,7 +531,14 @@ build-pipeline-manager-container:
528531
COPY sql-to-dbsp-compiler/lib /database-stream-processor/sql-to-dbsp-compiler/lib
529532
COPY sql-to-dbsp-compiler/temp /database-stream-processor/sql-to-dbsp-compiler/temp
530533
RUN ./pipeline-manager --bind-address=0.0.0.0 --api-server-working-directory=/working-dir --compiler-working-directory=/working-dir --runner-working-directory=/working-dir --sql-compiler-home=/database-stream-processor/sql-to-dbsp-compiler --dbsp-override-path=/database-stream-processor --precompile
531-
ENTRYPOINT ["./pipeline-manager", "--bind-address=0.0.0.0", "--api-server-working-directory=/working-dir", "--compiler-working-directory=/working-dir", "--runner-working-directory=/working-dir", "--sql-compiler-home=/database-stream-processor/sql-to-dbsp-compiler", "--dbsp-override-path=/database-stream-processor"]
534+
ENTRYPOINT ["./pipeline-manager", \
535+
"--bind-address=0.0.0.0", \
536+
"--api-server-working-directory=/working-dir", \
537+
"--compiler-working-directory=/working-dir", \
538+
"--runner-working-directory=/working-dir", \
539+
"--sql-compiler-home=/database-stream-processor/sql-to-dbsp-compiler", \
540+
"--dbsp-override-path=/database-stream-processor", \
541+
"--jit-pipeline-runner-path=/pipeline"]
532542

533543
# TODO: mirrors the Dockerfile. See note above.
534544
build-demo-container:
@@ -617,15 +627,13 @@ integration-test-container:
617627
integration-tests:
618628
FROM earthly/dind:alpine
619629
COPY deploy/docker-compose.yml .
630+
COPY deploy/docker-compose-test.yml .
620631
COPY deploy/.env .
621632
ENV FELDERA_VERSION=latest
622633
WITH DOCKER --pull postgres \
623634
--load ghcr.io/feldera/pipeline-manager:latest=+build-pipeline-manager-container \
624-
--compose docker-compose.yml \
625-
--service db \
626-
--service pipeline-manager \
627-
--load itest:latest=+integration-test-container
628-
RUN sleep 5 && docker run --env-file .env --network default_default itest:latest
635+
--load test:latest=+integration-test-container
636+
RUN COMPOSE_HTTP_TIMEOUT=120 RUST_LOG=debug,tokio_postgres=info docker-compose --env-file .env -f docker-compose.yml -f docker-compose-test.yml up --force-recreate --exit-code-from test db pipeline-manager test
629637
END
630638

631639
benchmark:
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
ALTER TABLE program
2+
ADD COLUMN jit_mode bool NOT NULL DEFAULT FALSE;
3+
4+
ALTER TABLE program_history
5+
ADD COLUMN jit_mode bool NOT NULL DEFAULT FALSE;

crates/pipeline_manager/src/api.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -723,6 +723,9 @@ struct NewProgramRequest {
723723
/// SQL code of the program.
724724
#[schema(example = "CREATE TABLE Example(name varchar);")]
725725
code: String,
726+
/// Compile the program in JIT mode.
727+
#[serde(default)]
728+
jit_mode: bool,
726729
}
727730

728731
/// Response to a new program request.
@@ -772,6 +775,7 @@ async fn do_new_program(
772775
&request.name,
773776
&request.description,
774777
&request.code,
778+
request.jit_mode,
775779
)
776780
.await
777781
.map(|(program_id, version)| {
@@ -799,6 +803,8 @@ struct UpdateProgramRequest {
799803
/// New SQL code for the program or `None` to keep existing program
800804
/// code unmodified.
801805
code: Option<String>,
806+
/// Compile the program in JIT mode.
807+
jit_mode: bool,
802808
}
803809

804810
/// Response to a program update request.
@@ -853,6 +859,7 @@ async fn update_program(
853859
&body.name,
854860
&body.description,
855861
&body.code,
862+
body.jit_mode,
856863
)
857864
.await?;
858865
info!(

crates/pipeline_manager/src/bin/local-runner.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use clap::{Args, Command, FromArgMatches};
44

55
use colored::Colorize;
66

7-
use pipeline_manager::config::{DatabaseConfig, LocalRunnerConfig};
7+
use pipeline_manager::config::{CompilerConfig, DatabaseConfig, LocalRunnerConfig};
88
use pipeline_manager::db::ProjectDB;
99
use pipeline_manager::local_runner;
1010
use tokio::spawn;
@@ -17,6 +17,7 @@ async fn main() {
1717
pipeline_manager::logging::init_logging(name);
1818
let cli = Command::new("Feldera local runner service");
1919
let cli = DatabaseConfig::augment_args(cli);
20+
let cli = CompilerConfig::augment_args(cli);
2021
let cli = LocalRunnerConfig::augment_args(cli);
2122
let matches = cli.get_matches();
2223

@@ -27,6 +28,11 @@ async fn main() {
2728
.map_err(|err| err.exit())
2829
.unwrap();
2930
let local_runner_config = local_runner_config.canonicalize().unwrap();
31+
let compiler_config = CompilerConfig::from_arg_matches(&matches)
32+
.map_err(|err| err.exit())
33+
.unwrap();
34+
let compiler_config = compiler_config.canonicalize().unwrap();
35+
3036
let db = ProjectDB::connect(
3137
&database_config,
3238
#[cfg(feature = "pg-embed")]
@@ -36,7 +42,7 @@ async fn main() {
3642
.unwrap();
3743
let db = Arc::new(Mutex::new(db));
3844
let _local_runner = spawn(async move {
39-
local_runner::run(db, &local_runner_config.clone()).await;
45+
local_runner::run(db, &local_runner_config, &compiler_config).await;
4046
});
4147
tokio::signal::ctrl_c()
4248
.await

crates/pipeline_manager/src/bin/pipeline-manager.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,15 @@ async fn main() -> anyhow::Result<()> {
7575
db.run_migrations().await?;
7676
let db = Arc::new(Mutex::new(db));
7777
let db_clone = db.clone();
78+
let compiler_config_clone = compiler_config.clone();
7879
let _compiler = tokio::spawn(async move {
79-
Compiler::run(&compiler_config.clone(), db_clone)
80+
Compiler::run(&compiler_config_clone, db_clone)
8081
.await
8182
.unwrap();
8283
});
8384
let db_clone = db.clone();
8485
let _local_runner = tokio::spawn(async move {
85-
local_runner::run(db_clone, &local_runner_config.clone()).await;
86+
local_runner::run(db_clone, &local_runner_config, &compiler_config).await;
8687
});
8788
// The api-server blocks forever
8889
pipeline_manager::api::run(db, api_config).await.unwrap();

0 commit comments

Comments
 (0)