From 4ac60454dc53235a8f2410e1d678b5228b7e5e78 Mon Sep 17 00:00:00 2001
From: "Kozlov, Alexey" <alexey.kozlov@intel.com>
Date: Tue, 17 Aug 2021 21:55:13 +0300
Subject: [PATCH 1/2] Adds separate native impl for Int64Index.reindex

---
 sdc/extensions/indexes/indexes_generic.py | 19 +++---
 sdc/extensions/indexes/int64_index_ext.py | 10 +++-
 sdc/extensions/sdc_hashmap_ext.py         | 55 ++++++++++++++++-
 sdc/native/conc_dict_module.cpp           | 72 +++++++++++++++++++++++
 4 files changed, 143 insertions(+), 13 deletions(-)

diff --git a/sdc/extensions/indexes/indexes_generic.py b/sdc/extensions/indexes/indexes_generic.py
index 02cf77b97..f991f1a34 100644
--- a/sdc/extensions/indexes/indexes_generic.py
+++ b/sdc/extensions/indexes/indexes_generic.py
@@ -43,6 +43,7 @@
 from sdc.hiframes.api import fix_df_index
 from sdc.functions import numpy_like
 from sdc.datatypes.common_functions import _sdc_internal_join
+from sdc.extensions.sdc_hashmap_type import ConcurrentDict
 
 
 def sdc_numeric_indexes_equals(left, right):
