@@ -25,23 +25,29 @@ def parse_data(data_samples):
2525 for timestamp , sample in enumerate (data_samples ):
2626 for entry in sample :
2727 key = entry ["key" ]
28- value = entry ["value" ].get ("Gauge" ) or entry ["value" ].get ("Counter" ) # Handle Gauge and Counter values
29- labels = {label [0 ]: label [1 ] for label in entry .get ("labels" , [])} # Convert labels to dict
28+ value = entry ["value" ].get ("Gauge" ) or entry ["value" ].get (
29+ "Counter"
30+ ) # Handle Gauge and Counter values
31+ labels = {
32+ label [0 ]: label [1 ] for label in entry .get ("labels" , [])
33+ } # Convert labels to dict
3034
3135 # Extract necessary information
3236 worker = labels .get ("worker" , "Total" )
3337 level = labels .get ("level" , "Total" )
3438 spine = labels .get ("id" , "Total" )
3539
3640 # Append processed entry
37- records .append ({
38- "timestamp" : timestamp ,
39- "key" : key ,
40- "worker" : worker ,
41- "level" : level ,
42- "value" : value ,
43- "id" : spine
44- })
41+ records .append (
42+ {
43+ "timestamp" : timestamp ,
44+ "key" : key ,
45+ "worker" : worker ,
46+ "level" : level ,
47+ "value" : value ,
48+ "id" : spine ,
49+ }
50+ )
4551
4652 return pd .DataFrame (records )
4753
@@ -52,14 +58,20 @@ def make_plots(data_samples):
5258
5359 # Filter for specific metrics
5460 df_merges = df [df ["key" ] == "spine.ongoing_merges" ]
55- df_merges_summary = df_merges .groupby (["timestamp" , "level" ])["value" ].agg (
56- ["mean" , "max" , "min" ]).reset_index ().melt (
57- id_vars = ["timestamp" , "level" ], var_name = "stat" , value_name = "value" )
61+ df_merges_summary = (
62+ df_merges .groupby (["timestamp" , "level" ])["value" ]
63+ .agg (["mean" , "max" , "min" ])
64+ .reset_index ()
65+ .melt (id_vars = ["timestamp" , "level" ], var_name = "stat" , value_name = "value" )
66+ )
5867
5968 df_batches = df [df ["key" ] == "spine.batches_per_level" ]
60- df_batches_summary = df_batches .groupby (["timestamp" , "level" ])["value" ].agg (
61- ["mean" , "max" , "min" ]).reset_index ().melt (
62- id_vars = ["timestamp" , "level" ], var_name = "stat" , value_name = "value" )
69+ df_batches_summary = (
70+ df_batches .groupby (["timestamp" , "level" ])["value" ]
71+ .agg (["mean" , "max" , "min" ])
72+ .reset_index ()
73+ .melt (id_vars = ["timestamp" , "level" ], var_name = "stat" , value_name = "value" )
74+ )
6375
6476 # Get bytes written at every time
6577 df_disk = df [df ["key" ] == "disk.total_bytes_written" ]
@@ -88,35 +100,45 @@ def make_plots(data_samples):
88100 # Plot function
89101 def create_plot (df , title , filename ):
90102 plot = (
91- ggplot (df , aes (x = "timestamp" , y = "value" , color = "stat" , group = "stat" ))
92- + geom_line (size = 1 )
93- + facet_wrap ("~level" , scales = "free" )
94- + labs (title = title , x = "Time" , y = "Value" )
95- + theme_classic ()
96- + scale_y_continuous (limits = (0 , None ))
103+ ggplot (df , aes (x = "timestamp" , y = "value" , color = "stat" , group = "stat" ))
104+ + geom_line (size = 1 )
105+ + facet_wrap ("~level" , scales = "free" )
106+ + labs (title = title , x = "Time" , y = "Value" )
107+ + theme_classic ()
108+ + scale_y_continuous (limits = (0 , None ))
97109 )
98110 plot .save (filename , width = 12 , height = 6 , dpi = 300 )
99111 print (f"Saved { filename } " )
100112
101113 # Generate plots
102- create_plot (df_merges , "Current #Batches being merged (Min/Avg/Max from all Spines)" , "ongoing_merges.png" )
103- create_plot (df_batches , "Current #Batches not being merged (Min/Avg/Max from all Spine)" , "batches_per_level.png" )
114+ create_plot (
115+ df_merges ,
116+ "Current #Batches being merged (Min/Avg/Max from all Spines)" ,
117+ "ongoing_merges.png" ,
118+ )
119+ create_plot (
120+ df_batches ,
121+ "Current #Batches not being merged (Min/Avg/Max from all Spine)" ,
122+ "batches_per_level.png" ,
123+ )
104124
105125 plot = (
106- ggplot (df_totals , aes (x = "timestamp" , y = "value" , color = "metric" ))
107- + geom_line (size = 1 )
108- + facet_grid ("metric ~ ." , scales = "free_y" ) # Separate plots for MiB/s and Counts
109- + labs (title = "Pipeline Totals" , x = "Time" , y = "Value" )
110- + theme_classic ()
111- + scale_y_continuous (limits = (0 , None ))
126+ ggplot (df_totals , aes (x = "timestamp" , y = "value" , color = "metric" ))
127+ + geom_line (size = 1 )
128+ + facet_grid (
129+ "metric ~ ." , scales = "free_y"
130+ ) # Separate plots for MiB/s and Counts
131+ + labs (title = "Pipeline Totals" , x = "Time" , y = "Value" )
132+ + theme_classic ()
133+ + scale_y_continuous (limits = (0 , None ))
112134 )
113135
114136 # Save the plot
115137 plot .save ("pipeline_totals.png" , width = 12 , height = 8 , dpi = 300 )
116138
117139
118- if __name__ == ' __main__' :
140+ if __name__ == " __main__" :
119141 import sys
120142
121143 samples = get_data_samples (sys .argv [1 ])
122- make_plots (samples )
144+ make_plots (samples )
0 commit comments