Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Fix parallel MC example to run on Windows
On Windows one needs to use multiprocessing.Manager to create
the barrier to be shared across workers, and passed to worker
initialization routine as argument.

Also passed batch_size to routines as keyword argument, and in
parallel_mc.py passed mc_worker_func to worker initialization
routine as keyword argument, which assigns to process-global
variable for reuse by subsequent MC computation tasks.
  • Loading branch information
oleksandr-pavlyk committed Mar 22, 2023
commit 11aeef259c3e45b3d4dea60589b7f7eaec830465
28 changes: 15 additions & 13 deletions examples/parallel_mc.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
import multiprocessing as mp
from functools import partial

__all__ = ['parallel_mc_run']

def worker_compute(w_id):
"Worker function executed on the spawned slave process"
# global _local_rs
return _worker_mc_compute(_local_rs)
global _local_rs, _worker_mc_compute_func
return _worker_mc_compute_func(_local_rs)


def assign_worker_rs(w_rs):
def init_worker(w_rs, mc_compute_func=None, barrier=None):
"""Assign process local random state variable `rs` the given value"""
assert not '_local_rs' in globals(), "Here comes trouble. Process is not expected to have global variable `_local_rs`"

global _local_rs
global _local_rs, _worker_mc_compute_func
_local_rs = w_rs
_worker_mc_compute_func = mc_compute_func
# wait to ensure that the assignment takes place for each worker
b.wait()
barrier.wait()

def parallel_mc_run(random_states, n_workers, n_batches, mc_func):
"""
Expand All @@ -25,15 +27,15 @@ def parallel_mc_run(random_states, n_workers, n_batches, mc_func):
and has access to worker-local global variable `rs`, containing worker's random states.
"""
# use of Barrier ensures that every worker gets one
global b, _worker_mc_compute
b = mp.Barrier(n_workers)

with mp.Manager() as manager:
b = manager.Barrier(n_workers)

_worker_mc_compute = mc_func
with mp.Pool(processes=n_workers) as pool:
# 1. map over every worker once to distribute RandomState instances
pool.map(assign_worker_rs, random_states, chunksize=1)
# 2. Perform computations on workers
r = pool.map(worker_compute, range(n_batches), chunksize=1)
with mp.Pool(processes=n_workers) as pool:
# 1. map over every worker once to distribute RandomState instances
pool.map(partial(init_worker, mc_compute_func=mc_func, barrier=b), random_states, chunksize=1)
# 2. Perform computations on workers
r = pool.map(worker_compute, range(n_batches), chunksize=1)

return r

Expand Down
16 changes: 9 additions & 7 deletions examples/stick_tetrahedron.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from sticky_math import mc_six_piece_stick_tetrahedron_prob
from arg_parsing import parse_arguments

def mc_runner(rs):
def mc_runner(rs, batch_size=None):
return mc_six_piece_stick_tetrahedron_prob(rs, batch_size)

def aggregate_mc_counts(counts, n_batches, batch_size):
Expand Down Expand Up @@ -40,6 +40,7 @@ def print_result(p_est, p_std, mc_size):
from itertools import repeat
from timeit import default_timer as timer
import sys
from functools import partial

args = parse_arguments()

Expand All @@ -51,23 +52,24 @@ def print_result(p_est, p_std, mc_size):
batch_size = args.batch_size
batches = args.batch_count
id0 = args.id_offset
print("Parallel Monte-Carlo estimation of stick tetrahedron probability")
print("Input parameters: -s {seed} -b {batchSize} -n {numBatches} -p {processes} -d {idOffset}".format(
seed=args.seed, batchSize=args.batch_size, numBatches=args.batch_count, processes=n_workers, idOffset=args.id_offset))
print("")

t0 = timer()

rss = build_MT2203_random_states(seed, id0, n_workers)
r = parallel_mc_run(rss, n_workers, batches, mc_runner)
# r = sequential_mc_run(rss, n_workers, batches, mc_runner)

r = parallel_mc_run(rss, n_workers, batches, partial(mc_runner, batch_size=batch_size))
# r = sequential_mc_run(rss, n_workers, batches, partial(mc_runner, batch_size=batch_size))

# retrieve values of estimates into numpy array
counts = np.fromiter(r, dtype=np.double)
p_est, p_std, event_count, nonevent_count = aggregate_mc_counts(counts, batches, batch_size)

t1 = timer()


print("Input parameters: -s {seed} -b {batchSize} -n {numBatches} -p {processes} -d {idOffset}".format(
seed=args.seed, batchSize=args.batch_size, numBatches=args.batch_count, processes=n_workers, idOffset=args.id_offset))
print("")
print_result(p_est, p_std, batches * batch_size)
print("")
print("Bayesian posterior beta distribution parameters: ({0}, {1})".format(event_count, nonevent_count))
Expand Down
27 changes: 16 additions & 11 deletions examples/fancy.py → examples/stick_triangle.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,41 +39,46 @@ def mc_dist(rs, n):
return mc_prob


def assign_worker_rs(w_rs):
def init_worker(w_rs, barrier=None):
"""Assign process local random state variable `rs` the given value"""
assert not 'rs' in globals(), "Here comes trouble. Process is not expected to have global variable `rs`"

global rs
rs = w_rs
# wait to ensure that the assignment takes place for each worker
b.wait()
barrier.wait()


def worker_compute(w_id):
def worker_compute(w_id, batch_size=None):
return mc_dist(rs, batch_size)


if __name__ == '__main__':
import multiprocessing as mp
from itertools import repeat
from timeit import default_timer as timer
from functools import partial

seed = 77777
n_workers = 12
batch_size = 1024 * 256
batches = 10000
print("Parallel Monte-Carlo estimation of stick triangle probability")
print(f"Parameters: n_workers={n_workers}, batch_size={batch_size}, n_batches={batches}, seed={seed}")
print("")

t0 = timer()
# Create instances of RandomState for each worker process from MT2203 family of generators
rss = [ rnd.RandomState(seed, brng=('MT2203', idx)) for idx in range(n_workers) ]
# use of Barrier ensures that every worker gets one
b = mp.Barrier(n_workers)

with mp.Pool(processes=n_workers) as pool:
# map over every worker once to distribute RandomState instances
pool.map(assign_worker_rs, rss, chunksize=1)
# Perform computations on workers
r = pool.map(worker_compute, range(batches), chunksize=1)
with mp.Manager() as manager:
# use of Barrier ensures that every worker gets one
b = manager.Barrier(n_workers)

with mp.Pool(processes=n_workers) as pool:
# map over every worker once to distribute RandomState instances
pool.map(partial(init_worker, barrier=b), rss, chunksize=1)
# Perform computations on workers
r = pool.map(partial(worker_compute, batch_size=batch_size), range(batches), chunksize=1)

# retrieve values of estimates into numpy array
ps = np.fromiter(r, dtype=np.double)
Expand Down