Skip to content

Commit 5f753c3

Browse files
committed
Formatting python.
Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
1 parent 2c62b0b commit 5f753c3

File tree

4 files changed

+72
-44
lines changed

4 files changed

+72
-44
lines changed

demo/all-packaged/run.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ def main():
3838
pipeline.start()
3939
time.sleep(2)
4040
status = pipeline.status()
41-
assert (
42-
status == PipelineStatus.RUNNING
43-
), f"FAIL: demo {demo['name']}: expected pipeline to be RUNNING but instead is {status}"
41+
assert status == PipelineStatus.RUNNING, (
42+
f"FAIL: demo {demo['name']}: expected pipeline to be RUNNING but instead is {status}"
43+
)
4444
pipeline.shutdown()
4545
print(f"PASS: demo {demo['name']}")
4646

python/tests/test_pipeline_builder.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -492,14 +492,20 @@ def test_avro_format(self):
492492
"format": {{
493493
"name": "avro",
494494
"config": {{
495-
"schema": {json.dumps(json.dumps({
496-
"type": "record",
497-
"name": "items",
498-
"fields": [
499-
{"name": "id", "type": ["null", "int"]},
500-
{"name": "name", "type": ["null", "string"]}
501-
]
502-
}))}
495+
"schema": {
496+
json.dumps(
497+
json.dumps(
498+
{
499+
"type": "record",
500+
"name": "items",
501+
"fields": [
502+
{"name": "id", "type": ["null", "int"]},
503+
{"name": "name", "type": ["null", "string"]},
504+
],
505+
}
506+
)
507+
)
508+
}
503509
}}
504510
}}
505511
}}

scripts/compilation_speed_benchmark.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ def main():
6565
program = ""
6666
times = []
6767
for i in range(0, len(queries)):
68-
print(f"Compiling program with {i+1} queries")
68+
print(f"Compiling program with {i + 1} queries")
6969
program += queries[i]
7070
program += ";"
7171
# print(f"program: {program}")

scripts/plot_metrics.py

Lines changed: 54 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -25,23 +25,29 @@ def parse_data(data_samples):
2525
for timestamp, sample in enumerate(data_samples):
2626
for entry in sample:
2727
key = entry["key"]
28-
value = entry["value"].get("Gauge") or entry["value"].get("Counter") # Handle Gauge and Counter values
29-
labels = {label[0]: label[1] for label in entry.get("labels", [])} # Convert labels to dict
28+
value = entry["value"].get("Gauge") or entry["value"].get(
29+
"Counter"
30+
) # Handle Gauge and Counter values
31+
labels = {
32+
label[0]: label[1] for label in entry.get("labels", [])
33+
} # Convert labels to dict
3034

3135
# Extract necessary information
3236
worker = labels.get("worker", "Total")
3337
level = labels.get("level", "Total")
3438
spine = labels.get("id", "Total")
3539

3640
# Append processed entry
37-
records.append({
38-
"timestamp": timestamp,
39-
"key": key,
40-
"worker": worker,
41-
"level": level,
42-
"value": value,
43-
"id": spine
44-
})
41+
records.append(
42+
{
43+
"timestamp": timestamp,
44+
"key": key,
45+
"worker": worker,
46+
"level": level,
47+
"value": value,
48+
"id": spine,
49+
}
50+
)
4551

4652
return pd.DataFrame(records)
4753

@@ -52,14 +58,20 @@ def make_plots(data_samples):
5258

5359
# Filter for specific metrics
5460
df_merges = df[df["key"] == "spine.ongoing_merges"]
55-
df_merges_summary = df_merges.groupby(["timestamp", "level"])["value"].agg(
56-
["mean", "max", "min"]).reset_index().melt(
57-
id_vars=["timestamp", "level"], var_name="stat", value_name="value")
61+
df_merges_summary = (
62+
df_merges.groupby(["timestamp", "level"])["value"]
63+
.agg(["mean", "max", "min"])
64+
.reset_index()
65+
.melt(id_vars=["timestamp", "level"], var_name="stat", value_name="value")
66+
)
5867

5968
df_batches = df[df["key"] == "spine.batches_per_level"]
60-
df_batches_summary = df_batches.groupby(["timestamp", "level"])["value"].agg(
61-
["mean", "max", "min"]).reset_index().melt(
62-
id_vars=["timestamp", "level"], var_name="stat", value_name="value")
69+
df_batches_summary = (
70+
df_batches.groupby(["timestamp", "level"])["value"]
71+
.agg(["mean", "max", "min"])
72+
.reset_index()
73+
.melt(id_vars=["timestamp", "level"], var_name="stat", value_name="value")
74+
)
6375

6476
# Get bytes written at every time
6577
df_disk = df[df["key"] == "disk.total_bytes_written"]
@@ -88,35 +100,45 @@ def make_plots(data_samples):
88100
# Plot function
89101
def create_plot(df, title, filename):
90102
plot = (
91-
ggplot(df, aes(x="timestamp", y="value", color="stat", group="stat"))
92-
+ geom_line(size=1)
93-
+ facet_wrap("~level", scales="free")
94-
+ labs(title=title, x="Time", y="Value")
95-
+ theme_classic()
96-
+ scale_y_continuous(limits=(0, None))
103+
ggplot(df, aes(x="timestamp", y="value", color="stat", group="stat"))
104+
+ geom_line(size=1)
105+
+ facet_wrap("~level", scales="free")
106+
+ labs(title=title, x="Time", y="Value")
107+
+ theme_classic()
108+
+ scale_y_continuous(limits=(0, None))
97109
)
98110
plot.save(filename, width=12, height=6, dpi=300)
99111
print(f"Saved {filename}")
100112

101113
# Generate plots
102-
create_plot(df_merges, "Current #Batches being merged (Min/Avg/Max from all Spines)", "ongoing_merges.png")
103-
create_plot(df_batches, "Current #Batches not being merged (Min/Avg/Max from all Spine)", "batches_per_level.png")
114+
create_plot(
115+
df_merges,
116+
"Current #Batches being merged (Min/Avg/Max from all Spines)",
117+
"ongoing_merges.png",
118+
)
119+
create_plot(
120+
df_batches,
121+
"Current #Batches not being merged (Min/Avg/Max from all Spine)",
122+
"batches_per_level.png",
123+
)
104124

105125
plot = (
106-
ggplot(df_totals, aes(x="timestamp", y="value", color="metric"))
107-
+ geom_line(size=1)
108-
+ facet_grid("metric ~ .", scales="free_y") # Separate plots for MiB/s and Counts
109-
+ labs(title="Pipeline Totals", x="Time", y="Value")
110-
+ theme_classic()
111-
+ scale_y_continuous(limits=(0, None))
126+
ggplot(df_totals, aes(x="timestamp", y="value", color="metric"))
127+
+ geom_line(size=1)
128+
+ facet_grid(
129+
"metric ~ .", scales="free_y"
130+
) # Separate plots for MiB/s and Counts
131+
+ labs(title="Pipeline Totals", x="Time", y="Value")
132+
+ theme_classic()
133+
+ scale_y_continuous(limits=(0, None))
112134
)
113135

114136
# Save the plot
115137
plot.save("pipeline_totals.png", width=12, height=8, dpi=300)
116138

117139

118-
if __name__ == '__main__':
140+
if __name__ == "__main__":
119141
import sys
120142

121143
samples = get_data_samples(sys.argv[1])
122-
make_plots(samples)
144+
make_plots(samples)

0 commit comments

Comments
 (0)