Skip to content

Commit 873807e

Browse files
committed
[WebConsole] Implement Samply profile collection UI
Signed-off-by: Karakatiza666 <bulakh.96@gmail.com>
1 parent e5b9819 commit 873807e

25 files changed

Lines changed: 1012 additions & 213 deletions

.devcontainer/Dockerfile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,4 +81,7 @@ RUN \
8181
rustup component add clippy && \
8282
cargo install cargo-make
8383

84+
## Install Samply
85+
RUN echo "pass" | sudo curl --proto '=https' --tlsv1.2 -LsSf https://github.com/feldera/samply/releases/download/v0.13.2/samply-installer.sh | sh
86+
8487
ENV PATH="$HOME/.local/bin:$PATH"

.devcontainer/docker-compose.devcontainer.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,15 @@ services:
1313
- REDPANDA_BROKERS=redpanda:9092
1414
volumes:
1515
- /var/run/docker.sock:/var/run/docker.sock
16+
cap_add:
17+
- PERFMON
18+
- IPC_LOCK
19+
security_opt:
20+
- seccomp:unconfined
21+
ulimits:
22+
memlock:
23+
soft: -1
24+
hard: -1
1625
postgres-dev:
1726
image: postgres:15-alpine
1827
restart: always

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,4 +89,7 @@ playwright/.cache/
8989
# pnpm
9090
.pnpm-store
9191

92-
.vite/
92+
.vite/
93+
94+
# Claude Code
95+
/.claude/

crates/adapters/src/controller.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1012,12 +1012,8 @@ impl Controller {
10121012
anyhow::bail!(
10131013
"samply process failed with status: `{}`, samply stdout: `{}`, samply stderr: `{}`",
10141014
output.status,
1015-
String::from_utf8_lossy(&output.stdout)
1016-
.trim()
1017-
.replace("\n", "\\n"),
1018-
String::from_utf8_lossy(&output.stderr)
1019-
.trim()
1020-
.replace("\n", "\\n"),
1015+
String::from_utf8_lossy(&output.stdout).trim(),
1016+
String::from_utf8_lossy(&output.stderr).trim(),
10211017
);
10221018
}
10231019

crates/adapters/src/samply.rs

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -296,25 +296,39 @@ fn write_marker(start: Timestamp, end: Timestamp, name: &str) {
296296
let _ = with_marker_file(|f| f.write_all(s.as_bytes()));
297297
}
298298

