Skip to content

Commit be6f768

Browse files
committed
storage: better checkpoint GC during S3 sync
Previously: - During S3 sync, we would prevent the GC of existing local checkpoints. - Only one checkpoint would be GCed at a time. This commit updates the `gc_checkpoint` method such that it such that, we can GC all *old* checkpoints (ie, checkpoints older than the retention threshold, currently 2 of the most recent) are GCed, except for any checkpoint in the `except` list or newer. This except list is populated from currently active requests for checkpoint syncs. Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> checkpointer: only preserve checkpoints in except list This commit updates the checkpointer to only preserve the checkpoints in the except list, instead of preserving any checkpoint that is newer. Also adds tests for the Checkpointer to ensure that it works correctly. Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com> py: add tests with sync GC count: 1, age: 0 Tests for potential regressions where we only want to keep 1 checkpoint in object store. Previously, this introduced a bug by also cleaning up the local checkpoint directory for this checkpoint, but the pipeline still expects it to be available. Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
1 parent 198ec8d commit be6f768

File tree

4 files changed

+311
-47
lines changed

4 files changed

+311
-47
lines changed

crates/adapters/src/controller.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6553,9 +6553,13 @@ impl RunningCheckpoint {
65536553
checkpoint: Checkpoint,
65546554
circuit: &mut CircuitThread,
65556555
) -> Result<Checkpoint, ControllerError> {
6556-
if circuit.sync_checkpoint_requests.is_empty()
6557-
&& let Err(error) = circuit.circuit.gc_checkpoint()
6558-
{
6556+
if let Err(error) = circuit.circuit.gc_checkpoint(
6557+
circuit
6558+
.sync_checkpoint_requests
6559+
.iter()
6560+
.map(|c| c.uuid())
6561+
.collect::<HashSet<_>>(),
6562+
) {
65596563
warn!("error removing old checkpoints: {error}");
65606564
}
65616565

crates/dbsp/src/circuit/checkpointer.rs

Lines changed: 278 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -254,45 +254,90 @@ impl Checkpointer {
254254

255255
/// Removes all meta-data files associated with the checkpoint given by
256256
/// `cpm` by removing the folder associated with the checkpoint.
257-
fn remove_checkpoint_dir(&self, cpm: &CheckpointMetadata) -> Result<(), Error> {
258-
assert_ne!(cpm.uuid, Uuid::nil());
259-
self.backend
260-
.delete_recursive(&cpm.uuid.to_string().into())?;
257+
fn remove_checkpoint_dir(&self, cpm: uuid::Uuid) -> Result<(), Error> {
258+
assert_ne!(cpm, Uuid::nil());
259+
self.backend.delete_recursive(&cpm.to_string().into())?;
261260
Ok(())
262261
}
263262

264-
/// Remove the oldest checkpoint from the list.
263+
/// Remove the oldest checkpoints from the list.
264+
/// - Preserves at least `MIN_CHECKPOINT_THRESHOLD` checkpoints.
265+
/// - Does not remove any checkpoints whose UUID is in the `except` list.
265266
///
266267
/// # Returns
267-
/// - Metadata of the removed checkpoint, if there are more than
268-
/// `MIN_CHECKPOINT_THRESHOLD`
269-
/// - None otherwise.
270-
pub fn gc_checkpoint(&mut self) -> Result<Option<CheckpointMetadata>, Error> {
271-
// Ensures that we can unwrap the call to pop_front, and front later:
272-
static_assertions::const_assert!(Checkpointer::MIN_CHECKPOINT_THRESHOLD >= 2);
273-
if self.checkpoint_list.len() > Self::MIN_CHECKPOINT_THRESHOLD {
274-
let cp_to_remove = self.checkpoint_list.pop_front().unwrap();
275-
let next_in_line = self.checkpoint_list.front().unwrap();
276-
277-
// Update the checkpoint list file, we do this first intentionally, in case
278-
// later operations fail we don't want the checkpoint list to
279-
// contain a checkpoint that only has part of the files.
280-
//
281-
// If any of the later operations fail, restarting the circuit will try
282-
// to remove the checkpoint files again (see also [`Self::gc_startup`]).
283-
self.update_checkpoint_file()?;
284-
285-
let potentially_remove = self.gather_batches_for_checkpoint(&cp_to_remove)?;
286-
let need_to_keep = self.gather_batches_for_checkpoint(next_in_line)?;
287-
for file in potentially_remove.difference(&need_to_keep) {
288-
self.remove_batch_file(file);
268+
/// - Uuid of the removed checkpoints, if there are more than `MIN_CHECKPOINT_THRESHOLD`.
269+
/// - Empty set otherwise.
270+
pub fn gc_checkpoint(
271+
&mut self,
272+
except: HashSet<uuid::Uuid>,
273+
) -> Result<HashSet<uuid::Uuid>, Error> {
274+
if self.checkpoint_list.len() <= Self::MIN_CHECKPOINT_THRESHOLD {
275+
return Ok(HashSet::new());
276+
}
277+
278+
let mut batch_files_to_keep: HashSet<_> = except
279+
.iter()
280+
.filter_map(|uuid| self.backend.gather_batches_for_checkpoint_uuid(*uuid).ok())
281+
.flatten()
282+
.collect();
283+
284+
let to_remove: HashSet<_> = self
285+
.checkpoint_list
286+
.iter()
287+
.take(
288+
self.checkpoint_list
289+
.len()
290+
.saturating_sub(Self::MIN_CHECKPOINT_THRESHOLD),
291+
)
292+
.map(|cpm| cpm.uuid)
293+
.filter(|cpm| !except.contains(cpm))
294+
.collect();
295+
296+
self.checkpoint_list
297+
.retain(|cpm| !to_remove.contains(&cpm.uuid));
298+
299+
// Update the checkpoint list file, we do this first intentionally, in case
300+
// later operations fail we don't want the checkpoint list to
301+
// contain a checkpoint that only has part of the files.
302+
//
303+
// If any of the later operations fail, restarting the circuit will try
304+
// to remove the checkpoint files again (see also [`Self::gc_startup`]).
305+
self.update_checkpoint_file()?;
306+
307+
// Find the first checkpoint in checkpoint list that is not in `except`.
308+
self.checkpoint_list
309+
.iter()
310+
.filter(|c| !except.contains(&c.uuid))
311+
.take(1)
312+
.filter_map(|c| self.backend.gather_batches_for_checkpoint(c).ok())
313+
.for_each(|batches| {
314+
for batch in batches {
315+
batch_files_to_keep.insert(batch);
316+
}
317+
});
318+
319+
for cpm in &to_remove {
320+
for batch_file in self
321+
.backend
322+
.gather_batches_for_checkpoint_uuid(*cpm)?
323+
.difference(&batch_files_to_keep)
324+
{
325+
self.remove_batch_file(batch_file);
289326
}
290-
self.remove_checkpoint_dir(&cp_to_remove)?;
291327

292-
Ok(Some(cp_to_remove))
293-
} else {
294-
Ok(None)
328+
self.remove_checkpoint_dir(*cpm)?;
295329
}
330+
331+
tracing::info!(
332+
"cleaned up {} checkpoints; exception list: {except:?}, retaining checkpoints: {:?}",
333+
to_remove.len(),
334+
self.checkpoint_list
335+
.iter()
336+
.map(|cpm| cpm.uuid)
337+
.collect::<Vec<_>>()
338+
);
339+
340+
Ok(to_remove)
296341
}
297342
}
298343

@@ -430,3 +475,205 @@ impl<T: Default> Checkpoint for EmptyCheckpoint<T> {
430475
Ok(())
431476
}
432477
}
478+
479+
#[cfg(test)]
480+
mod test {
481+
use std::sync::Arc;
482+
483+
use feldera_storage::StorageBackend;
484+
use feldera_types::config::{FileBackendConfig, StorageCacheConfig};
485+
use std::collections::HashSet;
486+
487+
use crate::storage::backend::posixio_impl::PosixBackend;
488+
489+
use super::Checkpointer;
490+
491+
struct Empty;
492+
struct MinCheckpoints;
493+
struct ExtraCheckpoints;
494+
495+
struct TestState<S> {
496+
checkpointer: Checkpointer,
497+
tempdir: tempfile::TempDir,
498+
_phantom: std::marker::PhantomData<S>,
499+
}
500+
501+
impl<S> TestState<S> {
502+
fn extras(&self) -> Vec<uuid::Uuid> {
503+
self.checkpointer
504+
.checkpoint_list
505+
.iter()
506+
.map(|cpm| cpm.uuid)
507+
.take(
508+
self.checkpointer
509+
.checkpoint_list
510+
.len()
511+
.saturating_sub(Checkpointer::MIN_CHECKPOINT_THRESHOLD),
512+
)
513+
.collect()
514+
}
515+
516+
fn oldest_extra(&self) -> uuid::Uuid {
517+
self.extras().first().cloned().unwrap()
518+
}
519+
520+
fn newest_extra(&self) -> uuid::Uuid {
521+
self.extras().last().cloned().unwrap()
522+
}
523+
}
524+
525+
impl TestState<Empty> {
526+
fn new() -> Self {
527+
let tempdir = tempfile::tempdir().unwrap();
528+
529+
let backend: Arc<dyn StorageBackend> = Arc::new(PosixBackend::new(
530+
tempdir.path(),
531+
StorageCacheConfig::default(),
532+
&FileBackendConfig::default(),
533+
));
534+
535+
Self {
536+
checkpointer: Checkpointer::new(backend).unwrap(),
537+
tempdir,
538+
_phantom: std::marker::PhantomData,
539+
}
540+
}
541+
542+
fn precondition(&self) {
543+
assert_eq!(self.checkpointer.checkpoint_list.len(), 0);
544+
}
545+
546+
fn checkpoint(mut self) -> TestState<MinCheckpoints> {
547+
self.precondition();
548+
549+
for i in 0..Checkpointer::MIN_CHECKPOINT_THRESHOLD {
550+
self.checkpointer
551+
.commit(uuid::Uuid::now_v7(), 0, None, Some(i as u64), Some(0))
552+
.unwrap();
553+
}
554+
555+
TestState::<MinCheckpoints> {
556+
checkpointer: self.checkpointer,
557+
tempdir: self.tempdir,
558+
_phantom: std::marker::PhantomData,
559+
}
560+
}
561+
}
562+
563+
impl TestState<MinCheckpoints> {
564+
fn precondition(&self) {
565+
assert_eq!(
566+
self.checkpointer.checkpoint_list.len(),
567+
Checkpointer::MIN_CHECKPOINT_THRESHOLD
568+
);
569+
570+
assert!(self.extras().is_empty());
571+
}
572+
573+
fn checkpoint(mut self) -> TestState<ExtraCheckpoints> {
574+
self.precondition();
575+
576+
let uuid = uuid::Uuid::now_v7();
577+
self.checkpointer
578+
.commit(uuid, 0, None, Some(2), Some(0))
579+
.unwrap();
580+
581+
TestState::<ExtraCheckpoints> {
582+
checkpointer: self.checkpointer,
583+
tempdir: self.tempdir,
584+
_phantom: std::marker::PhantomData,
585+
}
586+
}
587+
}
588+
589+
impl TestState<ExtraCheckpoints> {
590+
fn precondition(&self) {
591+
assert!(
592+
self.checkpointer.checkpoint_list.len() > Checkpointer::MIN_CHECKPOINT_THRESHOLD
593+
);
594+
595+
assert!(!self.extras().is_empty());
596+
}
597+
598+
fn checkpoint(mut self) -> TestState<ExtraCheckpoints> {
599+
self.precondition();
600+
601+
self.checkpointer
602+
.commit(uuid::Uuid::now_v7(), 0, None, Some(3), Some(0))
603+
.unwrap();
604+
605+
TestState::<ExtraCheckpoints> {
606+
checkpointer: self.checkpointer,
607+
tempdir: self.tempdir,
608+
_phantom: std::marker::PhantomData,
609+
}
610+
}
611+
612+
fn gc(mut self) -> TestState<MinCheckpoints> {
613+
self.precondition();
614+
615+
let removed = self.checkpointer.gc_checkpoint(HashSet::new()).unwrap();
616+
assert!(!removed.is_empty());
617+
618+
TestState::<MinCheckpoints> {
619+
checkpointer: self.checkpointer,
620+
tempdir: self.tempdir,
621+
_phantom: std::marker::PhantomData,
622+
}
623+
}
624+
625+
fn gc_with_except(mut self, except: uuid::Uuid) -> TestState<ExtraCheckpoints> {
626+
self.precondition();
627+
628+
self.checkpointer.gc_checkpoint([except].into()).unwrap();
629+
630+
assert!(self.extras().contains(&except));
631+
632+
TestState::<ExtraCheckpoints> {
633+
checkpointer: self.checkpointer,
634+
tempdir: self.tempdir,
635+
_phantom: std::marker::PhantomData,
636+
}
637+
}
638+
}
639+
640+
#[test]
641+
fn test_checkpointer() {
642+
// Empty checkpointer.
643+
let empty_checkpoints = TestState::<Empty>::new();
644+
645+
// Add minimum number of checkpoints.
646+
let min_checkpoints = empty_checkpoints.checkpoint();
647+
648+
// Add one extra checkpoint.
649+
let extra_checkpoints = min_checkpoints.checkpoint();
650+
651+
// Veify we can GC back to minimum.
652+
let min_checkpoints = extra_checkpoints.gc();
653+
654+
// Add two extra checkpoints.
655+
let one_extra = min_checkpoints.checkpoint();
656+
let two_extra = one_extra.checkpoint();
657+
658+
// There should be more than the minimum number of checkpoints.
659+
let keep = two_extra.newest_extra();
660+
661+
// And GC while keeping the newest extra checkpoint.
662+
// This should be the only extra checkpoint remaining.
663+
let one_extra = two_extra.gc_with_except(keep);
664+
assert!(one_extra.extras().contains(&keep) && one_extra.extras().len() == 1);
665+
666+
// Add one extra checkpoint.
667+
let two_extra = one_extra.checkpoint();
668+
669+
// Now lets try to GC while keeping the oldest extra checkpoint.
670+
let keep = two_extra.oldest_extra();
671+
let one_extra = two_extra.gc_with_except(keep);
672+
assert!(one_extra.extras().contains(&keep) && one_extra.extras().len() == 1);
673+
674+
// Finally, GC back to minimum.
675+
let min_checkpoints = one_extra.gc();
676+
// Verify that this is a valid minimum checkpoint state.
677+
min_checkpoints.precondition();
678+
}
679+
}

crates/dbsp/src/circuit/dbsp_handle.rs

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1454,12 +1454,16 @@ impl DBSPHandle {
14541454
}
14551455

14561456
/// Remove the oldest checkpoint from the list.
1457+
/// - Prevents removing checkpoints whose UUID is in `except`.
14571458
///
14581459
/// # Returns
1459-
/// - Metadata of the removed checkpoint, if one was removed.
1460-
/// - None otherwise.
1461-
pub fn gc_checkpoint(&mut self) -> Result<Option<CheckpointMetadata>, DbspError> {
1462-
self.checkpointer()?.lock().unwrap().gc_checkpoint()
1460+
/// - Uuid of the removed checkpoints, if any were removed.
1461+
/// - An empty set if there were no checkpoints to remove.
1462+
pub fn gc_checkpoint(
1463+
&mut self,
1464+
except: HashSet<uuid::Uuid>,
1465+
) -> Result<HashSet<uuid::Uuid>, DbspError> {
1466+
self.checkpointer()?.lock().unwrap().gc_checkpoint(except)
14631467
}
14641468

14651469
/// Enable CPU profiler.
@@ -2214,14 +2218,16 @@ pub(crate) mod tests {
22142218
let _cpm = dbsp.checkpoint().run().expect("commit failed");
22152219
}
22162220

2217-
let mut prev_count = count_directory_entries(temp.path()).unwrap();
2221+
let prev_count = count_directory_entries(temp.path()).unwrap();
22182222
let num_checkpoints = dbsp.list_checkpoints().unwrap().len();
2219-
for _i in 0..num_checkpoints - Checkpointer::MIN_CHECKPOINT_THRESHOLD {
2220-
let _r = dbsp.gc_checkpoint();
2221-
let count = count_directory_entries(temp.path()).unwrap();
2222-
assert!(count < prev_count);
2223-
prev_count = count;
2224-
}
2223+
2224+
assert!(num_checkpoints > Checkpointer::MIN_CHECKPOINT_THRESHOLD);
2225+
2226+
// Only MIN_CHECKPONT_THRESHOLD checkpoints will be kept.
2227+
let _r = dbsp.gc_checkpoint(std::collections::HashSet::new());
2228+
let count = count_directory_entries(temp.path()).unwrap();
2229+
assert!(count < prev_count);
2230+
assert!(dbsp.list_checkpoints().unwrap().len() <= Checkpointer::MIN_CHECKPOINT_THRESHOLD);
22252231
}
22262232

22272233
/// Make sure that leftover files from uncompleted checkpoints that were

0 commit comments

Comments
 (0)