@@ -15,14 +15,14 @@ use crate::{
1515 cursor:: CursorList , merge_batches, Batch , BatchReader , BatchReaderFactories , Cursor ,
1616 Filter , Trace ,
1717 } ,
18- Error , NumEntries ,
18+ Error , NumEntries , Runtime ,
1919} ;
2020
2121use crate :: storage:: file:: to_bytes;
2222use crate :: storage:: write_commit_metadata;
2323pub use crate :: trace:: spine_async:: snapshot:: SpineSnapshot ;
2424use crate :: trace:: CommittedSpine ;
25- use metrics:: counter;
25+ use metrics:: { counter, gauge } ;
2626use ouroboros:: self_referencing;
2727use rand:: Rng ;
2828use rkyv:: {
@@ -44,19 +44,23 @@ use std::{
4444 sync:: Condvar ,
4545} ;
4646use textwrap:: indent;
47+ use uuid:: Uuid ;
4748
4849mod list_merger;
4950mod snapshot;
5051mod thread;
5152
5253use self :: thread:: { BackgroundThread , WorkerStatus } ;
5354use super :: BatchLocation ;
54- use crate :: circuit:: metrics:: COMPACTION_STALL_TIME ;
55+ use crate :: circuit:: metrics:: { BATCHES_PER_LEVEL , COMPACTION_STALL_TIME , ONGOING_MERGES_PER_LEVEL } ;
5556use list_merger:: { ListMerger , ListMergerBuilder } ;
5657
5758/// Maximum amount of levels in the spine.
5859pub ( crate ) const MAX_LEVELS : usize = 9 ;
5960
61+ /// Levels as &'static str for metrics
62+ pub ( crate ) const LEVELS_AS_STR : [ & str ; MAX_LEVELS ] = [ "0" , "1" , "2" , "3" , "4" , "5" , "6" , "7" , "8" ] ;
63+
6064impl < B : Batch + Send + Sync > From < ( Vec < String > , & Spine < B > ) > for CommittedSpine < B > {
6165 fn from ( ( batches, spine) : ( Vec < String > , & Spine < B > ) ) -> Self {
6266 CommittedSpine {
@@ -162,6 +166,9 @@ where
162166 request_exit : bool ,
163167 #[ size_of( skip) ]
164168 merge_stats : MergeStats ,
169+ /// Unique identifier for the spine for metrics.
170+ #[ size_of( skip) ]
171+ ident : & ' static str ,
165172}
166173
167174impl < B > SharedState < B >
@@ -176,6 +183,7 @@ where
176183 slots : std:: array:: from_fn ( |_| Slot :: default ( ) ) ,
177184 request_exit : false ,
178185 merge_stats : MergeStats :: default ( ) ,
186+ ident : String :: leak ( Uuid :: now_v7 ( ) . to_string ( ) ) ,
179187 }
180188 }
181189
@@ -184,9 +192,7 @@ where
184192 fn add_batches ( & mut self , batches : impl IntoIterator < Item = Arc < B > > ) {
185193 for batch in batches {
186194 if !batch. is_empty ( ) {
187- self . slots [ Spine :: < B > :: size_to_level ( batch. len ( ) ) ]
188- . loose_batches
189- . push_back ( batch) ;
195+ self . add_batch ( batch) ;
190196 }
191197 }
192198 }
@@ -197,6 +203,9 @@ where
197203 debug_assert ! ( !batch. is_empty( ) ) ;
198204 let level = Spine :: < B > :: size_to_level ( batch. len ( ) ) ;
199205 self . slots [ level] . loose_batches . push_back ( batch) ;
206+
207+ gauge ! ( BATCHES_PER_LEVEL , "worker" => Runtime :: worker_index_str( ) , "level" => LEVELS_AS_STR [ level] , "id" => self . ident)
208+ . set ( self . slots [ level] . n_batches ( ) as f64 ) ;
200209 }
201210
202211 fn should_apply_backpressure ( & self ) -> bool {
@@ -256,6 +265,8 @@ where
256265 let cache_stats = batches. iter ( ) . fold ( CacheStats :: default ( ) , |stats, batch| {
257266 stats + batch. cache_stats ( )
258267 } ) ;
268+ gauge ! ( ONGOING_MERGES_PER_LEVEL , "worker" => Runtime :: worker_index_str( ) , "level" => LEVELS_AS_STR [ level] , "id" => self . ident)
269+ . set ( 0 ) ;
259270 self . merge_stats . report_merge (
260271 batches. iter ( ) . map ( |b| b. len ( ) ) . sum ( ) ,
261272 new_batch. len ( ) ,
@@ -396,7 +407,7 @@ where
396407 let start = Instant :: now ( ) ;
397408 let mut state = self . no_backpressure . wait ( state) . unwrap ( ) ;
398409 state. merge_stats . backpressure_wait += start. elapsed ( ) ;
399- counter ! ( COMPACTION_STALL_TIME ) . increment ( start. elapsed ( ) . as_secs ( ) ) ;
410+ counter ! ( COMPACTION_STALL_TIME ) . increment ( start. elapsed ( ) . as_nanos ( ) as u64 ) ;
400411 }
401412 }
402413
@@ -514,6 +525,7 @@ where
514525 idle : & Arc < Condvar > ,
515526 no_backpressure : & Arc < Condvar > ,
516527 ) -> WorkerStatus {
528+ let ident = state. lock ( ) . unwrap ( ) . ident ;
517529 // Run in-progress merges.
518530 let ( ( key_filter, value_filter) , frontier) = {
519531 let shared = state. lock ( ) . unwrap ( ) ;
@@ -546,6 +558,9 @@ where
546558 . filter_map ( |( level, slot) | slot. try_start_merge ( level) . map ( |batches| ( level, batches) ) )
547559 . collect :: < Vec < _ > > ( ) ;
548560 for ( level, batches) in start_merges {
561+ gauge ! ( ONGOING_MERGES_PER_LEVEL , "worker" => Runtime :: worker_index_str( ) , "level" => LEVELS_AS_STR [ level] , "id" => ident)
562+ . set ( batches. len ( ) as f64 ) ;
563+
549564 let merger = ListMergerBuilder :: with_capacity ( batches. len ( ) )
550565 . with_batches ( batches)
551566 . build ( ) ;
0 commit comments