Skip to content

Commit 97d6bca

Browse files
committed
[adapters]: last_sync should only be updated with successful syncs
Currently, even if sync fails, we update last_sync, due to which `periodic` in checkpoint status reports a UUID whose sync actually failed. This isn't correct, we should only set it if sync was successful. periodic checkpoints gets retried anyways, so it if fine to ignore failed ones. Signed-off-by: Swanand Mulay <73115739+swanandx@users.noreply.github.com>
1 parent 899da36 commit 97d6bca

File tree

2 files changed

+65
-5
lines changed

2 files changed

+65
-5
lines changed

crates/adapters/src/controller.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2209,11 +2209,14 @@ impl RunningCheckpointSync {
22092209
Ok(result) => {
22102210
join_handle.join().unwrap();
22112211

2212-
let mut last_sync = circuit.controller.last_checkpoint_sync.lock().unwrap();
2213-
*last_sync = LastCheckpoint {
2214-
timestamp: Instant::now(),
2215-
id: Some(uuid),
2216-
};
2212+
// Only update last_sync if the checkpoint sync succeeded.
2213+
if result.is_ok() {
2214+
let mut last_sync = circuit.controller.last_checkpoint_sync.lock().unwrap();
2215+
*last_sync = LastCheckpoint {
2216+
timestamp: Instant::now(),
2217+
id: Some(uuid),
2218+
};
2219+
}
22172220

22182221
Some(result)
22192222
}

python/tests/platform/test_checkpoint_sync.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,63 @@ def test_automated_checkpoint_sync(self):
284284
def test_automated_checkpoint_sync1(self):
285285
self.test_checkpoint_sync(ft_interval=5, automated_sync_interval=10)
286286

287+
@enterprise_only
288+
@single_host_only
289+
def test_automated_sync_auth_error(self):
290+
"""
291+
CREATE TABLE t0 (c0 INT, c1 VARCHAR);
292+
CREATE MATERIALIZED VIEW v0 AS SELECT * FROM t0;
293+
"""
294+
# Configure pipeline with automatic periodic sync and bad credentials.
295+
# The periodic sync should fail, and `periodic` in the sync status
296+
# should remain None (not report a UUID for a failed sync).
297+
storage_config = storage_cfg(
298+
self.pipeline.name,
299+
push_interval=5,
300+
auth_err=True,
301+
)
302+
self.pipeline.set_runtime_config(
303+
RuntimeConfig(
304+
workers=FELDERA_TEST_NUM_WORKERS,
305+
hosts=FELDERA_TEST_NUM_HOSTS,
306+
fault_tolerance_model=FaultToleranceModel.AtLeastOnce,
307+
storage=Storage(config=storage_config),
308+
checkpoint_interval_secs=5,
309+
)
310+
)
311+
self.pipeline.start()
312+
313+
# Insert data so the pipeline has something to checkpoint.
314+
data = [{"c0": i, "c1": str(i)} for i in range(1, 10)]
315+
self.pipeline.input_json("t0", data)
316+
self.pipeline.wait_for_completion()
317+
318+
# Wait for a checkpoint to be created.
319+
def checkpoint_exists() -> bool:
320+
return len(self.pipeline.checkpoints()) > 0
321+
322+
wait_for_condition(
323+
"checkpoint is created",
324+
checkpoint_exists,
325+
timeout_s=30.0,
326+
poll_interval_s=0.5,
327+
)
328+
329+
# Wait long enough for the periodic sync to have attempted (and failed).
330+
time.sleep(15)
331+
332+
# The periodic sync should have failed due to bad credentials.
333+
# `periodic` must be None — a failed sync should not report a UUID.
334+
status = self.pipeline.client.sync_checkpoint_status(self.pipeline.name)
335+
print(f"sync_status after failed periodic sync: {status}", file=sys.stderr)
336+
self.assertIsNone(
337+
status.get("periodic"),
338+
f"periodic should be None after failed sync, got: {status}",
339+
)
340+
341+
self.pipeline.stop(force=True)
342+
self.pipeline.clear_storage()
343+
287344
@enterprise_only
288345
@single_host_only
289346
def test_autherr_fail(self):

0 commit comments

Comments
 (0)