diff --git a/pyproject.toml b/pyproject.toml index e7c758e..6eb74a4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "type_infer" -version = "0.0.17" +version = "0.0.18" description = "Automated type inference for Machine Learning pipelines." authors = ["MindsDB Inc. "] license = "GPL-3.0" diff --git a/tests/integration_tests/test_type_infer.py b/tests/integration_tests/test_type_infer.py index 9ea74e1..979f729 100644 --- a/tests/integration_tests/test_type_infer.py +++ b/tests/integration_tests/test_type_infer.py @@ -5,7 +5,7 @@ from datetime import datetime, timedelta from type_infer.dtype import dtype -from type_infer.infer import infer_types +from type_infer.api import infer_types class TestTypeInference(unittest.TestCase): diff --git a/tests/unit_tests/test_dates.py b/tests/unit_tests/test_dates.py index 6e8bfe7..a80d68c 100644 --- a/tests/unit_tests/test_dates.py +++ b/tests/unit_tests/test_dates.py @@ -1,7 +1,7 @@ import unittest from type_infer.dtype import dtype -from type_infer.infer import type_check_date +from type_infer.rule_based.infer import type_check_date class TestDates(unittest.TestCase): diff --git a/tests/unit_tests/test_infer_dtypes.py b/tests/unit_tests/test_infer_dtypes.py index c93c28e..fc5359e 100644 --- a/tests/unit_tests/test_infer_dtypes.py +++ b/tests/unit_tests/test_infer_dtypes.py @@ -2,7 +2,7 @@ import random import pandas as pd -from type_infer.infer import get_column_data_type +from type_infer.api import get_column_data_type from type_infer.dtype import dtype diff --git a/tests/unit_tests/test_misc.py b/tests/unit_tests/test_misc.py index 5c88338..afa579b 100644 --- a/tests/unit_tests/test_misc.py +++ b/tests/unit_tests/test_misc.py @@ -3,7 +3,7 @@ from pathlib import Path import type_infer -from type_infer.helpers import tokenize_text +from type_infer.rule_based.helpers import tokenize_text class TestDates(unittest.TestCase): diff --git a/type_infer/__init__.py b/type_infer/__init__.py index fa649d7..33988a4 100644 --- a/type_infer/__init__.py +++ b/type_infer/__init__.py @@ -1,10 +1,10 @@ from type_infer import base from type_infer import dtype -from type_infer import infer +from type_infer import api from type_infer import helpers -__version__ = '0.0.17' +__version__ = '0.0.18' -__all__ = ['base', 'dtype', 'infer', 'helpers', '__version__'] +__all__ = ['base', 'dtype', 'api.py', 'helpers', '__version__'] diff --git a/type_infer/api.py b/type_infer/api.py new file mode 100644 index 0000000..4ba6ea7 --- /dev/null +++ b/type_infer/api.py @@ -0,0 +1,175 @@ +import random +import multiprocessing as mp + +from scipy.stats import norm +import pandas as pd + +from type_infer.base import TypeInformation +from type_infer.dtype import dtype +from type_infer.helpers import seed, log, get_nr_procs + +# inference engine specific imports +from type_infer.rule_based.infer import get_column_data_type +from type_infer.rule_based.helpers import get_identifier_description_mp + + +def _calculate_sample_size( + population_size, + margin_error=.01, + confidence_level=.995, + sigma=1 / 2 +): + """ + Calculate the minimal sample size to use to achieve a certain + margin of error and confidence level for a sample estimate + of the population mean. + Inputs + ------- + population_size: integer + Total size of the population that the sample is to be drawn from. + margin_error: number + Maximum expected difference between the true population parameter, + such as the mean, and the sample estimate. + confidence_level: number in the interval (0, 1) + If we were to draw a large number of equal-size samples + from the population, the true population parameter + should lie within this percentage + of the intervals (sample_parameter - e, sample_parameter + e) + where e is the margin_error. + sigma: number + The standard deviation of the population. For the case + of estimating a parameter in the interval [0, 1], sigma=1/2 + should be sufficient. + """ + alpha = 1 - confidence_level + # dictionary of confidence levels and corresponding z-scores + # computed via norm.ppf(1 - (alpha/2)), where norm is + # a normal distribution object in scipy.stats. + # Here, ppf is the percentile point function. + zdict = { + .90: 1.645, + .91: 1.695, + .99: 2.576, + .97: 2.17, + .94: 1.881, + .93: 1.812, + .95: 1.96, + .98: 2.326, + .96: 2.054, + .92: 1.751 + } + if confidence_level in zdict: + z = zdict[confidence_level] + else: + # Inf fix + if alpha == 0.0: + alpha += 0.001 + z = norm.ppf(1 - (alpha / 2)) + N = population_size + M = margin_error + numerator = z**2 * sigma**2 * (N / (N - 1)) + denom = M**2 + ((z**2 * sigma**2) / (N - 1)) + return numerator / denom + + +def _sample_data(df: pd.DataFrame) -> pd.DataFrame: + population_size = len(df) + if population_size <= 50: + sample_size = population_size + else: + sample_size = int(round(_calculate_sample_size(population_size))) + + population_size = len(df) + input_data_sample_indexes = random.sample(range(population_size), sample_size) + return df.iloc[input_data_sample_indexes] + + +def infer_types( + data: pd.DataFrame, + #TODO: method: InferenceEngine = Union[InferenceEngine.RuleBased, InferenceEngine.BERT], + pct_invalid: float, + seed_nr: int = 420, + mp_cutoff: int = 1e4, +) -> TypeInformation: + """ + Infers the data types of each column of the dataset by analyzing a small sample of + each column's items. + + Inputs + ---------- + data : pd.DataFrame + The input dataset for which we want to infer data type information. + pct_invalid : float + The percentage, i.e. a float between 0.0 and 100.0, of invalid values that are + accepted before failing the type inference for a column. + seed_nr : int, optional + Seed for the random number generator, by default 420 + mp_cutoff : int, optional + How many elements in the dataframe before switching to parallel processing, by + default 1e4. + """ + seed(seed_nr) + type_information = TypeInformation() + sample_df = _sample_data(data) + sample_size = len(sample_df) + population_size = len(data) + log.info(f'Analyzing a sample of {sample_size}') + log.info( + f'from a total population of {population_size}, this is equivalent to {round(sample_size*100/population_size, 1)}% of your data.') # noqa + + nr_procs = get_nr_procs(df=sample_df) + pool_size = min(nr_procs, len(sample_df.columns.values)) + if data.size > mp_cutoff and pool_size > 1: + log.info(f'Using {pool_size} processes to deduct types.') + pool = mp.Pool(processes=pool_size) + # column-wise parallelization # TODO: evaluate switching to row-wise split instead + # TODO: this would be the call to the inference engine -> column in, type out + answer_arr = pool.starmap(get_column_data_type, [ + (sample_df[x].dropna(), data[x], x, pct_invalid) for x in sample_df.columns.values + ]) + pool.close() + pool.join() + else: + answer_arr = [] + for x in sample_df.columns: + answer_arr.append(get_column_data_type(sample_df[x].dropna(), data, x, pct_invalid)) + + for i, col_name in enumerate(sample_df.columns): + (data_dtype, data_dtype_dist, additional_info, warn, info) = answer_arr[i] + + for msg in warn: + log.warning(msg) + for msg in info: + log.info(msg) + + if data_dtype is None: + data_dtype = dtype.invalid + + type_information.dtypes[col_name] = data_dtype + type_information.additional_info[col_name] = { + 'dtype_dist': data_dtype_dist + } + + if data.size > mp_cutoff and pool_size > 1: + pool = mp.Pool(processes=pool_size) + answer_arr = pool.map(get_identifier_description_mp, [ + (data[x], x, type_information.dtypes[x]) + for x in sample_df.columns + ]) + pool.close() + pool.join() + else: + answer_arr = [] + for x in sample_df.columns: + answer = get_identifier_description_mp([data[x], x, type_information.dtypes[x]]) + answer_arr.append(answer) + + for i, col_name in enumerate(sample_df.columns): + # work with the full data + if answer_arr[i] is not None: + log.warning(f'Column {col_name} is an identifier of type "{answer_arr[i]}"') + type_information.identifiers[col_name] = answer_arr[i] + + # @TODO Column removal logic was here, if the column was an identifier, move it elsewhere + + return type_information diff --git a/type_infer/bert/__init__.py b/type_infer/bert/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/type_infer/bert/infer.py b/type_infer/bert/infer.py new file mode 100644 index 0000000..7896162 --- /dev/null +++ b/type_infer/bert/infer.py @@ -0,0 +1 @@ +STABLE = False diff --git a/type_infer/dtype.py b/type_infer/dtype.py index 9f05a2e..e6e5819 100644 --- a/type_infer/dtype.py +++ b/type_infer/dtype.py @@ -45,3 +45,5 @@ class dtype: # Misc (Unk/NaNs) empty = "empty" invalid = "invalid" + +# TODO: introduce "modifiers"? diff --git a/type_infer/helpers.py b/type_infer/helpers.py index 3ae6bc1..2f20ea3 100644 --- a/type_infer/helpers.py +++ b/type_infer/helpers.py @@ -1,34 +1,50 @@ import os -import re -import nltk import psutil import random -import string import logging import colorlog import multiprocessing as mp +from typing import Iterable import numpy as np -import scipy.stats as st -from langid.langid import LanguageIdentifier -from langid.langid import model as langid_model +from type_infer.dtype import dtype -from typing import Iterable -from collections import Counter, defaultdict -from type_infer.dtype import dtype +def initialize_log(): + pid = os.getpid() + + handler = colorlog.StreamHandler() + handler.setFormatter(colorlog.ColoredFormatter()) + logging.basicConfig(handlers=[handler]) + log = logging.getLogger(f'type_infer-{pid}') + log_level = os.environ.get('TYPE_INFER_LOG', 'DEBUG') + log.setLevel(log_level) + return log -try: - nltk.data.find('tokenizers/punkt') -except LookupError: - nltk.download('punkt') -try: - from nltk.corpus import stopwords - stopwords.words('english') -except LookupError: - nltk.download('stopwords', quiet=True) +log = initialize_log() + + +def get_nr_procs(df=None): + if 'MINDSDB_N_WORKERS' in os.environ: + try: + n = int(os.environ['MINDSDB_N_WORKERS']) + except ValueError: + n = 1 + return n + elif os.name == 'nt': + return 1 + else: + available_mem = psutil.virtual_memory().available + if df is not None: + max_per_proc_usage = df.size + else: + max_per_proc_usage = 0.2 * pow(10, 9) # multiplier * 1GB + + proc_count = int(min(mp.cpu_count() - 1, available_mem // max_per_proc_usage)) + + return max(proc_count, 1) def seed(seed_nr: int) -> None: @@ -58,103 +74,6 @@ def is_nan_numeric(value: object) -> bool: return isnan -def initialize_log(): - pid = os.getpid() - - handler = colorlog.StreamHandler() - handler.setFormatter(colorlog.ColoredFormatter()) - - logging.basicConfig(handlers=[handler]) - log = logging.getLogger(f'type_infer-{pid}') - log_level = os.environ.get('TYPE_INFER_LOG', 'DEBUG') - log.setLevel(log_level) - return log - - -log = initialize_log() - - -def get_identifier_description_mp(arg_tup): - data, column_name, data_dtype = arg_tup - return get_identifier_description(data, column_name, data_dtype) - - -def get_identifier_description(data: Iterable, column_name: str, data_dtype: dtype): - data = list(data) - if isinstance(data[0], list): - nr_unique = len(set(tuple(x) for x in data)) - elif isinstance(data[0], dict): - nr_unique = len(set(str(x) for x in data)) - else: - nr_unique = len(set(data)) - - if nr_unique == 1: - return 'No Information' - - unique_pct = nr_unique / len(data) - - spaces = [len(str(x).split(' ')) - 1 for x in data] - mean_spaces = np.mean(spaces) if len(spaces) > 0 else 0.0 - - # Detect hash - all_same_length = all(len(str(data[0])) == len(str(x)) for x in data) - uuid_charset = set('0123456789abcdefABCDEF-') - all_uuid_charset = all(set(str(x)).issubset(uuid_charset) for x in data) - is_uuid = all_uuid_charset and all_same_length - - if all_same_length and len(data) == nr_unique and data_dtype not in (dtype.integer, dtype.float): - str_data = [str(x) for x in data] - randomness_per_index = [] - for i, _ in enumerate(str_data[0]): - N = len(set(x[i] for x in str_data)) - S = st.entropy([*Counter(x[i] for x in str_data).values()]) - if S == 0: - randomness_per_index.append(0.0) - else: - randomness_per_index.append(S / np.log(N)) - - mean_randomness = np.mean(randomness_per_index) if len(randomness_per_index) > 0 else 0 - if mean_randomness > 0.95: - return 'Hash-like identifier' - - # Detect foreign key - if data_dtype == dtype.integer: - if _is_foreign_key_name(column_name): - return 'Foreign key' - - if _is_identifier_name(column_name) or data_dtype in (dtype.categorical, dtype.binary): - if unique_pct > 0.98: - if is_uuid: - return 'UUID' - else: - return 'Unknown identifier' - - # Everything is unique and it's too short to be rich text - if data_dtype in (dtype.categorical, dtype.binary, dtype.short_text, dtype.rich_text) and \ - unique_pct > 0.99999 and mean_spaces < 1: - return 'Unknown identifier' - - return None - - -def _is_foreign_key_name(name): - for endings in ['id', 'ID', 'Id']: - for add in ['-', '_', ' ']: - if name.endswith(add + endings): - return True - for endings in ['ID', 'Id']: - if name.endswith(endings): - return True - return False - - -def _is_identifier_name(name): - for keyword in ['account', 'uuid', 'identifier', 'user']: - if keyword in name: - return True - return False - - def cast_string_to_python_type(string): """ Returns None, an integer, float or a string from a string""" if string is None or string == '': @@ -173,7 +92,6 @@ def cast_string_to_python_type(string): return string -# TODO: Should this be here? def clean_float(val): if isinstance(val, (int, float)): return float(val) @@ -192,102 +110,3 @@ def clean_float(val): return float(val) except Exception: return None - - -def get_language_dist(data): - lang_dist = defaultdict(lambda: 0) - lang_dist['Unknown'] = 0 - lang_probs_cache = dict() - identifier = LanguageIdentifier.from_modelstring(langid_model, norm_probs=True) - for text in data: - text = str(text) - text = text.translate(str.maketrans('', '', string.punctuation)) - if text not in lang_probs_cache: - try: - lang_probs = identifier.classify(text) - except Exception: - lang_probs = [] - lang_probs_cache[text] = lang_probs - - lang_probs = lang_probs_cache[text] - if len(lang_probs) > 0 and lang_probs[1] > 10 * (1 / len(identifier.nb_classes)): - lang_dist[lang_probs[0]] += 1 - else: - lang_dist['Unknown'] += 1 - - return dict(lang_dist) - - -def analyze_sentences(data): - nr_words = 0 - word_dist = defaultdict(int) - nr_words_dist = defaultdict(int) - stop_words = set(stopwords.words('english')) - for text in map(str, data): - text = text.lower() - text_dist = defaultdict(int) - tokens = tokenize_text(text) - tokens_no_stop = (x for x in tokens if x not in stop_words) - for tok in tokens_no_stop: - text_dist[tok] += 1 - - n_tokens = len(text_dist) - nr_words_dist[n_tokens] += 1 - nr_words += n_tokens - - # merge text_dist into word_dist - for k, v in text_dist.items(): - word_dist[k] += v - - return nr_words, dict(word_dist), dict(nr_words_dist) - - -# @TODO: eventually move these into .helpers.text -def tokenize_text(text): - """ Generator instead of list comprehension for optimal memory usage & runtime """ - return (t.lower() for t in nltk.word_tokenize(decontracted(text)) if contains_alnum(t)) - - -def decontracted(phrase): - # specific - phrase = re.sub(r"won\'t", "will not", phrase) - phrase = re.sub(r"can\'t", "can not", phrase) - - # general - phrase = re.sub(r"n\'t", " not", phrase) - phrase = re.sub(r"\'re", " are", phrase) - phrase = re.sub(r"\'s", " is", phrase) - phrase = re.sub(r"\'d", " would", phrase) - phrase = re.sub(r"\'ll", " will", phrase) - phrase = re.sub(r"\'t", " not", phrase) - phrase = re.sub(r"\'ve", " have", phrase) - phrase = re.sub(r"\'m", " am", phrase) - return phrase - - -def contains_alnum(text): - for c in text: - if c.isalnum(): - return True - return False - - -def get_nr_procs(df=None): - if 'MINDSDB_N_WORKERS' in os.environ: - try: - n = int(os.environ['MINDSDB_N_WORKERS']) - except ValueError: - n = 1 - return n - elif os.name == 'nt': - return 1 - else: - available_mem = psutil.virtual_memory().available - if df is not None: - max_per_proc_usage = df.size - else: - max_per_proc_usage = 0.2 * pow(10, 9) # multiplier * 1GB - - proc_count = int(min(mp.cpu_count() - 1, available_mem // max_per_proc_usage)) - - return max(proc_count, 1) diff --git a/type_infer/rule_based/__init__.py b/type_infer/rule_based/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/type_infer/rule_based/helpers.py b/type_infer/rule_based/helpers.py new file mode 100644 index 0000000..5c5e673 --- /dev/null +++ b/type_infer/rule_based/helpers.py @@ -0,0 +1,184 @@ +import re +import nltk +import string + +import numpy as np +import scipy.stats as st +from langid.langid import LanguageIdentifier +from langid.langid import model as langid_model + +from typing import Iterable +from collections import Counter, defaultdict + +from type_infer.dtype import dtype + + +try: + nltk.data.find('tokenizers/punkt') +except LookupError: + nltk.download('punkt') + +try: + from nltk.corpus import stopwords + stopwords.words('english') +except LookupError: + nltk.download('stopwords', quiet=True) + + + +def get_identifier_description_mp(arg_tup): + data, column_name, data_dtype = arg_tup + return get_identifier_description(data, column_name, data_dtype) + + +def get_identifier_description(data: Iterable, column_name: str, data_dtype: dtype): + data = list(data) + if isinstance(data[0], list): + nr_unique = len(set(tuple(x) for x in data)) + elif isinstance(data[0], dict): + nr_unique = len(set(str(x) for x in data)) + else: + nr_unique = len(set(data)) + + if nr_unique == 1: + return 'No Information' + + unique_pct = nr_unique / len(data) + + spaces = [len(str(x).split(' ')) - 1 for x in data] + mean_spaces = np.mean(spaces) if len(spaces) > 0 else 0.0 + + # Detect hash + all_same_length = all(len(str(data[0])) == len(str(x)) for x in data) + uuid_charset = set('0123456789abcdefABCDEF-') + all_uuid_charset = all(set(str(x)).issubset(uuid_charset) for x in data) + is_uuid = all_uuid_charset and all_same_length + + if all_same_length and len(data) == nr_unique and data_dtype not in (dtype.integer, dtype.float): + str_data = [str(x) for x in data] + randomness_per_index = [] + for i, _ in enumerate(str_data[0]): + N = len(set(x[i] for x in str_data)) + S = st.entropy([*Counter(x[i] for x in str_data).values()]) + if S == 0: + randomness_per_index.append(0.0) + else: + randomness_per_index.append(S / np.log(N)) + + mean_randomness = np.mean(randomness_per_index) if len(randomness_per_index) > 0 else 0 + if mean_randomness > 0.95: + return 'Hash-like identifier' + + # Detect foreign key + if data_dtype == dtype.integer: + if _is_foreign_key_name(column_name): + return 'Foreign key' + + if _is_identifier_name(column_name) or data_dtype in (dtype.categorical, dtype.binary): + if unique_pct > 0.98: + if is_uuid: + return 'UUID' + else: + return 'Unknown identifier' + + # Everything is unique and it's too short to be rich text + if data_dtype in (dtype.categorical, dtype.binary, dtype.short_text, dtype.rich_text) and \ + unique_pct > 0.99999 and mean_spaces < 1: + return 'Unknown identifier' + + return None + + +def _is_foreign_key_name(name): + for endings in ['id', 'ID', 'Id']: + for add in ['-', '_', ' ']: + if name.endswith(add + endings): + return True + for endings in ['ID', 'Id']: + if name.endswith(endings): + return True + return False + + +def _is_identifier_name(name): + for keyword in ['account', 'uuid', 'identifier', 'user']: + if keyword in name: + return True + return False + + +def get_language_dist(data): + lang_dist = defaultdict(lambda: 0) + lang_dist['Unknown'] = 0 + lang_probs_cache = dict() + identifier = LanguageIdentifier.from_modelstring(langid_model, norm_probs=True) + for text in data: + text = str(text) + text = text.translate(str.maketrans('', '', string.punctuation)) + if text not in lang_probs_cache: + try: + lang_probs = identifier.classify(text) + except Exception: + lang_probs = [] + lang_probs_cache[text] = lang_probs + + lang_probs = lang_probs_cache[text] + if len(lang_probs) > 0 and lang_probs[1] > 10 * (1 / len(identifier.nb_classes)): + lang_dist[lang_probs[0]] += 1 + else: + lang_dist['Unknown'] += 1 + + return dict(lang_dist) + + +def analyze_sentences(data): + nr_words = 0 + word_dist = defaultdict(int) + nr_words_dist = defaultdict(int) + stop_words = set(stopwords.words('english')) + for text in map(str, data): + text = text.lower() + text_dist = defaultdict(int) + tokens = tokenize_text(text) + tokens_no_stop = (x for x in tokens if x not in stop_words) + for tok in tokens_no_stop: + text_dist[tok] += 1 + + n_tokens = len(text_dist) + nr_words_dist[n_tokens] += 1 + nr_words += n_tokens + + # merge text_dist into word_dist + for k, v in text_dist.items(): + word_dist[k] += v + + return nr_words, dict(word_dist), dict(nr_words_dist) + + +def contains_alnum(text): + for c in text: + if c.isalnum(): + return True + return False + + +def tokenize_text(text): + """ Generator instead of list comprehension for optimal memory usage & runtime """ + return (t.lower() for t in nltk.word_tokenize(decontracted(text)) if contains_alnum(t)) + + +def decontracted(phrase): + # specific + phrase = re.sub(r"won\'t", "will not", phrase) + phrase = re.sub(r"can\'t", "can not", phrase) + + # general + phrase = re.sub(r"n\'t", " not", phrase) + phrase = re.sub(r"\'re", " are", phrase) + phrase = re.sub(r"\'s", " is", phrase) + phrase = re.sub(r"\'d", " would", phrase) + phrase = re.sub(r"\'ll", " will", phrase) + phrase = re.sub(r"\'t", " not", phrase) + phrase = re.sub(r"\'ve", " have", phrase) + phrase = re.sub(r"\'m", " am", phrase) + return phrase diff --git a/type_infer/infer.py b/type_infer/rule_based/infer.py similarity index 66% rename from type_infer/infer.py rename to type_infer/rule_based/infer.py index dd329f3..2df47af 100644 --- a/type_infer/infer.py +++ b/type_infer/rule_based/infer.py @@ -1,21 +1,19 @@ import re -import random import imghdr import sndhdr -import multiprocessing as mp from collections import Counter from typing import List, Union -from scipy.stats import norm import pandas as pd import numpy as np -from type_infer.base import TypeInformation from type_infer.dtype import dtype -from type_infer.helpers import seed, log # TODO: move somewhere else? -from type_infer.helpers import get_nr_procs -from type_infer.helpers import is_nan_numeric, get_identifier_description_mp, cast_string_to_python_type, \ - get_language_dist, analyze_sentences +from type_infer.helpers import log +from type_infer.rule_based.helpers import get_language_dist, analyze_sentences +from type_infer.helpers import is_nan_numeric, cast_string_to_python_type + + +STABLE = True # TODO: change engine into a class, add as attribute # @TODO: hardcode for distance, time, subunits of currency (e.g. cents) and other common units @@ -329,162 +327,3 @@ def get_column_data_type(data: Union[np.ndarray, list], full_data: pd.DataFrame, log.info(f'Column {col_name} has data type {curr_dtype}') return curr_dtype, known_dtype_dist, additional_info, warn, info - -def calculate_sample_size( - population_size, - margin_error=.01, - confidence_level=.995, - sigma=1 / 2 -): - """ - Calculate the minimal sample size to use to achieve a certain - margin of error and confidence level for a sample estimate - of the population mean. - Inputs - ------- - population_size: integer - Total size of the population that the sample is to be drawn from. - margin_error: number - Maximum expected difference between the true population parameter, - such as the mean, and the sample estimate. - confidence_level: number in the interval (0, 1) - If we were to draw a large number of equal-size samples - from the population, the true population parameter - should lie within this percentage - of the intervals (sample_parameter - e, sample_parameter + e) - where e is the margin_error. - sigma: number - The standard deviation of the population. For the case - of estimating a parameter in the interval [0, 1], sigma=1/2 - should be sufficient. - """ - alpha = 1 - (confidence_level) - # dictionary of confidence levels and corresponding z-scores - # computed via norm.ppf(1 - (alpha/2)), where norm is - # a normal distribution object in scipy.stats. - # Here, ppf is the percentile point function. - zdict = { - .90: 1.645, - .91: 1.695, - .99: 2.576, - .97: 2.17, - .94: 1.881, - .93: 1.812, - .95: 1.96, - .98: 2.326, - .96: 2.054, - .92: 1.751 - } - if confidence_level in zdict: - z = zdict[confidence_level] - else: - # Inf fix - if alpha == 0.0: - alpha += 0.001 - z = norm.ppf(1 - (alpha / 2)) - N = population_size - M = margin_error - numerator = z**2 * sigma**2 * (N / (N - 1)) - denom = M**2 + ((z**2 * sigma**2) / (N - 1)) - return numerator / denom - - -def sample_data(df: pd.DataFrame) -> pd.DataFrame: - population_size = len(df) - if population_size <= 50: - sample_size = population_size - else: - sample_size = int(round(calculate_sample_size(population_size))) - - population_size = len(df) - input_data_sample_indexes = random.sample(range(population_size), sample_size) - return df.iloc[input_data_sample_indexes] - - -def infer_types( - data: pd.DataFrame, - pct_invalid: float, - seed_nr: int = 420, - mp_cutoff: int = 1e4, -) -> TypeInformation: - """ - Infers the data types of each column of the dataset by analyzing a small sample of - each column's items. - - Inputs - ---------- - data : pd.DataFrame - The input dataset for which we want to infer data type information. - pct_invalid : float - The percentage, i.e. a float between 0.0 and 100.0, of invalid values that are - accepted before failing the type inference for a column. - seed_nr : int, optional - Seed for the random number generator, by default 420 - mp_cutoff : int, optional - How many elements in the dataframe before switching to parallel processing, by - default 1e4. - """ - seed(seed_nr) - type_information = TypeInformation() - sample_df = sample_data(data) - sample_size = len(sample_df) - population_size = len(data) - log.info(f'Analyzing a sample of {sample_size}') - log.info( - f'from a total population of {population_size}, this is equivalent to {round(sample_size*100/population_size, 1)}% of your data.') # noqa - - nr_procs = get_nr_procs(df=sample_df) - pool_size = min(nr_procs, len(sample_df.columns.values)) - if data.size > mp_cutoff and pool_size > 1: - log.info(f'Using {pool_size} processes to deduct types.') - pool = mp.Pool(processes=pool_size) - # column-wise parallelization # TODO: evaluate switching to row-wise split instead - answer_arr = pool.starmap(get_column_data_type, [ - (sample_df[x].dropna(), data[x], x, pct_invalid) for x in sample_df.columns.values - ]) - pool.close() - pool.join() - else: - answer_arr = [] - for x in sample_df.columns: - answer_arr.append(get_column_data_type(sample_df[x].dropna(), data, x, pct_invalid)) - - for i, col_name in enumerate(sample_df.columns): - (data_dtype, data_dtype_dist, additional_info, warn, info) = answer_arr[i] - - for msg in warn: - log.warning(msg) - for msg in info: - log.info(msg) - - if data_dtype is None: - data_dtype = dtype.invalid - - type_information.dtypes[col_name] = data_dtype - type_information.additional_info[col_name] = { - 'dtype_dist': data_dtype_dist - } - - if data.size > mp_cutoff and pool_size > 1: - pool = mp.Pool(processes=pool_size) - answer_arr = pool.map(get_identifier_description_mp, [ - (data[x], x, type_information.dtypes[x]) - for x in sample_df.columns - ]) - pool.close() - pool.join() - else: - answer_arr = [] - for x in sample_df.columns: - answer = get_identifier_description_mp([data[x], x, type_information.dtypes[x]]) - answer_arr.append(answer) - - for i, col_name in enumerate(sample_df.columns): - # work with the full data - if answer_arr[i] is not None: - log.warning(f'Column {col_name} is an identifier of type "{answer_arr[i]}"') - type_information.identifiers[col_name] = answer_arr[i] - - # @TODO Column removal logic was here, if the column was an identifier, move it elsewhere - - return type_information