diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 65380a4dc36..35bcf9c59f3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -335,6 +335,8 @@ jobs: run: MODIN_BENCHMARK_MODE=True pytest modin/pandas/test/internals/test_benchmark_mode.py - shell: bash -l {0} run: pytest modin/experimental/engines/omnisci_on_ray/test/test_dataframe.py + - shell: bash -l {0} + run: pytest modin/pandas/test/test_io.py::TestCsv - shell: bash -l {0} run: | curl -o codecov https://codecov.io/bash diff --git a/.github/workflows/push.yml b/.github/workflows/push.yml index ac898d4c675..7c2599f781f 100644 --- a/.github/workflows/push.yml +++ b/.github/workflows/push.yml @@ -121,6 +121,8 @@ jobs: run: sudo apt update && sudo apt install -y libhdf5-dev - shell: bash -l {0} run: pytest modin/experimental/engines/omnisci_on_ray/test/test_dataframe.py + - shell: bash -l {0} + run: pytest modin/pandas/test/test_io.py::TestCsv - shell: bash -l {0} run: | curl -o codecov https://codecov.io/bash diff --git a/modin/experimental/engines/omnisci_on_ray/io.py b/modin/experimental/engines/omnisci_on_ray/io.py index 5567f7e0002..20d4c3c8d1b 100644 --- a/modin/experimental/engines/omnisci_on_ray/io.py +++ b/modin/experimental/engines/omnisci_on_ray/io.py @@ -11,19 +11,47 @@ # ANY KIND, either express or implied. See the License for the specific language # governing permissions and limitations under the License. +from csv import Dialect +from typing import Union, Sequence, Callable, Dict, Tuple +import inspect +import os + from modin.experimental.backends.omnisci.query_compiler import DFAlgQueryCompiler from modin.engines.ray.generic.io import RayIO from modin.experimental.engines.omnisci_on_ray.frame.data import OmnisciOnRayFrame from modin.error_message import ErrorMessage +from modin.engines.base.io.text.text_file_dispatcher import TextFileDispatcher from pyarrow.csv import read_csv, ParseOptions, ConvertOptions, ReadOptions import pyarrow as pa import pandas from pandas.io.parsers import _validate_usecols_arg +from pandas._typing import FilePathOrBuffer +from pandas.io.common import is_url + +ReadCsvKwargsType = Dict[ + str, + Union[ + str, + int, + bool, + dict, + object, + Sequence, + Callable, + Dialect, + FilePathOrBuffer, + None, + ], +] + +class ArrowEngineException(Exception): + """Exception raised in case of Arrow engine-specific incompatibilities are found.""" -class OmnisciOnRayIO(RayIO): + +class OmnisciOnRayIO(RayIO, TextFileDispatcher): frame_cls = OmnisciOnRayFrame query_compiler_cls = DFAlgQueryCompiler @@ -78,6 +106,44 @@ class OmnisciOnRayIO(RayIO): "low_memory", "memory_map", "float_precision", + "storage_options", + ] + + unsupported_args = [ + "decimal", + "thousands", + "index_col", + "prefix", + "converters", + "skipfooter", + "true_values", + "false_values", + "nrows", + "skipinitialspace", + "squeeze", + "mangle_dupe_cols", + "na_values", + "keep_default_na", + "na_filter", + "verbose", + "infer_datetime_format", + "keep_date_col", + "date_parser", + "dayfirst", + "cache_dates", + "iterator", + "chunksize", + "encoding", + "lineterminator", + "dialect", + "quoting", + "comment", + "warn_bad_lines", + "error_bad_lines", + "low_memory", + "memory_map", + "float_precision", + "storage_options", ] @classmethod @@ -116,7 +182,7 @@ def read_csv( chunksize=None, compression="infer", thousands=None, - decimal=b".", + decimal=".", lineterminator=None, quotechar='"', quoting=0, @@ -140,7 +206,11 @@ def read_csv( try: if eng in ["pandas", "c"]: return cls._read(**mykwargs) - + use_modin_impl, error_message = cls._read_csv_check_support( + mykwargs, + ) + if not use_modin_impl: + raise ArrowEngineException(error_message) if isinstance(dtype, dict): column_types = {c: cls._dtype_to_arrow(t) for c, t in dtype.items()} else: @@ -150,31 +220,12 @@ def read_csv( for c in parse_dates: column_types[c] = pa.timestamp("s") - if names: - if header == 0: - skiprows = skiprows + 1 if skiprows is not None else 1 - elif header is None or header == "infer": - pass - else: - raise NotImplementedError( - "read_csv with 'arrow' engine and provided 'names' parameter supports only 0, None and 'infer' header values" - ) - else: - if header == 0 or header == "infer": - pass - else: - raise NotImplementedError( - "read_csv with 'arrow' engine without 'names' parameter provided supports only 0 and 'infer' header values" - ) + if names and header == 0: + skiprows = skiprows + 1 if skiprows is not None else 1 if delimiter is None: delimiter = sep - if delim_whitespace and delimiter != ",": - raise ValueError( - "Specified a delimiter and delim_whitespace=True; you can only specify one." - ) - usecols_md = cls._prepare_pyarrow_usecols(mykwargs) po = ParseOptions( @@ -216,7 +267,12 @@ def read_csv( ) return cls.from_arrow(at) - except (pa.ArrowNotImplementedError, NotImplementedError): + except ( + pa.ArrowNotImplementedError, + pa.ArrowInvalid, + NotImplementedError, + ArrowEngineException, + ): if eng in ["arrow"]: raise @@ -288,3 +344,124 @@ def _prepare_pyarrow_usecols(cls, read_csv_kwargs): raise NotImplementedError("unsupported `usecols` parameter") return usecols_md + + read_csv_unsup_defaults = {} + for k, v in inspect.signature(read_csv.__func__).parameters.items(): + if v.default is not inspect.Parameter.empty and k in unsupported_args: + read_csv_unsup_defaults[k] = v.default + + @classmethod + def _read_csv_check_support( + cls, + read_csv_kwargs: ReadCsvKwargsType, + ) -> Tuple[bool, str]: + """ + Check if passed parameters are supported by current ``modin.pandas.read_csv`` implementation. + + Parameters + ---------- + read_csv_kwargs : dict + Parameters of read_csv function. + + Returns + ------- + bool + Whether passed parameters are supported or not. + str + Error message that should be raised if user explicitly set `engine="arrow"`. + """ + filepath_or_buffer = read_csv_kwargs.get("filepath_or_buffer", None) + header = read_csv_kwargs.get("header", "infer") + names = read_csv_kwargs.get("names", None) + engine = read_csv_kwargs.get("engine", None) + skiprows = read_csv_kwargs.get("skiprows", None) + delimiter = read_csv_kwargs.get("delimiter", None) + + if read_csv_kwargs.get("compression", "infer") != "infer": + return ( + False, + "read_csv with 'arrow' engine doesn't support explicit compression parameter, compression" + " must be inferred automatically (supported compression types are gzip and bz2)", + ) + + if isinstance(filepath_or_buffer, str): + if not os.path.exists(filepath_or_buffer): + if cls.file_exists(filepath_or_buffer) or is_url(filepath_or_buffer): + return ( + False, + "read_csv with 'arrow' engine supports only local files", + ) + else: + raise FileNotFoundError("No such file or directory") + elif not cls.pathlib_or_pypath(filepath_or_buffer): + if hasattr(filepath_or_buffer, "read"): + return ( + False, + "read_csv with 'arrow' engine doesn't support file-like objects", + ) + else: + raise ValueError( + f"Invalid file path or buffer object type: {type(filepath_or_buffer)}" + ) + + for arg, def_value in cls.read_csv_unsup_defaults.items(): + if read_csv_kwargs[arg] != def_value: + return ( + False, + f"read_csv with 'arrow' engine doesn't support {arg} parameter", + ) + if delimiter is not None and read_csv_kwargs.get("delim_whitespace", False): + raise ValueError( + "Specified a delimiter with both sep and delim_whitespace=True; you can only specify one." + ) + + if names: + if header not in [None, 0, "infer"]: + return ( + False, + "read_csv with 'arrow' engine and provided 'names' parameter supports only 0, None and " + "'infer' header values", + ) + + empty_pandas_df = pandas.read_csv( + **dict( + read_csv_kwargs, + nrows=0, + skiprows=None, + skipfooter=0, + usecols=None, + index_col=None, + names=None, + engine=None if engine == "arrow" else engine, + ), + ) + columns_number = len(empty_pandas_df.columns) + if columns_number != len(names): + return ( + False, + "read_csv with 'arrow' engine doesn't support names parameter, which length doesn't match " + "with actual number of columns", + ) + else: + if header not in [0, "infer"]: + return ( + False, + "read_csv with 'arrow' engine without 'names' parameter provided supports only 0 and 'infer' " + "header values", + ) + + if not read_csv_kwargs.get("skip_blank_lines", True): + # in some corner cases empty lines are handled as '', + # while pandas handles it as NaNs - issue #3084 + return ( + False, + "read_csv with 'arrow' engine doesn't support skip_blank_lines = False parameter", + ) + + if skiprows is not None and not isinstance(skiprows, int): + return ( + False, + "read_csv with 'arrow' engine doesn't support non-integer skiprows parameter", + ) + + return True, None diff --git a/modin/pandas/test/test_io.py b/modin/pandas/test/test_io.py index 0cdd5b965f1..55d639013c4 100644 --- a/modin/pandas/test/test_io.py +++ b/modin/pandas/test/test_io.py @@ -16,7 +16,7 @@ import pandas from pandas.errors import ParserWarning from collections import OrderedDict -from modin.config import TestDatasetSize +from modin.config import TestDatasetSize, Engine, Backend, IsExperimental from modin.utils import to_pandas from modin.pandas.utils import from_arrow import pyarrow as pa @@ -44,8 +44,6 @@ generate_dataframe, ) -from modin.config import Engine, Backend, IsExperimental - if Backend.get() == "Pandas": import modin.pandas as pd else: @@ -253,7 +251,7 @@ def test_read_csv_delimiters( # Column and Index Locations and Names tests @pytest.mark.skipif( - Engine.get() != "Python", + Engine.get() != "Python" and Backend.get() != "Omnisci", reason="many parameters combiantions fails: issue #2312, #2307", ) @pytest.mark.parametrize("header", ["infer", None, 0]) @@ -364,8 +362,26 @@ def test_read_csv_parsing_2( nrows, names, ): - if false_values or true_values and Engine.get() != "Python": + xfail_case_1 = ( + (false_values or true_values) + and Engine.get() != "Python" + and Backend.get() != "Omnisci" + ) + xfail_case_2 = ( + Backend.get() == "Omnisci" + and isinstance(skiprows, int) + and names is None + and skipfooter == 0 + and nrows is None + and not true_values + and not false_values + ) + if xfail_case_1: pytest.xfail("modin and pandas dataframes differs - issue #2446") + if xfail_case_2: + pytest.xfail( + "read_csv fails because of duplicated columns names - issue #3080" + ) if request.config.getoption("--simulate-cloud").lower() != "off": pytest.xfail( reason="The reason of tests fail in `cloud` mode is unknown for now - issue #2340" @@ -437,6 +453,10 @@ def test_read_csv_squeeze(self, request, test_case): ) def test_read_csv_mangle_dupe_cols(self): + if Backend.get() == "Omnisci": + pytest.xfail( + "processing of duplicated columns in OmniSci backend is not supported yet - issue #3080" + ) unique_filename = get_unique_filename() str_non_unique_cols = "col,col,col,col\n5, 6, 7, 8\n9, 10, 11, 12\n" eval_io_from_str(str_non_unique_cols, unique_filename, mangle_dupe_cols=True) @@ -492,6 +512,15 @@ def test_read_csv_datetime( reason="The reason of tests fail in `cloud` mode is unknown for now - issue #2340" ) + if ( + Backend.get() == "Omnisci" + and isinstance(parse_dates, list) + and ("col4" in parse_dates or 3 in parse_dates) + ): + pytest.xfail( + "In some cases read_csv with `parse_dates` with OmniSci backend outputs incorrect result - issue #3081" + ) + raising_exceptions = io_ops_bad_exc # default value if isinstance(parse_dates, dict) and callable(date_parser): # In this case raised TypeError: () takes 1 positional argument but 2 were given @@ -697,7 +726,7 @@ def test_read_csv_quoting( # Error Handling parameters tests @pytest.mark.xfail( - Engine.get() != "Python", + Engine.get() != "Python" and Backend.get() != "Omnisci", reason="read_csv with Ray engine doen't raise `bad lines` exceptions - issue #2500", ) @pytest.mark.parametrize("warn_bad_lines", [True, False]) @@ -863,7 +892,13 @@ def test_read_csv_s3(self): @pytest.mark.parametrize("names", [list("XYZ"), None]) @pytest.mark.parametrize("skiprows", [1, 2, 3, 4, None]) def test_read_csv_skiprows_names(self, names, skiprows): - + if Backend.get() == "Omnisci" and names is None and skiprows in [1, None]: + # If these conditions are satisfied, columns names will be inferred + # from the first row, that will contain duplicated values, that is + # not supported by `Omnisci` backend yet. + pytest.xfail( + "processing of duplicated columns in OmniSci backend is not supported yet - issue #3080" + ) eval_io( fn_name="read_csv", # read_csv kwargs @@ -913,7 +948,7 @@ def test_read_csv_newlines_in_quotes(self, nrows, skiprows): filepath_or_buffer="modin/pandas/test/data/newlines.csv", nrows=nrows, skiprows=skiprows, - cast_to_str=True, + cast_to_str=Backend.get() != "Omnisci", ) def test_read_csv_sep_none(self, request): @@ -961,6 +996,21 @@ def test_read_csv_names_neq_num_cols(self, request, kwargs): **kwargs, ) + def test_read_csv_wrong_path(self): + + raising_exceptions = [e for e in io_ops_bad_exc if e != FileNotFoundError] + + eval_io( + fn_name="read_csv", + raising_exceptions=raising_exceptions, + # read_csv kwargs + filepath_or_buffer="/some/wrong/path.csv", + ) + + @pytest.mark.skipif( + Backend.get() == "Omnisci", + reason="to_csv is not implemented with OmniSci backend yet - issue #3082", + ) @pytest.mark.parametrize("header", [False, True]) @pytest.mark.parametrize("mode", ["w", "wb+"]) def test_to_csv(self, request, header, mode): @@ -981,6 +1031,10 @@ def test_to_csv(self, request, header, mode): mode=mode, ) + @pytest.mark.skipif( + Backend.get() == "Omnisci", + reason="to_csv is not implemented with OmniSci backend yet - issue #3082", + ) def test_dataframe_to_csv(self, request): if request.config.getoption("--simulate-cloud").lower() != "off": pytest.xfail( @@ -992,6 +1046,10 @@ def test_dataframe_to_csv(self, request): modin_obj=modin_df, pandas_obj=pandas_df, fn="to_csv", extension="csv" ) + @pytest.mark.skipif( + Backend.get() == "Omnisci", + reason="to_csv is not implemented with OmniSci backend yet - issue #3082", + ) def test_series_to_csv(self, request): if request.config.getoption("--simulate-cloud").lower() != "off": pytest.xfail( diff --git a/requirements/env_omnisci.yml b/requirements/env_omnisci.yml index 575586d7057..5b4ed14f71f 100644 --- a/requirements/env_omnisci.yml +++ b/requirements/env_omnisci.yml @@ -19,3 +19,4 @@ dependencies: - ray-core >=1.0.0,<1.2.0 - openpyxl - xlrd + - sqlalchemy