@@ -1098,6 +1098,9 @@ pub trait Node: Any {
10981098 /// from a checkpoint must be backfilled from clean state.
10991099 fn clear_state ( & mut self ) -> Result < ( ) , DbspError > ;
11001100
1101+ /// Call [`Operator::start_compaction`](super::operator_traits::Operator::start_compaction) on the operator this node encapsulates.
1102+ fn start_compaction ( & mut self ) ;
1103+
11011104 /// Place operator in the replay mode.
11021105 ///
11031106 /// In the replay mode the operator streams its stored state to a temporary
@@ -1836,6 +1839,8 @@ pub trait CircuitBase: 'static {
18361839 ) -> Result < PartitioningPolicy , DbspError > ;
18371840
18381841 fn rebalance ( & self ) ;
1842+
1843+ fn start_compaction ( & self ) ;
18391844}
18401845
18411846/// The circuit interface. All DBSP computation takes place within a circuit.
@@ -3506,6 +3511,13 @@ where
35063511 fn rebalance ( & self ) {
35073512 self . inner ( ) . balancer . rebalance ( )
35083513 }
3514+
3515+ fn start_compaction ( & self ) {
3516+ let _ = self . map_local_nodes_mut ( & mut |node| {
3517+ node. start_compaction ( ) ;
3518+ Ok ( ( ) )
3519+ } ) ;
3520+ }
35093521}
35103522
35113523impl < P , T > Circuit for ChildCircuit < P , T >
@@ -4673,6 +4685,10 @@ where
46734685 self . operator . restore ( base, self . persistent_id ( ) . as_deref ( ) )
46744686 }
46754687
4688+ fn start_compaction ( & mut self ) {
4689+ self . operator . start_compaction ( )
4690+ }
4691+
46764692 fn clear_state ( & mut self ) -> Result < ( ) , DbspError > {
46774693 self . operator . clear_state ( )
46784694 }
@@ -4820,6 +4836,10 @@ where
48204836 self . operator . restore ( base, self . persistent_id ( ) . as_deref ( ) )
48214837 }
48224838
4839+ fn start_compaction ( & mut self ) {
4840+ self . operator . start_compaction ( )
4841+ }
4842+
48234843 fn clear_state ( & mut self ) -> Result < ( ) , DbspError > {
48244844 self . operator . clear_state ( )
48254845 }
@@ -4981,6 +5001,10 @@ where
49815001 self . operator . restore ( base, self . persistent_id ( ) . as_deref ( ) )
49825002 }
49835003
5004+ fn start_compaction ( & mut self ) {
5005+ self . operator . start_compaction ( )
5006+ }
5007+
49845008 fn clear_state ( & mut self ) -> Result < ( ) , DbspError > {
49855009 self . operator . clear_state ( )
49865010 }
@@ -5135,6 +5159,10 @@ where
51355159 self . operator . restore ( base, self . persistent_id ( ) . as_deref ( ) )
51365160 }
51375161
5162+ fn start_compaction ( & mut self ) {
5163+ self . operator . start_compaction ( )
5164+ }
5165+
51385166 fn clear_state ( & mut self ) -> Result < ( ) , DbspError > {
51395167 self . operator . clear_state ( )
51405168 }
@@ -5346,6 +5374,10 @@ where
53465374 self . operator . restore ( base, self . persistent_id ( ) . as_deref ( ) )
53475375 }
53485376
5377+ fn start_compaction ( & mut self ) {
5378+ self . operator . start_compaction ( )
5379+ }
5380+
53495381 fn clear_state ( & mut self ) -> Result < ( ) , DbspError > {
53505382 self . operator . clear_state ( )
53515383 }
@@ -5533,6 +5565,10 @@ where
55335565 self . operator . restore ( base, self . persistent_id ( ) . as_deref ( ) )
55345566 }
55355567
5568+ fn start_compaction ( & mut self ) {
5569+ self . operator . start_compaction ( )
5570+ }
5571+
55365572 fn clear_state ( & mut self ) -> Result < ( ) , DbspError > {
55375573 self . operator . clear_state ( )
55385574 }
@@ -5744,6 +5780,10 @@ where
57445780 self . operator . restore ( base, self . persistent_id ( ) . as_deref ( ) )
57455781 }
57465782
5783+ fn start_compaction ( & mut self ) {
5784+ self . operator . start_compaction ( )
5785+ }
5786+
57475787 fn clear_state ( & mut self ) -> Result < ( ) , DbspError > {
57485788 self . operator . clear_state ( )
57495789 }
@@ -5929,6 +5969,10 @@ where
59295969 self . operator . restore ( base, self . persistent_id ( ) . as_deref ( ) )
59305970 }
59315971
5972+ fn start_compaction ( & mut self ) {
5973+ self . operator . start_compaction ( )
5974+ }
5975+
59325976 fn clear_state ( & mut self ) -> Result < ( ) , DbspError > {
59335977 self . operator . clear_state ( )
59345978 }
@@ -6135,6 +6179,10 @@ where
61356179 self . operator . restore ( base, self . persistent_id ( ) . as_deref ( ) )
61366180 }
61376181
6182+ fn start_compaction ( & mut self ) {
6183+ self . operator . start_compaction ( )
6184+ }
6185+
61386186 fn clear_state ( & mut self ) -> Result < ( ) , DbspError > {
61396187 self . operator . clear_state ( )
61406188 }
@@ -6326,6 +6374,10 @@ where
63266374 self . operator . restore ( base, self . persistent_id ( ) . as_deref ( ) )
63276375 }
63286376
6377+ fn start_compaction ( & mut self ) {
6378+ self . operator . start_compaction ( )
6379+ }
6380+
63296381 fn clear_state ( & mut self ) -> Result < ( ) , DbspError > {
63306382 self . operator . clear_state ( )
63316383 }
@@ -6507,6 +6559,10 @@ where
65076559 . restore ( base, self . persistent_id ( ) . as_deref ( ) )
65086560 }
65096561
6562+ fn start_compaction ( & mut self ) {
6563+ self . operator . borrow_mut ( ) . start_compaction ( )
6564+ }
6565+
65106566 fn clear_state ( & mut self ) -> Result < ( ) , DbspError > {
65116567 self . operator . borrow_mut ( ) . clear_state ( )
65126568 }
@@ -6672,6 +6728,8 @@ where
66726728 Ok ( ( ) )
66736729 }
66746730
6731+ fn start_compaction ( & mut self ) { }
6732+
66756733 fn clear_state ( & mut self ) -> Result < ( ) , DbspError > {
66766734 Ok ( ( ) )
66776735 }
@@ -6900,6 +6958,10 @@ where
69006958 Ok ( ( ) )
69016959 }
69026960
6961+ fn start_compaction ( & mut self ) {
6962+ self . circuit . start_compaction ( ) ;
6963+ }
6964+
69036965 fn clear_state ( & mut self ) -> Result < ( ) , DbspError > {
69046966 self . circuit
69056967 . map_local_nodes_mut ( & mut |node| node. clear_state ( ) )
@@ -7669,6 +7731,10 @@ impl CircuitHandle {
76697731 pub fn rebalance ( & self ) {
76707732 self . circuit . rebalance ( )
76717733 }
7734+
7735+ pub fn start_compaction ( & self ) {
7736+ self . circuit . start_compaction ( )
7737+ }
76727738}
76737739
76747740pin_project ! {
0 commit comments