Skip to content
This repository was archived by the owner on Feb 2, 2024. It is now read-only.
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
1
  • Loading branch information
Ivan Butygin committed Nov 11, 2019
commit fd005ee6d5f137a16c58da480bc37a3bc695c33a
137 changes: 129 additions & 8 deletions hpat/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@
import numba
from numba import ir, types, typing, config, numpy_support, ir_utils, postproc
from numba.ir_utils import (
next_label,
mk_range_block,
mk_loop_header,
rename_labels,
simplify,
mk_unique_var,
replace_vars_inner,
find_topo_order,
Expand All @@ -68,7 +73,6 @@
from numba.typing import signature
from numba.parfor import (
Parfor,
lower_parfor_sequential,
get_parfor_reductions,
get_parfor_params,
wrap_parfor_blocks,
Expand Down Expand Up @@ -120,8 +124,14 @@ def __init__(self):
pass

def run_pass(self, state):
return DistributedPassImpl(state).run_pass()

print('XXXXXXXXXXXXXXXXXXXXXXXXXXX')
state.func_ir.dump()
print('YYYYYYYYYYYYYYYYYYYYYYYYYYY')
res = DistributedPassImpl(state).run_pass()
print('XXXXXXXXXXXXXXXXXXXXXXXXXXX')
state.func_ir.dump()
print('YYYYYYYYYYYYYYYYYYYYYYYYYYY')
return res

class DistributedPassImpl(object):
"""The summary of the class should be here for example below is the summary line for this class
Expand Down Expand Up @@ -150,6 +160,8 @@ def __init__(self, state):
# size for 1DVar allocs and parfors
self.oneDVar_len_vars = {}

self.parfors_to_lower = {}

self.state = state

def run_pass(self):
Expand All @@ -174,7 +186,7 @@ def run_pass(self):
remove_dead(self.state.func_ir.blocks, self.state.func_ir.arg_names, self.state.func_ir, self.state.typemap)
dprint_func_ir(self.state.func_ir, "after distributed pass")
lower_parfor_sequential(
self.state.typingctx, self.state.func_ir, self.state.typemap, self.state.calltypes)
self.state.typingctx, self.state.func_ir, self.state.typemap, self.state.calltypes, self.parfors_to_lower)
if hpat.multithread_mode:
# parfor params need to be updated for multithread_mode since some
# new variables like alloc_start are introduced by distributed pass
Expand Down Expand Up @@ -212,12 +224,15 @@ def _run_dist_pass(self, blocks):
self.state.typemap, self.state.calltypes, self.state.typingctx,
self.state.targetctx, self)
elif isinstance(inst, Parfor):
print('********** process parfor **********')
out_nodes = self._run_parfor(inst, namevar_table)
self.parfors_to_lower[inst] = True
# run dist pass recursively
p_blocks = wrap_parfor_blocks(inst)
# build_definitions(p_blocks, self.state.func_ir._definitions)
self._run_dist_pass(p_blocks)
unwrap_parfor_blocks(inst)
# p_blocks = wrap_parfor_blocks(inst)
# # build_definitions(p_blocks, self.state.func_ir._definitions)
# self._run_dist_pass(p_blocks)
# unwrap_parfor_blocks(inst)
print('********** process parfor end **********')
elif isinstance(inst, ir.Assign):
lhs = inst.target.name
rhs = inst.value
Expand Down Expand Up @@ -1884,7 +1899,12 @@ def _gen_parfor_reductions(self, parfor, namevar_table):
_, reductions = get_parfor_reductions(
parfor, parfor.params, self.state.calltypes)

print('aaaaaaaaaaaaaaaaaaa')
parfor.dump()
print('aaaaaaaaaaaaaaaaaaa')
for reduce_varname, (init_val, reduce_nodes) in reductions.items():
print(len(reduce_nodes))
print('\n'.join([str(a) for a in reduce_nodes]))
reduce_op = guard(self._get_reduce_op, reduce_nodes)
# TODO: initialize reduction vars (arrays)
reduce_var = namevar_table[reduce_varname]
Expand Down Expand Up @@ -2275,3 +2295,104 @@ def _set_getsetitem_index(node, new_ind):
def dprint(*s): # pragma: no cover
if debug_prints():
print(*s)


def lower_parfor_sequential(typingctx, func_ir, typemap, calltypes, whitelist):
ir_utils._max_label = max(ir_utils._max_label,
ir_utils.find_max_label(func_ir.blocks))
parfor_found = False
new_blocks = {}
for (block_label, block) in func_ir.blocks.items():
block_label, parfor_found = _lower_parfor_sequential_block(
block_label, block, new_blocks, typemap, calltypes, parfor_found, whitelist)
# old block stays either way
new_blocks[block_label] = block
func_ir.blocks = new_blocks
# rename only if parfor found and replaced (avoid test_flow_control error)
if parfor_found:
func_ir.blocks = rename_labels(func_ir.blocks)
dprint_func_ir(func_ir, "after parfor sequential lowering")
simplify(func_ir, typemap, calltypes)
dprint_func_ir(func_ir, "after parfor sequential simplify")
# add dels since simplify removes dels
post_proc = postproc.PostProcessor(func_ir)
post_proc.run()
return


def _lower_parfor_sequential_block(
block_label,
block,
new_blocks,
typemap,
calltypes,
parfor_found,
whitelist):
scope = block.scope
i = _find_first_parfor(block.body, whitelist)
while i != -1:
parfor_found = True
inst = block.body[i]
loc = inst.init_block.loc
# split block across parfor
prev_block = ir.Block(scope, loc)
prev_block.body = block.body[:i]
block.body = block.body[i + 1:]
# previous block jump to parfor init block
init_label = next_label()
prev_block.body.append(ir.Jump(init_label, loc))
new_blocks[init_label] = inst.init_block
new_blocks[block_label] = prev_block
block_label = next_label()

ndims = len(inst.loop_nests)
for i in range(ndims):
loopnest = inst.loop_nests[i]
# create range block for loop
range_label = next_label()
header_label = next_label()
range_block = mk_range_block(
typemap,
loopnest.start,
loopnest.stop,
loopnest.step,
calltypes,
scope,
loc)
range_block.body[-1].target = header_label # fix jump target
phi_var = range_block.body[-2].target
new_blocks[range_label] = range_block
header_block = mk_loop_header(typemap, phi_var, calltypes,
scope, loc)
header_block.body[-2].target = loopnest.index_variable
new_blocks[header_label] = header_block
# jump to this new inner loop
if i == 0:
inst.init_block.body.append(ir.Jump(range_label, loc))
header_block.body[-1].falsebr = block_label
else:
new_blocks[prev_header_label].body[-1].truebr = range_label
header_block.body[-1].falsebr = prev_header_label
prev_header_label = header_label # to set truebr next loop

# last body block jump to inner most header
body_last_label = max(inst.loop_body.keys())
inst.loop_body[body_last_label].body.append(
ir.Jump(header_label, loc))
# inner most header jumps to first body block
body_first_label = min(inst.loop_body.keys())
header_block.body[-1].truebr = body_first_label
# add parfor body to blocks
for (l, b) in inst.loop_body.items():
l, parfor_found = _lower_parfor_sequential_block(
l, b, new_blocks, typemap, calltypes, parfor_found, whitelist)
new_blocks[l] = b
i = _find_first_parfor(block.body, whitelist)
return block_label, parfor_found


def _find_first_parfor(body, whitelist):
for (i, inst) in enumerate(body):
if isinstance(inst, Parfor) and inst in whitelist:
return i
return -1