299+
pub(crate) type SamplyProfile = Option<Result<Vec<u8>, String>>;
300+
299301
#[derive(Debug, Default, Clone, PartialEq, Eq)]
300-
pub(crate) enum SamplyProfile {
302+
pub(crate) enum SamplyStatus {
301303
#[default]
302-
None,
303-
Success(Vec<u8>),
304-
Failure(String),
304+
Idle,
305+
InProgress {
306+
expected_after: chrono::DateTime<chrono::Utc>,
307+
},
308+
}
309+
310+
#[derive(Debug, Default, Clone)]
311+
pub(crate) struct SamplyState {
312+
pub(crate) last_profile: SamplyProfile,
313+
pub(crate) samply_status: SamplyStatus,
305314
}
306315

307-
impl SamplyProfile {
308-
pub(crate) fn update(&mut self, result: Result<Vec<u8>, anyhow::Error>) {
316+
impl SamplyState {
317+
pub(crate) fn start_profiling(&mut self, expected_after: chrono::DateTime<chrono::Utc>) {
318+
self.samply_status = SamplyStatus::InProgress { expected_after };
319+
}
320+
321+
pub(crate) fn complete_profiling(&mut self, result: Result<Vec<u8>, anyhow::Error>) {
309322
match result {
310323
Ok(data) => {
311324
info!("collected samply profile: {} bytes", data.len());
312-
*self = SamplyProfile::Success(data);
325+
self.last_profile = Some(Ok(data));
313326
}
314327
Err(error) => {
315328
error!("samply profiling failed: {:?}", error);
316-
*self = SamplyProfile::Failure(error.to_string());
329+
self.last_profile = Some(Err(error.to_string()));
317330
}
318331
}
332+
self.samply_status = SamplyStatus::Idle;
319333
}
320334
}

crates/adapters/src/server.rs

Lines changed: 85 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::{
1313
adhoc::{adhoc_websocket, stream_adhoc_result},
1414
controller::ConnectorConfig,
1515
ensure_default_crypto_provider,
16-
samply::SamplyProfile,
16+
samply::{SamplyProfile, SamplyState, SamplyStatus},
1717
transport::http::{
1818
HttpInputEndpoint, HttpInputTransport, HttpOutputEndpoint, HttpOutputTransport,
1919
},
@@ -63,7 +63,7 @@ use feldera_types::constants::STATUS_FILE;
6363
use feldera_types::coordination::{AdHocScan, CoordinationActivate, Labels, Step, StepRequest};
6464
use feldera_types::pipeline_diff::PipelineDiff;
6565
use feldera_types::query_params::{
66-
ActivateParams, MetricsFormat, MetricsParameters, SamplyProfileParams,
66+
ActivateParams, MetricsFormat, MetricsParameters, SamplyProfileGetParams, SamplyProfileParams,
6767
};
6868
use feldera_types::runtime_status::{
6969
BootstrapPolicy, ExtendedRuntimeStatus, ExtendedRuntimeStatusError, RuntimeDesiredStatus,
@@ -266,8 +266,8 @@ pub(crate) struct ServerState {
266266
sync_checkpoint_state: Mutex<CheckpointSyncState>,
267267

268268
/// Leaf lock.
269-
/// Latest samply profile.
270-
samply_profile: Arc<Mutex<SamplyProfile>>,
269+
/// Samply profiling state.
270+
samply_state: Arc<Mutex<SamplyState>>,
271271

272272
/// Deployment ID.
273273
deployment_id: Uuid,
@@ -322,7 +322,7 @@ impl ServerState {
322322
deployment_id,
323323
storage,
324324
rate_limiter,
325-
samply_profile: Default::default(),
325+
samply_state: Default::default(),
326326
coordination_activate: Default::default(),
327327
leases: Default::default(),
328328
}
@@ -1720,22 +1720,92 @@ async fn samply_profile(
17201720
let duration = query_params.duration_secs;
17211721
let controller = state.controller()?;
17221722

1723-
let state_samply_profile = state.samply_profile.clone();
1723+
let state_samply_state = state.samply_state.clone();
1724+
1725+
// Check if profiling is already in progress
1726+
{
1727+
let samply_state = state_samply_state.lock().unwrap();
1728+
if matches!(samply_state.samply_status, SamplyStatus::InProgress { .. }) {
1729+
return Ok(HttpResponse::Conflict().json(ErrorResponse {
1730+
message: "samply profile collection is already in progress".to_string(),
1731+
error_code: "SamplyProfilingInProgress".into(),
1732+
details: serde_json::Value::Null,
1733+
}));
1734+
}
1735+
}
1736+
1737+
// Set the state to InProgress with expected completion time
1738+
let expected_after = chrono::Utc::now() + chrono::Duration::seconds(duration as i64);
1739+
state_samply_state
1740+
.lock()
1741+
.unwrap()
1742+
.start_profiling(expected_after);
17241743

17251744
spawn(async move {
17261745
let result = controller.async_samply_profile(duration).await;
1727-
state_samply_profile.lock().unwrap().update(result);
1746+
state_samply_state
1747+
.lock()
1748+
.unwrap()
1749+
.complete_profiling(result);
17281750
});
17291751

1730-
Ok(HttpResponse::Ok().finish())
1752+
// Wait to check if it errored out immediately
1753+
sleep(Duration::from_millis(600)).await;
1754+
1755+
// Check if profiling is still running or failed immediately
1756+
let samply_state = state.samply_state.lock().unwrap();
1757+
Ok(match samply_state.samply_status {
1758+
// Profile is still running - return success
1759+
SamplyStatus::InProgress { .. } => HttpResponse::Accepted().finish(),
1760+
// Profile completed during wait - check if it failed
1761+
SamplyStatus::Idle => match &samply_state.last_profile {
1762+
Some(Err(error)) => samply_profile_error_response(error),
1763+
_ => HttpResponse::InternalServerError().json(ErrorResponse {
1764+
message: "samply profiling completed unexpectedly".to_string(),
1765+
error_code: "SamplyProfilingUnexpectedCompletion".into(),
1766+
details: serde_json::Value::Null,
1767+
}),
1768+
},
1769+
})
17311770
}
17321771

17331772
#[get("/samply_profile")]
1734-
async fn get_samply_profile(state: WebData<ServerState>) -> Result<HttpResponse, PipelineError> {
1735-
let profile = state.samply_profile.lock().unwrap();
1773+
async fn get_samply_profile(
1774+
state: WebData<ServerState>,
1775+
query_params: web::Query<SamplyProfileGetParams>,
1776+
) -> Result<HttpResponse, PipelineError> {
1777+
let samply_state = state.samply_state.lock().unwrap();
1778+
1779+
// If latest=true, check if profiling is in progress and return 204 No Content
1780+
if query_params.latest {
1781+
if let SamplyStatus::InProgress { expected_after } = samply_state.samply_status {
1782+
let now = chrono::Utc::now();
1783+
let retry_after_secs = (expected_after - now).num_seconds().max(0);
1784+
1785+
return Ok(HttpResponse::NoContent()
1786+
.insert_header(("Retry-After", retry_after_secs.to_string()))
1787+
.finish());
1788+
}
1789+
}
1790+
1791+
// Return the last profile result
1792+
Ok(samply_profile_response(&samply_state.last_profile))
1793+
}
17361794

1737-
Ok(match profile.clone() {
1738-
SamplyProfile::Success(bytes) => {
1795+
/// Helper function to construct error response for samply profiling failure
1796+
fn samply_profile_error_response(error: &str) -> HttpResponse {
1797+
HttpResponse::InternalServerError().json(ErrorResponse {
1798+
message: "failed to profile the pipeline using samply".to_string(),
1799+
error_code: "SamplyProfilingFailure".into(),
1800+
details: serde_json::Value::String(error.to_string()),
1801+
})
1802+
}
1803+
1804+
/// Helper function to construct HttpResponse from SamplyProfile result
1805+
fn samply_profile_response(last_profile: &SamplyProfile) -> HttpResponse {
1806+
match last_profile {
1807+
Some(Ok(bytes)) => {
1808+
let bytes = bytes.clone();
17391809
let byte_stream = once(async move { Ok::<_, PipelineError>(web::Bytes::from(bytes)) });
17401810

17411811
HttpResponse::Ok()
@@ -1749,17 +1819,13 @@ async fn get_samply_profile(state: WebData<ServerState>) -> Result<HttpResponse,
17491819
))
17501820
.streaming(byte_stream)
17511821
}
1752-
SamplyProfile::Failure(ref error) => HttpResponse::InternalServerError().json(ErrorResponse {
1753-
message: "failed to profile the pipeline using samply".to_string(),
1754-
error_code: "SamplyProfilingFailure".into(),
1755-
details: serde_json::Value::String(error.to_string()),
1756-
}),
1757-
SamplyProfile::None => HttpResponse::BadRequest().json(json!({
1822+
Some(Err(error)) => samply_profile_error_response(error),
1823+
None => HttpResponse::BadRequest().json(json!({
17581824
"message": "no samply profile found; trigger a samply profile by making a POST request to `/samply_profile`",
17591825
"error_code": "NoSamplyProfile",
17601826
"details": null
17611827
})),
1762-
})
1828+
}
17631829
}
17641830

17651831
#[post("/checkpoint/sync")]

crates/adapters/src/server/error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ pub struct ErrorResponse {
7373
pub error_code: Cow<'static, str>,
7474
/// Detailed error metadata.
7575
/// The contents of this field is determined by `error_code`.
76-
#[schema(value_type=Object)]
76+
#[schema(value_type=Value)]
7777
pub details: JsonValue,
7878
}
7979

crates/feldera-types/src/error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub struct ErrorResponse {
1818
pub error_code: Cow<'static, str>,
1919
/// Detailed error metadata.
2020
/// The contents of this field is determined by `error_code`.
21-
#[schema(value_type = Object)]
21+
#[schema(value_type = Value)]
2222
pub details: JsonValue,
2323
}
2424

crates/feldera-types/src/query_params.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,3 +52,12 @@ pub struct SamplyProfileParams {
5252
fn default_samply_profile_duration() -> u64 {
5353
30
5454
}
55+
56+
/// Query parameters to retrieve samply profile.
57+
#[derive(Debug, Deserialize, IntoParams, ToSchema)]
58+
pub struct SamplyProfileGetParams {
59+
/// If true, returns 307 redirect if profile collection is in progress.
60+
/// If false or not provided, returns the last collected profile.
61+
#[serde(default)]
62+
pub latest: bool,
63+
}

crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use actix_web::{
1515
web::{self, Data as WebData, ReqData},
1616
HttpRequest, HttpResponse,
1717
};
18-
use feldera_types::query_params::{MetricsParameters, SamplyProfileParams};
18+
use feldera_types::query_params::{MetricsParameters, SamplyProfileGetParams, SamplyProfileParams};
1919
use feldera_types::{program_schema::SqlIdentifier, query_params::ActivateParams};
2020
use std::time::Duration;
2121
use tracing::{debug, info};
@@ -1168,7 +1168,10 @@ pub(crate) async fn get_checkpoint_sync_status(
11681168
SamplyProfileParams,
11691169
),
11701170
responses(
1171-
(status = OK, description = "Started profiling the pipeline with the Samply tool"),
1171+
(status = ACCEPTED, description = "Started profiling the pipeline with the Samply tool"),
1172+
(status = CONFLICT
1173+
, description = "Samply profile collection is already in progress"
1174+
, body = ErrorResponse),
11721175
(status = NOT_FOUND
11731176
, description = "Pipeline with that name does not exist"
11741177
, body = ErrorResponse
@@ -1212,24 +1215,28 @@ pub(crate) async fn start_samply_profile(
12121215

12131216
/// Get Samply Profile
12141217
///
1215-
/// Retrieve the most recent samply profile of a running pipeline.
1218+
/// Retrieve the last samply profile of a pipeline, regardless of whether profiling is currently in progress.
1219+
/// If ?latest parameter is specified and Samply profile collection is in progress, returns HTTP 307 with Retry-After header.
12161220
#[utoipa::path(
12171221
context_path = "/v0",
12181222
security(("JSON web token (JWT) or API key" = [])),
12191223
params(
12201224
("pipeline_name" = String, Path, description = "Unique pipeline name"),
1225+
SamplyProfileGetParams,
12211226
),
12221227
responses(
12231228
(status = OK
12241229
, description = "Samply profile as a gzip containing the profile that can be inspected by the samply tool"
12251230
, content_type = "application/gzip"
12261231
, body = Vec<u8>),
1232+
(status = NO_CONTENT
1233+
, description = "Samply profile collection is in progress. Check the Retry-After header for expected completion time."),
12271234
(status = NOT_FOUND
12281235
, description = "Pipeline with that name does not exist"
12291236
, body = ErrorResponse
12301237
, example = json!(examples::error_unknown_pipeline_name())),
12311238
(status = BAD_REQUEST
1232-
, description = "No samply profile exists for the pipeline, create one by calling `POST /pipelines/{pipeline_name}/samply_profile/start?duration_secs=30`"
1239+
, description = "No samply profile exists for the pipeline, create one by calling `POST /pipelines/{pipeline_name}/samply_profile?duration_secs=30`"
12331240
, body = ErrorResponse),
12341241
(status = SERVICE_UNAVAILABLE
12351242
, body = ErrorResponse

0 commit comments

Comments
 (0)