@@ -254,45 +254,90 @@ impl Checkpointer {
254254
255255 /// Removes all meta-data files associated with the checkpoint given by
256256 /// `cpm` by removing the folder associated with the checkpoint.
257- fn remove_checkpoint_dir ( & self , cpm : & CheckpointMetadata ) -> Result < ( ) , Error > {
258- assert_ne ! ( cpm. uuid, Uuid :: nil( ) ) ;
259- self . backend
260- . delete_recursive ( & cpm. uuid . to_string ( ) . into ( ) ) ?;
257+ fn remove_checkpoint_dir ( & self , cpm : uuid:: Uuid ) -> Result < ( ) , Error > {
258+ assert_ne ! ( cpm, Uuid :: nil( ) ) ;
259+ self . backend . delete_recursive ( & cpm. to_string ( ) . into ( ) ) ?;
261260 Ok ( ( ) )
262261 }
263262
264- /// Remove the oldest checkpoint from the list.
263+ /// Remove the oldest checkpoints from the list.
264+ /// - Preserves at least `MIN_CHECKPOINT_THRESHOLD` checkpoints.
265+ /// - Does not remove any checkpoints whose UUID is in the `except` list.
265266 ///
266267 /// # Returns
267- /// - Metadata of the removed checkpoint, if there are more than
268- /// `MIN_CHECKPOINT_THRESHOLD`
269- /// - None otherwise.
270- pub fn gc_checkpoint ( & mut self ) -> Result < Option < CheckpointMetadata > , Error > {
271- // Ensures that we can unwrap the call to pop_front, and front later:
272- static_assertions:: const_assert!( Checkpointer :: MIN_CHECKPOINT_THRESHOLD >= 2 ) ;
273- if self . checkpoint_list . len ( ) > Self :: MIN_CHECKPOINT_THRESHOLD {
274- let cp_to_remove = self . checkpoint_list . pop_front ( ) . unwrap ( ) ;
275- let next_in_line = self . checkpoint_list . front ( ) . unwrap ( ) ;
276-
277- // Update the checkpoint list file, we do this first intentionally, in case
278- // later operations fail we don't want the checkpoint list to
279- // contain a checkpoint that only has part of the files.
280- //
281- // If any of the later operations fail, restarting the circuit will try
282- // to remove the checkpoint files again (see also [`Self::gc_startup`]).
283- self . update_checkpoint_file ( ) ?;
284-
285- let potentially_remove = self . gather_batches_for_checkpoint ( & cp_to_remove) ?;
286- let need_to_keep = self . gather_batches_for_checkpoint ( next_in_line) ?;
287- for file in potentially_remove. difference ( & need_to_keep) {
288- self . remove_batch_file ( file) ;
268+ /// - Uuid of the removed checkpoints, if there are more than `MIN_CHECKPOINT_THRESHOLD`.
269+ /// - Empty set otherwise.
270+ pub fn gc_checkpoint (
271+ & mut self ,
272+ except : HashSet < uuid:: Uuid > ,
273+ ) -> Result < HashSet < uuid:: Uuid > , Error > {
274+ if self . checkpoint_list . len ( ) <= Self :: MIN_CHECKPOINT_THRESHOLD {
275+ return Ok ( HashSet :: new ( ) ) ;
276+ }
277+
278+ let mut batch_files_to_keep: HashSet < _ > = except
279+ . iter ( )
280+ . filter_map ( |uuid| self . backend . gather_batches_for_checkpoint_uuid ( * uuid) . ok ( ) )
281+ . flatten ( )
282+ . collect ( ) ;
283+
284+ let to_remove: HashSet < _ > = self
285+ . checkpoint_list
286+ . iter ( )
287+ . take (
288+ self . checkpoint_list
289+ . len ( )
290+ . saturating_sub ( Self :: MIN_CHECKPOINT_THRESHOLD ) ,
291+ )
292+ . map ( |cpm| cpm. uuid )
293+ . filter ( |cpm| !except. contains ( cpm) )
294+ . collect ( ) ;
295+
296+ self . checkpoint_list
297+ . retain ( |cpm| !to_remove. contains ( & cpm. uuid ) ) ;
298+
299+ // Update the checkpoint list file, we do this first intentionally, in case
300+ // later operations fail we don't want the checkpoint list to
301+ // contain a checkpoint that only has part of the files.
302+ //
303+ // If any of the later operations fail, restarting the circuit will try
304+ // to remove the checkpoint files again (see also [`Self::gc_startup`]).
305+ self . update_checkpoint_file ( ) ?;
306+
307+ // Find the first checkpoint in checkpoint list that is not in `except`.
308+ self . checkpoint_list
309+ . iter ( )
310+ . filter ( |c| !except. contains ( & c. uuid ) )
311+ . take ( 1 )
312+ . filter_map ( |c| self . backend . gather_batches_for_checkpoint ( c) . ok ( ) )
313+ . for_each ( |batches| {
314+ for batch in batches {
315+ batch_files_to_keep. insert ( batch) ;
316+ }
317+ } ) ;
318+
319+ for cpm in & to_remove {
320+ for batch_file in self
321+ . backend
322+ . gather_batches_for_checkpoint_uuid ( * cpm) ?
323+ . difference ( & batch_files_to_keep)
324+ {
325+ self . remove_batch_file ( batch_file) ;
289326 }
290- self . remove_checkpoint_dir ( & cp_to_remove) ?;
291327
292- Ok ( Some ( cp_to_remove) )
293- } else {
294- Ok ( None )
328+ self . remove_checkpoint_dir ( * cpm) ?;
295329 }
330+
331+ tracing:: info!(
332+ "cleaned up {} checkpoints; exception list: {except:?}, retaining checkpoints: {:?}" ,
333+ to_remove. len( ) ,
334+ self . checkpoint_list
335+ . iter( )
336+ . map( |cpm| cpm. uuid)
337+ . collect:: <Vec <_>>( )
338+ ) ;
339+
340+ Ok ( to_remove)
296341 }
297342}
298343
@@ -430,3 +475,205 @@ impl<T: Default> Checkpoint for EmptyCheckpoint<T> {
430475 Ok ( ( ) )
431476 }
432477}
478+
479+ #[ cfg( test) ]
480+ mod test {
481+ use std:: sync:: Arc ;
482+
483+ use feldera_storage:: StorageBackend ;
484+ use feldera_types:: config:: { FileBackendConfig , StorageCacheConfig } ;
485+ use std:: collections:: HashSet ;
486+
487+ use crate :: storage:: backend:: posixio_impl:: PosixBackend ;
488+
489+ use super :: Checkpointer ;
490+
491+ struct Empty ;
492+ struct MinCheckpoints ;
493+ struct ExtraCheckpoints ;
494+
495+ struct TestState < S > {
496+ checkpointer : Checkpointer ,
497+ tempdir : tempfile:: TempDir ,
498+ _phantom : std:: marker:: PhantomData < S > ,
499+ }
500+
501+ impl < S > TestState < S > {
502+ fn extras ( & self ) -> Vec < uuid:: Uuid > {
503+ self . checkpointer
504+ . checkpoint_list
505+ . iter ( )
506+ . map ( |cpm| cpm. uuid )
507+ . take (
508+ self . checkpointer
509+ . checkpoint_list
510+ . len ( )
511+ . saturating_sub ( Checkpointer :: MIN_CHECKPOINT_THRESHOLD ) ,
512+ )
513+ . collect ( )
514+ }
515+
516+ fn oldest_extra ( & self ) -> uuid:: Uuid {
517+ self . extras ( ) . first ( ) . cloned ( ) . unwrap ( )
518+ }
519+
520+ fn newest_extra ( & self ) -> uuid:: Uuid {
521+ self . extras ( ) . last ( ) . cloned ( ) . unwrap ( )
522+ }
523+ }
524+
525+ impl TestState < Empty > {
526+ fn new ( ) -> Self {
527+ let tempdir = tempfile:: tempdir ( ) . unwrap ( ) ;
528+
529+ let backend: Arc < dyn StorageBackend > = Arc :: new ( PosixBackend :: new (
530+ tempdir. path ( ) ,
531+ StorageCacheConfig :: default ( ) ,
532+ & FileBackendConfig :: default ( ) ,
533+ ) ) ;
534+
535+ Self {
536+ checkpointer : Checkpointer :: new ( backend) . unwrap ( ) ,
537+ tempdir,
538+ _phantom : std:: marker:: PhantomData ,
539+ }
540+ }
541+
542+ fn precondition ( & self ) {
543+ assert_eq ! ( self . checkpointer. checkpoint_list. len( ) , 0 ) ;
544+ }
545+
546+ fn checkpoint ( mut self ) -> TestState < MinCheckpoints > {
547+ self . precondition ( ) ;
548+
549+ for i in 0 ..Checkpointer :: MIN_CHECKPOINT_THRESHOLD {
550+ self . checkpointer
551+ . commit ( uuid:: Uuid :: now_v7 ( ) , 0 , None , Some ( i as u64 ) , Some ( 0 ) )
552+ . unwrap ( ) ;
553+ }
554+
555+ TestState :: < MinCheckpoints > {
556+ checkpointer : self . checkpointer ,
557+ tempdir : self . tempdir ,
558+ _phantom : std:: marker:: PhantomData ,
559+ }
560+ }
561+ }
562+
563+ impl TestState < MinCheckpoints > {
564+ fn precondition ( & self ) {
565+ assert_eq ! (
566+ self . checkpointer. checkpoint_list. len( ) ,
567+ Checkpointer :: MIN_CHECKPOINT_THRESHOLD
568+ ) ;
569+
570+ assert ! ( self . extras( ) . is_empty( ) ) ;
571+ }
572+
573+ fn checkpoint ( mut self ) -> TestState < ExtraCheckpoints > {
574+ self . precondition ( ) ;
575+
576+ let uuid = uuid:: Uuid :: now_v7 ( ) ;
577+ self . checkpointer
578+ . commit ( uuid, 0 , None , Some ( 2 ) , Some ( 0 ) )
579+ . unwrap ( ) ;
580+
581+ TestState :: < ExtraCheckpoints > {
582+ checkpointer : self . checkpointer ,
583+ tempdir : self . tempdir ,
584+ _phantom : std:: marker:: PhantomData ,
585+ }
586+ }
587+ }
588+
589+ impl TestState < ExtraCheckpoints > {
590+ fn precondition ( & self ) {
591+ assert ! (
592+ self . checkpointer. checkpoint_list. len( ) > Checkpointer :: MIN_CHECKPOINT_THRESHOLD
593+ ) ;
594+
595+ assert ! ( !self . extras( ) . is_empty( ) ) ;
596+ }
597+
598+ fn checkpoint ( mut self ) -> TestState < ExtraCheckpoints > {
599+ self . precondition ( ) ;
600+
601+ self . checkpointer
602+ . commit ( uuid:: Uuid :: now_v7 ( ) , 0 , None , Some ( 3 ) , Some ( 0 ) )
603+ . unwrap ( ) ;
604+
605+ TestState :: < ExtraCheckpoints > {
606+ checkpointer : self . checkpointer ,
607+ tempdir : self . tempdir ,
608+ _phantom : std:: marker:: PhantomData ,
609+ }
610+ }
611+
612+ fn gc ( mut self ) -> TestState < MinCheckpoints > {
613+ self . precondition ( ) ;
614+
615+ let removed = self . checkpointer . gc_checkpoint ( HashSet :: new ( ) ) . unwrap ( ) ;
616+ assert ! ( !removed. is_empty( ) ) ;
617+
618+ TestState :: < MinCheckpoints > {
619+ checkpointer : self . checkpointer ,
620+ tempdir : self . tempdir ,
621+ _phantom : std:: marker:: PhantomData ,
622+ }
623+ }
624+
625+ fn gc_with_except ( mut self , except : uuid:: Uuid ) -> TestState < ExtraCheckpoints > {
626+ self . precondition ( ) ;
627+
628+ self . checkpointer . gc_checkpoint ( [ except] . into ( ) ) . unwrap ( ) ;
629+
630+ assert ! ( self . extras( ) . contains( & except) ) ;
631+
632+ TestState :: < ExtraCheckpoints > {
633+ checkpointer : self . checkpointer ,
634+ tempdir : self . tempdir ,
635+ _phantom : std:: marker:: PhantomData ,
636+ }
637+ }
638+ }
639+
640+ #[ test]
641+ fn test_checkpointer ( ) {
642+ // Empty checkpointer.
643+ let empty_checkpoints = TestState :: < Empty > :: new ( ) ;
644+
645+ // Add minimum number of checkpoints.
646+ let min_checkpoints = empty_checkpoints. checkpoint ( ) ;
647+
648+ // Add one extra checkpoint.
649+ let extra_checkpoints = min_checkpoints. checkpoint ( ) ;
650+
651+ // Veify we can GC back to minimum.
652+ let min_checkpoints = extra_checkpoints. gc ( ) ;
653+
654+ // Add two extra checkpoints.
655+ let one_extra = min_checkpoints. checkpoint ( ) ;
656+ let two_extra = one_extra. checkpoint ( ) ;
657+
658+ // There should be more than the minimum number of checkpoints.
659+ let keep = two_extra. newest_extra ( ) ;
660+
661+ // And GC while keeping the newest extra checkpoint.
662+ // This should be the only extra checkpoint remaining.
663+ let one_extra = two_extra. gc_with_except ( keep) ;
664+ assert ! ( one_extra. extras( ) . contains( & keep) && one_extra. extras( ) . len( ) == 1 ) ;
665+
666+ // Add one extra checkpoint.
667+ let two_extra = one_extra. checkpoint ( ) ;
668+
669+ // Now lets try to GC while keeping the oldest extra checkpoint.
670+ let keep = two_extra. oldest_extra ( ) ;
671+ let one_extra = two_extra. gc_with_except ( keep) ;
672+ assert ! ( one_extra. extras( ) . contains( & keep) && one_extra. extras( ) . len( ) == 1 ) ;
673+
674+ // Finally, GC back to minimum.
675+ let min_checkpoints = one_extra. gc ( ) ;
676+ // Verify that this is a valid minimum checkpoint state.
677+ min_checkpoints. precondition ( ) ;
678+ }
679+ }
0 commit comments