Skip to content
This repository was archived by the owner on Feb 2, 2024. It is now read-only.

[WIP]: Tweak Int64Index.reindex() performance #984

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions sdc/extensions/indexes/indexes_generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from sdc.hiframes.api import fix_df_index
from sdc.functions import numpy_like
from sdc.datatypes.common_functions import _sdc_internal_join
from sdc.extensions.sdc_hashmap_type import ConcurrentDict


def sdc_numeric_indexes_equals(left, right):
Expand Down Expand Up @@ -131,24 +132,20 @@ def pd_indexes_index_reindex_impl(self, target, method=None, level=None, limit=N
# build a dict of 'self' index values to their positions:
map_index_to_position = Dict.empty(
key_type=index_dtype,
value_type=types.int32
value_type=types.int64
)

# TO-DO: needs concurrent hash map
# TO-DO: apply ConcurrentDict after it's perf is optimized
for i, value in enumerate(self):
if value in map_index_to_position:
raise ValueError("cannot reindex from a duplicate axis")
else:
map_index_to_position[value] = i
map_index_to_position[value] = i

if len(map_index_to_position) < len(self):
raise ValueError("cannot reindex from a duplicate axis")

res_size = len(target)
indexer = np.empty(res_size, dtype=np.int64)
for i in numba.prange(res_size):
val = target[i]
if val in map_index_to_position:
indexer[i] = map_index_to_position[val]
else:
indexer[i] = -1
indexer[i] = map_index_to_position.get(target[i], -1)

return target, indexer

Expand Down
11 changes: 10 additions & 1 deletion sdc/extensions/indexes/int64_index_ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
from sdc.hiframes.api import fix_df_index
from sdc.extensions.indexes.indexes_generic import *
from sdc.datatypes.common_functions import hpat_arrays_append
from sdc.extensions.sdc_hashmap_ext import map_and_fill_indexer_int64


@intrinsic
Expand Down Expand Up @@ -455,8 +456,16 @@ def pd_int64_index_reindex_overload(self, target, method=None, level=None, limit
raise TypingError('{} Not allowed for non comparable indexes. \
Given: self={}, target={}'.format(_func_name, self, target))

# FIXME: handle case when target is not numpy array!
def pd_int64_index_reindex_impl(self, target, method=None, level=None, limit=None, tolerance=None):
return sdc_indexes_reindex(self, target=target, method=method, level=level, tolerance=tolerance)
# for Int64Index case index.data can be passed to native function that can built the map
# and fill the resulting indexer more efficiently than generic implementation
indexer = np.empty(len(target), dtype=np.int64)
ok = map_and_fill_indexer_int64(self.values, target, indexer)
if not ok:
raise ValueError("cannot reindex from a duplicate axis")

return target, indexer

return pd_int64_index_reindex_impl

Expand Down
64 changes: 63 additions & 1 deletion sdc/extensions/sdc_hashmap_ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from glob import glob
from llvmlite import ir as lir
from numba import types, cfunc
from numba.core import cgutils
from numba.core import cgutils, config
from numba.extending import (typeof_impl, type_callable, models, register_model, NativeValue,
lower_builtin, box, unbox, lower_getattr, intrinsic,
overload_method, overload, overload_attribute)
Expand Down Expand Up @@ -66,6 +66,19 @@
from numba.typed.dictobject import _cast


## FIXME: need to place this binding into separate module?
import ctypes as ct
def bind(sym, sig):
# Returns ctypes binding to symbol sym with signature sig
addr = getattr(hconc_dict, sym)
return ct.cast(addr, sig)

set_threads_count_sig = ct.CFUNCTYPE(None, ct.c_uint64)
set_threads_count_sym = bind('set_number_of_threads', set_threads_count_sig)

set_threads_count_sym(config.NUMBA_NUM_THREADS)


def gen_func_suffixes():
key_suffixes = ['int32_t', 'int64_t', 'voidptr']
val_suffixes = ['int32_t', 'int64_t', 'float', 'double', 'voidptr']
Expand Down Expand Up @@ -94,6 +107,8 @@ def load_native_func(fname, module, skip_check=None):
load_native_func('hashmap_getiter', hconc_dict)
load_native_func('hashmap_iternext', hconc_dict)

ll.add_symbol('native_map_and_fill_indexer_int64', hconc_dict.native_map_and_fill_indexer_int64)


supported_numeric_key_types = [
types.int32,
Expand Down Expand Up @@ -1123,3 +1138,50 @@ def impl_iterator_iternext(context, builder, sig, args, result):
else:
# unreachable
raise AssertionError('unknown type: {}'.format(iter_type.iterable))


@intrinsic
def map_and_fill_indexer_int64(typingctx, index_data_type, searched_type, res_type):

ret_type = types.bool_

def codegen(context, builder, sig, args):
data_val, searched_val, res_val = args

data_ctinfo = context.make_helper(builder, index_data_type, data_val)
searched_ctinfo = context.make_helper(builder, searched_type, searched_val)
res_ctinfo = context.make_helper(builder, res_type, res_val)
lir_key_type = context.get_value_type(types.int64)

data_size_val = context.compile_internal(
builder,
lambda arr: len(arr),
types.int64(index_data_type),
[data_val]
)

searched_size_val = context.compile_internal(
builder,
lambda arr: len(arr),
types.int64(searched_type),
[searched_val]
)

fnty = lir.FunctionType(lir.IntType(8),
[lir_key_type.as_pointer(),
lir_key_type.as_pointer(),
lir.IntType(64),
lir.IntType(64),
lir_key_type.as_pointer(),])
fn_hashmap_fill_indexer = builder.module.get_or_insert_function(
fnty, name=f"native_map_and_fill_indexer_int64")

res = builder.call(fn_hashmap_fill_indexer,
[data_ctinfo.data,
searched_ctinfo.data,
data_size_val,
searched_size_val,
res_ctinfo.data])
return context.cast(builder, res, types.uint8, types.bool_)

return ret_type(index_data_type, searched_type, res_type), codegen
83 changes: 83 additions & 0 deletions sdc/native/conc_dict_module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,30 @@
// *****************************************************************************

#include <Python.h>
#include <unordered_map>
#include "hashmap.hpp"

class TrivialInt64TBBHashCompare {
public:
static size_t hash(const int64_t& val) {
return (size_t)val;
}
static bool equal(const int64_t& k1, const int64_t& k2) {
return k1==k2;
}
};

struct TrivialInt64Hash {
public:
TrivialInt64Hash() = default;
TrivialInt64Hash(const TrivialInt64Hash&) = default;
~TrivialInt64Hash() = default;
size_t operator()(const int64_t& val) const {
return (size_t)val;
}
};

using namespace std;

#define declare_hashmap_create(key_type, val_type, suffix) \
void hashmap_create_##suffix(NRT_MemInfo** meminfo, \
Expand Down Expand Up @@ -210,6 +232,65 @@ declare_hashmap_create_from_data(int64_t, int64_t)
declare_hashmap_create_from_data(int64_t, float)
declare_hashmap_create_from_data(int64_t, double)

void set_number_of_threads(uint64_t threads)
{
utils::tbb_control::set_threads_num(threads);
}

uint8_t native_map_and_fill_indexer_int64(int64_t* data, int64_t* searched, int64_t dsize, int64_t ssize, int64_t* res)
{
#if SUPPORTED_TBB_VERSION
// FIXME: we need to store the allocated map somewhere and re-use it later
// here it's allocated on the heap (but not freed) to avoid calling dtor (like pandas does the map once built is cached)
auto ptr_my_map = new tbb::concurrent_hash_map<int64_t, int64_t, TrivialInt64TBBHashCompare>(2*dsize, TrivialInt64TBBHashCompare());
utils::tbb_control::get_arena().execute([&]() {
tbb::parallel_for(tbb::blocked_range<size_t>(0, dsize),
[&](const tbb::blocked_range<size_t>& r) {
for(size_t i=r.begin(); i!=r.end(); ++i) {
ptr_my_map->emplace(data[i], i);
}
}
);
});

if (ptr_my_map->size() < dsize)
return 0;

utils::tbb_control::get_arena().execute([&]() {
tbb::parallel_for(tbb::blocked_range<size_t>(0, ssize),
[&](const tbb::blocked_range<size_t>& r) {
for(size_t i=r.begin(); i!=r.end(); ++i) {
auto it_pair = ptr_my_map->equal_range(searched[i]);
if (it_pair.first != ptr_my_map->end()) {
res[i] = it_pair.first->second;
} else {
res[i] = -1;
}
}
}
);
});

return 1;
#else
auto ptr_my_map = new std::unordered_map<int64_t, int64_t, TrivialInt64Hash>(2*dsize, TrivialInt64Hash());
for(size_t i=0; i<dsize; ++i) {
ptr_my_map->emplace(data[i], i);
}

if (ptr_my_map->size() < dsize)
return 0;

for(size_t i=0; i<ssize; ++i) {
auto it = ptr_my_map->find(searched[i]);
res[i] = (it != ptr_my_map->end()) ? it->second : -1;
}

return 1;
#endif
}


PyMODINIT_FUNC PyInit_hconc_dict()
{
static struct PyModuleDef moduledef = {
Expand Down Expand Up @@ -257,6 +338,8 @@ PyMODINIT_FUNC PyInit_hconc_dict()
REGISTER(hashmap_create_from_data_int64_t_to_float)
REGISTER(hashmap_create_from_data_int64_t_to_double)

REGISTER(native_map_and_fill_indexer_int64)
REGISTER(set_number_of_threads)
utils::tbb_control::init();

return m;
Expand Down