Skip to content

Commit 32da4ad

Browse files
committed
Enable compression for requests between pipeline-manager and pipelines
Make circuit_profile proxy as streaming Signed-off-by: Karakatiza666 <bulakh.96@gmail.com>
1 parent b23449a commit 32da4ad

File tree

5 files changed

+460
-80
lines changed

5 files changed

+460
-80
lines changed

crates/adapters/src/server.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use actix_web::{
2828
get,
2929
http::StatusCode,
3030
http::header,
31-
post, rt,
31+
middleware, post, rt,
3232
web::{self, Data as WebData, Payload, Query},
3333
};
3434
use arrow::ipc::writer::StreamWriter;
@@ -825,6 +825,7 @@ pub fn run_server(
825825
res
826826
})
827827
})
828+
.wrap(middleware::Compress::default())
828829
.wrap(observability::actix_middleware());
829830
build_app(app, state)
830831
}

crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ pub(crate) async fn http_input(
112112
req,
113113
body,
114114
Some(Duration::from_secs(30)),
115+
false,
115116
)
116117
.await
117118
}
@@ -191,7 +192,8 @@ pub(crate) async fn http_output(
191192
&endpoint,
192193
req,
193194
body,
194-
None,
195+
Some(Duration::MAX),
196+
false,
195197
)
196198
.await
197199
}
@@ -673,7 +675,8 @@ pub(crate) async fn get_pipeline_time_series_stream(
673675
"time_series_stream",
674676
request,
675677
body,
676-
None,
678+
Some(Duration::MAX),
679+
false,
677680
)
678681
.await
679682
}
@@ -716,18 +719,20 @@ pub(crate) async fn get_pipeline_circuit_profile(
716719
tenant_id: ReqData<TenantId>,
717720
path: web::Path<String>,
718721
request: HttpRequest,
722+
body: web::Payload,
719723
) -> Result<HttpResponse, ManagerError> {
720724
let pipeline_name = path.into_inner();
721725
state
722726
.runner
723-
.forward_http_request_to_pipeline_by_name(
727+
.forward_streaming_http_request_to_pipeline_by_name(
724728
client.as_ref(),
725729
*tenant_id,
726730
&pipeline_name,
727-
Method::GET,
728731
"dump_profile",
729-
request.query_string(),
732+
request,
733+
body,
730734
Some(Duration::from_secs(120)),
735+
false,
731736
)
732737
.await
733738
}
@@ -770,21 +775,24 @@ pub(crate) async fn get_pipeline_circuit_json_profile(
770775
tenant_id: ReqData<TenantId>,
771776
path: web::Path<String>,
772777
request: HttpRequest,
778+
body: web::Payload,
773779
) -> Result<HttpResponse, ManagerError> {
774780
let pipeline_name = path.into_inner();
775781

776-
// Get the JSON profile from the pipeline and return it directly
777-
// The Compress middleware will automatically compress the response based on Accept-Encoding
782+
// Stream the JSON profile from the pipeline with compression passthrough.
783+
// The pipeline's Compress middleware compresses the response; we pass the
784+
// compressed bytes through directly without buffering.
778785
state
779786
.runner
780-
.forward_http_request_to_pipeline_by_name(
787+
.forward_streaming_http_request_to_pipeline_by_name(
781788
client.as_ref(),
782789
*tenant_id,
783790
&pipeline_name,
784-
Method::GET,
785791
"dump_json_profile",
786-
request.query_string(),
792+
request,
793+
body,
787794
Some(Duration::from_secs(120)),
795+
true,
788796
)
789797
.await
790798
}
@@ -1326,6 +1334,7 @@ pub(crate) async fn get_pipeline_samply_profile(
13261334
request,
13271335
body,
13281336
Some(Duration::MAX),
1337+
false,
13291338
)
13301339
.await
13311340
}
@@ -1766,6 +1775,7 @@ pub(crate) async fn pipeline_adhoc_sql(
17661775
request,
17671776
body,
17681777
Some(Duration::MAX),
1778+
false,
17691779
)
17701780
.await
17711781
}

0 commit comments

Comments
 (0)