Skip to content

Commit 2eb6ce3

Browse files
aehmttwblp
authored andcommitted
Add circuit profile
Signed-off-by: Matei <matei@feldera.com>
1 parent fe811b9 commit 2eb6ce3

File tree

1 file changed

+13
-1
lines changed

1 file changed

+13
-1
lines changed

benchmark/feldera-sql/run.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ def main():
137137
group.add_argument('--csv-metrics', help='File to write pipeline metrics (memory, disk) in .csv format')
138138
group.add_argument('--metrics-interval', help='How often metrics should be sampled, in seconds (default: 1)')
139139
group.add_argument('--include-disabled', action=argparse.BooleanOptionalAction, help='Include queries from the disabled-queries/ directory.')
140+
group.add_argument('--circuit-profile', action=argparse.BooleanOptionalAction, help='If set to true, will save a circuit profile (default: --no-circuit-profile)')
140141

141142
group = parser.add_argument_group("Options for Nexmark benchmark only")
142143
group.add_argument('--lateness', action=argparse.BooleanOptionalAction, help='whether to use lateness for GC to save memory (default: --lateness)')
@@ -148,6 +149,7 @@ def main():
148149
group.add_argument("--poller-threads", required=False, type=int, help="Override number of poller threads to use")
149150
group.add_argument('--input-topic-suffix', help='suffix to apply to input topic names (by default, "")')
150151
parser.set_defaults(lateness=True, storage=False, cores=16, metrics_interval=1, folder='benchmarks/nexmark', events=100000)
152+
151153

152154
global api_url, kafka_options, headers
153155
api_url = parser.parse_args().api_url
@@ -168,6 +170,7 @@ def main():
168170
disabled_folder = os.path.join(FILE_DIR, folder + '/disabled-queries/')
169171
if include_disabled and os.path.exists(disabled_folder):
170172
all_queries |= load_queries(disabled_folder)
173+
profile = parser.parse_args().circuit_profile
171174

172175
queries = sort_queries(parse_queries(all_queries, parser.parse_args().query))
173176
storage = parser.parse_args().storage
@@ -291,7 +294,16 @@ def main():
291294

292295
results += [[when, "feldera", "stream", "sql", pipeline_name, cores, last_processed, elapsed, peak_memory, cpu_msecs]]
293296

294-
# Start pipeline
297+
if profile:
298+
response = requests.get(f"{api_url}/v0/pipelines/{full_name}/circuit_profile", headers=headers)
299+
profile_file_name = full_name + ".zip"
300+
print("\nWriting circuit profile stats to " + profile_file_name + "...")
301+
with open(profile_file_name, 'wb') as f:
302+
for chunk in response.iter_content(1024):
303+
f.write(chunk)
304+
break
305+
306+
# Stop pipeline
295307
elapsed = stop_pipeline(full_name, True)
296308
print(f"Stopped pipeline {full_name} in {elapsed:.1f} s")
297309

0 commit comments

Comments
 (0)