Skip to content

Commit ea35ba3

Browse files
authored
fix: init mp pool & grpc client once, use os.sched_setaffinity (#1751)
fix: init mp pool & grpc client once, use os.sched_setaffinity - mp pool is initalize once per benchmark run, otherwise time to init pool is affecting throughput calculations. - grpc client should be intialized once per process in an event loop, otherwise processes get stuck . - os.sched_setaffinity helps in pinning the process to a set of cores to avoid hard irq from nic.
1 parent 8d38be4 commit ea35ba3

File tree

1 file changed

+43
-25
lines changed
  • packages/google-cloud-storage/tests/perf/microbenchmarks/writes

1 file changed

+43
-25
lines changed

packages/google-cloud-storage/tests/perf/microbenchmarks/writes/test_writes.py

Lines changed: 43 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,10 @@
3838
from tests.perf.microbenchmarks._utils import (
3939
publish_benchmark_extra_info,
4040
RandomBytesIO,
41+
get_irq_affinity,
4142
)
4243
from tests.perf.microbenchmarks.conftest import publish_resource_metrics
4344
import tests.perf.microbenchmarks.writes.config as config
44-
from google.cloud import storage
4545

4646
# Get write parameters
4747
all_params = config.get_write_params()
@@ -318,10 +318,34 @@ def target_wrapper(*args, **kwargs):
318318
)
319319

320320

321+
# --- Global Variables for Worker Process ---
322+
worker_loop = None
323+
worker_client = None
324+
worker_json_client = None
325+
326+
327+
def _worker_init(bucket_type):
328+
"""Initializes a persistent event loop and client for each worker process."""
329+
cpu_affinity = get_irq_affinity()
330+
if cpu_affinity:
331+
os.sched_setaffinity(
332+
0, {i for i in range(1, os.cpu_count()) if i not in cpu_affinity}
333+
)
334+
global worker_loop, worker_client, worker_json_client
335+
if bucket_type == "zonal":
336+
worker_loop = asyncio.new_event_loop()
337+
asyncio.set_event_loop(worker_loop)
338+
worker_client = worker_loop.run_until_complete(create_client())
339+
else: # regional
340+
from google.cloud import storage
341+
342+
worker_json_client = storage.Client()
343+
344+
321345
def _upload_files_worker(files_to_upload, other_params, bucket_type):
322346
"""A worker function for multi-processing uploads.
323347
324-
Initializes a client and calls the appropriate multi-coroutine upload function.
348+
Calls the appropriate multi-coroutine upload function using the global client.
325349
This function is intended to be called in a separate process.
326350
327351
Args:
@@ -333,41 +357,28 @@ def _upload_files_worker(files_to_upload, other_params, bucket_type):
333357
float: The maximum latency from the uploads performed by this worker.
334358
"""
335359
if bucket_type == "zonal":
336-
loop = asyncio.new_event_loop()
337-
asyncio.set_event_loop(loop)
338-
client = loop.run_until_complete(create_client())
339-
try:
340-
result = upload_files_using_grpc_multi_coro(
341-
loop, client, files_to_upload, other_params
342-
)
343-
finally:
344-
# cleanup loop
345-
tasks = asyncio.all_tasks(loop=loop)
346-
for task in tasks:
347-
task.cancel()
348-
loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True))
349-
loop.close()
350-
return result
360+
return upload_files_using_grpc_multi_coro(
361+
worker_loop, worker_client, files_to_upload, other_params
362+
)
351363
else: # regional
352-
json_client = storage.Client()
353364
return upload_files_using_json_multi_threaded(
354-
None, json_client, files_to_upload, other_params
365+
None, worker_json_client, files_to_upload, other_params
355366
)
356367

357368

358-
def upload_files_mp_mc_wrapper(files_names, params):
369+
def upload_files_mp_mc_wrapper(pool, files_names, params):
359370
"""Wrapper for multi-process, multi-coroutine uploads.
360371
361372
Distributes files among a pool of processes and calls the worker function.
362373
363374
Args:
375+
pool: The multiprocessing pool.
364376
files_names (list): The full list of filenames to upload.
365-
params: An object containing benchmark parameters (num_processes, num_coros).
377+
params: An object containing benchmark parameters (num_coros).
366378
367379
Returns:
368380
float: The maximum latency observed across all processes.
369381
"""
370-
num_processes = params.num_processes
371382
num_coros = params.num_coros
372383

373384
filenames_per_process = [
@@ -383,9 +394,7 @@ def upload_files_mp_mc_wrapper(files_names, params):
383394
for filenames in filenames_per_process
384395
]
385396

386-
ctx = multiprocessing.get_context("spawn")
387-
with ctx.Pool(processes=num_processes) as pool:
388-
results = pool.starmap(_upload_files_worker, args)
397+
results = pool.starmap(_upload_files_worker, args)
389398

390399
return max(results)
391400

@@ -414,18 +423,27 @@ def target_wrapper(*args, **kwargs):
414423
output_times.append(result)
415424
return output_times
416425

426+
ctx = multiprocessing.get_context("spawn")
427+
pool = ctx.Pool(
428+
processes=params.num_processes,
429+
initializer=_worker_init,
430+
initargs=(params.bucket_type,),
431+
)
417432
try:
418433
with monitor() as m:
419434
output_times = benchmark.pedantic(
420435
target=target_wrapper,
421436
iterations=1,
422437
rounds=params.rounds,
423438
args=(
439+
pool,
424440
files_names,
425441
params,
426442
),
427443
)
428444
finally:
445+
pool.close()
446+
pool.join()
429447
publish_benchmark_extra_info(
430448
benchmark, params, benchmark_group="write", true_times=output_times
431449
)

0 commit comments

Comments
 (0)