diff --git a/hpat/datatypes/common_functions.py b/hpat/datatypes/common_functions.py index 3b0fb5fdc..b4aeb1dc6 100644 --- a/hpat/datatypes/common_functions.py +++ b/hpat/datatypes/common_functions.py @@ -32,7 +32,7 @@ import numpy -from numba import types +from numba import types, njit, prange from numba.extending import overload from numba import numpy_support @@ -131,3 +131,36 @@ def _append_list_string_array_impl(A, B): return new_data return _append_list_string_array_impl + + +@njit +def _compute_map_chunks(l, n): + assert n > 0 + a = len(l) // n + b = a + 1 + c = len(l) % n + return [l[i * b: i * b + b] if i < c else l[c * b + (i - c) * a: c * b + (i - c) * a + a] for i in range(n)] + + +@njit(parallel=True) +def map_reduce(arg, init_val, map_func, reduce_func): + res = init_val + for i in prange(len(arg)): + val = map_func(arg[i]) + res = reduce_func(res, val) + return res + + +@njit(parallel=True) +def map_reduce_chunked(arg, init_val, map_func, reduce_func): + res = init_val + # TODO: proper cores/nodes count + chunks_count = 4 + if 1 == chunks_count: + return map_func(arg) + else: + c = _compute_map_chunks(arg, chunks_count) + for i in prange(len(c)): + val = map_func(c[i]) + res = reduce_func(res, val) + return res diff --git a/hpat/datatypes/hpat_pandas_series_functions.py b/hpat/datatypes/hpat_pandas_series_functions.py index c77430913..da6bbf561 100644 --- a/hpat/datatypes/hpat_pandas_series_functions.py +++ b/hpat/datatypes/hpat_pandas_series_functions.py @@ -36,7 +36,7 @@ from numba.errors import TypingError from numba.extending import overload, overload_method, overload_attribute -from numba import types +from numba import types, njit import hpat import hpat.datatypes.common_functions as common_functions @@ -2898,7 +2898,6 @@ def hpat_pandas_series_median_impl(self, axis=None, skipna=True, level=None, num return hpat_pandas_series_median_impl - @overload_method(SeriesType, 'argsort') def hpat_pandas_series_argsort(self, axis=0, kind='quicksort', order=None): """ @@ -2980,6 +2979,40 @@ def hpat_pandas_series_argsort_noidx_impl(self, axis=0, kind='quicksort', order= return hpat_pandas_series_argsort_noidx_impl +@njit +def _sort_map_func(list1): + return numpy.sort(list1) + + +@njit +def _sort_reduce_func(list1, list2): + # TODO: proper NaNs handling + size_1 = len(list1) + size_2 = len(list2) + res = numpy.empty(size_1 + size_2, list1.dtype) + i, j, k = 0, 0, 0 + while i < size_1 and j < size_2: + if list1[i] < list2[j]: + res[k] = list1[i] + i += 1 + else: + res[k] = list2[j] + j += 1 + k += 1 + + while i < size_1: + res[k] = list1[i] + i += 1 + k += 1 + + while j < size_2: + res[k] = list2[j] + j += 1 + k += 1 + + return res + + @overload_method(SeriesType, 'sort_values') def hpat_pandas_series_sort_values(self, axis=0, ascending=True, inplace=False, kind='quicksort', na_position='last'): """ @@ -3062,7 +3095,14 @@ def hpat_pandas_series_sort_values_num_noidx_impl(self, axis=0, ascending=True, na = self.isna().sum() indices = numpy.arange(len(self._data)) index_result = numpy.argsort(self._data, kind='mergesort') - result = numpy.sort(self._data) + + result = common_functions.map_reduce_chunked( + self._data, + numpy.empty(0, self._data.dtype), + _sort_map_func, + _sort_reduce_func) + # result = numpy.sort(self._data) + i = len(self._data) - na index_result[i:] = index_result[i:][::-1] if not ascending: diff --git a/hpat/distributed.py b/hpat/distributed.py index fd008686e..0eeeb5452 100644 --- a/hpat/distributed.py +++ b/hpat/distributed.py @@ -38,6 +38,7 @@ import warnings from collections import defaultdict import numpy as np +import os import numba from numba import ir, types, typing, config, numpy_support, ir_utils, postproc @@ -106,6 +107,7 @@ dist_analysis = None fir_text = None +_distribution_depth = int(os.getenv('SDC_DISTRIBUTION_DEPTH', '1')) @register_pass(mutates_CFG=True, analysis_only=False) class DistributedPass(FunctionPass): @@ -122,7 +124,6 @@ def __init__(self): def run_pass(self, state): return DistributedPassImpl(state).run_pass() - class DistributedPassImpl(object): """The summary of the class should be here for example below is the summary line for this class @@ -169,7 +170,7 @@ def run_pass(self): self._gen_dist_inits() self.state.func_ir._definitions = build_definitions(self.state.func_ir.blocks) - self.state.func_ir.blocks = self._run_dist_pass(self.state.func_ir.blocks) + self.state.func_ir.blocks = self._run_dist_pass(self.state.func_ir.blocks, 0) self.state.func_ir.blocks = self._dist_prints(self.state.func_ir.blocks) remove_dead(self.state.func_ir.blocks, self.state.func_ir.arg_names, self.state.func_ir, self.state.typemap) dprint_func_ir(self.state.func_ir, "after distributed pass") @@ -195,7 +196,7 @@ def run_pass(self): return True - def _run_dist_pass(self, blocks): + def _run_dist_pass(self, blocks, depth): """This function does something""" topo_order = find_topo_order(blocks) namevar_table = get_name_var_table(blocks) @@ -212,11 +213,11 @@ def _run_dist_pass(self, blocks): self.state.typemap, self.state.calltypes, self.state.typingctx, self.state.targetctx, self) elif isinstance(inst, Parfor): - out_nodes = self._run_parfor(inst, namevar_table) + out_nodes = self._run_parfor(inst, namevar_table, depth) # run dist pass recursively p_blocks = wrap_parfor_blocks(inst) # build_definitions(p_blocks, self.state.func_ir._definitions) - self._run_dist_pass(p_blocks) + self._run_dist_pass(p_blocks, depth + 1) unwrap_parfor_blocks(inst) elif isinstance(inst, ir.Assign): lhs = inst.target.name @@ -1692,10 +1693,17 @@ def f(A, start, step): return out - def _run_parfor(self, parfor, namevar_table): + def _run_parfor(self, parfor, namevar_table, depth): # stencil_accesses, neighborhood = get_stencil_accesses( # parfor, self.state.typemap) + global _distribution_depth + if depth >= _distribution_depth: + # Do not distribute + if depth == _distribution_depth: + parfor.no_sequential_lowering = True + return [parfor] + # Thread and 1D parfors turn to gufunc in multithread mode if (hpat.multithread_mode and self._dist_analysis.parfor_dists[parfor.id]