Skip to content

Commit

Permalink
Merge pull request #47 from mindsdb/columns_pool
Browse files Browse the repository at this point in the history
Run as many processes as columns count
  • Loading branch information
paxcema authored Jul 12, 2023
2 parents 9d7a09d + 36f866e commit a2acea7
Showing 1 changed file with 6 additions and 5 deletions.
11 changes: 6 additions & 5 deletions type_infer/infer.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,9 +392,10 @@ def infer_types(
f'from a total population of {population_size}, this is equivalent to {round(sample_size*100/population_size, 1)}% of your data.') # noqa

nr_procs = get_nr_procs(df=sample_df)
if data.size > mp_cutoff and nr_procs > 1:
log.info(f'Using {nr_procs} processes to deduct types.')
pool = mp.Pool(processes=nr_procs)
pool_size = min(nr_procs, len(sample_df.columns.values))
if data.size > mp_cutoff and pool_size > 1:
log.info(f'Using {pool_size} processes to deduct types.')
pool = mp.Pool(processes=pool_size)
# column-wise parallelization # TODO: evaluate switching to row-wise split instead
answer_arr = pool.starmap(get_column_data_type, [
(sample_df[x].dropna(), data[x], x, pct_invalid) for x in sample_df.columns.values
Expand Down Expand Up @@ -422,8 +423,8 @@ def infer_types(
'dtype_dist': data_dtype_dist
}

if data.size > mp_cutoff and nr_procs > 1:
pool = mp.Pool(processes=nr_procs)
if data.size > mp_cutoff and pool_size > 1:
pool = mp.Pool(processes=pool_size)
answer_arr = pool.map(get_identifier_description_mp, [
(data[x], x, type_information.dtypes[x])
for x in sample_df.columns
Expand Down

0 comments on commit a2acea7

Please sign in to comment.