@@ -13,7 +13,7 @@ use crate::{
1313 adhoc:: { adhoc_websocket, stream_adhoc_result} ,
1414 controller:: ConnectorConfig ,
1515 ensure_default_crypto_provider,
16- samply:: SamplyProfile ,
16+ samply:: { SamplyProfile , SamplyState , SamplyStatus } ,
1717 transport:: http:: {
1818 HttpInputEndpoint , HttpInputTransport , HttpOutputEndpoint , HttpOutputTransport ,
1919 } ,
@@ -63,7 +63,7 @@ use feldera_types::constants::STATUS_FILE;
6363use feldera_types:: coordination:: { AdHocScan , CoordinationActivate , Labels , Step , StepRequest } ;
6464use feldera_types:: pipeline_diff:: PipelineDiff ;
6565use feldera_types:: query_params:: {
66- ActivateParams , MetricsFormat , MetricsParameters , SamplyProfileParams ,
66+ ActivateParams , MetricsFormat , MetricsParameters , SamplyProfileGetParams , SamplyProfileParams ,
6767} ;
6868use feldera_types:: runtime_status:: {
6969 BootstrapPolicy , ExtendedRuntimeStatus , ExtendedRuntimeStatusError , RuntimeDesiredStatus ,
@@ -266,8 +266,8 @@ pub(crate) struct ServerState {
266266 sync_checkpoint_state : Mutex < CheckpointSyncState > ,
267267
268268 /// Leaf lock.
269- /// Latest samply profile .
270- samply_profile : Arc < Mutex < SamplyProfile > > ,
269+ /// Samply profiling state .
270+ samply_state : Arc < Mutex < SamplyState > > ,
271271
272272 /// Deployment ID.
273273 deployment_id : Uuid ,
@@ -322,7 +322,7 @@ impl ServerState {
322322 deployment_id,
323323 storage,
324324 rate_limiter,
325- samply_profile : Default :: default ( ) ,
325+ samply_state : Default :: default ( ) ,
326326 coordination_activate : Default :: default ( ) ,
327327 leases : Default :: default ( ) ,
328328 }
@@ -1720,22 +1720,92 @@ async fn samply_profile(
17201720 let duration = query_params. duration_secs ;
17211721 let controller = state. controller ( ) ?;
17221722
1723- let state_samply_profile = state. samply_profile . clone ( ) ;
1723+ let state_samply_state = state. samply_state . clone ( ) ;
1724+
1725+ // Check if profiling is already in progress
1726+ {
1727+ let samply_state = state_samply_state. lock ( ) . unwrap ( ) ;
1728+ if matches ! ( samply_state. samply_status, SamplyStatus :: InProgress { .. } ) {
1729+ return Ok ( HttpResponse :: Conflict ( ) . json ( ErrorResponse {
1730+ message : "samply profile collection is already in progress" . to_string ( ) ,
1731+ error_code : "SamplyProfilingInProgress" . into ( ) ,
1732+ details : serde_json:: Value :: Null ,
1733+ } ) ) ;
1734+ }
1735+ }
1736+
1737+ // Set the state to InProgress with expected completion time
1738+ let expected_after = chrono:: Utc :: now ( ) + chrono:: Duration :: seconds ( duration as i64 ) ;
1739+ state_samply_state
1740+ . lock ( )
1741+ . unwrap ( )
1742+ . start_profiling ( expected_after) ;
17241743
17251744 spawn ( async move {
17261745 let result = controller. async_samply_profile ( duration) . await ;
1727- state_samply_profile. lock ( ) . unwrap ( ) . update ( result) ;
1746+ state_samply_state
1747+ . lock ( )
1748+ . unwrap ( )
1749+ . complete_profiling ( result) ;
17281750 } ) ;
17291751
1730- Ok ( HttpResponse :: Ok ( ) . finish ( ) )
1752+ // Wait to check if it errored out immediately
1753+ sleep ( Duration :: from_millis ( 600 ) ) . await ;
1754+
1755+ // Check if profiling is still running or failed immediately
1756+ let samply_state = state. samply_state . lock ( ) . unwrap ( ) ;
1757+ Ok ( match samply_state. samply_status {
1758+ // Profile is still running - return success
1759+ SamplyStatus :: InProgress { .. } => HttpResponse :: Accepted ( ) . finish ( ) ,
1760+ // Profile completed during wait - check if it failed
1761+ SamplyStatus :: Idle => match & samply_state. last_profile {
1762+ Some ( Err ( error) ) => samply_profile_error_response ( error) ,
1763+ _ => HttpResponse :: InternalServerError ( ) . json ( ErrorResponse {
1764+ message : "samply profiling completed unexpectedly" . to_string ( ) ,
1765+ error_code : "SamplyProfilingUnexpectedCompletion" . into ( ) ,
1766+ details : serde_json:: Value :: Null ,
1767+ } ) ,
1768+ } ,
1769+ } )
17311770}
17321771
17331772#[ get( "/samply_profile" ) ]
1734- async fn get_samply_profile ( state : WebData < ServerState > ) -> Result < HttpResponse , PipelineError > {
1735- let profile = state. samply_profile . lock ( ) . unwrap ( ) ;
1773+ async fn get_samply_profile (
1774+ state : WebData < ServerState > ,
1775+ query_params : web:: Query < SamplyProfileGetParams > ,
1776+ ) -> Result < HttpResponse , PipelineError > {
1777+ let samply_state = state. samply_state . lock ( ) . unwrap ( ) ;
1778+
1779+ // If latest=true, check if profiling is in progress and return 204 No Content
1780+ if query_params. latest {
1781+ if let SamplyStatus :: InProgress { expected_after } = samply_state. samply_status {
1782+ let now = chrono:: Utc :: now ( ) ;
1783+ let retry_after_secs = ( expected_after - now) . num_seconds ( ) . max ( 0 ) ;
1784+
1785+ return Ok ( HttpResponse :: NoContent ( )
1786+ . insert_header ( ( "Retry-After" , retry_after_secs. to_string ( ) ) )
1787+ . finish ( ) ) ;
1788+ }
1789+ }
1790+
1791+ // Return the last profile result
1792+ Ok ( samply_profile_response ( & samply_state. last_profile ) )
1793+ }
17361794
1737- Ok ( match profile. clone ( ) {
1738- SamplyProfile :: Success ( bytes) => {
1795+ /// Helper function to construct error response for samply profiling failure
1796+ fn samply_profile_error_response ( error : & str ) -> HttpResponse {
1797+ HttpResponse :: InternalServerError ( ) . json ( ErrorResponse {
1798+ message : "failed to profile the pipeline using samply" . to_string ( ) ,
1799+ error_code : "SamplyProfilingFailure" . into ( ) ,
1800+ details : serde_json:: Value :: String ( error. to_string ( ) ) ,
1801+ } )
1802+ }
1803+
1804+ /// Helper function to construct HttpResponse from SamplyProfile result
1805+ fn samply_profile_response ( last_profile : & SamplyProfile ) -> HttpResponse {
1806+ match last_profile {
1807+ Some ( Ok ( bytes) ) => {
1808+ let bytes = bytes. clone ( ) ;
17391809 let byte_stream = once ( async move { Ok :: < _ , PipelineError > ( web:: Bytes :: from ( bytes) ) } ) ;
17401810
17411811 HttpResponse :: Ok ( )
@@ -1749,17 +1819,13 @@ async fn get_samply_profile(state: WebData<ServerState>) -> Result<HttpResponse,
17491819 ) )
17501820 . streaming ( byte_stream)
17511821 }
1752- SamplyProfile :: Failure ( ref error) => HttpResponse :: InternalServerError ( ) . json ( ErrorResponse {
1753- message : "failed to profile the pipeline using samply" . to_string ( ) ,
1754- error_code : "SamplyProfilingFailure" . into ( ) ,
1755- details : serde_json:: Value :: String ( error. to_string ( ) ) ,
1756- } ) ,
1757- SamplyProfile :: None => HttpResponse :: BadRequest ( ) . json ( json ! ( {
1822+ Some ( Err ( error) ) => samply_profile_error_response ( error) ,
1823+ None => HttpResponse :: BadRequest ( ) . json ( json ! ( {
17581824 "message" : "no samply profile found; trigger a samply profile by making a POST request to `/samply_profile`" ,
17591825 "error_code" : "NoSamplyProfile" ,
17601826 "details" : null
17611827 } ) ) ,
1762- } )
1828+ }
17631829}
17641830
17651831#[ post( "/checkpoint/sync" ) ]
0 commit comments