diff --git a/crates/adapters/src/server.rs b/crates/adapters/src/server.rs index 22027506c61..eb1a24a8ce9 100644 --- a/crates/adapters/src/server.rs +++ b/crates/adapters/src/server.rs @@ -1252,6 +1252,7 @@ where .service(coordination_adhoc_scan) .service(coordination_labels_incomplete) .service(coordination_restart) + .service(clock_advance) } /// Implements `/start`, `/pause`, `/activate`: @@ -2045,6 +2046,44 @@ async fn commit_transaction(state: WebData) -> Result, + body: web::Json, +) -> Result { + use feldera_types::transport::clock::ClockAdvanceResponse; + let controller = state.controller()?; + let reader = + controller + .get_input_endpoint("now") + .ok_or_else(|| PipelineError::InvalidParam { + error: "no `now` clock connector is registered".to_string(), + })?; + let clock_reader = reader + .as_any() + .downcast::() + .map_err(|_| PipelineError::InvalidParam { + error: "`now` connector is not a clock reader".to_string(), + })?; + + let now_ms = + clock_reader + .advance(body.delta_ms) + .await + .map_err(|e| PipelineError::InvalidParam { + error: e.to_string(), + })?; + + let now = chrono::DateTime::::from_timestamp_millis(now_ms as i64) + .map(|dt| dt.to_rfc3339_opts(chrono::SecondsFormat::Millis, true)) + .unwrap_or_else(|| format!("{now_ms} ms since epoch")); + + Ok(HttpResponse::Ok().json(ClockAdvanceResponse { now_ms, now })) +} + #[derive(Debug, Deserialize)] struct IngressArgs { #[serde(default = "HttpInputTransport::default_format")] diff --git a/crates/adapters/src/transport/clock.rs b/crates/adapters/src/transport/clock.rs index a1725a1f4bc..05427dd832d 100644 --- a/crates/adapters/src/transport/clock.rs +++ b/crates/adapters/src/transport/clock.rs @@ -42,12 +42,36 @@ use std::{ }; use tokio::{ select, - sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, + sync::{ + mpsc::{ + Receiver as MpscReceiver, Sender as MpscSender, UnboundedReceiver, UnboundedSender, + channel as bounded_channel, unbounded_channel, + }, + oneshot, + }, time::{Instant, sleep_until}, }; +/// Side-channel commands to [`ClockReader`], used by `POST /clock/advance`. +enum ClockCommand { + /// Move the externally-driven clock forward by `delta_ms` (or one + /// `clock_resolution_ms` if `None`); replies with the new `NOW()`. + Advance { + delta_ms: Option, + reply: oneshot::Sender>, + }, +} + /// The controller uses this configuration to add a clock input connector to each pipeline. pub fn now_endpoint_config(config: &PipelineConfig) -> InputEndpointConfig { + // Compute the offset delta once, at endpoint construction, against the + // current wall clock. The connector then advances `NOW()` at wall-clock + // cadence from this anchor for the rest of the pipeline's lifetime. + let now_offset_delta_ms = config + .global + .dev_tweaks + .now_offset_delta_ms(chrono::Utc::now()); + InputEndpointConfig::new( "now", ConnectorConfig::new( @@ -56,6 +80,8 @@ pub fn now_endpoint_config(config: &PipelineConfig) -> InputEndpointConfig { .global .clock_resolution_usecs .unwrap_or(DEFAULT_CLOCK_RESOLUTION_USECS), + now_offset_delta_ms, + http_driven: config.global.dev_tweaks.now_http_driven(), }), Some(FormatConfig { name: Cow::Borrowed("json"), @@ -110,8 +136,10 @@ impl TransportInputEndpoint for ClockEndpoint { } } -struct ClockReader { +pub struct ClockReader { sender: UnboundedSender, + /// Bounded at 1; one in-flight `advance` at a time (testing knob). + clock_sender: MpscSender, } impl ClockReader { @@ -121,34 +149,71 @@ impl ClockReader { parser: Box, ) -> Self { let (sender, receiver) = unbounded_channel::(); + let (clock_sender, clock_receiver) = bounded_channel::(1); let config_clone = config.clone(); TOKIO.spawn(async { - let _ = Self::worker_task(config_clone, parser, consumer, receiver).await; + let _ = + Self::worker_task(config_clone, parser, consumer, receiver, clock_receiver).await; }); - Self { sender } + Self { + sender, + clock_sender, + } + } + + /// Advance the clock by `delta_ms` (or one `clock_resolution_ms` if + /// `None`) and return the new `NOW()`; `Some(0)` is a read. + /// Requires `http_driven` mode. + pub async fn advance(&self, delta_ms: Option) -> AnyResult { + let (reply, rx) = oneshot::channel(); + self.clock_sender + .send(ClockCommand::Advance { delta_ms, reply }) + .await + .map_err(|_| anyhow!("clock connector is shut down"))?; + rx.await + .map_err(|_| anyhow!("clock connector dropped the advance reply"))? } /// Current timestamp in milliseconds, rounded to `clock_resolution_ms`. + /// + /// Reads the wall clock and forwards to [`Self::current_time_at`] for + /// the actual math. Split out so tests can supply a fixed wall clock. fn current_time(config: &ClockConfig) -> u64 { - let now = SystemTime::now(); - let now_millis = now + Self::current_time_at(config, SystemTime::now()) + } + + /// Timestamp in milliseconds for a given wall-clock instant, shifted + /// by `config.now_offset_delta_ms` if set, then rounded to the clock + /// resolution. + fn current_time_at(config: &ClockConfig, wall: SystemTime) -> u64 { + let wall_ms = wall .duration_since(UNIX_EPOCH) .unwrap_or_default() - .as_millis() as u64; + .as_millis() as i64; + let shifted_ms = wall_ms + .saturating_add(config.now_offset_delta_ms.unwrap_or(0)) + .max(0) as u64; let clock_resolution_ms = config.clock_resolution_ms(); - now_millis - now_millis % clock_resolution_ms + shifted_ms - shifted_ms % clock_resolution_ms } /// Time when we want to trigger the next clock tick, assuming we triggered one at `previous_tick`. /// /// Returns time as `Instant`, so it can be used with `sleep_until`. This also ensures that /// we don't need to worry about timezone or daylight changes. + /// + /// `previous_tick` is the timestamp we just emitted (in the shifted + /// timeline if `now_offset_delta_ms` is set). Wake-ups are paced by + /// wall-clock duration, so we undo the offset before scheduling. fn next_tick_time(config: &ClockConfig, previous_tick: &u64) -> Instant { - let next_tick = previous_tick + config.clock_resolution_ms(); + let next_tick_ms = previous_tick.saturating_add(config.clock_resolution_ms()); + let next_wall_clock_ms = (next_tick_ms as i64) + .saturating_sub(config.now_offset_delta_ms.unwrap_or(0)) + .max(0) as u64; - let target_time = UNIX_EPOCH + Duration::from_millis(next_tick); + let target_time = UNIX_EPOCH + Duration::from_millis(next_wall_clock_ms); let now = SystemTime::now(); let duration_until = target_time.duration_since(now).unwrap_or(Duration::ZERO); @@ -165,6 +230,7 @@ impl ClockReader { mut parser: Box, consumer: Box, mut receiver: UnboundedReceiver, + mut clock_receiver: MpscReceiver, ) { const RECORD_SIZE: BufferSize = BufferSize { records: 1, @@ -172,6 +238,9 @@ impl ClockReader { }; let mut next_tick: Option = None; let mut pipeline_state = PipelineState::Paused; + + let mut current_now_ms: u64 = Self::current_time(&config); + loop { select! { // When the next clock tick is scheduled, wakeup at `next_tick` time. @@ -181,6 +250,30 @@ impl ClockReader { consumer.request_step(); } } + Some(clock_cmd) = clock_receiver.recv() => { match clock_cmd { + ClockCommand::Advance { delta_ms, reply } => { + if !config.http_driven { + let _ = reply.send(Err(anyhow!( + "clock is wall-clock driven; set dev_tweaks.now_http_driven = true to enable POST /clock/advance" + ))); + continue; + } + let resolution = config.clock_resolution_ms(); + // `None` advances by one resolution tick (the + // default wall-clock cadence); explicit values + // advance by that many milliseconds. + let effective_delta = delta_ms.unwrap_or(resolution); + let advanced = current_now_ms.saturating_add(effective_delta); + // Round to the resolution so emitted timestamps + // stay on resolution boundaries. + current_now_ms = advanced - advanced % resolution; + if effective_delta > 0 { + // Schedule a step so the new value gets emitted. + consumer.request_step(); + } + let _ = reply.send(Ok(current_now_ms)); + } + }} message = receiver.recv() => match message { None => { // channel closed @@ -199,6 +292,11 @@ impl ClockReader { let mut buffer = parser.parse(Self::timestamp_to_record(ts_millis).as_bytes(), None).0.unwrap(); buffer.flush(); consumer.replayed(RECORD_SIZE, 0); + // Track the last replayed value so a subsequent + // external advance continues from there rather + // than jumping back to `now_offset`. Tested by + // `test_clock_advance_survives_checkpoint`. + current_now_ms = ts_millis; } Some(InputReaderCommand::Extend) => { pipeline_state = PipelineState::Running; @@ -213,7 +311,11 @@ impl ClockReader { Some(InputReaderCommand::Queue { .. }) => { // Push current time; consumer.buffered(RECORD_SIZE); - let now = Self::current_time(&config); + let now = if config.http_driven { + current_now_ms + } else { + Self::current_time(&config) + }; let mut buffer = parser.parse(Self::timestamp_to_record(now).as_bytes(), None).0.unwrap(); buffer.flush(); consumer.extended(RECORD_SIZE, Some(Resume::Replay { @@ -222,8 +324,13 @@ impl ClockReader { hash: 0 }), vec![Watermark::new(Utc::now(), None)]); - // Schedule next tick. - next_tick = Some(Self::next_tick_time(&config, &now)); + current_now_ms = now; + + // Schedule the next wall-clock tick only in wall + // mode; externally-driven mode waits for advance(). + if !config.http_driven { + next_tick = Some(Self::next_tick_time(&config, &now)); + } } Some(InputReaderCommand::Disconnect) => { break; @@ -453,4 +560,327 @@ mod test { controller.stop().unwrap(); println!("Clock test is finished"); } + + /// `current_time_at` with a fixed wall clock and a configured offset + /// emits exactly the configured target timestamp. Fully deterministic: + /// the wall clock is supplied by the test, not read from the system. + #[test] + fn test_current_time_with_offset() { + use chrono::{TimeZone, Utc}; + use feldera_types::transport::clock::ClockConfig; + use std::time::{Duration, UNIX_EPOCH}; + + // An arbitrary fixed wall-clock instant; the actual value is + // irrelevant because we compute the delta against it. + let wall_ms: i64 = 1_700_000_000_000; + let wall = UNIX_EPOCH + Duration::from_millis(wall_ms as u64); + + let past = Utc.with_ymd_and_hms(1970, 1, 1, 0, 0, 0).unwrap(); + let future = Utc.with_ymd_and_hms(2036, 1, 1, 0, 0, 0).unwrap(); + + let make = |target: chrono::DateTime| ClockConfig { + clock_resolution_usecs: 1_000_000, + now_offset_delta_ms: Some(target.timestamp_millis() - wall_ms), + http_driven: false, + }; + + let past_ts = super::ClockReader::current_time_at(&make(past), wall); + let future_ts = super::ClockReader::current_time_at(&make(future), wall); + + assert_eq!(past_ts, past.timestamp_millis() as u64); + assert_eq!(future_ts, future.timestamp_millis() as u64); + } + + /// Pre-epoch offsets clamp to `0`. + #[test] + fn test_current_time_with_pre_epoch_offset_clamps_to_zero() { + use chrono::{TimeZone, Utc}; + use feldera_types::transport::clock::ClockConfig; + use std::time::{Duration, UNIX_EPOCH}; + + let wall_ms: i64 = 1_700_000_000_000; + let wall = UNIX_EPOCH + Duration::from_millis(wall_ms as u64); + + // A pre-epoch target (year 1900) and a one-millisecond-before-epoch + // target both fall under the wire-format floor. + for target in [ + Utc.with_ymd_and_hms(1900, 1, 1, 0, 0, 0).unwrap(), + Utc.timestamp_millis_opt(-1).unwrap(), + ] { + let config = ClockConfig { + clock_resolution_usecs: 1_000_000, + now_offset_delta_ms: Some(target.timestamp_millis() - wall_ms), + http_driven: false, + }; + let ts = super::ClockReader::current_time_at(&config, wall); + assert_eq!(ts, 0, "{target} should clamp to epoch, got {ts}"); + } + + // Exactly epoch is the floor: it should *not* clamp away, it + // should emit 0 as the legitimate epoch timestamp. + let epoch_config = ClockConfig { + clock_resolution_usecs: 1_000_000, + now_offset_delta_ms: Some(0 - wall_ms), + http_driven: false, + }; + assert_eq!(super::ClockReader::current_time_at(&epoch_config, wall), 0); + } + + /// `now_endpoint_config` translates `DevTweaks::now_offset` into a + /// populated `ClockConfig::now_offset_delta_ms`, and leaves the field + /// `None` when no override is configured. Checks only the wiring; + /// the delta arithmetic itself lives in [`test_current_time_with_offset`]. + #[test] + fn test_now_endpoint_config_with_offset() { + use feldera_types::config::{PipelineConfig, TransportConfig}; + + let delta_of = |body: serde_json::Value| -> Option { + let config: PipelineConfig = serde_json::from_value(body).unwrap(); + let endpoint = super::now_endpoint_config(&config); + let TransportConfig::ClockInput(clock_config) = &endpoint.connector_config.transport + else { + panic!("expected ClockInput transport"); + }; + clock_config.now_offset_delta_ms + }; + + // Past target. + assert!( + delta_of(json!({ + "name": "test", "workers": 1, "fault_tolerance": {}, "inputs": {}, + "dev_tweaks": { "now_offset": "1970-01-01T00:00:00Z" }, + })) + .is_some(), + ); + // Future target. + assert!( + delta_of(json!({ + "name": "test", "workers": 1, "fault_tolerance": {}, "inputs": {}, + "dev_tweaks": { "now_offset": "2036-01-01T00:00:00Z" }, + })) + .is_some(), + ); + // No override → no delta. + assert!( + delta_of(json!({ + "name": "test", "workers": 1, "fault_tolerance": {}, "inputs": {}, + })) + .is_none(), + ); + } + + /// `ClockReader::advance` contract: read, explicit forward, and tick-by-resolution. + #[test] + fn test_clock_advance_http_driven() { + use dbsp::circuit::tokio::TOKIO; + + let tempdir = TempDir::new().unwrap(); + let storage_dir = tempdir.path().join("storage"); + create_dir(&storage_dir).unwrap(); + + // Anchor `NOW()` at a fixed past instant so the test asserts + // against literal values rather than wall clock. + let anchor_ms: u64 = 1_700_000_000_000; + let anchor_rfc = chrono::DateTime::::from_timestamp_millis(anchor_ms as i64) + .unwrap() + .to_rfc3339(); + + // 1-second resolution so the `None`-tick step is a clean +1000ms. + let resolution_ms: u64 = 1_000; + let config = serde_json::from_value(json!({ + "name": "test", + "workers": 1, + "storage_config": { "path": storage_dir }, + "fault_tolerance": {}, + "inputs": {}, + "clock_resolution_usecs": resolution_ms * 1_000, + "dev_tweaks": { + "now_offset": anchor_rfc, + "now_http_driven": true, + }, + })) + .unwrap(); + + let test_stats = Arc::new(ClockStats::new()); + let test_stats_clone = test_stats.clone(); + + let controller = Controller::with_test_config( + move |workers| Ok(clock_test_circuit(workers, test_stats_clone)), + &config, + Box::new(move |e, _| panic!("clock_advance test: error: {e}")), + ) + .unwrap(); + + controller.start(); + // Let the initial Extend-triggered tick drain. + sleep(Duration::from_millis(500)); + + let reader = controller + .get_input_endpoint("now") + .expect("clock connector should be registered"); + let clock_reader = reader + .as_any() + .downcast::() + .expect("`now` should downcast to ClockReader"); + + // Read-only: Some(0) returns the current NOW() without moving it. + let now0 = TOKIO.block_on(clock_reader.advance(Some(0))).unwrap(); + assert_eq!(now0, anchor_ms, "initial NOW() should be the anchor"); + + let now1 = TOKIO.block_on(clock_reader.advance(Some(0))).unwrap(); + assert_eq!(now1, now0, "Some(0) must not move the clock"); + + // None advances by one `clock_resolution_ms` (one wall-clock tick). + let now_tick = TOKIO.block_on(clock_reader.advance(None)).unwrap(); + assert_eq!(now_tick, anchor_ms + resolution_ms); + + // Some(n) advances by exactly n ms. + let now2 = TOKIO.block_on(clock_reader.advance(Some(60_000))).unwrap(); + assert_eq!(now2, anchor_ms + resolution_ms + 60_000); + + // Compounding works across explicit and tick-style advances. + let now3 = TOKIO.block_on(clock_reader.advance(None)).unwrap(); + assert_eq!(now3, anchor_ms + 2 * resolution_ms + 60_000); + + controller.stop().unwrap(); + } + + /// `advance` errors when `http_driven` is false. + #[test] + fn test_clock_advance_rejected_when_wall_driven() { + use dbsp::circuit::tokio::TOKIO; + + let tempdir = TempDir::new().unwrap(); + let storage_dir = tempdir.path().join("storage"); + create_dir(&storage_dir).unwrap(); + + let config = serde_json::from_value(json!({ + "name": "test", + "workers": 1, + "storage_config": { "path": storage_dir }, + "fault_tolerance": {}, + "inputs": {}, + })) + .unwrap(); + + let test_stats = Arc::new(ClockStats::new()); + let test_stats_clone = test_stats.clone(); + + let controller = Controller::with_test_config( + move |workers| Ok(clock_test_circuit(workers, test_stats_clone)), + &config, + Box::new(move |e, _| panic!("clock_advance reject test: error: {e}")), + ) + .unwrap(); + + controller.start(); + sleep(Duration::from_millis(500)); + + let reader = controller.get_input_endpoint("now").unwrap(); + let clock_reader = reader.as_any().downcast::().unwrap(); + + let err = TOKIO + .block_on(clock_reader.advance(Some(60_000))) + .expect_err("advance must error in wall-clock mode"); + assert!( + err.to_string().contains("now_http_driven"), + "unexpected error: {err}" + ); + + controller.stop().unwrap(); + } + + /// After checkpoint/restart, `NOW()` resumes from the last replayed value, not the anchor. + #[test] + fn test_clock_advance_survives_checkpoint() { + use dbsp::circuit::tokio::TOKIO; + + let tempdir = TempDir::new().unwrap(); + let storage_dir = tempdir.path().join("storage"); + create_dir(&storage_dir).unwrap(); + + let anchor_ms: u64 = 1_700_000_000_000; + let anchor_rfc = chrono::DateTime::::from_timestamp_millis(anchor_ms as i64) + .unwrap() + .to_rfc3339(); + let resolution_ms: u64 = 1_000; + let one_day_ms: u64 = 24 * 60 * 60 * 1_000; + + let config = serde_json::from_value(json!({ + "name": "test", + "workers": 1, + "storage_config": { "path": storage_dir }, + "fault_tolerance": {}, + "inputs": {}, + "clock_resolution_usecs": resolution_ms * 1_000, + "dev_tweaks": { + "now_offset": anchor_rfc, + "now_http_driven": true, + }, + })) + .unwrap(); + + // Run 1: advance once → checkpoint → advance again → stop. + let last_emitted_in_run1 = { + let test_stats = Arc::new(ClockStats::new()); + let test_stats_clone = test_stats.clone(); + let controller = Controller::with_test_config( + move |workers| Ok(clock_test_circuit(workers, test_stats_clone)), + &config, + Box::new(move |e, _| panic!("checkpoint survival run 1: {e}")), + ) + .unwrap(); + controller.start(); + + let reader = controller.get_input_endpoint("now").unwrap(); + let clock_reader = reader.as_any().downcast::().unwrap(); + + // First advance, lands *before* the checkpoint. + TOKIO + .block_on(clock_reader.advance(Some(one_day_ms))) + .unwrap(); + controller.checkpoint().unwrap(); + + // Second advance, lands *after* the checkpoint marker. This + // is the journal entry that will be replayed on restart. + let post = TOKIO + .block_on(clock_reader.advance(Some(one_day_ms))) + .unwrap(); + assert_eq!(post, anchor_ms + 2 * one_day_ms); + // `advance()` returns when the worker has requested a step, + // but the Queue that actually records the value to the + // journal is asynchronous. Wait for it to drain or there + // will be nothing for run 2 to replay. + sleep(Duration::from_secs(1)); + controller.stop().unwrap(); + post + }; + + // Run 2: restart from the checkpoint. + let test_stats = Arc::new(ClockStats::new()); + let test_stats_clone = test_stats.clone(); + let controller = Controller::with_test_config( + move |workers| Ok(clock_test_circuit(workers, test_stats_clone)), + &config, + Box::new(move |e, _| panic!("checkpoint survival run 2: {e}")), + ) + .unwrap(); + + let reader = controller.get_input_endpoint("now").unwrap(); + let clock_reader = reader.as_any().downcast::().unwrap(); + + let post_replay = TOKIO.block_on(clock_reader.advance(Some(0))).unwrap(); + assert_eq!( + post_replay, last_emitted_in_run1, + "NOW() must resume from the last replayed value, not jump back to anchor" + ); + + // A further advance continues monotonically. + let after_more = TOKIO + .block_on(clock_reader.advance(Some(one_day_ms))) + .unwrap(); + assert_eq!(after_more, last_emitted_in_run1 + one_day_ms); + + controller.stop().unwrap(); + } } diff --git a/crates/fda/src/cli.rs b/crates/fda/src/cli.rs index 7fabd08fa82..5bd04962561 100644 --- a/crates/fda/src/cli.rs +++ b/crates/fda/src/cli.rs @@ -703,6 +703,23 @@ pub enum PipelineAction { #[arg(value_hint = ValueHint::Other, add = ArgValueCompleter::new(pipeline_names))] name: String, }, + /// Advance the externally-driven `NOW()` clock by `delta_ms` and + /// print the new value. + /// + /// Requires `dev_tweaks.now_http_driven = true` on the pipeline. + /// The clock is forward-only; `delta_ms = 0` reads the current value + /// without moving it; omitting `--delta-ms` advances by one + /// `clock_resolution` (one configured tick). + #[clap(aliases = &["clock-set", "set-clock"])] + ClockAdvance { + /// The name of the pipeline. + #[arg(value_hint = ValueHint::Other, add = ArgValueCompleter::new(pipeline_names))] + name: String, + /// Milliseconds to add to `NOW()`. Omit to advance by one + /// `clock_resolution`. + #[arg(long)] + delta_ms: Option, + }, /// Clear the storage resources of a pipeline. /// /// Note that the pipeline must be stopped before clearing its storage resources. diff --git a/crates/fda/src/main.rs b/crates/fda/src/main.rs index b46ec2b7ce6..eda65b5175b 100644 --- a/crates/fda/src/main.rs +++ b/crates/fda/src/main.rs @@ -8,6 +8,7 @@ use feldera_rest_api::types::*; use feldera_rest_api::*; use feldera_types::config::{FtModel, RuntimeConfig, StorageOptions}; use feldera_types::error::ErrorResponse; +use feldera_types::transport::clock::ClockAdvanceRequest; use futures_util::StreamExt; use json_to_table::json_to_table; use log::{debug, error, info, trace, warn}; @@ -1910,6 +1911,41 @@ async fn pipeline(format: OutputFormat, action: PipelineAction, client: Client) .unwrap(); println!("Initiated rebalancing for pipeline {name}."); } + PipelineAction::ClockAdvance { name, delta_ms } => { + let response = client + .clock_advance() + .pipeline_name(name.clone()) + .body(ClockAdvanceRequest { delta_ms }) + .send() + .await + .map_err(handle_errors_fatal( + client.baseurl().clone(), + "Failed to advance clock", + 1, + )) + .unwrap(); + let body = response.into_inner(); + match format { + OutputFormat::Text => { + println!("NOW() = {} ({} ms)", body.now, body.now_ms); + } + OutputFormat::Json => { + println!( + "{}", + serde_json::to_string_pretty(&body) + .expect("failed to serialize clock advance response") + ); + } + _ => { + eprintln!( + "Unsupported output format, falling back to text: {}", + format + ); + println!("NOW() = {} ({} ms)", body.now, body.now_ms); + std::process::exit(1); + } + } + } PipelineAction::Bench { args } => bench::bench(client, format, args).await, PipelineAction::DismissError { name } => { let response = client diff --git a/crates/fda/test.bash b/crates/fda/test.bash index 3824a0e5195..4bf637ab3bf 100755 --- a/crates/fda/test.bash +++ b/crates/fda/test.bash @@ -60,6 +60,7 @@ fda shutdown p1 || true fda delete --force p1 || true fda delete --force p2 || true fda clear pudf && fda delete pudf || true +fda delete --force pclock || true fda delete unknown || true fda delete --force punknown || true fda apikey delete a || true @@ -133,6 +134,45 @@ fda commit-transaction p1 --no-wait fda shutdown p1 +# Clock advance smoke test (testing knob). Uses a dedicated pipeline +# `pclock` whose SQL references NOW(); the clock connector is only +# registered when the program contains a NOW() stream. +echo "Testing clock advance..." +fda shutdown pclock || true +fda delete --force pclock || true +cat > clock.sql <, + /// Override the timestamp returned by SQL `NOW()` at pipeline start. + /// + /// When set, the clock connector anchors `NOW()` to this RFC 3339 + /// timestamp the first time the pipeline starts and advances at + /// wall-clock cadence from there: + /// `NOW() = now_offset + (wall_clock - wall_clock_at_start)`. + /// + /// Any RFC 3339 timestamp from `1970-01-01T00:00:00Z` through year + /// `9999` is accepted, in the past or future relative to wall clock. + /// Earlier values are silently clamped to the epoch. + /// + /// This is a testing knob for queries that depend on `NOW()`. + /// + /// Checkpoint/restart re-anchors the offset. Set `now_offset` to + /// `1970-01-01T00:00:00Z`, run for a day, checkpoint, then restart: + /// replayed ticks are exact, but the next new tick emits + /// `1970-01-01T00:00:00Z` again rather than `1970-01-02T00:00:00Z`. + #[serde(skip_serializing_if = "Option::is_none")] + pub now_offset: Option>, + + /// Drive `NOW()` from an external HTTP endpoint instead of wall clock. + /// + /// When `true`, the clock connector emits one initial tick (using + /// `now_offset` if set, otherwise wall clock) and then holds that + /// value. Subsequent calls to `POST /clock/advance` move `NOW()` + /// forward by the requested delta. Negative deltas are rejected; + /// the clock is forward-only. + #[serde(skip_serializing_if = "Option::is_none")] + pub now_http_driven: Option, + /// Options not understood by this particular version. /// /// This allows the pipeline manager to take options that a custom or old @@ -297,6 +328,25 @@ impl DevTweaks { pub fn disable_auto_transaction(&self) -> bool { self.disable_auto_transaction.unwrap_or(false) } + + /// Milliseconds to add to wall-clock time when reporting `NOW()`, + /// computed from [`Self::now_offset`] and `wall_clock_at_start`. + /// + /// Returns `None` if no override is configured. The subtraction + /// saturates rather than overflows; in practice the inputs are + /// `chrono`-bounded RFC 3339 timestamps whose millisecond difference + /// stays well below `i64::MAX`, but saturating is defense in depth. + pub fn now_offset_delta_ms(&self, wall_clock_at_start: DateTime) -> Option { + self.now_offset.map(|target| { + target + .timestamp_millis() + .saturating_sub(wall_clock_at_start.timestamp_millis()) + }) + } + + pub fn now_http_driven(&self) -> bool { + self.now_http_driven.unwrap_or(false) + } } /// Selects which eviction strategy backs a cache instance. diff --git a/crates/feldera-types/src/transport/clock.rs b/crates/feldera-types/src/transport/clock.rs index fbfb01ae1b6..ed9ea254b87 100644 --- a/crates/feldera-types/src/transport/clock.rs +++ b/crates/feldera-types/src/transport/clock.rs @@ -6,6 +6,21 @@ use utoipa::ToSchema; #[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize, ToSchema)] pub struct ClockConfig { pub clock_resolution_usecs: u64, + + /// Signed offset, in milliseconds, added to wall-clock time before + /// rounding to the clock resolution. + /// + /// Populated from `DevTweaks::now_offset` at endpoint construction. + /// `None` means no shift is applied. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub now_offset_delta_ms: Option, + + /// If `true`, the clock does not advance on wall-clock cadence. + /// `NOW()` is held at its current value and only advances when an + /// external caller invokes the pipeline's `POST /clock/advance` + /// endpoint. Populated from `DevTweaks::now_http_driven`. + #[serde(default, skip_serializing_if = "std::ops::Not::not")] + pub http_driven: bool, } impl ClockConfig { @@ -14,3 +29,23 @@ impl ClockConfig { max((self.clock_resolution_usecs + 500) / 1_000, 1) } } + +/// Body of `POST /clock/advance`. +/// +/// `delta_ms` is unsigned; negative values fail JSON deserialization. +/// `Some(0)` reads the current `NOW()` without moving it; `Some(n)` +/// advances by `n` ms; `None` (`null` or omitted) advances by one +/// `clock_resolution`. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, ToSchema)] +pub struct ClockAdvanceRequest { + #[serde(default)] + pub delta_ms: Option, +} + +/// Response of `POST /clock/advance`: the new `NOW()` value as both +/// milliseconds since epoch and an RFC 3339 string. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, ToSchema)] +pub struct ClockAdvanceResponse { + pub now_ms: u64, + pub now: String, +} diff --git a/crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs b/crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs index 10b0d62c401..8595ab3d33e 100644 --- a/crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs +++ b/crates/pipeline-manager/src/api/endpoints/pipeline_interaction.rs @@ -2284,3 +2284,62 @@ pub(crate) async fn commit_transaction( Ok(response) } + +/// Advance Clock +/// +/// Moves `NOW()` forward by a specified amount. Returns the +/// current clock time of the circuit. +/// +/// Requires `dev_tweaks.now_http_driven = true` on the pipeline. +/// +/// Forward-only: `delta_ms` is `u64`, so negative bodies are rejected at +/// JSON parse time. `delta_ms = null` or omitted advances by one +/// `clock_resolution`. +#[utoipa::path( + context_path = "/v0", + security(("JSON web token (JWT) or API key" = [])), + params( + ("pipeline_name" = String, Path, description = "Unique pipeline name"), + ), + request_body( + content = ClockAdvanceRequest, + content_type = "application/json", + description = "Milliseconds to add to NOW(); zero reads the current value, null/omitted: advance by one clock_resolution."), + responses( + (status = OK + , description = "Clock advanced successfully; body contains the new NOW()." + , content_type = "application/json" + , body = ClockAdvanceResponse), + (status = BAD_REQUEST + , description = "Clock not in http-driven mode, or malformed body." + , body = ErrorResponse), + (status = SERVICE_UNAVAILABLE + , description = "Pipeline is not running." + , body = ErrorResponse), + ), + tag = "Metrics & Debugging" +)] +#[post("/pipelines/{pipeline_name}/clock/advance")] +pub(crate) async fn clock_advance( + state: WebData, + client: WebData, + tenant_id: ReqData, + path: web::Path, + request: HttpRequest, + body: web::Payload, +) -> Result { + let pipeline_name = path.into_inner(); + + state + .runner + .forward_streaming_http_request_to_pipeline_by_name( + client.as_ref(), + *tenant_id, + &pipeline_name, + "clock/advance", + request, + body, + None, + ) + .await +} diff --git a/crates/pipeline-manager/src/api/main.rs b/crates/pipeline-manager/src/api/main.rs index d9346a3a520..abf6ddf8e78 100644 --- a/crates/pipeline-manager/src/api/main.rs +++ b/crates/pipeline-manager/src/api/main.rs @@ -231,6 +231,7 @@ It contains the following fields: endpoints::pipeline_interaction::completion_status, endpoints::pipeline_interaction::start_transaction, endpoints::pipeline_interaction::commit_transaction, + endpoints::pipeline_interaction::clock_advance, endpoints::pipeline_interaction::get_pipeline_time_series, endpoints::pipeline_interaction::get_pipeline_time_series_stream, endpoints::pipeline_interaction::post_pipeline_rebalance, @@ -448,6 +449,8 @@ It contains the following fields: feldera_types::preprocess::PreprocessorConfig, feldera_types::transaction::StartTransactionResponse, feldera_types::transaction::CommitProgressSummary, + feldera_types::transport::clock::ClockAdvanceRequest, + feldera_types::transport::clock::ClockAdvanceResponse, feldera_types::time_series::TimeSeries, feldera_types::time_series::SampleStatistics, feldera_types::suspend::SuspendError, @@ -679,6 +682,7 @@ fn api_scope() -> Scope { .service(endpoints::pipeline_interaction::completion_status) .service(endpoints::pipeline_interaction::start_transaction) .service(endpoints::pipeline_interaction::commit_transaction) + .service(endpoints::pipeline_interaction::clock_advance) // API keys endpoints .service(endpoints::api_key::list_api_keys) .service(endpoints::api_key::get_api_key) diff --git a/crates/rest-api/build.rs b/crates/rest-api/build.rs index c0be4b2db10..998c4b359c6 100644 --- a/crates/rest-api/build.rs +++ b/crates/rest-api/build.rs @@ -205,6 +205,14 @@ fn type_replacement() -> Vec<(&'static str, &'static str)> { "NatsInputConfig", "feldera_types::transport::nats::NatsInputConfig", ), + ( + "ClockAdvanceRequest", + "feldera_types::transport::clock::ClockAdvanceRequest", + ), + ( + "ClockAdvanceResponse", + "feldera_types::transport::clock::ClockAdvanceResponse", + ), ] } diff --git a/openapi.json b/openapi.json index 413d0e9c00c..294d518cbee 100644 --- a/openapi.json +++ b/openapi.json @@ -2977,6 +2977,75 @@ ] } }, + "/v0/pipelines/{pipeline_name}/clock/advance": { + "post": { + "tags": [ + "Metrics & Debugging" + ], + "summary": "Advance Clock", + "description": "Moves `NOW()` forward by a specified amount. Returns the\ncurrent clock time of the circuit.\n\nRequires `dev_tweaks.now_http_driven = true` on the pipeline.\n\nForward-only: `delta_ms` is `u64`, so negative bodies are rejected at\nJSON parse time. `delta_ms = null` or omitted advances by one\n`clock_resolution`.", + "operationId": "clock_advance", + "parameters": [ + { + "name": "pipeline_name", + "in": "path", + "description": "Unique pipeline name", + "required": true, + "schema": { + "type": "string" + } + } + ], + "requestBody": { + "description": "Milliseconds to add to NOW(); zero reads the current value, null/omitted: advance by one clock_resolution.", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ClockAdvanceRequest" + } + } + }, + "required": true + }, + "responses": { + "200": { + "description": "Clock advanced successfully; body contains the new NOW().", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ClockAdvanceResponse" + } + } + } + }, + "400": { + "description": "Clock not in http-driven mode, or malformed body.", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorResponse" + } + } + } + }, + "503": { + "description": "Pipeline is not running.", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorResponse" + } + } + } + } + }, + "security": [ + { + "JSON web token (JWT) or API key": [] + } + ] + } + }, "/v0/pipelines/{pipeline_name}/commit_transaction": { "post": { "tags": [ @@ -7452,6 +7521,36 @@ } } }, + "ClockAdvanceRequest": { + "type": "object", + "description": "Body of `POST /clock/advance`.\n\n`delta_ms` is unsigned; negative values fail JSON deserialization.\n`Some(0)` reads the current `NOW()` without moving it; `Some(n)`\nadvances by `n` ms; `None` (`null` or omitted) advances by one\n`clock_resolution`.", + "properties": { + "delta_ms": { + "type": "integer", + "format": "int64", + "nullable": true, + "minimum": 0 + } + } + }, + "ClockAdvanceResponse": { + "type": "object", + "description": "Response of `POST /clock/advance`: the new `NOW()` value as both\nmilliseconds since epoch and an RFC 3339 string.", + "required": [ + "now_ms", + "now" + ], + "properties": { + "now": { + "type": "string" + }, + "now_ms": { + "type": "integer", + "format": "int64", + "minimum": 0 + } + } + }, "ClockConfig": { "type": "object", "required": [ @@ -7462,6 +7561,16 @@ "type": "integer", "format": "int64", "minimum": 0 + }, + "http_driven": { + "type": "boolean", + "description": "If `true`, the clock does not advance on wall-clock cadence.\n`NOW()` is held at its current value and only advances when an\nexternal caller invokes the pipeline's `POST /clock/advance`\nendpoint. Populated from `DevTweaks::now_http_driven`." + }, + "now_offset_delta_ms": { + "type": "integer", + "format": "int64", + "description": "Signed offset, in milliseconds, added to wall-clock time before\nrounding to the clock resolution.\n\nPopulated from `DevTweaks::now_offset` at endpoint construction.\n`None` means no shift is applied.", + "nullable": true } } }, @@ -8762,6 +8871,19 @@ "nullable": true, "minimum": 0 }, + "now_http_driven": { + "type": "boolean", + "description": "Drive `NOW()` from an external HTTP endpoint instead of wall clock.\n\nWhen `true`, the clock connector emits one initial tick (using\n`now_offset` if set, otherwise wall clock) and then holds that\nvalue. Subsequent calls to `POST /clock/advance` move `NOW()`\nforward by the requested delta. Negative deltas are rejected;\nthe clock is forward-only.", + "default": null, + "nullable": true + }, + "now_offset": { + "type": "string", + "format": "date-time", + "description": "Override the timestamp returned by SQL `NOW()` at pipeline start.\n\nWhen set, the clock connector anchors `NOW()` to this RFC 3339\ntimestamp the first time the pipeline starts and advances at\nwall-clock cadence from there:\n`NOW() = now_offset + (wall_clock - wall_clock_at_start)`.\n\nAny RFC 3339 timestamp from `1970-01-01T00:00:00Z` through year\n`9999` is accepted, in the past or future relative to wall clock.\nEarlier values are silently clamped to the epoch.\n\nThis is a testing knob for queries that depend on `NOW()`.\n\nCheckpoint/restart re-anchors the offset. Set `now_offset` to\n`1970-01-01T00:00:00Z`, run for a day, checkpoint, then restart:\nreplayed ticks are exact, but the next new tick emits\n`1970-01-01T00:00:00Z` again rather than `1970-01-02T00:00:00Z`.", + "default": null, + "nullable": true + }, "splitter_chunk_size_records": { "type": "integer", "format": "int64", diff --git a/python/feldera/pipeline.py b/python/feldera/pipeline.py index 4d43e4eb995..11eb2684144 100644 --- a/python/feldera/pipeline.py +++ b/python/feldera/pipeline.py @@ -670,6 +670,25 @@ def resume(self, wait: bool = True, timeout_s: Optional[float] = None): self.client.resume_pipeline(self.name, wait=wait, timeout_s=timeout_s) + def advance_clock(self, delta_ms: Optional[int] = None) -> Dict[str, Any]: + """ + Advance the externally-driven `NOW()` clock and return its new value. + + Requires `dev_tweaks.now_http_driven = True` on this pipeline. + Forward-only: `delta_ms` must be non-negative. + + :param delta_ms: Milliseconds to add to `NOW()`. ``0`` reads the + current value without moving the clock; ``None`` (the default) + advances by one ``clock_resolution`` (one configured tick). + + :return: A dict ``{"now_ms": , "now": }``. + + :raises FelderaAPIError: If the clock is not in http-driven mode, + the body is malformed, or the pipeline is not running. + """ + + return self.client.advance_clock(self.name, delta_ms) + def start_transaction(self) -> int: """ Start a new transaction. diff --git a/python/feldera/rest/feldera_client.py b/python/feldera/rest/feldera_client.py index bc396cd49f2..9bd035b890a 100644 --- a/python/feldera/rest/feldera_client.py +++ b/python/feldera/rest/feldera_client.py @@ -785,6 +785,40 @@ def start_transaction(self, pipeline_name: str) -> int: return int(resp.get("transaction_id")) + def advance_clock( + self, + pipeline_name: str, + delta_ms: Optional[int] = None, + ) -> Dict[str, Any]: + """ + Advance the externally-driven `NOW()` clock and return its new value. + + Requires `dev_tweaks.now_http_driven = True` on the pipeline. The + clock is forward-only. + + :param pipeline_name: The name of the pipeline. + + :param delta_ms: Milliseconds to add to `NOW()`. ``0`` reads the + current value without moving the clock; ``None`` (the default) + advances by one ``clock_resolution`` (one configured tick). + Must be non-negative. + + :return: A dict ``{"now_ms": , "now": }``. + + :raises FelderaAPIError: If the clock is not in http-driven mode, + the body is malformed, or the pipeline is not running. + """ + + if delta_ms is not None and delta_ms < 0: + raise ValueError( + f"delta_ms must be non-negative; the clock is forward-only (got {delta_ms})" + ) + + return self.http.post( + path=f"/pipelines/{pipeline_name}/clock/advance", + body={"delta_ms": delta_ms}, + ) + def commit_transaction( self, pipeline_name: str, diff --git a/python/tests/runtime/test_clock_advance.py b/python/tests/runtime/test_clock_advance.py new file mode 100644 index 00000000000..9b8febdf0e0 --- /dev/null +++ b/python/tests/runtime/test_clock_advance.py @@ -0,0 +1,165 @@ +""" +Integration test for `POST /v0/pipelines/{name}/clock/advance`. + +The pipeline is configured with both `dev_tweaks.now_offset` (the anchor) +and `dev_tweaks.now_http_driven = true` (the externally-driven mode). +The test exercises: + +* the read-only `advance(0)`, +* explicit forward advances and compounding, +* `advance(null)` — advance by one `clock_resolution`, +* rejection of negative deltas at the JSON layer (`u64` won't parse a + negative number), +* visibility of the new `NOW()` to SQL via an adhoc query against a + materialized view of `SELECT NOW()`. + +Every value is asserted exactly; the test does not depend on wall clock. +""" + +import time +import unittest + +from feldera.enums import PipelineStatus +from feldera.pipeline_builder import PipelineBuilder +from feldera.rest.errors import FelderaAPIError +from feldera.runtime_config import RuntimeConfig +from feldera.testutils import ( + FELDERA_TEST_NUM_HOSTS, + FELDERA_TEST_NUM_WORKERS, + unique_pipeline_name, +) +from tests import TEST_CLIENT + +# `2030-01-01T00:00:00Z` in milliseconds since epoch. Chosen as a fixed +# point in time so every assertion in this test can use literal values. +ANCHOR_RFC = "2030-01-01T00:00:00Z" +ANCHOR_MS = 1_893_456_000_000 + +ONE_MINUTE_MS = 60_000 +ONE_DAY_MS = 24 * 60 * 60 * 1_000 + +# Clock resolution configured below; `advance(null)` moves NOW() by this much. +CLOCK_RESOLUTION_MS = 1_000 + +# How long to wait for the materialized view to reflect a newly-emitted +# `NOW()` value. Each advance() requests a step asynchronously, so the +# view update lags the HTTP response by a few milliseconds in practice. +VIEW_PROPAGATION_TIMEOUT_S = 10.0 + + +def _advance(pipeline, delta_ms: int | None) -> dict: + """Wrapper around `Pipeline.advance_clock` for readability.""" + return pipeline.advance_clock(delta_ms) + + +def _wait_for_now(pipeline, expected_prefix: str) -> str: + """Poll `SELECT t FROM v` until `t` starts with `expected_prefix`. + + Returns the matching value. Raises if it never appears within + `VIEW_PROPAGATION_TIMEOUT_S`. + """ + deadline = time.monotonic() + VIEW_PROPAGATION_TIMEOUT_S + last: str | None = None + while time.monotonic() < deadline: + rows = list(pipeline.query("SELECT t FROM v;")) + if rows: + last = rows[0]["t"] + if str(last).startswith(expected_prefix): + return str(last) + time.sleep(0.05) + raise AssertionError( + f"NOW() did not reach prefix {expected_prefix!r} within " + f"{VIEW_PROPAGATION_TIMEOUT_S}s (last observed: {last!r})" + ) + + +class TestClockAdvance(unittest.TestCase): + def test_anchor_plus_advance(self): + pipeline_name = unique_pipeline_name("test_clock_advance") + + sql = "CREATE MATERIALIZED VIEW v AS SELECT NOW() AS t;" + + pipeline = PipelineBuilder( + TEST_CLIENT, + pipeline_name, + sql=sql, + runtime_config=RuntimeConfig( + workers=FELDERA_TEST_NUM_WORKERS, + hosts=FELDERA_TEST_NUM_HOSTS, + # 1-second resolution; advance() values are rounded to this + # and `advance(null)` moves NOW() by exactly this much. + clock_resolution_usecs=CLOCK_RESOLUTION_MS * 1_000, + dev_tweaks={ + "now_offset": ANCHOR_RFC, + "now_http_driven": True, + }, + ), + ).create_or_replace() + + pipeline.start() + try: + self.assertEqual(pipeline.status(), PipelineStatus.RUNNING) + + # advance(0) is a read: returns the anchor without moving NOW(). + resp = _advance(pipeline, 0) + self.assertEqual(resp["now_ms"], ANCHOR_MS) + self.assertTrue( + resp["now"].startswith("2030-01-01T00:00:00"), + f"unexpected RFC 3339: {resp['now']!r}", + ) + + # advance(null) advances by exactly one clock_resolution. + resp = _advance(pipeline, None) + self.assertEqual(resp["now_ms"], ANCHOR_MS + CLOCK_RESOLUTION_MS) + + # advance(+1min) moves NOW() forward; both the HTTP response + # and the materialized view reflect the same value. + resp = _advance(pipeline, ONE_MINUTE_MS) + self.assertEqual( + resp["now_ms"], ANCHOR_MS + CLOCK_RESOLUTION_MS + ONE_MINUTE_MS + ) + _wait_for_now(pipeline, "2030-01-01T00:01:01") + + # advance(+1 day) compounds with the previous steps. + resp = _advance(pipeline, ONE_DAY_MS) + self.assertEqual( + resp["now_ms"], + ANCHOR_MS + CLOCK_RESOLUTION_MS + ONE_MINUTE_MS + ONE_DAY_MS, + ) + _wait_for_now(pipeline, "2030-01-02T00:01:01") + + # advance(0) confirms the clock did not drift between calls. + resp = _advance(pipeline, 0) + self.assertEqual( + resp["now_ms"], + ANCHOR_MS + CLOCK_RESOLUTION_MS + ONE_MINUTE_MS + ONE_DAY_MS, + ) + + # Negative deltas are rejected client-side by the wrapper + # before any HTTP traffic. + with self.assertRaises(ValueError): + _advance(pipeline, -1) + + # The HTTP layer enforces the same constraint (negative bodies + # fail JSON parse because the field is `u64`). Exercise it by + # bypassing the wrapper. + with self.assertRaises(FelderaAPIError) as cm: + TEST_CLIENT.http.post( + path=f"/pipelines/{pipeline_name}/clock/advance", + body={"delta_ms": -1}, + ) + self.assertEqual(cm.exception.status_code, 400) + + resp = _advance(pipeline, 0) + self.assertEqual( + resp["now_ms"], + ANCHOR_MS + CLOCK_RESOLUTION_MS + ONE_MINUTE_MS + ONE_DAY_MS, + "NOW() must not change after a rejected advance", + ) + finally: + pipeline.stop(force=True) + pipeline.clear_storage() + + +if __name__ == "__main__": + unittest.main()