Skip to content

Commit de60b0a

Browse files
committed
profiling: support profiling a running pipeline with samply
First, you must trigger the profiling with `POST /samply_profile`. To specify how long samply runs, set the query parameter `duration_secs`, in seconds. Default: 30 After 30s, make a `GET /samply_profile` request to get the latest profile. This profile should be a `zip` file, containing two files: - `profile.json` - `profile.syms.json` (the symbols file) Download the profile, unzip it, and load it to samply to view the profile. Profiling may fail if `samply` is not installed, or if the required permissions do not exist. Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
1 parent 8b94460 commit de60b0a

File tree

15 files changed

+783
-30
lines changed

15 files changed

+783
-30
lines changed

.github/workflows/test-integration-platform.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ jobs:
185185
--health-interval 10s
186186
--health-timeout 5s
187187
--health-retries 5
188+
--cap-add=PERFMON
188189
189190
steps:
190191
- name: Show Kubernetes node

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/adapters/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ bytestring = "1.4.0"
7272
circular-queue = { workspace = true, features = ["serde_support"] }
7373
crossbeam = { workspace = true }
7474
dbsp = { workspace = true }
75+
semver = { workspace = true }
7576
serde = { workspace = true, features = ["derive", "rc"] }
7677
erased-serde = { workspace = true }
7778
serde_yaml = { workspace = true }
@@ -194,6 +195,7 @@ thread-id = { workspace = true }
194195
parking_lot = { workspace = true }
195196
backoff = { workspace = true }
196197
sentry = { workspace = true }
198+
zip = { workspace = true }
197199

198200
[package.metadata.cargo-machete]
199201
ignored = ["num-traits"]

crates/adapters/src/controller.rs

Lines changed: 133 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use crate::{
4444
CircuitCatalog, Encoder, InputConsumer, OutputConsumer, OutputEndpoint, ParseError,
4545
PipelineState, TransportInputEndpoint,
4646
};
47-
use anyhow::{anyhow, Error as AnyError};
47+
use anyhow::{anyhow, Context, Error as AnyError};
4848
use arrow::datatypes::Schema;
4949
use atomic::Atomic;
5050
use checkpoint::Checkpoint;
@@ -117,6 +117,9 @@ use std::{
117117
thread::JoinHandle,
118118
time::{Duration, Instant},
119119
};
120+
use tokio::fs::File;
121+
use tokio::io::AsyncReadExt;
122+
use tokio::io::BufReader;
120123
use tokio::sync::oneshot;
121124
use tokio::sync::oneshot::error::TryRecvError;
122125
use tokio::sync::Mutex as TokioMutex;
@@ -811,6 +814,135 @@ impl Controller {
811814
receiver.await.unwrap()
812815
}
813816

