3838from tests .perf .microbenchmarks ._utils import (
3939 publish_benchmark_extra_info ,
4040 RandomBytesIO ,
41+ get_irq_affinity ,
4142)
4243from tests .perf .microbenchmarks .conftest import publish_resource_metrics
4344import tests .perf .microbenchmarks .writes .config as config
44- from google .cloud import storage
4545
4646# Get write parameters
4747all_params = config .get_write_params ()
@@ -318,10 +318,34 @@ def target_wrapper(*args, **kwargs):
318318 )
319319
320320
321+ # --- Global Variables for Worker Process ---
322+ worker_loop = None
323+ worker_client = None
324+ worker_json_client = None
325+
326+
327+ def _worker_init (bucket_type ):
328+ """Initializes a persistent event loop and client for each worker process."""
329+ cpu_affinity = get_irq_affinity ()
330+ if cpu_affinity :
331+ os .sched_setaffinity (
332+ 0 , {i for i in range (1 , os .cpu_count ()) if i not in cpu_affinity }
333+ )
334+ global worker_loop , worker_client , worker_json_client
335+ if bucket_type == "zonal" :
336+ worker_loop = asyncio .new_event_loop ()
337+ asyncio .set_event_loop (worker_loop )
338+ worker_client = worker_loop .run_until_complete (create_client ())
339+ else : # regional
340+ from google .cloud import storage
341+
342+ worker_json_client = storage .Client ()
343+
344+
321345def _upload_files_worker (files_to_upload , other_params , bucket_type ):
322346 """A worker function for multi-processing uploads.
323347
324- Initializes a client and calls the appropriate multi-coroutine upload function.
348+ Calls the appropriate multi-coroutine upload function using the global client .
325349 This function is intended to be called in a separate process.
326350
327351 Args:
@@ -333,41 +357,28 @@ def _upload_files_worker(files_to_upload, other_params, bucket_type):
333357 float: The maximum latency from the uploads performed by this worker.
334358 """
335359 if bucket_type == "zonal" :
336- loop = asyncio .new_event_loop ()
337- asyncio .set_event_loop (loop )
338- client = loop .run_until_complete (create_client ())
339- try :
340- result = upload_files_using_grpc_multi_coro (
341- loop , client , files_to_upload , other_params
342- )
343- finally :
344- # cleanup loop
345- tasks = asyncio .all_tasks (loop = loop )
346- for task in tasks :
347- task .cancel ()
348- loop .run_until_complete (asyncio .gather (* tasks , return_exceptions = True ))
349- loop .close ()
350- return result
360+ return upload_files_using_grpc_multi_coro (
361+ worker_loop , worker_client , files_to_upload , other_params
362+ )
351363 else : # regional
352- json_client = storage .Client ()
353364 return upload_files_using_json_multi_threaded (
354- None , json_client , files_to_upload , other_params
365+ None , worker_json_client , files_to_upload , other_params
355366 )
356367
357368
358- def upload_files_mp_mc_wrapper (files_names , params ):
369+ def upload_files_mp_mc_wrapper (pool , files_names , params ):
359370 """Wrapper for multi-process, multi-coroutine uploads.
360371
361372 Distributes files among a pool of processes and calls the worker function.
362373
363374 Args:
375+ pool: The multiprocessing pool.
364376 files_names (list): The full list of filenames to upload.
365- params: An object containing benchmark parameters (num_processes, num_coros).
377+ params: An object containing benchmark parameters (num_coros).
366378
367379 Returns:
368380 float: The maximum latency observed across all processes.
369381 """
370- num_processes = params .num_processes
371382 num_coros = params .num_coros
372383
373384 filenames_per_process = [
@@ -383,9 +394,7 @@ def upload_files_mp_mc_wrapper(files_names, params):
383394 for filenames in filenames_per_process
384395 ]
385396
386- ctx = multiprocessing .get_context ("spawn" )
387- with ctx .Pool (processes = num_processes ) as pool :
388- results = pool .starmap (_upload_files_worker , args )
397+ results = pool .starmap (_upload_files_worker , args )
389398
390399 return max (results )
391400
@@ -414,18 +423,27 @@ def target_wrapper(*args, **kwargs):
414423 output_times .append (result )
415424 return output_times
416425
426+ ctx = multiprocessing .get_context ("spawn" )
427+ pool = ctx .Pool (
428+ processes = params .num_processes ,
429+ initializer = _worker_init ,
430+ initargs = (params .bucket_type ,),
431+ )
417432 try :
418433 with monitor () as m :
419434 output_times = benchmark .pedantic (
420435 target = target_wrapper ,
421436 iterations = 1 ,
422437 rounds = params .rounds ,
423438 args = (
439+ pool ,
424440 files_names ,
425441 params ,
426442 ),
427443 )
428444 finally :
445+ pool .close ()
446+ pool .join ()
429447 publish_benchmark_extra_info (
430448 benchmark , params , benchmark_group = "write" , true_times = output_times
431449 )
0 commit comments