@@ -43,8 +43,8 @@ use crossbeam::{
4343} ;
4444use datafusion:: prelude:: * ;
4545use dbsp:: circuit:: metrics:: {
46- COMPACTION_STALL_TIME , DBSP_OPERATOR_COMMIT_LATENCY , DBSP_STEP , DBSP_STEP_LATENCY ,
47- FILES_CREATED , FILES_DELETED , TOTAL_LATE_RECORDS ,
46+ COMPACTION_STALL_TIME_NANOSECONDS , DBSP_OPERATOR_COMMIT_LATENCY_MICROSECONDS , DBSP_STEP ,
47+ DBSP_STEP_LATENCY_MICROSECONDS , FILES_CREATED , FILES_DELETED , TOTAL_LATE_RECORDS ,
4848} ;
4949use dbsp:: circuit:: tokio:: TOKIO ;
5050use dbsp:: circuit:: { CircuitStorageConfig , DevTweaks , Mode } ;
@@ -61,7 +61,8 @@ use feldera_ir::LirCircuit;
6161use feldera_storage:: checkpoint_synchronizer:: CheckpointSynchronizer ;
6262use feldera_storage:: histogram:: ExponentialHistogram ;
6363use feldera_storage:: metrics:: {
64- READ_BLOCKS , READ_LATENCY , SYNC_LATENCY , WRITE_BLOCKS , WRITE_LATENCY ,
64+ READ_BLOCKS_BYTES , READ_LATENCY_MICROSECONDS , SYNC_LATENCY_MICROSECONDS , WRITE_BLOCKS_BYTES ,
65+ WRITE_LATENCY_MICROSECONDS ,
6566} ;
6667use feldera_types:: checkpoint:: CheckpointMetadata ;
6768use feldera_types:: format:: json:: JsonLines ;
@@ -724,7 +725,7 @@ impl Controller {
724725 "compaction_stall_duration_seconds" ,
725726 "Time in seconds a worker was stalled waiting for more merges to complete." ,
726727 labels,
727- COMPACTION_STALL_TIME . load ( Ordering :: Relaxed ) as f64 / 1_000_000_000.0 ,
728+ COMPACTION_STALL_TIME_NANOSECONDS . load ( Ordering :: Relaxed ) as f64 / 1_000_000_000.0 ,
728729 ) ;
729730 metrics. counter (
730731 "files_created_total" ,
@@ -748,12 +749,12 @@ impl Controller {
748749 "dbsp_step_latency_seconds" ,
749750 "Latency of DBSP steps over the last 60 seconds or 1000 steps, whichever is less, in seconds" ,
750751 labels,
751- & HistogramDiv :: new ( DBSP_STEP_LATENCY . lock ( ) . unwrap ( ) . snapshot ( ) , 1_000_000.0 ) ) ;
752+ & HistogramDiv :: new ( DBSP_STEP_LATENCY_MICROSECONDS . lock ( ) . unwrap ( ) . snapshot ( ) , 1_000_000.0 ) ) ;
752753 metrics. histogram (
753754 "dbsp_operator_checkpoint_latency_seconds" ,
754755 "Latency of individual operator checkpoint operations in seconds. (Because checkpoints run in parallel across workers, these will not add to `feldera_checkpoint_latency_seconds`.)" ,
755756 labels,
756- & HistogramDiv :: new ( DBSP_OPERATOR_COMMIT_LATENCY . snapshot ( ) , 1_000_000.0 ) ,
757+ & HistogramDiv :: new ( DBSP_OPERATOR_COMMIT_LATENCY_MICROSECONDS . snapshot ( ) , 1_000_000.0 ) ,
757758 ) ;
758759
759760 metrics. histogram (
@@ -779,32 +780,32 @@ impl Controller {
779780 "storage_read_latency_seconds" ,
780781 "Read latency for storage blocks in seconds" ,
781782 labels,
782- & HistogramDiv :: new ( READ_LATENCY . snapshot ( ) , 1_000_000.0 ) ,
783+ & HistogramDiv :: new ( READ_LATENCY_MICROSECONDS . snapshot ( ) , 1_000_000.0 ) ,
783784 ) ;
784785 metrics. histogram (
785786 "storage_write_latency_seconds" ,
786787 "Write latency for storage blocks in seconds" ,
787788 labels,
788- & HistogramDiv :: new ( WRITE_LATENCY . snapshot ( ) , 1_000_000.0 ) ,
789+ & HistogramDiv :: new ( WRITE_LATENCY_MICROSECONDS . snapshot ( ) , 1_000_000.0 ) ,
789790 ) ;
790791 metrics. histogram (
791792 "storage_sync_latency_seconds" ,
792793 "Sync latency in seconds" ,
793794 labels,
794- & HistogramDiv :: new ( SYNC_LATENCY . snapshot ( ) , 1_000_000.0 ) ,
795+ & HistogramDiv :: new ( SYNC_LATENCY_MICROSECONDS . snapshot ( ) , 1_000_000.0 ) ,
795796 ) ;
796797
797798 metrics. histogram (
798799 "storage_read_block_bytes" ,
799800 "Sizes in bytes of blocks read from storage." ,
800801 labels,
801- & READ_BLOCKS . snapshot ( ) ,
802+ & READ_BLOCKS_BYTES . snapshot ( ) ,
802803 ) ;
803804 metrics. histogram (
804805 "storage_write_block_bytes" ,
805806 "Sizes in bytes of blocks written to storage." ,
806807 labels,
807- & WRITE_BLOCKS . snapshot ( ) ,
808+ & WRITE_BLOCKS_BYTES . snapshot ( ) ,
808809 ) ;
809810
810811 fn write_input_metric < F , M > (
@@ -1348,7 +1349,7 @@ impl CircuitThread {
13481349 . status
13491350 . global_metrics
13501351 . num_total_processed_records ( ) ;
1351- let written_before = WRITE_BLOCKS . sum ( ) ;
1352+ let written_before = WRITE_BLOCKS_BYTES . sum ( ) ;
13521353 let checkpoint = CHECKPOINT_LATENCY . record_callback ( || {
13531354 this. circuit
13541355 . commit_with_metadata ( this. step , processed_records)
@@ -1377,7 +1378,7 @@ impl CircuitThread {
13771378 . map_err ( Arc :: new)
13781379 } )
13791380 } ) ?;
1380- let written_after = WRITE_BLOCKS . sum ( ) ;
1381+ let written_after = WRITE_BLOCKS_BYTES . sum ( ) ;
13811382 CHECKPOINT_WRITTEN . record ( ( written_after - written_before) / 1_000_000 ) ;
13821383 CHECKPOINT_PROCESSED_RECORDS . store ( processed_records, Ordering :: Relaxed ) ;
13831384
0 commit comments