Skip to content
This repository was archived by the owner on Feb 2, 2024. It is now read-only.
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
2
  • Loading branch information
Ivan Butygin committed Nov 11, 2019
commit 46af82ad07d1f7a67e009f2405f1ec8c0e09b6ca
135 changes: 17 additions & 118 deletions hpat/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@
import numba
from numba import ir, types, typing, config, numpy_support, ir_utils, postproc
from numba.ir_utils import (
next_label,
mk_range_block,
mk_loop_header,
rename_labels,
simplify,
mk_unique_var,
replace_vars_inner,
find_topo_order,
Expand All @@ -73,6 +68,7 @@
from numba.typing import signature
from numba.parfor import (
Parfor,
lower_parfor_sequential,
get_parfor_reductions,
get_parfor_params,
wrap_parfor_blocks,
Expand Down Expand Up @@ -160,8 +156,6 @@ def __init__(self, state):
# size for 1DVar allocs and parfors
self.oneDVar_len_vars = {}

self.parfors_to_lower = {}

self.state = state

def run_pass(self):
Expand All @@ -181,12 +175,12 @@ def run_pass(self):

self._gen_dist_inits()
self.state.func_ir._definitions = build_definitions(self.state.func_ir.blocks)
self.state.func_ir.blocks = self._run_dist_pass(self.state.func_ir.blocks)
self.state.func_ir.blocks = self._run_dist_pass(self.state.func_ir.blocks, 0)
self.state.func_ir.blocks = self._dist_prints(self.state.func_ir.blocks)
remove_dead(self.state.func_ir.blocks, self.state.func_ir.arg_names, self.state.func_ir, self.state.typemap)
dprint_func_ir(self.state.func_ir, "after distributed pass")
lower_parfor_sequential(
self.state.typingctx, self.state.func_ir, self.state.typemap, self.state.calltypes, self.parfors_to_lower)
self.state.typingctx, self.state.func_ir, self.state.typemap, self.state.calltypes)
if hpat.multithread_mode:
# parfor params need to be updated for multithread_mode since some
# new variables like alloc_start are introduced by distributed pass
Expand All @@ -207,7 +201,7 @@ def run_pass(self):

return True

def _run_dist_pass(self, blocks):
def _run_dist_pass(self, blocks, depth):
"""This function does something"""
topo_order = find_topo_order(blocks)
namevar_table = get_name_var_table(blocks)
Expand All @@ -225,13 +219,12 @@ def _run_dist_pass(self, blocks):
self.state.targetctx, self)
elif isinstance(inst, Parfor):
print('********** process parfor **********')
out_nodes = self._run_parfor(inst, namevar_table)
self.parfors_to_lower[inst] = True
out_nodes = self._run_parfor(inst, namevar_table, 0 == depth)
# run dist pass recursively
# p_blocks = wrap_parfor_blocks(inst)
# # build_definitions(p_blocks, self.state.func_ir._definitions)
# self._run_dist_pass(p_blocks)
# unwrap_parfor_blocks(inst)
p_blocks = wrap_parfor_blocks(inst)
# build_definitions(p_blocks, self.state.func_ir._definitions)
self._run_dist_pass(p_blocks, depth + 1)
unwrap_parfor_blocks(inst)
print('********** process parfor end **********')
elif isinstance(inst, ir.Assign):
lhs = inst.target.name
Expand Down Expand Up @@ -1707,10 +1700,14 @@ def f(A, start, step):

return out

def _run_parfor(self, parfor, namevar_table):
def _run_parfor(self, parfor, namevar_table, distribute):
# stencil_accesses, neighborhood = get_stencil_accesses(
# parfor, self.state.typemap)

if not distribute:
parfor.no_sequential_lowering = True
return [parfor]

