Skip to content

Commit

Permalink
FIX-#2830: read_csv with OmniSci backend compatibility with Pandas (#…
Browse files Browse the repository at this point in the history
…2905)

Co-authored-by: Dmitry Chigarev <[email protected]>
Co-authored-by: Vasily Litvinov <[email protected]>
Signed-off-by: Alexander Myskov <[email protected]>
  • Loading branch information
3 people authored Jun 25, 2021
1 parent 78381a6 commit e74cde9
Show file tree
Hide file tree
Showing 5 changed files with 273 additions and 33 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,8 @@ jobs:
run: MODIN_BENCHMARK_MODE=True pytest modin/pandas/test/internals/test_benchmark_mode.py
- shell: bash -l {0}
run: pytest modin/experimental/engines/omnisci_on_ray/test/test_dataframe.py
- shell: bash -l {0}
run: pytest modin/pandas/test/test_io.py::TestCsv
- shell: bash -l {0}
run: |
curl -o codecov https://codecov.io/bash
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ jobs:
run: sudo apt update && sudo apt install -y libhdf5-dev
- shell: bash -l {0}
run: pytest modin/experimental/engines/omnisci_on_ray/test/test_dataframe.py
- shell: bash -l {0}
run: pytest modin/pandas/test/test_io.py::TestCsv
- shell: bash -l {0}
run: |
curl -o codecov https://codecov.io/bash
Expand Down
227 changes: 202 additions & 25 deletions modin/experimental/engines/omnisci_on_ray/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,47 @@
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

from csv import Dialect
from typing import Union, Sequence, Callable, Dict, Tuple
import inspect
import os

from modin.experimental.backends.omnisci.query_compiler import DFAlgQueryCompiler
from modin.engines.ray.generic.io import RayIO
from modin.experimental.engines.omnisci_on_ray.frame.data import OmnisciOnRayFrame
from modin.error_message import ErrorMessage
from modin.engines.base.io.text.text_file_dispatcher import TextFileDispatcher

from pyarrow.csv import read_csv, ParseOptions, ConvertOptions, ReadOptions
import pyarrow as pa

import pandas
from pandas.io.parsers import _validate_usecols_arg
from pandas._typing import FilePathOrBuffer
from pandas.io.common import is_url

ReadCsvKwargsType = Dict[
str,
Union[
str,
int,
bool,
dict,
object,
Sequence,
Callable,
Dialect,
FilePathOrBuffer,
None,
],
]


class ArrowEngineException(Exception):
"""Exception raised in case of Arrow engine-specific incompatibilities are found."""

class OmnisciOnRayIO(RayIO):

class OmnisciOnRayIO(RayIO, TextFileDispatcher):

frame_cls = OmnisciOnRayFrame
query_compiler_cls = DFAlgQueryCompiler
Expand Down Expand Up @@ -78,6 +106,44 @@ class OmnisciOnRayIO(RayIO):
"low_memory",
"memory_map",
"float_precision",
"storage_options",
]

unsupported_args = [
"decimal",
"thousands",
"index_col",
"prefix",
"converters",
"skipfooter",
"true_values",
"false_values",
"nrows",
"skipinitialspace",
"squeeze",
"mangle_dupe_cols",
"na_values",
"keep_default_na",
"na_filter",
"verbose",
"infer_datetime_format",
"keep_date_col",
"date_parser",
"dayfirst",
"cache_dates",
"iterator",
"chunksize",
"encoding",
"lineterminator",
"dialect",
"quoting",
"comment",
"warn_bad_lines",
"error_bad_lines",
"low_memory",
"memory_map",
"float_precision",
"storage_options",
]

