Skip to content

Commit

Permalink
update to use new method from pandas
Browse files Browse the repository at this point in the history
  • Loading branch information
jorisvandenbossche committed Apr 10, 2024
1 parent eb3d211 commit 6dee245
Showing 1 changed file with 31 additions and 138 deletions.
169 changes: 31 additions & 138 deletions python/pyarrow/pandas_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ def get_datetimetz_type(values, dtype, type_):
# Converting pyarrow.Table efficiently to pandas.DataFrame


def _reconstruct_block(item, columns=None, extension_columns=None):
def _reconstruct_block(item, columns=None, extension_columns=None, return_block=True):
"""
Construct a pandas Block from the `item` dictionary coming from pyarrow's
serialization or returned by arrow::python::ConvertTableToPandas.
Expand Down Expand Up @@ -709,114 +709,23 @@ def _reconstruct_block(item, columns=None, extension_columns=None):
block_arr = item.get('block', None)
placement = item['placement']
if 'dictionary' in item:
cat = _pandas_api.categorical_type.from_codes(
arr = _pandas_api.categorical_type.from_codes(
block_arr, categories=item['dictionary'],
ordered=item['ordered'])
block = _int.make_block(cat, placement=placement)
elif 'timezone' in item:
unit, _ = np.datetime_data(block_arr.dtype)
dtype = make_datetimetz(unit, item['timezone'])
if _pandas_api.is_ge_v21():
pd_arr = _pandas_api.pd.array(
arr = _pandas_api.pd.array(
block_arr.view("int64"), dtype=dtype, copy=False
)
block = _int.make_block(pd_arr, placement=placement)
else:
block = _int.make_block(block_arr, placement=placement,
klass=_int.DatetimeTZBlock,
dtype=dtype)
elif 'py_array' in item:
# create ExtensionBlock
arr = item['py_array']
assert len(placement) == 1
name = columns[placement[0]]
pandas_dtype = extension_columns[name]
if not hasattr(pandas_dtype, '__from_arrow__'):
raise ValueError("This column does not support to be converted "
"to a pandas ExtensionArray")
pd_ext_arr = pandas_dtype.__from_arrow__(arr)
block = _int.make_block(pd_ext_arr, placement=placement)
else:
block = _int.make_block(block_arr, placement=placement)

return block


def _reconstruct_sub_dataframe(item, columns=None, extension_columns=None):
"""
Construct a pandas Block from the `item` dictionary coming from pyarrow's
serialization or returned by arrow::python::ConvertTableToPandas.
This function takes care of converting dictionary types to pandas
categorical, Timestamp-with-timezones to the proper pandas Block, and
conversion to pandas ExtensionBlock
Parameters
----------
item : dict
For basic types, this is a dictionary in the form of
{'block': np.ndarray of values, 'placement': pandas block placement}.
Additional keys are present for other types (dictionary, timezone,
object).
columns :
Column names of the table being constructed, used for extension types
extension_columns : dict
Dictionary of {column_name: pandas_dtype} that includes all columns
and corresponding dtypes that will be converted to a pandas
ExtensionBlock.
Returns
-------
pandas Block
"""
from pandas import DataFrame

block_arr = item.get('block', None)
placement = item['placement']
if 'dictionary' in item:
block_arr = _pandas_api.categorical_type.from_codes(
block_arr, categories=item['dictionary'],
ordered=item['ordered'])
elif 'timezone' in item:
unit, _ = np.datetime_data(block_arr.dtype)
dtype = make_datetimetz(unit, item['timezone'])
block_arr = _pandas_api.pd.array(
block_arr.view("int64"), dtype=dtype, copy=False
)
elif 'py_array' in item:
# create ExtensionBlock
arr = item['py_array']
assert len(placement) == 1
name = columns[placement[0]]
pandas_dtype = extension_columns[name]
if not hasattr(pandas_dtype, '__from_arrow__'):
raise ValueError("This column does not support to be converted "
"to a pandas ExtensionArray")
block_arr = pandas_dtype.__from_arrow__(arr)
else:
# 2d block
df = DataFrame(block_arr.T, copy=False, dtype=block_arr.dtype)
return df, placement

df = DataFrame(block_arr, copy=False)
return df, placement


def _reconstruct_arrays(item, columns=None, extension_columns=None):

block_arr = item.get('block', None)
placement = item['placement']
if 'dictionary' in item:
arr = _pandas_api.categorical_type.from_codes(
block_arr, categories=item['dictionary'],
ordered=item['ordered'])
elif 'timezone' in item:
unit, _ = np.datetime_data(block_arr.dtype)
dtype = make_datetimetz(unit, item['timezone'])
arr = _pandas_api.pd.array(
block_arr.view("int64"), dtype=dtype, copy=False
)
arr = block_arr
if return_block:
block = _int.make_block(block_arr, placement=placement,
klass=_int.DatetimeTZBlock,
dtype=dtype)
return block
elif 'py_array' in item:
# create ExtensionBlock
arr = item['py_array']
Expand All @@ -828,10 +737,12 @@ def _reconstruct_arrays(item, columns=None, extension_columns=None):
"to a pandas ExtensionArray")
arr = pandas_dtype.__from_arrow__(arr)
else:
# 2d block
arr = block_arr[0]
arr = block_arr

return arr, placement
if return_block:
return _int.make_block(arr, placement=placement)
else:
return arr, placement


def make_datetimetz(unit, tz):
Expand All @@ -844,9 +755,6 @@ def make_datetimetz(unit, tz):
def table_to_dataframe(
options, table, categories=None, ignore_metadata=False, types_mapper=None
):
from pandas.core.internals import BlockManager
from pandas import DataFrame, concat

all_columns = []
column_indexes = []
pandas_metadata = table.schema.pandas_metadata
Expand All @@ -867,9 +775,17 @@ def table_to_dataframe(
_check_data_column_metadata_consistency(all_columns)
columns = _deserialize_column_index(table, all_columns, column_indexes)

column_names = table.column_names
result = pa.lib.table_to_blocks(options, table, categories,
list(ext_columns_dtypes.keys()))
if options["use_blocks"]:
blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes)
from pandas.core.internals import BlockManager
from pandas import DataFrame

blocks = [
_reconstruct_block(item, column_names, ext_columns_dtypes)
for item in result
]
axes = [columns, index]
mgr = BlockManager(blocks, axes)
if _pandas_api.is_ge_v21():
Expand All @@ -878,27 +794,14 @@ def table_to_dataframe(
df = DataFrame(mgr)
return df
else:
table_columns = table.column_names
extension_columns = list(ext_columns_dtypes.keys())
result = pa.lib.table_to_blocks(options, table, categories, extension_columns)
if not result:
return DataFrame(index=index, columns=columns)

# if options["split_blocks"]:
# arrays, placements = zip(*[
# _reconstruct_arrays(item, table_columns, ext_columns_dtypes)
# for item in result])
# return DataFrame._from_arrays(arrays, index=index, columns=columns)

dfs, placements = zip(*[
_reconstruct_sub_dataframe(item, table_columns, ext_columns_dtypes)
for item in result])

df = concat(dfs, axis=1, copy=False, ignore_index=True)
indexer = np.concatenate(placements).argsort()
df = df.take(indexer, axis=1)
df.index = index
df.columns = columns
from pandas.api.internals import create_dataframe_from_blocks

blocks = [
_reconstruct_block(
item, column_names, ext_columns_dtypes, return_block=False)
for item in result
]
df = create_dataframe_from_blocks(blocks, index=index, columns=columns)
return df


Expand Down Expand Up @@ -1216,16 +1119,6 @@ def _reconstruct_columns_from_metadata(columns, column_indexes):
return pd.Index(new_levels[0], dtype=new_levels[0].dtype, name=columns.name)


def _table_to_blocks(options, block_table, categories, extension_columns):
# Part of table_to_blockmanager
# Convert an arrow table to Block from the internal pandas API
columns = block_table.column_names
result = pa.lib.table_to_blocks(options, block_table, categories,
list(extension_columns.keys()))
return [_reconstruct_block(item, columns, extension_columns)
for item in result]


def _add_any_metadata(table, pandas_metadata):
modified_columns = {}
modified_fields = {}
Expand Down

0 comments on commit 6dee245

Please sign in to comment.