# Thread and 1D parfors turn to gufunc in multithread mode
if (hpat.multithread_mode
and self._dist_analysis.parfor_dists[parfor.id]
Expand Down Expand Up @@ -2109,6 +2106,9 @@ def f(val, op): # pragma: no cover
replace_arg_nodes(block, [reduce_var, op_var])
dist_reduce_nodes = [op_assign] + block.body[:-3]
dist_reduce_nodes[-1].target = reduce_var
print('*****************************')
print('\n'.join([str(a) for a in dist_reduce_nodes]))
print('*****************************')
return dist_reduce_nodes

def _get_reduce_op(self, reduce_nodes):
Expand Down Expand Up @@ -2295,104 +2295,3 @@ def _set_getsetitem_index(node, new_ind):
def dprint(*s): # pragma: no cover
if debug_prints():
print(*s)


def lower_parfor_sequential(typingctx, func_ir, typemap, calltypes, whitelist):
ir_utils._max_label = max(ir_utils._max_label,
ir_utils.find_max_label(func_ir.blocks))
parfor_found = False
new_blocks = {}
for (block_label, block) in func_ir.blocks.items():
block_label, parfor_found = _lower_parfor_sequential_block(
block_label, block, new_blocks, typemap, calltypes, parfor_found, whitelist)
# old block stays either way
new_blocks[block_label] = block
func_ir.blocks = new_blocks
# rename only if parfor found and replaced (avoid test_flow_control error)
if parfor_found:
func_ir.blocks = rename_labels(func_ir.blocks)
dprint_func_ir(func_ir, "after parfor sequential lowering")
simplify(func_ir, typemap, calltypes)
dprint_func_ir(func_ir, "after parfor sequential simplify")
# add dels since simplify removes dels
post_proc = postproc.PostProcessor(func_ir)
post_proc.run()
return


def _lower_parfor_sequential_block(
block_label,
block,
new_blocks,
typemap,
calltypes,
parfor_found,
whitelist):
scope = block.scope
i = _find_first_parfor(block.body, whitelist)
while i != -1:
parfor_found = True
inst = block.body[i]
loc = inst.init_block.loc
# split block across parfor
prev_block = ir.Block(scope, loc)
prev_block.body = block.body[:i]
block.body = block.body[i + 1:]
# previous block jump to parfor init block
init_label = next_label()
prev_block.body.append(ir.Jump(init_label, loc))
new_blocks[init_label] = inst.init_block
new_blocks[block_label] = prev_block
block_label = next_label()

ndims = len(inst.loop_nests)
for i in range(ndims):
loopnest = inst.loop_nests[i]
# create range block for loop
range_label = next_label()
header_label = next_label()
range_block = mk_range_block(
typemap,
loopnest.start,
loopnest.stop,
loopnest.step,
calltypes,
scope,
loc)
range_block.body[-1].target = header_label # fix jump target
phi_var = range_block.body[-2].target
new_blocks[range_label] = range_block
header_block = mk_loop_header(typemap, phi_var, calltypes,
scope, loc)
header_block.body[-2].target = loopnest.index_variable
new_blocks[header_label] = header_block
# jump to this new inner loop
if i == 0:
inst.init_block.body.append(ir.Jump(range_label, loc))
header_block.body[-1].falsebr = block_label
else:
new_blocks[prev_header_label].body[-1].truebr = range_label
header_block.body[-1].falsebr = prev_header_label
prev_header_label = header_label # to set truebr next loop

# last body block jump to inner most header
body_last_label = max(inst.loop_body.keys())
inst.loop_body[body_last_label].body.append(
ir.Jump(header_label, loc))
# inner most header jumps to first body block
body_first_label = min(inst.loop_body.keys())
header_block.body[-1].truebr = body_first_label
# add parfor body to blocks
for (l, b) in inst.loop_body.items():
l, parfor_found = _lower_parfor_sequential_block(
l, b, new_blocks, typemap, calltypes, parfor_found, whitelist)
new_blocks[l] = b
i = _find_first_parfor(block.body, whitelist)
return block_label, parfor_found


def _find_first_parfor(body, whitelist):
for (i, inst) in enumerate(body):
if isinstance(inst, Parfor) and inst in whitelist:
return i
return -1