@classmethod
Expand Down Expand Up @@ -116,7 +182,7 @@ def read_csv(
chunksize=None,
compression="infer",
thousands=None,
decimal=b".",
decimal=".",
lineterminator=None,
quotechar='"',
quoting=0,
Expand All @@ -140,7 +206,11 @@ def read_csv(
try:
if eng in ["pandas", "c"]:
return cls._read(**mykwargs)

use_modin_impl, error_message = cls._read_csv_check_support(
mykwargs,
)
if not use_modin_impl:
raise ArrowEngineException(error_message)
if isinstance(dtype, dict):
column_types = {c: cls._dtype_to_arrow(t) for c, t in dtype.items()}
else:
Expand All @@ -150,31 +220,12 @@ def read_csv(
for c in parse_dates:
column_types[c] = pa.timestamp("s")

if names:
if header == 0:
skiprows = skiprows + 1 if skiprows is not None else 1
elif header is None or header == "infer":
pass
else:
raise NotImplementedError(
"read_csv with 'arrow' engine and provided 'names' parameter supports only 0, None and 'infer' header values"
)
else:
if header == 0 or header == "infer":
pass
else:
raise NotImplementedError(
"read_csv with 'arrow' engine without 'names' parameter provided supports only 0 and 'infer' header values"
)
if names and header == 0:
skiprows = skiprows + 1 if skiprows is not None else 1

if delimiter is None:
delimiter = sep

if delim_whitespace and delimiter != ",":
raise ValueError(
"Specified a delimiter and delim_whitespace=True; you can only specify one."
)

usecols_md = cls._prepare_pyarrow_usecols(mykwargs)

po = ParseOptions(
Expand Down Expand Up @@ -216,7 +267,12 @@ def read_csv(
)

return cls.from_arrow(at)
except (pa.ArrowNotImplementedError, NotImplementedError):
except (
pa.ArrowNotImplementedError,
pa.ArrowInvalid,
NotImplementedError,
ArrowEngineException,
):
if eng in ["arrow"]:
raise

Expand Down Expand Up @@ -288,3 +344,124 @@ def _prepare_pyarrow_usecols(cls, read_csv_kwargs):
raise NotImplementedError("unsupported `usecols` parameter")

return usecols_md

read_csv_unsup_defaults = {}
for k, v in inspect.signature(read_csv.__func__).parameters.items():
if v.default is not inspect.Parameter.empty and k in unsupported_args:
read_csv_unsup_defaults[k] = v.default

@classmethod
def _read_csv_check_support(
cls,
read_csv_kwargs: ReadCsvKwargsType,
) -> Tuple[bool, str]:
"""
Check if passed parameters are supported by current ``modin.pandas.read_csv`` implementation.
Parameters
----------
read_csv_kwargs : dict
Parameters of read_csv function.
Returns
-------
bool
Whether passed parameters are supported or not.
str
Error message that should be raised if user explicitly set `engine="arrow"`.
"""
filepath_or_buffer = read_csv_kwargs.get("filepath_or_buffer", None)
header = read_csv_kwargs.get("header", "infer")
names = read_csv_kwargs.get("names", None)
engine = read_csv_kwargs.get("engine", None)
skiprows = read_csv_kwargs.get("skiprows", None)
delimiter = read_csv_kwargs.get("delimiter", None)

if read_csv_kwargs.get("compression", "infer") != "infer":
return (
False,
"read_csv with 'arrow' engine doesn't support explicit compression parameter, compression"
" must be inferred automatically (supported compression types are gzip and bz2)",
)

if isinstance(filepath_or_buffer, str):
if not os.path.exists(filepath_or_buffer):
if cls.file_exists(filepath_or_buffer) or is_url(filepath_or_buffer):
return (
False,
"read_csv with 'arrow' engine supports only local files",
)
else:
raise FileNotFoundError("No such file or directory")
elif not cls.pathlib_or_pypath(filepath_or_buffer):
if hasattr(filepath_or_buffer, "read"):
return (
False,
"read_csv with 'arrow' engine doesn't support file-like objects",
)
else:
raise ValueError(
f"Invalid file path or buffer object type: {type(filepath_or_buffer)}"
)

for arg, def_value in cls.read_csv_unsup_defaults.items():
if read_csv_kwargs[arg] != def_value:
return (
False,
f"read_csv with 'arrow' engine doesn't support {arg} parameter",
)
if delimiter is not None and read_csv_kwargs.get("delim_whitespace", False):
raise ValueError(
"Specified a delimiter with both sep and delim_whitespace=True; you can only specify one."
)

if names:
if header not in [None, 0, "infer"]:
return (
False,
"read_csv with 'arrow' engine and provided 'names' parameter supports only 0, None and "
"'infer' header values",
)

empty_pandas_df = pandas.read_csv(
**dict(
read_csv_kwargs,
nrows=0,
skiprows=None,
skipfooter=0,
usecols=None,
index_col=None,
names=None,
engine=None if engine == "arrow" else engine,
),
)
columns_number = len(empty_pandas_df.columns)
if columns_number != len(names):
return (
False,
"read_csv with 'arrow' engine doesn't support names parameter, which length doesn't match "
"with actual number of columns",
)
else:
if header not in [0, "infer"]:
return (
False,
"read_csv with 'arrow' engine without 'names' parameter provided supports only 0 and 'infer' "
"header values",
)

if not read_csv_kwargs.get("skip_blank_lines", True):
# in some corner cases empty lines are handled as '',
# while pandas handles it as NaNs - issue #3084
return (
False,
"read_csv with 'arrow' engine doesn't support skip_blank_lines = False parameter",
)

if skiprows is not None and not isinstance(skiprows, int):
return (
False,
"read_csv with 'arrow' engine doesn't support non-integer skiprows parameter",
)

return True, None
Loading

0 comments on commit e74cde9

Please sign in to comment.