817+
pub async fn async_samply_profile(&self, duration: u64) -> Result<Vec<u8>, AnyError> {
818+
#[cfg(not(unix))]
819+
{
820+
anyhow::bail!(
821+
"samply is not supported on this platform; only supported on unix platforms"
822+
)
823+
}
824+
825+
let err_msg = format!(
826+
"is Samply installed and in `$PATH`? try: `curl --proto '=https' --tlsv1.2 -LsSf https://github.com/abhizer/samply/releases/download/v0.13.2/samply-installer.sh | sh`",
827+
);
828+
829+
let version = tokio::process::Command::new("samply")
830+
.arg("--version")
831+
.output()
832+
.await
833+
.with_context(|| format!("failed to get samply version; {err_msg}"))?;
834+
835+
let version = semver::Version::parse(
836+
String::from_utf8(version.stdout)?
837+
.split_whitespace()
838+
.nth(1)
839+
.unwrap_or("0.0.0"),
840+
)
841+
.with_context(|| format!("failed to parse samply version; {err_msg}"))?;
842+
843+
let req = semver::VersionReq::parse(">=0.13.2").unwrap();
844+
845+
if !req.matches(&version) {
846+
anyhow::bail!(
847+
"samply version is too old (found: {}, required: >= {}); {err_msg}",
848+
version,
849+
req
850+
);
851+
}
852+
853+
info!(
854+
"collecting samply profile for the next {} seconds",
855+
duration
856+
);
857+
858+
let temp = tempfile::Builder::new()
859+
.prefix("samply_profile_")
860+
.suffix(".json.gz")
861+
.rand_bytes(10)
862+
.tempfile()
863+
.context("failed to create tempfile to store samply profiles")?;
864+
let profile_file = temp
865+
.path()
866+
.to_str()
867+
.context("failed to convert path to samply profile to str")?;
868+
869+
let mut cmd = tokio::process::Command::new("samply");
870+
let mut child = cmd
871+
.args([
872+
"record",
873+
"-p",
874+
&std::process::id().to_string(),
875+
"-o",
876+
profile_file,
877+
"--save-only",
878+
"--presymbolicate",
879+
])
880+
.stdout(std::process::Stdio::piped())
881+
.stderr(std::process::Stdio::piped())
882+
.spawn()
883+
.context("failed to spawn samply process")?;
884+
885+
let child_pid = child.id().context("failed to get samply process id")?;
886+
887+
// Workaround as samply's `--duration` flag doesn't seem to work.
888+
// See: https://github.com/mstange/samply/issues/716
889+
//
890+
// As the duration flag doesn't work, we have to send a SIGINT to
891+
// tell samply to stop recording.
892+
//
893+
// If samply returns before the specified duration, it is likely due
894+
// to an error, and in such cases, we want to report it immediately.
895+
tokio::select! {
896+
_ = child.wait() => {}
897+
_ = tokio::time::sleep(Duration::from_secs(duration)) => {
898+
// Send SIGINT to the samply process to stop recording.
899+
nix::sys::signal::kill(
900+
nix::unistd::Pid::from_raw(child_pid as i32),
901+
nix::sys::signal::Signal::SIGINT,
902+
)
903+
.context("failed to send SIGINT to samply process")?;
904+
}
905+
}
906+
907+
let output = child
908+
.wait_with_output()
909+
.await
910+
.context("failed when waiting for samply process")?;
911+
912+
if !output.status.success() {
913+
anyhow::bail!(
914+
"samply process failed with status: `{}`, samply stdout: `{}`, samply stderr: `{}`",
915+
output.status,
916+
String::from_utf8_lossy(&output.stdout)
917+
.trim()
918+
.replace("\n", "\\n"),
919+
String::from_utf8_lossy(&output.stderr)
920+
.trim()
921+
.replace("\n", "\\n"),
922+
);
923+
}
924+
925+
let mut file = BufReader::new(File::open(profile_file).await.context(format!(
926+
"failed to open samply profile file `{profile_file}`"
927+
))?);
928+
929+
let mut buf = Vec::with_capacity(10 * 1024 * 1024); // 10 MB
930+
931+
file.read_to_end(&mut buf).await.context(format!(
932+
"failed to read samply profile file `{profile_file}`"
933+
))?;
934+
935+
if buf.is_empty() {
936+
anyhow::bail!(
937+
"samply profile is empty; no data collected; profile file: `{profile_file}`"
938+
);
939+
}
940+
941+
tracing::info!("collected samply profile ({} bytes)", buf.len());
942+
943+
Ok(buf)
944+
}
945+
814946
/// Triggers a sync checkpoint operation. `cb` will be called when it
815947
/// completes.
816948
///

crates/adapters/src/samply.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ use nix::{
7575
};
7676
use smallstr::SmallString;
7777
use tempfile::{tempdir, TempDir};
78-
use tracing::{enabled, error, span::EnteredSpan, Level, Span};
78+
use tracing::{enabled, error, info, span::EnteredSpan, Level, Span};
7979

8080
#[derive(Copy, Clone, Debug)]
8181
struct Timestamp(i64);
@@ -295,3 +295,26 @@ fn write_marker(start: Timestamp, end: Timestamp, name: &str) {
295295
writeln!(&mut s, "{start} {end} {name}").unwrap();
296296
let _ = with_marker_file(|f| f.write_all(s.as_bytes()));
297297
}
298+
299+
#[derive(Debug, Default, Clone, PartialEq, Eq)]
300+
pub(crate) enum SamplyProfile {
301+
#[default]
302+
None,
303+
Success(Vec<u8>),
304+
Failure(String),
305+
}
306+
307+
impl SamplyProfile {
308+
pub(crate) fn update(&mut self, result: Result<Vec<u8>, anyhow::Error>) {
309+
match result {
310+
Ok(data) => {
311+
info!("collected samply profile: {} bytes", data.len());
312+
*self = SamplyProfile::Success(data);
313+
}
314+
Err(error) => {
315+
error!("samply profiling failed: {:?}", error);
316+
*self = SamplyProfile::Failure(error.to_string());
317+
}
318+
}
319+
}
320+
}

0 commit comments

Comments
 (0)