Skip to content
This repository was archived by the owner on Feb 2, 2024. It is now read-only.

WIP: interface for map-reduce style kernels #284

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion hpat/datatypes/common_functions.py
Original file line number Diff line number Diff line change
@@ -32,7 +32,7 @@

import numpy

from numba import types
from numba import types, njit, prange
from numba.extending import overload
from numba import numpy_support

@@ -131,3 +131,36 @@ def _append_list_string_array_impl(A, B):
return new_data

return _append_list_string_array_impl


@njit
def _compute_map_chunks(l, n):
assert n > 0
a = len(l) // n
b = a + 1
c = len(l) % n
return [l[i * b: i * b + b] if i < c else l[c * b + (i - c) * a: c * b + (i - c) * a + a] for i in range(n)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is quite understandable code, isn't it? :-)
please don't call variables by single letter



@njit(parallel=True)
def map_reduce(arg, init_val, map_func, reduce_func):
res = init_val
for i in prange(len(arg)):
val = map_func(arg[i])
res = reduce_func(res, val)
return res


@njit(parallel=True)
def map_reduce_chunked(arg, init_val, map_func, reduce_func):
res = init_val
# TODO: proper cores/nodes count
chunks_count = 4
if 1 == chunks_count:
return map_func(arg)
else:
c = _compute_map_chunks(arg, chunks_count)
for i in prange(len(c)):
val = map_func(c[i])
res = reduce_func(res, val)
return res
46 changes: 43 additions & 3 deletions hpat/datatypes/hpat_pandas_series_functions.py
Original file line number Diff line number Diff line change
@@ -36,7 +36,7 @@

from numba.errors import TypingError
from numba.extending import overload, overload_method, overload_attribute
from numba import types
from numba import types, njit

import hpat
import hpat.datatypes.common_functions as common_functions
@@ -2898,7 +2898,6 @@ def hpat_pandas_series_median_impl(self, axis=None, skipna=True, level=None, num

return hpat_pandas_series_median_impl


@overload_method(SeriesType, 'argsort')
def hpat_pandas_series_argsort(self, axis=0, kind='quicksort', order=None):
"""
@@ -2980,6 +2979,40 @@ def hpat_pandas_series_argsort_noidx_impl(self, axis=0, kind='quicksort', order=
return hpat_pandas_series_argsort_noidx_impl


@njit
def _sort_map_func(list1):
return numpy.sort(list1)


@njit
def _sort_reduce_func(list1, list2):
# TODO: proper NaNs handling
size_1 = len(list1)
size_2 = len(list2)
res = numpy.empty(size_1 + size_2, list1.dtype)
i, j, k = 0, 0, 0
while i < size_1 and j < size_2:
if list1[i] < list2[j]:
res[k] = list1[i]
i += 1
else:
res[k] = list2[j]
j += 1
k += 1

while i < size_1:
res[k] = list1[i]
i += 1
k += 1

while j < size_2:
res[k] = list2[j]
j += 1
k += 1

return res


@overload_method(SeriesType, 'sort_values')
def hpat_pandas_series_sort_values(self, axis=0, ascending=True, inplace=False, kind='quicksort', na_position='last'):
"""
@@ -3062,7 +3095,14 @@ def hpat_pandas_series_sort_values_num_noidx_impl(self, axis=0, ascending=True,
na = self.isna().sum()
indices = numpy.arange(len(self._data))
index_result = numpy.argsort(self._data, kind='mergesort')
result = numpy.sort(self._data)

result = common_functions.map_reduce_chunked(
self._data,
numpy.empty(0, self._data.dtype),
_sort_map_func,
_sort_reduce_func)
# result = numpy.sort(self._data)

i = len(self._data) - na
index_result[i:] = index_result[i:][::-1]
if not ascending:
20 changes: 14 additions & 6 deletions hpat/distributed.py
Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@
import warnings
from collections import defaultdict
import numpy as np
import os

import numba
from numba import ir, types, typing, config, numpy_support, ir_utils, postproc
@@ -106,6 +107,7 @@
dist_analysis = None
fir_text = None

_distribution_depth = int(os.getenv('SDC_DISTRIBUTION_DEPTH', '1'))

@register_pass(mutates_CFG=True, analysis_only=False)
class DistributedPass(FunctionPass):
@@ -122,7 +124,6 @@ def __init__(self):
def run_pass(self, state):
return DistributedPassImpl(state).run_pass()


class DistributedPassImpl(object):
"""The summary of the class should be here for example below is the summary line for this class

@@ -169,7 +170,7 @@ def run_pass(self):

self._gen_dist_inits()
self.state.func_ir._definitions = build_definitions(self.state.func_ir.blocks)
self.state.func_ir.blocks = self._run_dist_pass(self.state.func_ir.blocks)
self.state.func_ir.blocks = self._run_dist_pass(self.state.func_ir.blocks, 0)
self.state.func_ir.blocks = self._dist_prints(self.state.func_ir.blocks)
remove_dead(self.state.func_ir.blocks, self.state.func_ir.arg_names, self.state.func_ir, self.state.typemap)
dprint_func_ir(self.state.func_ir, "after distributed pass")
@@ -195,7 +196,7 @@ def run_pass(self):

return True

def _run_dist_pass(self, blocks):
def _run_dist_pass(self, blocks, depth):
"""This function does something"""
topo_order = find_topo_order(blocks)
namevar_table = get_name_var_table(blocks)
@@ -212,11 +213,11 @@ def _run_dist_pass(self, blocks):
self.state.typemap, self.state.calltypes, self.state.typingctx,
self.state.targetctx, self)
elif isinstance(inst, Parfor):
out_nodes = self._run_parfor(inst, namevar_table)
out_nodes = self._run_parfor(inst, namevar_table, depth)
# run dist pass recursively
p_blocks = wrap_parfor_blocks(inst)
# build_definitions(p_blocks, self.state.func_ir._definitions)
self._run_dist_pass(p_blocks)
self._run_dist_pass(p_blocks, depth + 1)
unwrap_parfor_blocks(inst)
elif isinstance(inst, ir.Assign):
lhs = inst.target.name
@@ -1692,10 +1693,17 @@ def f(A, start, step):

return out

def _run_parfor(self, parfor, namevar_table):
def _run_parfor(self, parfor, namevar_table, depth):
# stencil_accesses, neighborhood = get_stencil_accesses(
# parfor, self.state.typemap)

global _distribution_depth
if depth >= _distribution_depth:
# Do not distribute
if depth == _distribution_depth:
parfor.no_sequential_lowering = True
return [parfor]

# Thread and 1D parfors turn to gufunc in multithread mode
if (hpat.multithread_mode
and self._dist_analysis.parfor_dists[parfor.id]