From fd005ee6d5f137a16c58da480bc37a3bc695c33a Mon Sep 17 00:00:00 2001 From: Ivan Butygin Date: Wed, 30 Oct 2019 15:30:07 +0300 Subject: [PATCH 01/11] 1 --- hpat/distributed.py | 137 +++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 129 insertions(+), 8 deletions(-) diff --git a/hpat/distributed.py b/hpat/distributed.py index fd008686e..6aa64c98d 100644 --- a/hpat/distributed.py +++ b/hpat/distributed.py @@ -42,6 +42,11 @@ import numba from numba import ir, types, typing, config, numpy_support, ir_utils, postproc from numba.ir_utils import ( + next_label, + mk_range_block, + mk_loop_header, + rename_labels, + simplify, mk_unique_var, replace_vars_inner, find_topo_order, @@ -68,7 +73,6 @@ from numba.typing import signature from numba.parfor import ( Parfor, - lower_parfor_sequential, get_parfor_reductions, get_parfor_params, wrap_parfor_blocks, @@ -120,8 +124,14 @@ def __init__(self): pass def run_pass(self, state): - return DistributedPassImpl(state).run_pass() - + print('XXXXXXXXXXXXXXXXXXXXXXXXXXX') + state.func_ir.dump() + print('YYYYYYYYYYYYYYYYYYYYYYYYYYY') + res = DistributedPassImpl(state).run_pass() + print('XXXXXXXXXXXXXXXXXXXXXXXXXXX') + state.func_ir.dump() + print('YYYYYYYYYYYYYYYYYYYYYYYYYYY') + return res class DistributedPassImpl(object): """The summary of the class should be here for example below is the summary line for this class @@ -150,6 +160,8 @@ def __init__(self, state): # size for 1DVar allocs and parfors self.oneDVar_len_vars = {} + self.parfors_to_lower = {} + self.state = state def run_pass(self): @@ -174,7 +186,7 @@ def run_pass(self): remove_dead(self.state.func_ir.blocks, self.state.func_ir.arg_names, self.state.func_ir, self.state.typemap) dprint_func_ir(self.state.func_ir, "after distributed pass") lower_parfor_sequential( - self.state.typingctx, self.state.func_ir, self.state.typemap, self.state.calltypes) + self.state.typingctx, self.state.func_ir, self.state.typemap, self.state.calltypes, self.parfors_to_lower) if hpat.multithread_mode: # parfor params need to be updated for multithread_mode since some # new variables like alloc_start are introduced by distributed pass @@ -212,12 +224,15 @@ def _run_dist_pass(self, blocks): self.state.typemap, self.state.calltypes, self.state.typingctx, self.state.targetctx, self) elif isinstance(inst, Parfor): + print('********** process parfor **********') out_nodes = self._run_parfor(inst, namevar_table) + self.parfors_to_lower[inst] = True # run dist pass recursively - p_blocks = wrap_parfor_blocks(inst) - # build_definitions(p_blocks, self.state.func_ir._definitions) - self._run_dist_pass(p_blocks) - unwrap_parfor_blocks(inst) + # p_blocks = wrap_parfor_blocks(inst) + # # build_definitions(p_blocks, self.state.func_ir._definitions) + # self._run_dist_pass(p_blocks) + # unwrap_parfor_blocks(inst) + print('********** process parfor end **********') elif isinstance(inst, ir.Assign): lhs = inst.target.name rhs = inst.value @@ -1884,7 +1899,12 @@ def _gen_parfor_reductions(self, parfor, namevar_table): _, reductions = get_parfor_reductions( parfor, parfor.params, self.state.calltypes) + print('aaaaaaaaaaaaaaaaaaa') + parfor.dump() + print('aaaaaaaaaaaaaaaaaaa') for reduce_varname, (init_val, reduce_nodes) in reductions.items(): + print(len(reduce_nodes)) + print('\n'.join([str(a) for a in reduce_nodes])) reduce_op = guard(self._get_reduce_op, reduce_nodes) # TODO: initialize reduction vars (arrays) reduce_var = namevar_table[reduce_varname] @@ -2275,3 +2295,104 @@ def _set_getsetitem_index(node, new_ind): def dprint(*s): # pragma: no cover if debug_prints(): print(*s) + + +def lower_parfor_sequential(typingctx, func_ir, typemap, calltypes, whitelist): + ir_utils._max_label = max(ir_utils._max_label, + ir_utils.find_max_label(func_ir.blocks)) + parfor_found = False + new_blocks = {} + for (block_label, block) in func_ir.blocks.items(): + block_label, parfor_found = _lower_parfor_sequential_block( + block_label, block, new_blocks, typemap, calltypes, parfor_found, whitelist) + # old block stays either way + new_blocks[block_label] = block + func_ir.blocks = new_blocks + # rename only if parfor found and replaced (avoid test_flow_control error) + if parfor_found: + func_ir.blocks = rename_labels(func_ir.blocks) + dprint_func_ir(func_ir, "after parfor sequential lowering") + simplify(func_ir, typemap, calltypes) + dprint_func_ir(func_ir, "after parfor sequential simplify") + # add dels since simplify removes dels + post_proc = postproc.PostProcessor(func_ir) + post_proc.run() + return + + +def _lower_parfor_sequential_block( + block_label, + block, + new_blocks, + typemap, + calltypes, + parfor_found, + whitelist): + scope = block.scope + i = _find_first_parfor(block.body, whitelist) + while i != -1: + parfor_found = True + inst = block.body[i] + loc = inst.init_block.loc + # split block across parfor + prev_block = ir.Block(scope, loc) + prev_block.body = block.body[:i] + block.body = block.body[i + 1:] + # previous block jump to parfor init block + init_label = next_label() + prev_block.body.append(ir.Jump(init_label, loc)) + new_blocks[init_label] = inst.init_block + new_blocks[block_label] = prev_block + block_label = next_label() + + ndims = len(inst.loop_nests) + for i in range(ndims): + loopnest = inst.loop_nests[i] + # create range block for loop + range_label = next_label() + header_label = next_label() + range_block = mk_range_block( + typemap, + loopnest.start, + loopnest.stop, + loopnest.step, + calltypes, + scope, + loc) + range_block.body[-1].target = header_label # fix jump target + phi_var = range_block.body[-2].target + new_blocks[range_label] = range_block + header_block = mk_loop_header(typemap, phi_var, calltypes, + scope, loc) + header_block.body[-2].target = loopnest.index_variable + new_blocks[header_label] = header_block + # jump to this new inner loop + if i == 0: + inst.init_block.body.append(ir.Jump(range_label, loc)) + header_block.body[-1].falsebr = block_label + else: + new_blocks[prev_header_label].body[-1].truebr = range_label + header_block.body[-1].falsebr = prev_header_label + prev_header_label = header_label # to set truebr next loop + + # last body block jump to inner most header + body_last_label = max(inst.loop_body.keys()) + inst.loop_body[body_last_label].body.append( + ir.Jump(header_label, loc)) + # inner most header jumps to first body block + body_first_label = min(inst.loop_body.keys()) + header_block.body[-1].truebr = body_first_label + # add parfor body to blocks + for (l, b) in inst.loop_body.items(): + l, parfor_found = _lower_parfor_sequential_block( + l, b, new_blocks, typemap, calltypes, parfor_found, whitelist) + new_blocks[l] = b + i = _find_first_parfor(block.body, whitelist) + return block_label, parfor_found + + +def _find_first_parfor(body, whitelist): + for (i, inst) in enumerate(body): + if isinstance(inst, Parfor) and inst in whitelist: + return i + return -1 From 46af82ad07d1f7a67e009f2405f1ec8c0e09b6ca Mon Sep 17 00:00:00 2001 From: Ivan Butygin Date: Wed, 30 Oct 2019 17:07:27 +0300 Subject: [PATCH 02/11] 2 --- hpat/distributed.py | 135 ++++++-------------------------------------- 1 file changed, 17 insertions(+), 118 deletions(-) diff --git a/hpat/distributed.py b/hpat/distributed.py index 6aa64c98d..e9bf626e9 100644 --- a/hpat/distributed.py +++ b/hpat/distributed.py @@ -42,11 +42,6 @@ import numba from numba import ir, types, typing, config, numpy_support, ir_utils, postproc from numba.ir_utils import ( - next_label, - mk_range_block, - mk_loop_header, - rename_labels, - simplify, mk_unique_var, replace_vars_inner, find_topo_order, @@ -73,6 +68,7 @@ from numba.typing import signature from numba.parfor import ( Parfor, + lower_parfor_sequential, get_parfor_reductions, get_parfor_params, wrap_parfor_blocks, @@ -160,8 +156,6 @@ def __init__(self, state): # size for 1DVar allocs and parfors self.oneDVar_len_vars = {} - self.parfors_to_lower = {} - self.state = state def run_pass(self): @@ -181,12 +175,12 @@ def run_pass(self): self._gen_dist_inits() self.state.func_ir._definitions = build_definitions(self.state.func_ir.blocks) - self.state.func_ir.blocks = self._run_dist_pass(self.state.func_ir.blocks) + self.state.func_ir.blocks = self._run_dist_pass(self.state.func_ir.blocks, 0) self.state.func_ir.blocks = self._dist_prints(self.state.func_ir.blocks) remove_dead(self.state.func_ir.blocks, self.state.func_ir.arg_names, self.state.func_ir, self.state.typemap) dprint_func_ir(self.state.func_ir, "after distributed pass") lower_parfor_sequential( - self.state.typingctx, self.state.func_ir, self.state.typemap, self.state.calltypes, self.parfors_to_lower) + self.state.typingctx, self.state.func_ir, self.state.typemap, self.state.calltypes) if hpat.multithread_mode: # parfor params need to be updated for multithread_mode since some # new variables like alloc_start are introduced by distributed pass @@ -207,7 +201,7 @@ def run_pass(self): return True - def _run_dist_pass(self, blocks): + def _run_dist_pass(self, blocks, depth): """This function does something""" topo_order = find_topo_order(blocks) namevar_table = get_name_var_table(blocks) @@ -225,13 +219,12 @@ def _run_dist_pass(self, blocks): self.state.targetctx, self) elif isinstance(inst, Parfor): print('********** process parfor **********') - out_nodes = self._run_parfor(inst, namevar_table) - self.parfors_to_lower[inst] = True + out_nodes = self._run_parfor(inst, namevar_table, 0 == depth) # run dist pass recursively - # p_blocks = wrap_parfor_blocks(inst) - # # build_definitions(p_blocks, self.state.func_ir._definitions) - # self._run_dist_pass(p_blocks) - # unwrap_parfor_blocks(inst) + p_blocks = wrap_parfor_blocks(inst) + # build_definitions(p_blocks, self.state.func_ir._definitions) + self._run_dist_pass(p_blocks, depth + 1) + unwrap_parfor_blocks(inst) print('********** process parfor end **********') elif isinstance(inst, ir.Assign): lhs = inst.target.name @@ -1707,10 +1700,14 @@ def f(A, start, step): return out - def _run_parfor(self, parfor, namevar_table): + def _run_parfor(self, parfor, namevar_table, distribute): # stencil_accesses, neighborhood = get_stencil_accesses( # parfor, self.state.typemap) + if not distribute: + parfor.no_sequential_lowering = True + return [parfor] + # Thread and 1D parfors turn to gufunc in multithread mode if (hpat.multithread_mode and self._dist_analysis.parfor_dists[parfor.id] @@ -2109,6 +2106,9 @@ def f(val, op): # pragma: no cover replace_arg_nodes(block, [reduce_var, op_var]) dist_reduce_nodes = [op_assign] + block.body[:-3] dist_reduce_nodes[-1].target = reduce_var + print('*****************************') + print('\n'.join([str(a) for a in dist_reduce_nodes])) + print('*****************************') return dist_reduce_nodes def _get_reduce_op(self, reduce_nodes): @@ -2295,104 +2295,3 @@ def _set_getsetitem_index(node, new_ind): def dprint(*s): # pragma: no cover if debug_prints(): print(*s) - - -def lower_parfor_sequential(typingctx, func_ir, typemap, calltypes, whitelist): - ir_utils._max_label = max(ir_utils._max_label, - ir_utils.find_max_label(func_ir.blocks)) - parfor_found = False - new_blocks = {} - for (block_label, block) in func_ir.blocks.items(): - block_label, parfor_found = _lower_parfor_sequential_block( - block_label, block, new_blocks, typemap, calltypes, parfor_found, whitelist) - # old block stays either way - new_blocks[block_label] = block - func_ir.blocks = new_blocks - # rename only if parfor found and replaced (avoid test_flow_control error) - if parfor_found: - func_ir.blocks = rename_labels(func_ir.blocks) - dprint_func_ir(func_ir, "after parfor sequential lowering") - simplify(func_ir, typemap, calltypes) - dprint_func_ir(func_ir, "after parfor sequential simplify") - # add dels since simplify removes dels - post_proc = postproc.PostProcessor(func_ir) - post_proc.run() - return - - -def _lower_parfor_sequential_block( - block_label, - block, - new_blocks, - typemap, - calltypes, - parfor_found, - whitelist): - scope = block.scope - i = _find_first_parfor(block.body, whitelist) - while i != -1: - parfor_found = True - inst = block.body[i] - loc = inst.init_block.loc - # split block across parfor - prev_block = ir.Block(scope, loc) - prev_block.body = block.body[:i] - block.body = block.body[i + 1:] - # previous block jump to parfor init block - init_label = next_label() - prev_block.body.append(ir.Jump(init_label, loc)) - new_blocks[init_label] = inst.init_block - new_blocks[block_label] = prev_block - block_label = next_label() - - ndims = len(inst.loop_nests) - for i in range(ndims): - loopnest = inst.loop_nests[i] - # create range block for loop - range_label = next_label() - header_label = next_label() - range_block = mk_range_block( - typemap, - loopnest.start, - loopnest.stop, - loopnest.step, - calltypes, - scope, - loc) - range_block.body[-1].target = header_label # fix jump target - phi_var = range_block.body[-2].target - new_blocks[range_label] = range_block - header_block = mk_loop_header(typemap, phi_var, calltypes, - scope, loc) - header_block.body[-2].target = loopnest.index_variable - new_blocks[header_label] = header_block - # jump to this new inner loop - if i == 0: - inst.init_block.body.append(ir.Jump(range_label, loc)) - header_block.body[-1].falsebr = block_label - else: - new_blocks[prev_header_label].body[-1].truebr = range_label - header_block.body[-1].falsebr = prev_header_label - prev_header_label = header_label # to set truebr next loop - - # last body block jump to inner most header - body_last_label = max(inst.loop_body.keys()) - inst.loop_body[body_last_label].body.append( - ir.Jump(header_label, loc)) - # inner most header jumps to first body block - body_first_label = min(inst.loop_body.keys()) - header_block.body[-1].truebr = body_first_label - # add parfor body to blocks - for (l, b) in inst.loop_body.items(): - l, parfor_found = _lower_parfor_sequential_block( - l, b, new_blocks, typemap, calltypes, parfor_found, whitelist) - new_blocks[l] = b - i = _find_first_parfor(block.body, whitelist) - return block_label, parfor_found - - -def _find_first_parfor(body, whitelist): - for (i, inst) in enumerate(body): - if isinstance(inst, Parfor) and inst in whitelist: - return i - return -1 From 0ccc2f928b9877d5ddea8430c5c9b800b61a437a Mon Sep 17 00:00:00 2001 From: Ivan Butygin Date: Thu, 7 Nov 2019 10:37:18 +0300 Subject: [PATCH 03/11] some work --- hpat/distributed.py | 45 ++++++++++++++++++++++++++------------------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/hpat/distributed.py b/hpat/distributed.py index e9bf626e9..4857bc287 100644 --- a/hpat/distributed.py +++ b/hpat/distributed.py @@ -120,13 +120,13 @@ def __init__(self): pass def run_pass(self, state): - print('XXXXXXXXXXXXXXXXXXXXXXXXXXX') - state.func_ir.dump() - print('YYYYYYYYYYYYYYYYYYYYYYYYYYY') + # print('XXXXXXXXXXXXXXXXXXXXXXXXXXX') + # state.func_ir.dump() + # print('YYYYYYYYYYYYYYYYYYYYYYYYYYY') res = DistributedPassImpl(state).run_pass() - print('XXXXXXXXXXXXXXXXXXXXXXXXXXX') - state.func_ir.dump() - print('YYYYYYYYYYYYYYYYYYYYYYYYYYY') + # print('XXXXXXXXXXXXXXXXXXXXXXXXXXX') + # state.func_ir.dump() + # print('YYYYYYYYYYYYYYYYYYYYYYYYYYY') return res class DistributedPassImpl(object): @@ -219,7 +219,7 @@ def _run_dist_pass(self, blocks, depth): self.state.targetctx, self) elif isinstance(inst, Parfor): print('********** process parfor **********') - out_nodes = self._run_parfor(inst, namevar_table, 0 == depth) + out_nodes = self._run_parfor(inst, namevar_table, depth) # run dist pass recursively p_blocks = wrap_parfor_blocks(inst) # build_definitions(p_blocks, self.state.func_ir._definitions) @@ -286,6 +286,9 @@ def _run_dist_pass(self, blocks, depth): if not replaced: blocks[label].body = new_body + # print('******************** new_body') + # print('\n'.join([str(a) for a in new_body])) + # print('********************') return blocks @@ -1700,12 +1703,16 @@ def f(A, start, step): return out - def _run_parfor(self, parfor, namevar_table, distribute): + def _run_parfor(self, parfor, namevar_table, depth): # stencil_accesses, neighborhood = get_stencil_accesses( # parfor, self.state.typemap) - if not distribute: - parfor.no_sequential_lowering = True + dist_depth = 0 + + if depth > dist_depth: + # Do not distribute + if depth == (dist_depth + 1): + parfor.no_sequential_lowering = True return [parfor] # Thread and 1D parfors turn to gufunc in multithread mode @@ -1718,7 +1725,7 @@ def _run_parfor(self, parfor, namevar_table, distribute): return self._run_parfor_1D_Var(parfor, namevar_table) if self._dist_analysis.parfor_dists[parfor.id] != Distribution.OneD: - if debug_prints(): # pragma: no cover + if True or debug_prints(): # pragma: no cover print("parfor " + str(parfor.id) + " not parallelized.") return [parfor] @@ -1896,12 +1903,12 @@ def _gen_parfor_reductions(self, parfor, namevar_table): _, reductions = get_parfor_reductions( parfor, parfor.params, self.state.calltypes) - print('aaaaaaaaaaaaaaaaaaa') - parfor.dump() - print('aaaaaaaaaaaaaaaaaaa') + # print('aaaaaaaaaaaaaaaaaaa') + # parfor.dump() + # print('aaaaaaaaaaaaaaaaaaa') for reduce_varname, (init_val, reduce_nodes) in reductions.items(): - print(len(reduce_nodes)) - print('\n'.join([str(a) for a in reduce_nodes])) + # print(len(reduce_nodes)) + # print('\n'.join([str(a) for a in reduce_nodes])) reduce_op = guard(self._get_reduce_op, reduce_nodes) # TODO: initialize reduction vars (arrays) reduce_var = namevar_table[reduce_varname] @@ -2106,9 +2113,9 @@ def f(val, op): # pragma: no cover replace_arg_nodes(block, [reduce_var, op_var]) dist_reduce_nodes = [op_assign] + block.body[:-3] dist_reduce_nodes[-1].target = reduce_var - print('*****************************') - print('\n'.join([str(a) for a in dist_reduce_nodes])) - print('*****************************') + # print('*****************************') + # print('\n'.join([str(a) for a in dist_reduce_nodes])) + # print('*****************************') return dist_reduce_nodes def _get_reduce_op(self, reduce_nodes): From 07b630fbbca01e0ab927af8f80895bcc0c6ff869 Mon Sep 17 00:00:00 2001 From: Ivan Butygin Date: Mon, 11 Nov 2019 12:49:38 +0300 Subject: [PATCH 04/11] fix --- hpat/distributed.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hpat/distributed.py b/hpat/distributed.py index 4857bc287..2bd36c0a6 100644 --- a/hpat/distributed.py +++ b/hpat/distributed.py @@ -1709,9 +1709,9 @@ def _run_parfor(self, parfor, namevar_table, depth): dist_depth = 0 - if depth > dist_depth: + if depth >= dist_depth: # Do not distribute - if depth == (dist_depth + 1): + if depth == dist_depth: parfor.no_sequential_lowering = True return [parfor] From 7aa49a53a41ac7e91ffc1ab4dcd233297420c21d Mon Sep 17 00:00:00 2001 From: Ivan Butygin Date: Tue, 12 Nov 2019 12:24:27 +0300 Subject: [PATCH 05/11] parallel sort example --- hpat/datatypes/common_functions.py | 32 ++++++++++++++- .../datatypes/hpat_pandas_series_functions.py | 39 +++++++++++++++++-- 2 files changed, 67 insertions(+), 4 deletions(-) diff --git a/hpat/datatypes/common_functions.py b/hpat/datatypes/common_functions.py index 3b0fb5fdc..bf6f676fc 100644 --- a/hpat/datatypes/common_functions.py +++ b/hpat/datatypes/common_functions.py @@ -32,7 +32,7 @@ import numpy -from numba import types +from numba import types, njit, prange from numba.extending import overload from numba import numpy_support @@ -131,3 +131,33 @@ def _append_list_string_array_impl(A, B): return new_data return _append_list_string_array_impl + +@njit +def _compute_map_chunks(l, n): + assert n > 0 + a = len(l) // n + b = a + 1 + c = len(l) % n + return [l[i * b : i * b + b] if i < c else l[c * b + (i - c) * a : c * b + (i - c) * a + a] for i in range(n)] + +@njit(parallel=True) +def map_reduce(arg, init_val, map_func, reduce_func): + res = init_val + for i in prange(len(arg)): + val = map_func(arg[i]) + res = reduce_func(res, val) + return res + +@njit(parallel=True) +def map_reduce_chunked(arg, init_val, map_func, reduce_func): + res = init_val + # TODO: proper cores/nodes count + chunks_count = 4 + if 1 == chunks_count: + return map_func(arg) + else: + c = _compute_map_chunks(arg, chunks_count) + for i in range(len(c)): + val = map_func(c[i]) + res = reduce_func(res, val) + return res diff --git a/hpat/datatypes/hpat_pandas_series_functions.py b/hpat/datatypes/hpat_pandas_series_functions.py index c77430913..22ebf6717 100644 --- a/hpat/datatypes/hpat_pandas_series_functions.py +++ b/hpat/datatypes/hpat_pandas_series_functions.py @@ -36,7 +36,7 @@ from numba.errors import TypingError from numba.extending import overload, overload_method, overload_attribute -from numba import types +from numba import types, njit import hpat import hpat.datatypes.common_functions as common_functions @@ -2898,7 +2898,6 @@ def hpat_pandas_series_median_impl(self, axis=None, skipna=True, level=None, num return hpat_pandas_series_median_impl - @overload_method(SeriesType, 'argsort') def hpat_pandas_series_argsort(self, axis=0, kind='quicksort', order=None): """ @@ -2979,6 +2978,37 @@ def hpat_pandas_series_argsort_noidx_impl(self, axis=0, kind='quicksort', order= return hpat_pandas_series_argsort_noidx_impl +@njit +def _sort_map_func(list1): + return numpy.sort(list1) + +@njit +def _sort_reduce_func(list1, list2): + # TODO: proper NaNs handling + size_1 = len(list1) + size_2 = len(list2) + res = numpy.empty(size_1 + size_2, list1.dtype) + i, j, k = 0, 0, 0 + while i < size_1 and j < size_2: + if list1[i] < list2[j]: + res[k] = list1[i] + i += 1 + else: + res[k] = list2[j] + j += 1 + k += 1 + + while i < size_1: + res[k] = list1[i] + i += 1 + k += 1 + + while j < size_2: + res[k] = list2[j] + j += 1 + k += 1 + + return res @overload_method(SeriesType, 'sort_values') def hpat_pandas_series_sort_values(self, axis=0, ascending=True, inplace=False, kind='quicksort', na_position='last'): @@ -3062,7 +3092,10 @@ def hpat_pandas_series_sort_values_num_noidx_impl(self, axis=0, ascending=True, na = self.isna().sum() indices = numpy.arange(len(self._data)) index_result = numpy.argsort(self._data, kind='mergesort') - result = numpy.sort(self._data) + + result = common_functions.map_reduce_chunked(self._data, numpy.empty(0, self._data.dtype), _sort_map_func, _sort_reduce_func) + # result = numpy.sort(self._data) + i = len(self._data) - na index_result[i:] = index_result[i:][::-1] if not ascending: From 995955cc63551bec476c381bcc16ae3cfdc78643 Mon Sep 17 00:00:00 2001 From: Ivan Butygin Date: Tue, 12 Nov 2019 12:28:14 +0300 Subject: [PATCH 06/11] distribution depth --- hpat/distributed.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/hpat/distributed.py b/hpat/distributed.py index 2bd36c0a6..6a4c4857d 100644 --- a/hpat/distributed.py +++ b/hpat/distributed.py @@ -106,6 +106,7 @@ dist_analysis = None fir_text = None +_distribution_depth = int(os.getenv('SDC_DISTRIBUTION_DEPTH', '1')) @register_pass(mutates_CFG=True, analysis_only=False) class DistributedPass(FunctionPass): @@ -1707,11 +1708,10 @@ def _run_parfor(self, parfor, namevar_table, depth): # stencil_accesses, neighborhood = get_stencil_accesses( # parfor, self.state.typemap) - dist_depth = 0 - - if depth >= dist_depth: + global _distribution_depth + if depth >= _distribution_depth: # Do not distribute - if depth == dist_depth: + if depth == _distribution_depth: parfor.no_sequential_lowering = True return [parfor] From 995ff3100c8a377fe89a5b06c3bd011f17cd7a63 Mon Sep 17 00:00:00 2001 From: Ivan Butygin Date: Tue, 12 Nov 2019 12:32:30 +0300 Subject: [PATCH 07/11] remove commented code --- hpat/distributed.py | 22 ++-------------------- 1 file changed, 2 insertions(+), 20 deletions(-) diff --git a/hpat/distributed.py b/hpat/distributed.py index 6a4c4857d..fe4a5921f 100644 --- a/hpat/distributed.py +++ b/hpat/distributed.py @@ -121,14 +121,7 @@ def __init__(self): pass def run_pass(self, state): - # print('XXXXXXXXXXXXXXXXXXXXXXXXXXX') - # state.func_ir.dump() - # print('YYYYYYYYYYYYYYYYYYYYYYYYYYY') - res = DistributedPassImpl(state).run_pass() - # print('XXXXXXXXXXXXXXXXXXXXXXXXXXX') - # state.func_ir.dump() - # print('YYYYYYYYYYYYYYYYYYYYYYYYYYY') - return res + return DistributedPassImpl(state).run_pass() class DistributedPassImpl(object): """The summary of the class should be here for example below is the summary line for this class @@ -219,14 +212,12 @@ def _run_dist_pass(self, blocks, depth): self.state.typemap, self.state.calltypes, self.state.typingctx, self.state.targetctx, self) elif isinstance(inst, Parfor): - print('********** process parfor **********') out_nodes = self._run_parfor(inst, namevar_table, depth) # run dist pass recursively p_blocks = wrap_parfor_blocks(inst) # build_definitions(p_blocks, self.state.func_ir._definitions) self._run_dist_pass(p_blocks, depth + 1) unwrap_parfor_blocks(inst) - print('********** process parfor end **********') elif isinstance(inst, ir.Assign): lhs = inst.target.name rhs = inst.value @@ -287,9 +278,6 @@ def _run_dist_pass(self, blocks, depth): if not replaced: blocks[label].body = new_body - # print('******************** new_body') - # print('\n'.join([str(a) for a in new_body])) - # print('********************') return blocks @@ -1725,7 +1713,7 @@ def _run_parfor(self, parfor, namevar_table, depth): return self._run_parfor_1D_Var(parfor, namevar_table) if self._dist_analysis.parfor_dists[parfor.id] != Distribution.OneD: - if True or debug_prints(): # pragma: no cover + if debug_prints(): # pragma: no cover print("parfor " + str(parfor.id) + " not parallelized.") return [parfor] @@ -1903,9 +1891,6 @@ def _gen_parfor_reductions(self, parfor, namevar_table): _, reductions = get_parfor_reductions( parfor, parfor.params, self.state.calltypes) - # print('aaaaaaaaaaaaaaaaaaa') - # parfor.dump() - # print('aaaaaaaaaaaaaaaaaaa') for reduce_varname, (init_val, reduce_nodes) in reductions.items(): # print(len(reduce_nodes)) # print('\n'.join([str(a) for a in reduce_nodes])) @@ -2113,9 +2098,6 @@ def f(val, op): # pragma: no cover replace_arg_nodes(block, [reduce_var, op_var]) dist_reduce_nodes = [op_assign] + block.body[:-3] dist_reduce_nodes[-1].target = reduce_var - # print('*****************************') - # print('\n'.join([str(a) for a in dist_reduce_nodes])) - # print('*****************************') return dist_reduce_nodes def _get_reduce_op(self, reduce_nodes): From 34ca22d9a828262698f09be5d16dc41f563c2a88 Mon Sep 17 00:00:00 2001 From: Ivan Butygin Date: Tue, 12 Nov 2019 12:33:32 +0300 Subject: [PATCH 08/11] reduce commented code --- hpat/distributed.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/hpat/distributed.py b/hpat/distributed.py index fe4a5921f..a149c2827 100644 --- a/hpat/distributed.py +++ b/hpat/distributed.py @@ -1892,8 +1892,6 @@ def _gen_parfor_reductions(self, parfor, namevar_table): parfor, parfor.params, self.state.calltypes) for reduce_varname, (init_val, reduce_nodes) in reductions.items(): - # print(len(reduce_nodes)) - # print('\n'.join([str(a) for a in reduce_nodes])) reduce_op = guard(self._get_reduce_op, reduce_nodes) # TODO: initialize reduction vars (arrays) reduce_var = namevar_table[reduce_varname] From 14ebb3d268ba53b42faf38c3dac9712961cc0b03 Mon Sep 17 00:00:00 2001 From: Ivan Butygin Date: Tue, 12 Nov 2019 13:43:03 +0300 Subject: [PATCH 09/11] style --- hpat/datatypes/common_functions.py | 5 ++++- hpat/datatypes/hpat_pandas_series_functions.py | 9 ++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/hpat/datatypes/common_functions.py b/hpat/datatypes/common_functions.py index bf6f676fc..92f9aea84 100644 --- a/hpat/datatypes/common_functions.py +++ b/hpat/datatypes/common_functions.py @@ -132,13 +132,15 @@ def _append_list_string_array_impl(A, B): return _append_list_string_array_impl + @njit def _compute_map_chunks(l, n): assert n > 0 a = len(l) // n b = a + 1 c = len(l) % n - return [l[i * b : i * b + b] if i < c else l[c * b + (i - c) * a : c * b + (i - c) * a + a] for i in range(n)] + return [l[i * b: i * b + b] if i < c else l[c * b + (i - c) * a: c * b + (i - c) * a + a] for i in range(n)] + @njit(parallel=True) def map_reduce(arg, init_val, map_func, reduce_func): @@ -148,6 +150,7 @@ def map_reduce(arg, init_val, map_func, reduce_func): res = reduce_func(res, val) return res + @njit(parallel=True) def map_reduce_chunked(arg, init_val, map_func, reduce_func): res = init_val diff --git a/hpat/datatypes/hpat_pandas_series_functions.py b/hpat/datatypes/hpat_pandas_series_functions.py index 22ebf6717..da6bbf561 100644 --- a/hpat/datatypes/hpat_pandas_series_functions.py +++ b/hpat/datatypes/hpat_pandas_series_functions.py @@ -2978,10 +2978,12 @@ def hpat_pandas_series_argsort_noidx_impl(self, axis=0, kind='quicksort', order= return hpat_pandas_series_argsort_noidx_impl + @njit def _sort_map_func(list1): return numpy.sort(list1) + @njit def _sort_reduce_func(list1, list2): # TODO: proper NaNs handling @@ -3010,6 +3012,7 @@ def _sort_reduce_func(list1, list2): return res + @overload_method(SeriesType, 'sort_values') def hpat_pandas_series_sort_values(self, axis=0, ascending=True, inplace=False, kind='quicksort', na_position='last'): """ @@ -3093,7 +3096,11 @@ def hpat_pandas_series_sort_values_num_noidx_impl(self, axis=0, ascending=True, indices = numpy.arange(len(self._data)) index_result = numpy.argsort(self._data, kind='mergesort') - result = common_functions.map_reduce_chunked(self._data, numpy.empty(0, self._data.dtype), _sort_map_func, _sort_reduce_func) + result = common_functions.map_reduce_chunked( + self._data, + numpy.empty(0, self._data.dtype), + _sort_map_func, + _sort_reduce_func) # result = numpy.sort(self._data) i = len(self._data) - na From a711ac329dfeada2f0e7e5d0aa23b39c2d67e098 Mon Sep 17 00:00:00 2001 From: Ivan Butygin Date: Tue, 12 Nov 2019 13:43:41 +0300 Subject: [PATCH 10/11] fix --- hpat/distributed.py | 1 + 1 file changed, 1 insertion(+) diff --git a/hpat/distributed.py b/hpat/distributed.py index a149c2827..0eeeb5452 100644 --- a/hpat/distributed.py +++ b/hpat/distributed.py @@ -38,6 +38,7 @@ import warnings from collections import defaultdict import numpy as np +import os import numba from numba import ir, types, typing, config, numpy_support, ir_utils, postproc From 1c91eac3a5e99c91297aed95d7d07e27ad89a2b3 Mon Sep 17 00:00:00 2001 From: Ivan Butygin Date: Tue, 12 Nov 2019 13:58:29 +0300 Subject: [PATCH 11/11] prange --- hpat/datatypes/common_functions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hpat/datatypes/common_functions.py b/hpat/datatypes/common_functions.py index 92f9aea84..b4aeb1dc6 100644 --- a/hpat/datatypes/common_functions.py +++ b/hpat/datatypes/common_functions.py @@ -160,7 +160,7 @@ def map_reduce_chunked(arg, init_val, map_func, reduce_func): return map_func(arg) else: c = _compute_map_chunks(arg, chunks_count) - for i in range(len(c)): + for i in prange(len(c)): val = map_func(c[i]) res = reduce_func(res, val) return res