@@ -131,24 +132,20 @@ def pd_indexes_index_reindex_impl(self, target, method=None, level=None, limit=N
         # build a dict of 'self' index values to their positions:
         map_index_to_position = Dict.empty(
             key_type=index_dtype,
-            value_type=types.int32
+            value_type=types.int64
         )
 
-        # TO-DO: needs concurrent hash map
+        # TO-DO: apply ConcurrentDict after it's perf is optimized
         for i, value in enumerate(self):
-            if value in map_index_to_position:
-                raise ValueError("cannot reindex from a duplicate axis")
-            else:
-                map_index_to_position[value] = i
+            map_index_to_position[value] = i
+
+        if len(map_index_to_position) < len(self):
+            raise ValueError("cannot reindex from a duplicate axis")
 
         res_size = len(target)
         indexer = np.empty(res_size, dtype=np.int64)
         for i in numba.prange(res_size):
-            val = target[i]
-            if val in map_index_to_position:
-                indexer[i] = map_index_to_position[val]
-            else:
-                indexer[i] = -1
+            indexer[i] = map_index_to_position.get(target[i], -1)
 
         return target, indexer
 
diff --git a/sdc/extensions/indexes/int64_index_ext.py b/sdc/extensions/indexes/int64_index_ext.py
index 6a22c11e7..5c49cbc2e 100644
--- a/sdc/extensions/indexes/int64_index_ext.py
+++ b/sdc/extensions/indexes/int64_index_ext.py
@@ -52,6 +52,7 @@
 from sdc.hiframes.api import fix_df_index
 from sdc.extensions.indexes.indexes_generic import *
 from sdc.datatypes.common_functions import hpat_arrays_append
+from sdc.extensions.sdc_hashmap_ext import map_and_fill_indexer_int64
 
 
 @intrinsic
@@ -456,7 +457,14 @@ def pd_int64_index_reindex_overload(self, target, method=None, level=None, limit
         Given: self={}, target={}'.format(_func_name, self, target))
 
     def pd_int64_index_reindex_impl(self, target, method=None, level=None, limit=None, tolerance=None):
-        return sdc_indexes_reindex(self, target=target, method=method, level=level, tolerance=tolerance)
+        # for Int64Index case index.data can be passed to native function that can built the map
+        # and fill the resulting indexer more efficiently than generic implementation
+        indexer = np.empty(len(target), dtype=np.int64)
+        ok = map_and_fill_indexer_int64(self.values, target, indexer)
+        if not ok:
+            raise ValueError("cannot reindex from a duplicate axis")
+
+        return target, indexer
 
     return pd_int64_index_reindex_impl
 
diff --git a/sdc/extensions/sdc_hashmap_ext.py b/sdc/extensions/sdc_hashmap_ext.py
index d02840035..f23e1edba 100644
--- a/sdc/extensions/sdc_hashmap_ext.py
+++ b/sdc/extensions/sdc_hashmap_ext.py
@@ -35,7 +35,7 @@
 from glob import glob
 from llvmlite import ir as lir
 from numba import types, cfunc
-from numba.core import cgutils
+from numba.core import cgutils, config
 from numba.extending import (typeof_impl, type_callable, models, register_model, NativeValue,
                              lower_builtin, box, unbox, lower_getattr, intrinsic,
                              overload_method, overload, overload_attribute)
@@ -66,6 +66,19 @@
 from numba.typed.dictobject import _cast
 
 
+## FIXME: need to place this binding into separate module?
+import ctypes as ct
+def bind(sym, sig):
+    # Returns ctypes binding to symbol sym with signature sig
+    addr = getattr(hconc_dict, sym)
+    return ct.cast(addr, sig)
+
+set_threads_count_sig = ct.CFUNCTYPE(None, ct.c_uint64)
+set_threads_count_sym = bind('set_number_of_threads', set_threads_count_sig)
+
+set_threads_count_sym(config.NUMBA_NUM_THREADS)
+
+
 def gen_func_suffixes():
     key_suffixes = ['int32_t', 'int64_t', 'voidptr']
     val_suffixes = ['int32_t', 'int64_t', 'float', 'double', 'voidptr']
@@ -94,6 +107,8 @@ def load_native_func(fname, module, skip_check=None):
 load_native_func('hashmap_getiter', hconc_dict)
 load_native_func('hashmap_iternext', hconc_dict)
 
+ll.add_symbol('native_map_and_fill_indexer_int64', hconc_dict.native_map_and_fill_indexer_int64)
+
 
 supported_numeric_key_types = [
     types.int32,
@@ -1123,3 +1138,41 @@ def impl_iterator_iternext(context, builder, sig, args, result):
         else:
             # unreachable
             raise AssertionError('unknown type: {}'.format(iter_type.iterable))
+
+
+@intrinsic
+def map_and_fill_indexer_int64(typingctx, index_data_type, searched_type, res_type):
+
+    ret_type = types.bool_
+
+    def codegen(context, builder, sig, args):
+        data_val, searched_val, res_val = args
+
+        data_ctinfo = context.make_helper(builder, index_data_type, data_val)
+        searched_ctinfo = context.make_helper(builder, searched_type, searched_val)
+        res_ctinfo = context.make_helper(builder, res_type, res_val)
+        lir_key_type = context.get_value_type(types.int64)
+
+        size_val = context.compile_internal(
+            builder,
+            lambda arr: len(arr),
+            types.int64(searched_type),
+            [searched_val]
+        )
+
+        fnty = lir.FunctionType(lir.IntType(8),
+                                [lir_key_type.as_pointer(),
+                                 lir_key_type.as_pointer(),
+                                 lir.IntType(64),
+                                 lir_key_type.as_pointer(),])
+        fn_hashmap_fill_indexer = builder.module.get_or_insert_function(
+            fnty, name=f"native_map_and_fill_indexer_int64")
+
+        res = builder.call(fn_hashmap_fill_indexer,
+                           [data_ctinfo.data,
+                            searched_ctinfo.data,
+                            size_val,
+                            res_ctinfo.data])
+        return context.cast(builder, res, types.uint8, types.bool_)
+
+    return ret_type(index_data_type, searched_type, res_type), codegen
diff --git a/sdc/native/conc_dict_module.cpp b/sdc/native/conc_dict_module.cpp
index 52d91ba0c..6c464f772 100644
--- a/sdc/native/conc_dict_module.cpp
+++ b/sdc/native/conc_dict_module.cpp
@@ -26,7 +26,20 @@
 
 #include <Python.h>
 #include "hashmap.hpp"
+#include <chrono>
+#include <iostream>
 
+class TrivialTBBHashCompare {
+public:
+    static size_t hash(const int64_t& val) {
+        return (size_t)val;
+    }
+    static bool equal(const int64_t& k1, const int64_t& k2) {
+        return k1==k2;
+    }
+};
+
+using namespace std::chrono;
 
 #define declare_hashmap_create(key_type, val_type, suffix) \
 void hashmap_create_##suffix(NRT_MemInfo** meminfo, \
@@ -210,6 +223,63 @@ declare_hashmap_create_from_data(int64_t, int64_t)
 declare_hashmap_create_from_data(int64_t, float)
 declare_hashmap_create_from_data(int64_t, double)
 
+void set_number_of_threads(uint64_t threads)
+{
+utils::tbb_control::set_threads_num(threads);
+}
+
+uint8_t native_map_and_fill_indexer_int64(int64_t* data, int64_t* searched, int64_t size, int64_t* res)
+{
+    auto t1 = high_resolution_clock::now();
+    auto my_map_ptr = new tbb::concurrent_hash_map<int64_t, int64_t, TrivialTBBHashCompare>(2*size, TrivialTBBHashCompare());
+    auto& my_map = *my_map_ptr;
+
+    utils::tbb_control::get_arena().execute([&]() {
+            tbb::parallel_for(tbb::blocked_range<size_t>(0, size),
+                         [&](const tbb::blocked_range<size_t>& r) {
+                             for(size_t i=r.begin(); i!=r.end(); ++i) {
+                                 my_map.emplace(data[i], i);
+                             }
+                         }
+            );
+    });
+
+    if (my_map.size() < size)
+        return 0;
+
+    auto t2 = high_resolution_clock::now();
+    duration<double, std::ratio<1, 1>> ms_double = t2 - t1;
+    auto ms_int = duration_cast<milliseconds>(t2 - t1);
+    std::cout << "native (TBB) building map: " << ms_int.count() << " ms, (" << ms_double.count() << " sec)" << std::endl;
+
+    auto it_map_end = my_map.end();
+    utils::tbb_control::get_arena().execute([&]() {
+        tbb::parallel_for(tbb::blocked_range<size_t>(0, size),
+                     [&](const tbb::blocked_range<size_t>& r) {
+                         for(size_t i=r.begin(); i!=r.end(); ++i) {
+                             auto it_pair = my_map.equal_range(searched[i]);
+                             if (it_pair.first != my_map.end()) {
+                                 res[i] = it_pair.first->second;
+                             } else {
+                                 res[i] = -1;
+                             }
+                         }
+                     }
+        );
+    });
+
+    auto t3 = high_resolution_clock::now();
+    ms_double = t3 - t2;
+    ms_int = duration_cast<milliseconds>(t3 - t2);
+    std::cout << "native (TBB) filling indexer: " << ms_int.count() << " ms, (" << ms_double.count() << " sec)" << std::endl;
+    ms_double = t3 - t1;
+    ms_int = duration_cast<milliseconds>(t3 - t1);
+    std::cout << "total time: " << ms_int.count() << " ms, (" << ms_double.count() << " sec)" << std::endl;
+
+    return 1;
+}
+
+
 PyMODINIT_FUNC PyInit_hconc_dict()
 {
     static struct PyModuleDef moduledef = {
@@ -257,6 +327,8 @@ PyMODINIT_FUNC PyInit_hconc_dict()
     REGISTER(hashmap_create_from_data_int64_t_to_float)
     REGISTER(hashmap_create_from_data_int64_t_to_double)
 
+    REGISTER(native_map_and_fill_indexer_int64)
+    REGISTER(set_number_of_threads)
     utils::tbb_control::init();
 
     return m;

From 540e288dbe99c180c91c0e0947e89903373921fe Mon Sep 17 00:00:00 2001
From: "Kozlov, Alexey" <alexey.kozlov@intel.com>
Date: Tue, 17 Aug 2021 22:43:20 +0300
Subject: [PATCH 2/2] Fallback to STL impl and more TO-DOs added

---
 sdc/extensions/indexes/int64_index_ext.py |  1 +
 sdc/extensions/sdc_hashmap_ext.py         | 13 ++++-
 sdc/native/conc_dict_module.cpp           | 67 +++++++++++++----------
 3 files changed, 51 insertions(+), 30 deletions(-)

diff --git a/sdc/extensions/indexes/int64_index_ext.py b/sdc/extensions/indexes/int64_index_ext.py
index 5c49cbc2e..1fded0b81 100644
--- a/sdc/extensions/indexes/int64_index_ext.py
+++ b/sdc/extensions/indexes/int64_index_ext.py
@@ -456,6 +456,7 @@ def pd_int64_index_reindex_overload(self, target, method=None, level=None, limit
         raise TypingError('{} Not allowed for non comparable indexes. \
         Given: self={}, target={}'.format(_func_name, self, target))
 
+    # FIXME: handle case when target is not numpy array!
     def pd_int64_index_reindex_impl(self, target, method=None, level=None, limit=None, tolerance=None):
         # for Int64Index case index.data can be passed to native function that can built the map
         # and fill the resulting indexer more efficiently than generic implementation
diff --git a/sdc/extensions/sdc_hashmap_ext.py b/sdc/extensions/sdc_hashmap_ext.py
index f23e1edba..ec8d42781 100644
--- a/sdc/extensions/sdc_hashmap_ext.py
+++ b/sdc/extensions/sdc_hashmap_ext.py
@@ -1153,7 +1153,14 @@ def codegen(context, builder, sig, args):
         res_ctinfo = context.make_helper(builder, res_type, res_val)
         lir_key_type = context.get_value_type(types.int64)
 
-        size_val = context.compile_internal(
+        data_size_val = context.compile_internal(
+            builder,
+            lambda arr: len(arr),
+            types.int64(index_data_type),
+            [data_val]
+        )
+
+        searched_size_val = context.compile_internal(
             builder,
             lambda arr: len(arr),
             types.int64(searched_type),
@@ -1164,6 +1171,7 @@ def codegen(context, builder, sig, args):
                                 [lir_key_type.as_pointer(),
                                  lir_key_type.as_pointer(),
                                  lir.IntType(64),
+                                 lir.IntType(64),
                                  lir_key_type.as_pointer(),])
         fn_hashmap_fill_indexer = builder.module.get_or_insert_function(
             fnty, name=f"native_map_and_fill_indexer_int64")
@@ -1171,7 +1179,8 @@ def codegen(context, builder, sig, args):
         res = builder.call(fn_hashmap_fill_indexer,
                            [data_ctinfo.data,
                             searched_ctinfo.data,
-                            size_val,
+                            data_size_val,
+                            searched_size_val,
                             res_ctinfo.data])
         return context.cast(builder, res, types.uint8, types.bool_)
 
diff --git a/sdc/native/conc_dict_module.cpp b/sdc/native/conc_dict_module.cpp
index 6c464f772..d2a17946b 100644
--- a/sdc/native/conc_dict_module.cpp
+++ b/sdc/native/conc_dict_module.cpp
@@ -25,11 +25,10 @@
 // *****************************************************************************
 
 #include <Python.h>
+#include <unordered_map>
 #include "hashmap.hpp"
-#include <chrono>
-#include <iostream>
 
-class TrivialTBBHashCompare {
+class TrivialInt64TBBHashCompare {
 public:
     static size_t hash(const int64_t& val) {
         return (size_t)val;
@@ -39,7 +38,17 @@ class TrivialTBBHashCompare {
     }
 };
 
-using namespace std::chrono;
+struct TrivialInt64Hash {
+public:
+    TrivialInt64Hash() = default;
+    TrivialInt64Hash(const TrivialInt64Hash&) = default;
+    ~TrivialInt64Hash() = default;
+    size_t operator()(const int64_t& val) const {
+        return (size_t)val;
+    }
+};
+
+using namespace std;
 
 #define declare_hashmap_create(key_type, val_type, suffix) \
 void hashmap_create_##suffix(NRT_MemInfo** meminfo, \
@@ -228,37 +237,31 @@ void set_number_of_threads(uint64_t threads)
 utils::tbb_control::set_threads_num(threads);
 }
 
-uint8_t native_map_and_fill_indexer_int64(int64_t* data, int64_t* searched, int64_t size, int64_t* res)
+uint8_t native_map_and_fill_indexer_int64(int64_t* data, int64_t* searched, int64_t dsize, int64_t ssize, int64_t* res)
 {
-    auto t1 = high_resolution_clock::now();
-    auto my_map_ptr = new tbb::concurrent_hash_map<int64_t, int64_t, TrivialTBBHashCompare>(2*size, TrivialTBBHashCompare());
-    auto& my_map = *my_map_ptr;
-
+#if SUPPORTED_TBB_VERSION
+    // FIXME: we need to store the allocated map somewhere and re-use it later
+    // here it's allocated on the heap (but not freed) to avoid calling dtor (like pandas does the map once built is cached)
+    auto ptr_my_map = new tbb::concurrent_hash_map<int64_t, int64_t, TrivialInt64TBBHashCompare>(2*dsize, TrivialInt64TBBHashCompare());
     utils::tbb_control::get_arena().execute([&]() {
-            tbb::parallel_for(tbb::blocked_range<size_t>(0, size),
+            tbb::parallel_for(tbb::blocked_range<size_t>(0, dsize),
                          [&](const tbb::blocked_range<size_t>& r) {
                              for(size_t i=r.begin(); i!=r.end(); ++i) {
-                                 my_map.emplace(data[i], i);
+                                 ptr_my_map->emplace(data[i], i);
                              }
                          }
             );
     });
 
-    if (my_map.size() < size)
+    if (ptr_my_map->size() < dsize)
         return 0;
 
-    auto t2 = high_resolution_clock::now();
-    duration<double, std::ratio<1, 1>> ms_double = t2 - t1;
-    auto ms_int = duration_cast<milliseconds>(t2 - t1);
-    std::cout << "native (TBB) building map: " << ms_int.count() << " ms, (" << ms_double.count() << " sec)" << std::endl;
-
-    auto it_map_end = my_map.end();
     utils::tbb_control::get_arena().execute([&]() {
-        tbb::parallel_for(tbb::blocked_range<size_t>(0, size),
+        tbb::parallel_for(tbb::blocked_range<size_t>(0, ssize),
                      [&](const tbb::blocked_range<size_t>& r) {
                          for(size_t i=r.begin(); i!=r.end(); ++i) {
-                             auto it_pair = my_map.equal_range(searched[i]);
-                             if (it_pair.first != my_map.end()) {
+                             auto it_pair = ptr_my_map->equal_range(searched[i]);
+                             if (it_pair.first != ptr_my_map->end()) {
                                  res[i] = it_pair.first->second;
                              } else {
                                  res[i] = -1;
@@ -268,15 +271,23 @@ uint8_t native_map_and_fill_indexer_int64(int64_t* data, int64_t* searched, int6
         );
     });
 
-    auto t3 = high_resolution_clock::now();
-    ms_double = t3 - t2;
-    ms_int = duration_cast<milliseconds>(t3 - t2);
-    std::cout << "native (TBB) filling indexer: " << ms_int.count() << " ms, (" << ms_double.count() << " sec)" << std::endl;
-    ms_double = t3 - t1;
-    ms_int = duration_cast<milliseconds>(t3 - t1);
-    std::cout << "total time: " << ms_int.count() << " ms, (" << ms_double.count() << " sec)" << std::endl;
+    return 1;
+#else
+    auto ptr_my_map = new std::unordered_map<int64_t, int64_t, TrivialInt64Hash>(2*dsize, TrivialInt64Hash());
+    for(size_t i=0; i<dsize; ++i) {
+        ptr_my_map->emplace(data[i], i);
+    }
+
+    if (ptr_my_map->size() < dsize)
+        return 0;
+
+    for(size_t i=0; i<ssize; ++i) {
+        auto it = ptr_my_map->find(searched[i]);
+        res[i] = (it != ptr_my_map->end()) ? it->second : -1;
+    }
 
     return 1;
+#endif
 }