Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

generate_statistics_from_pyarrow table or parquet #92

Open
tanguycdls opened this issue Oct 9, 2019 · 22 comments
Open

generate_statistics_from_pyarrow table or parquet #92

tanguycdls opened this issue Oct 9, 2019 · 22 comments

Comments

@tanguycdls
Copy link

/type feature
Hi, since TF records are already converted to Pyarrow Tables to compute statistics, how hard would it be to add an option to read directly Pyarrow file or Parquet file?

| 'DecodeData' >> tf_example_decoder.DecodeTFExample(

If my understanding of that code is correct we could replace beam.io.textio.ReadFromText by beam.io.parquetio.ReadFromParquet? if so will we need to extract features or the Pyarrow schema would be enough ?

My aim would be to use TFDV to extract data features and visualise them using facets.

Thanks

@gowthamkpr gowthamkpr self-assigned this Oct 9, 2019
@gowthamkpr
Copy link

gowthamkpr commented Oct 9, 2019

In this issue, its mentioned that the Towards the goal of adding support for computing statistics over structured data (e.g., arbitrary protocol buffers, parquet data), GenerateStatistics API will take Arrow tables as input instead of Dict[FeatureName, ndarray]

@caveness
Copy link
Collaborator

caveness commented Oct 9, 2019

One thing to note is that the GenerateStatisticsAPI will only accept Arrow tables whose columns are ListArray of primitive types (e.g., int8, int16, int32, int64, uint8, uint16, uint32, uint64, float16, float32, float64, binary, string, unicode), so Arrow tables that are not in that format will not work with the API.

@tanguycdls
Copy link
Author

Hi thanks for the prompt answer ! Sorry I did not search enough before sending the issue.

About the limitation of the input data to Arrow Tables of ListArray of primitives: It does not include List of List dtypes? If I use those dtypes what can I do ?

When converted to TfRecords it seems it worked when printing the facets: I had access to the quantiles of length of the records.

@jlafaye
Copy link

jlafaye commented Feb 1, 2020

Many people use pandas or spark in their data preparation stage. Those tools dump parquet files in which every column is an array of 'primitive type', not an array of 'list of primitive types'.
I noticed that and tried to work around this limitation by attempting to cast between the two types but pyarrow does not support these kind of casts:

Traceback (most recent call last): File "apache_beam/runners/common.py", line 813, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 448, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 928, in apache_beam.runners.common._OutputProcessor.process_outputs File "beam_test.py", line 64, in process yield table.cast(schema) File "pyarrow/table.pxi", line 1334, in itercolumns File "pyarrow/table.pxi", line 203, in pyarrow.lib.ChunkedArray.cast File "pyarrow/error.pxi", line 86, in pyarrow.lib.check_status pyarrow.lib.ArrowNotImplementedError: No cast implemented from int32 to list<item: int32>

Just wanted to share my interest in this kind of feature. I would by happy to help if needed.

@tanguycdls
Copy link
Author

tanguycdls commented Mar 18, 2020

import numpy as np
import pyarrow as pa
import numpy as np
import pyarrow as pa
from tensorflow_data_validation.statistics import stats_options as options
from apache_beam.io.filesystem import CompressionTypes
import apache_beam as beam
from typing import Any, List, Optional, Text
from tensorflow_metadata.proto.v0 import statistics_pb2
import os
import tempfile
import tensorflow as tf
from tensorflow_data_validation import constants
from tensorflow_data_validation.api import stats_api
from tensorflow_data_validation import load_statistics

def convert_to_list_array(column):
        if isinstance(column.type, pa.lib.ListType):
            return column
        type_ = column.type.to_pandas_dtype()
        if hasattr(type_, 'kind'):
            if type_.kind == 'M':
                # tfdv does not support dates and pyarrow cannot cast from datetime to string natively
                return pa.lib.ListArray.from_arrays(np.arange(len(column)+1), column.to_pandas().astype(str))
        assert column.num_chunks == 1, 'does not work with more than one chunk'
        #Wrap in a list
        return pa.lib.ListArray.from_arrays(np.arange(len(column)+1), column.chunks[0])
    
def transform_2_valid_pyarrow(path_to_file, output_path):
    table = pa.parquet.read_table(path_to_file)
    res = {}
    for col in table.column_names:
        res[col] = convert_to_list_array(table[col])
    tb = pa.table(res)
    pa.parquet.write_table(tb, output_path)
    return output_path

def generate_statistics_from_arrow(
    data_location: Text,
    output_path: Optional[bytes] = None,
    stats_options: options.StatsOptions = options.StatsOptions(),
    pipeline_options: Optional[PipelineOptions] = None,
    compression_type: Text = CompressionTypes.AUTO,
) -> statistics_pb2.DatasetFeatureStatisticsList:
    if output_path is None:
        output_path = os.path.join(tempfile.mkdtemp(), 'data_stats.tfrecord')
    output_dir_path = os.path.dirname(output_path)
    if not tf.io.gfile.exists(output_dir_path):
        tf.io.gfile.makedirs(output_dir_path)

    batch_size = (stats_options.desired_batch_size if stats_options.desired_batch_size
      and stats_options.desired_batch_size > 0 else
      constants.DEFAULT_DESIRED_INPUT_BATCH_SIZE)
    with beam.Pipeline(options=pipeline_options) as p:
        _ = (
        p
        | 'ReadData' >> beam.io.parquetio.ReadFromParquetBatched(
        file_pattern=data_location, )
        | 'GenerateStatistics' >> stats_api.GenerateStatistics(stats_options)
        | 'WriteStatsOutput' >> beam.io.WriteToTFRecord(
        output_path,
        shard_name_template='',
        coder=beam.coders.ProtoCoder(
        statistics_pb2.DatasetFeatureStatisticsList)))
    return load_statistics(output_path)

The following should work and not cost too much in terms of memory !

@brills
Copy link
Contributor

brills commented Mar 18, 2020

one caveat to that solution is that your original columns must not contain null (nil, None, etc).

@tanguycdls
Copy link
Author

one caveat to that solution is that your original columns must not contain null (nil, None, etc).

Hi thanks for your answer indeed my dataframe was already null safe (replaced by 0) so i did not notice it...
So the easiest solution to use that would be to convert to TFrecords and let it handle all the conversions ?

If i can access the null mask of the array it should not be too hard to make the List array correctly but i'm not sure how to get it.

Thanks

@brills
Copy link
Contributor

brills commented Mar 19, 2020

I think there may be an easy patch to your existing solution so it handles Nulls correctly, but I'm not familiar with pandas APIs.

Basically, we want to translate something like pd.Series([1, 2, None, 3]) to pa.array([[1], [2], None, [3]))

Note that pa.array([1], [2], None, [3]) is essentially

pa.ListArray.from_arrays(
  # mask[i] == True <==> list_array[i] is null
  offsets=pa.array([0,1,2,2,3], mask=np.array([False, False, True, False, False]),
  values=pa.array([1, 2, 3]))

So essentially, you can write a function that:

  1. identifies the indices of all the nulls in a pd.Series
  2. removes all the nulls
  3. assemble a ListArray like above.

@brills
Copy link
Contributor

brills commented Mar 26, 2020

@tanguycdls : if you have something workable, do you mind making a contribution?

@tanguycdls
Copy link
Author

Hi @brills
I'm still struggling with the null handling.
An easy way of doing this would be to convert to pandas and retrieve the null mask but it would really costly for large datasets.
I found that script used in Fletcher (backend for arrow for Pandas): https://github.com/xhochy/fletcher/blob/9f50e39e378b7fa5d34616311e3207b395355bc2/fletcher/_algorithms.py#L70

I did not have time yet to go through it and merge all the pieces together, if I do I will share my code !

@tanguycdls
Copy link
Author

tanguycdls commented Mar 29, 2020

Hi, i worked a bit on the pyarrow side today: actually List Array does not have a mask parameter in the from_arrays function? are you running w/ a nightly version https://github.com/apache/arrow/blob/c49b960d2b697135c8de45222c6377e427ba8aad/python/pyarrow/array.pxi#L1402

To have null you need: offset[j] = None --> arr[j] = None

I did the following:

def get_null_mask(arr):
    return arr.to_pandas().isna().values # thats costly ...
def create_offset(null_mask):
    # to have null in a pyarrow List of List you need:
    # offset[j] = None -> arr[j] = None
    offset = (null_mask == False).cumsum() - 1 # if first value is None it will be replaced so Ok otherwise it will be 0
    offset = np.concatenate([offset, [offset[-1] + 1]]).astype(object)
    offset[np.where(null_mask==True)[0]] = None
    return offset
def get_values(arr, null_mask):
    return arr.take(pa.array(np.where(null_mask==False)[0]))

def transform_null_to_list_list(arr):
    null_mask = get_null_mask(arr)
    offset = create_offset(null_mask)
    values = get_values(arr, null_mask)
    return pa.ListArray.from_arrays(offset, values)
transform_null_to_list_list(pa.array([None, 1, 7, 9, None, 6]))

[None, [1], [7], [9], None, [6]]

def convert_to_list_array(array):
    if isinstance(array.type, pa.lib.ListType):
        return array
    type_ = column.type.to_pandas_dtype()
    if hasattr(type_, 'kind'):
        if type_.kind == 'M':
            # tfdv does not support dates and pyarrow cannot cast from datetime to string natively
            return pa.lib.ListArray.from_arrays(np.arange(len(array)+1),
                                                pd.to_datetime(array.to_pandas()).dt.strftime('%Y-%m-%d'))
    assert column.num_chunks == 1, 'Function is not compatible with array with more than one chunk'
    if array.null_count == 0:
        return pa.lib.ListArray.from_arrays(np.arange(len(array)+1), array.chunks[0])
    else:
        return transform_null_to_list_list(array.chunks[0])

The to_pandas transformation is costly so still need to figure out how to avoid it!

EDIT i added the full example to use it directly in Beam:
https://gist.github.com/tanguycdls/0a1f7b928a27f9a4c5659f17d315999c

If you're interested i can make a pr out of it, it will be easier to check if all the cases are OK.

@brills
Copy link
Contributor

brills commented Mar 30, 2020

@tanguycdls

actually List Array does not have a mask parameter in the from_arrays function? are you running w/ a nightly version

It's true that ListArray.from_arrays doesn't accept a mask parameter, but the offsets parameter actually serves two purposes:
if offset[i] == null, then arr[i] is null (but note that offset[i]'s value in the buffer still matters!)
https://github.com/apache/arrow/blob/0facdc77b7d3ddba5c2982a6ea8167ab12336a9a/cpp/src/arrow/array.cc#L241

So you should be able to do something like

offsets = pa.array(range(0, num_values + 1), mask=<figure_out_the_mask>)

def get_null_mask(arr):
return arr.to_pandas().isna().values # thats costly ...

you can use tfx_bsl.arrow.array_util.GetArrayNullBitmapAsByteArray instead. (note that it's a private API in tfx_bsl, so I would encourage you to make a contribution if it works, to avoid depending on something that has no guarantees.)

@tanguycdls
Copy link
Author

Thanks for the help @brills !

It's true that ListArray.from_arrays doesn't accept a mask parameter, but the offsets parameter actually serves two purposes:
if offset[i] == null, then arr[i] is null (but note that offset[i]'s value in the buffer still matters!)
https://github.com/apache/arrow/blob/0facdc77b7d3ddba5c2982a6ea8167ab12336a9a/cpp/src/arrow/array.cc#L241

So you should be able to do something like

offsets = pa.array(range(0, num_values + 1), mask=<figure_out_the_mask>)

It simplifies a lot the code:


def create_offset(null_mask):
    null_mask = np.concatenate([null_mask, [False]])
    return pa.array((null_mask == False).cumsum() - 1, mask=null_mask)

def get_values(arr, null_mask):
    return arr.take(pa.array(np.where(null_mask==False)[0]))

def transform_null_to_list_list(arr):
    null_mask = array_util.GetArrayNullBitmapAsByteArray(arr).to_numpy().astype(bool)
    offset = create_offset(null_mask)
    values = get_values(arr, null_mask)
    return pa.ListArray.from_arrays(offset, values)

example = pa.array([None, 1, 7, 9, None, 6, None])
transform_null_to_list_list(example)

I will try to contribute that code as soon as I have more time on my side and once we're sure everything works well !

@brills
Copy link
Contributor

brills commented Mar 31, 2020

nice!

If you don't mind I can add this to tfx_bsl, and revise TFDV's generate_statistics_from_pyarrow. We recently find other libraries could also benefit from this adapter.

@brills
Copy link
Contributor

brills commented Apr 1, 2020

Sorry, I meant to revise generate_statistics_from_pandas. I could imagine a generate_statistics_from_pyarrow, but that will take longer (contributions are welcomed!)

@tanguycdls
Copy link
Author

Sorry, I meant to revise generate_statistics_from_pandas. I could imagine a generate_statistics_from_pyarrow, but that will take longer (contributions are welcomed!)

Ok in that case i can try to contrib on the generate_statistics_from_parquet or pyarrow table, we are more interested by that use case and calling the private method seems to be a bad idea!

@brills
Copy link
Contributor

brills commented Apr 7, 2020

tensorflow/tfx-bsl@d6cc2b8 added the conversion function to tfx_bsl

@khorshuheng
Copy link

@brills I created the generate_statistics_from_parquet method, based on the conversion function added to tfx_bsl:
https://gist.github.com/khorshuheng/4e0d305463a0cd5a5f7600706d619660

To verify the correctness, i compared the statistics generated from the above method, with generate_statistics_from_csv. tfdv version is 0.25.0, pyarrow version is 0.17.0. The training dataset comes from the tutorial example (https://storage.googleapis.com/artifacts.tfx-oss-public.appspot.com/datasets/chicago_data.zip).

Most statistics are correct, apart from a slight discrepancy for histogram statistics for trip_start_timestamp.

Does the github gist above decode the parquet file correctly? Or are there additional postprocessing required?

@brills
Copy link
Contributor

brills commented Dec 8, 2020

apart from a slight discrepancy for histogram statistics for trip_start_timestamp.

The histogram is computed using an approximate algorithm and due to the nondeterministic nature of beam, the result may even change across runs. So if the diff is slight it could be expected.

@khorshuheng
Copy link

@brills Thanks for the clarification. In that case, can i submit a pull request based on github gist which i submitted? Or is there more work which is required?

@sethcoast
Copy link

@brills @khorshuheng Any update on when generate_statistics_from_parquet() will be officially added to tfdv? I could really benefit from it.

@DavidFarago
Copy link

The generate_statistics_from_arrow() above is not working for me:

pyarrow.lib.ArrowInvalid: Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants