From aca8db5cb3bc3caf2bfd8a5f157886c8e730ec44 Mon Sep 17 00:00:00 2001 From: Rehan Durrani Date: Mon, 23 May 2022 12:30:59 -0700 Subject: [PATCH 01/16] FIX-#4479: Prevent users from using a local filepath when performing a distributed write Signed-off-by: Rehan Durrani --- docs/release_notes/release_notes-0.15.0.rst | 1 + .../implementations/pandas_on_ray/io/io.py | 12 +++++++++ .../execution/ray/implementations/utils.py | 25 +++++++++++++++++++ 3 files changed, 38 insertions(+) create mode 100644 modin/core/execution/ray/implementations/utils.py diff --git a/docs/release_notes/release_notes-0.15.0.rst b/docs/release_notes/release_notes-0.15.0.rst index 67d4718a2d4..606759add12 100644 --- a/docs/release_notes/release_notes-0.15.0.rst +++ b/docs/release_notes/release_notes-0.15.0.rst @@ -35,6 +35,7 @@ Key Features and Updates * * Developer API enhancements * FEAT-#4359: Add __dataframe__ method to the protocol dataframe (#4360) + * FIX-#4479: Prevent users from using a local filepath when performing a distributed write (#4484) * Update testing suite * TEST-#4363: Use Ray from pypi in CI (#4364) * FIX-#4422: get rid of case sensitivity for `warns_that_defaulting_to_pandas` (#4423) diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py index bcba828f42b..6286c28d49e 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py @@ -42,6 +42,7 @@ from modin.core.execution.ray.common import RayTask, SignalActor from ..dataframe import PandasOnRayDataframe from ..partitioning import PandasOnRayDataframePartition +from modin.core.execution.ray.implementations.utils import is_local_path class PandasOnRayIO(RayIO): @@ -165,6 +166,12 @@ def to_csv(cls, qc, **kwargs): if not cls._to_csv_check_support(kwargs): return RayIO.to_csv(qc, **kwargs) + if len(ray.nodes()) > 1: + path = kwargs["path_or_buf"] + if is_local_path(path): + raise ValueError("`path_or_buf` must point to a networked file or buffer when in cluster mode.") + + signals = SignalActor.remote(len(qc._modin_frame._partitions) + 1) def func(df, **kw): @@ -276,6 +283,11 @@ def to_parquet(cls, qc, **kwargs): """ if not cls._to_parquet_check_support(kwargs): return RayIO.to_parquet(qc, **kwargs) + + if len(ray.nodes()) > 1: + path = kwargs["path_or_buf"] + if is_local_path(path): + raise ValueError("`path_or_buf` must point to a networked file or buffer when in cluster mode.") def func(df, **kw): """ diff --git a/modin/core/execution/ray/implementations/utils.py b/modin/core/execution/ray/implementations/utils.py new file mode 100644 index 00000000000..798db3d92f4 --- /dev/null +++ b/modin/core/execution/ray/implementations/utils.py @@ -0,0 +1,25 @@ +import os +import pathlib +import re + +S3_ADDRESS_REGEX = re.compile("[sS]3://(.*?)/(.*)") + +def is_local_path(path_or_buf) -> bool: + """ + Return True if the specified path_or_buf is a local path, False otherwise. + + Parameters + ---------- + path_or_buf : str, path object or file-like object + The path or buffer to check. + + Returns + ------- + Whether the `path_or_buf` points to a local file. + """ + if isinstance(path_or_buf, str): + if S3_ADDRESS_REGEX.match(path_or_buf) is not None or "://" in path_or_buf: + return False # S3 or network path. + if isinstance(path_or_buf, str) or isinstance(path_or_buf, pathlib.PurePath): + return os.path.exists(path_or_buf) + return False From e622fcfafeeed66899a9deaff395bf934e79ab6d Mon Sep 17 00:00:00 2001 From: Rehan Durrani Date: Mon, 23 May 2022 12:32:51 -0700 Subject: [PATCH 02/16] lint Signed-off-by: Rehan Durrani --- .../ray/implementations/pandas_on_ray/io/io.py | 11 +++++++---- modin/core/execution/ray/implementations/utils.py | 5 +++-- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py index 6286c28d49e..896b69f08f3 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py @@ -169,8 +169,9 @@ def to_csv(cls, qc, **kwargs): if len(ray.nodes()) > 1: path = kwargs["path_or_buf"] if is_local_path(path): - raise ValueError("`path_or_buf` must point to a networked file or buffer when in cluster mode.") - + raise ValueError( + "`path_or_buf` must point to a networked file or buffer when in cluster mode." + ) signals = SignalActor.remote(len(qc._modin_frame._partitions) + 1) @@ -283,11 +284,13 @@ def to_parquet(cls, qc, **kwargs): """ if not cls._to_parquet_check_support(kwargs): return RayIO.to_parquet(qc, **kwargs) - + if len(ray.nodes()) > 1: path = kwargs["path_or_buf"] if is_local_path(path): - raise ValueError("`path_or_buf` must point to a networked file or buffer when in cluster mode.") + raise ValueError( + "`path_or_buf` must point to a networked file or buffer when in cluster mode." + ) def func(df, **kw): """ diff --git a/modin/core/execution/ray/implementations/utils.py b/modin/core/execution/ray/implementations/utils.py index 798db3d92f4..ba020b2943d 100644 --- a/modin/core/execution/ray/implementations/utils.py +++ b/modin/core/execution/ray/implementations/utils.py @@ -4,6 +4,7 @@ S3_ADDRESS_REGEX = re.compile("[sS]3://(.*?)/(.*)") + def is_local_path(path_or_buf) -> bool: """ Return True if the specified path_or_buf is a local path, False otherwise. @@ -12,14 +13,14 @@ def is_local_path(path_or_buf) -> bool: ---------- path_or_buf : str, path object or file-like object The path or buffer to check. - + Returns ------- Whether the `path_or_buf` points to a local file. """ if isinstance(path_or_buf, str): if S3_ADDRESS_REGEX.match(path_or_buf) is not None or "://" in path_or_buf: - return False # S3 or network path. + return False # S3 or network path. if isinstance(path_or_buf, str) or isinstance(path_or_buf, pathlib.PurePath): return os.path.exists(path_or_buf) return False From be74c8ed053e203fc1d6ccc69b940640ea457bce Mon Sep 17 00:00:00 2001 From: Rehan Durrani Date: Mon, 23 May 2022 12:54:11 -0700 Subject: [PATCH 03/16] Fix docstring Signed-off-by: Rehan Durrani --- modin/core/execution/ray/implementations/utils.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/modin/core/execution/ray/implementations/utils.py b/modin/core/execution/ray/implementations/utils.py index ba020b2943d..20a3ad962ac 100644 --- a/modin/core/execution/ray/implementations/utils.py +++ b/modin/core/execution/ray/implementations/utils.py @@ -1,3 +1,18 @@ +# Licensed to Modin Development Team under one or more contributor license agreements. +# See the NOTICE file distributed with this work for additional information regarding +# copyright ownership. The Modin Development Team licenses this file to you under the +# Apache License, Version 2.0 (the "License"); you may not use this file except in +# compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under +# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific language +# governing permissions and limitations under the License. + +"""Collection of utility functions for distributed io.""" + import os import pathlib import re From 7b1dff643749602c1f23365fe0aef51146c2ccd4 Mon Sep 17 00:00:00 2001 From: Rehan Sohail Durrani Date: Wed, 25 May 2022 20:25:01 -0700 Subject: [PATCH 04/16] Update modin/core/execution/ray/implementations/utils.py Co-authored-by: Yaroslav Igoshev --- modin/core/execution/ray/implementations/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modin/core/execution/ray/implementations/utils.py b/modin/core/execution/ray/implementations/utils.py index 20a3ad962ac..6210b7f2689 100644 --- a/modin/core/execution/ray/implementations/utils.py +++ b/modin/core/execution/ray/implementations/utils.py @@ -22,7 +22,7 @@ def is_local_path(path_or_buf) -> bool: """ - Return True if the specified path_or_buf is a local path, False otherwise. + Return ``True`` if the specified `path_or_buf` is a local path, ``False`` otherwise. Parameters ---------- From a313a79e8cf06608b434dd970af03d8b69582ef1 Mon Sep 17 00:00:00 2001 From: Rehan Sohail Durrani Date: Wed, 25 May 2022 20:25:12 -0700 Subject: [PATCH 05/16] Update modin/core/execution/ray/implementations/utils.py Co-authored-by: Mahesh Vashishtha --- modin/core/execution/ray/implementations/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modin/core/execution/ray/implementations/utils.py b/modin/core/execution/ray/implementations/utils.py index 6210b7f2689..0ece205fa98 100644 --- a/modin/core/execution/ray/implementations/utils.py +++ b/modin/core/execution/ray/implementations/utils.py @@ -36,6 +36,6 @@ def is_local_path(path_or_buf) -> bool: if isinstance(path_or_buf, str): if S3_ADDRESS_REGEX.match(path_or_buf) is not None or "://" in path_or_buf: return False # S3 or network path. - if isinstance(path_or_buf, str) or isinstance(path_or_buf, pathlib.PurePath): + if isinstance(path_or_buf, (str, pathlib.PurePath)): return os.path.exists(path_or_buf) return False From 1dd86361b7132d5db0d78764776a1058c25d1900 Mon Sep 17 00:00:00 2001 From: Rehan Sohail Durrani Date: Wed, 25 May 2022 20:25:25 -0700 Subject: [PATCH 06/16] Update modin/core/execution/ray/implementations/pandas_on_ray/io/io.py Co-authored-by: Mahesh Vashishtha --- .../ray/implementations/pandas_on_ray/io/io.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py index 896b69f08f3..0554b42fe81 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py @@ -166,12 +166,10 @@ def to_csv(cls, qc, **kwargs): if not cls._to_csv_check_support(kwargs): return RayIO.to_csv(qc, **kwargs) - if len(ray.nodes()) > 1: - path = kwargs["path_or_buf"] - if is_local_path(path): - raise ValueError( - "`path_or_buf` must point to a networked file or buffer when in cluster mode." - ) + if len(ray.nodes()) > 1 and is_local_path(kwargs["path_or_buf"]): + raise ValueError( + "`path_or_buf` must point to a networked file or buffer when in cluster mode." + ) signals = SignalActor.remote(len(qc._modin_frame._partitions) + 1) From 419108870b2c5a6c918524c525de014b06dd4dff Mon Sep 17 00:00:00 2001 From: Rehan Sohail Durrani Date: Wed, 25 May 2022 20:26:04 -0700 Subject: [PATCH 07/16] Update modin/core/execution/ray/implementations/pandas_on_ray/io/io.py Co-authored-by: Mahesh Vashishtha --- .../ray/implementations/pandas_on_ray/io/io.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py index 0554b42fe81..44c3118dbbb 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py @@ -283,12 +283,10 @@ def to_parquet(cls, qc, **kwargs): if not cls._to_parquet_check_support(kwargs): return RayIO.to_parquet(qc, **kwargs) - if len(ray.nodes()) > 1: - path = kwargs["path_or_buf"] - if is_local_path(path): - raise ValueError( - "`path_or_buf` must point to a networked file or buffer when in cluster mode." - ) + if len(ray.nodes()) > 1 and is_local_path(kwargs["path_or_buf"]): + raise ValueError( + "`path_or_buf` must point to a networked file or buffer when in cluster mode." + ) def func(df, **kw): """ From 8c7120bdf5a0c8a9c698106a0ac6c560552062fa Mon Sep 17 00:00:00 2001 From: Rehan Durrani Date: Wed, 25 May 2022 20:35:39 -0700 Subject: [PATCH 08/16] Address review comments Signed-off-by: Rehan Durrani --- modin/core/execution/ray/implementations/pandas_on_ray/io/io.py | 2 +- modin/core/{execution/ray/implementations => io}/utils.py | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename modin/core/{execution/ray/implementations => io}/utils.py (100%) diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py index 44c3118dbbb..7a745e06da4 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py @@ -42,7 +42,7 @@ from modin.core.execution.ray.common import RayTask, SignalActor from ..dataframe import PandasOnRayDataframe from ..partitioning import PandasOnRayDataframePartition -from modin.core.execution.ray.implementations.utils import is_local_path +from modin.core.io.utils import is_local_path class PandasOnRayIO(RayIO): diff --git a/modin/core/execution/ray/implementations/utils.py b/modin/core/io/utils.py similarity index 100% rename from modin/core/execution/ray/implementations/utils.py rename to modin/core/io/utils.py From dcadcf44476a3a520198ebe9090fbe30abc74458 Mon Sep 17 00:00:00 2001 From: Rehan Durrani Date: Wed, 25 May 2022 21:08:01 -0700 Subject: [PATCH 09/16] Address review comments and fix util for non-local paths and non-existent local paths Signed-off-by: Rehan Durrani --- .../implementations/pandas_on_ray/io/io.py | 4 +-- modin/core/io/utils.py | 36 ++++++++++++++++++- 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py index 7a745e06da4..4b306694a09 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py @@ -169,7 +169,7 @@ def to_csv(cls, qc, **kwargs): if len(ray.nodes()) > 1 and is_local_path(kwargs["path_or_buf"]): raise ValueError( "`path_or_buf` must point to a networked file or buffer when in cluster mode." - ) + ) signals = SignalActor.remote(len(qc._modin_frame._partitions) + 1) @@ -286,7 +286,7 @@ def to_parquet(cls, qc, **kwargs): if len(ray.nodes()) > 1 and is_local_path(kwargs["path_or_buf"]): raise ValueError( "`path_or_buf` must point to a networked file or buffer when in cluster mode." - ) + ) def func(df, **kw): """ diff --git a/modin/core/io/utils.py b/modin/core/io/utils.py index 0ece205fa98..90a7d98a108 100644 --- a/modin/core/io/utils.py +++ b/modin/core/io/utils.py @@ -16,6 +16,7 @@ import os import pathlib import re +from typing import Union S3_ADDRESS_REGEX = re.compile("[sS]3://(.*?)/(.*)") @@ -37,5 +38,38 @@ def is_local_path(path_or_buf) -> bool: if S3_ADDRESS_REGEX.match(path_or_buf) is not None or "://" in path_or_buf: return False # S3 or network path. if isinstance(path_or_buf, (str, pathlib.PurePath)): - return os.path.exists(path_or_buf) + if os.path.exists(path_or_buf): + return True + local_device_id = os.stat(os.getcwd()).st_dev + path_device_id = get_device_id(path_or_buf) + if path_device_id == local_device_id: + return True return False + + +def get_device_id(path: Union[str, pathlib.PurePath]) -> Union[int, None]: + """ + Return the result of `os.stat(path).st_dev` for the portion of `path` that exists locally. + + Parameters + ---------- + path : str, path object + The path to check. + + Returns + ------- + The `st_dev` field of `os.stat` of the portion of the `path` that exists locally, None if no + part of the path exists locally. + """ + index = 1 + path_list = list(pathlib.Path(path).parts) + if path_list[0] == "/": + index += 1 + try: + os.stat(os.path.join(*path_list[:index])) + except: + return None + while os.path.exists(os.path.join(*path_list[:index])): + index += 1 + index -= 1 + return os.stat(os.path.join(*path_list[:index])).st_dev From b6552ba457da0a1ffa67ae7ef8f1b61e1918555b Mon Sep 17 00:00:00 2001 From: Rehan Sohail Durrani Date: Thu, 26 May 2022 18:27:09 -0700 Subject: [PATCH 10/16] Update modin/core/io/utils.py Co-authored-by: Mahesh Vashishtha --- modin/core/io/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modin/core/io/utils.py b/modin/core/io/utils.py index 90a7d98a108..7723e8f935e 100644 --- a/modin/core/io/utils.py +++ b/modin/core/io/utils.py @@ -16,7 +16,7 @@ import os import pathlib import re -from typing import Union +from typing import Optional, Union S3_ADDRESS_REGEX = re.compile("[sS]3://(.*?)/(.*)") From 0608f9e5b7ffd00a997da470cbf346fc6af41838 Mon Sep 17 00:00:00 2001 From: Rehan Sohail Durrani Date: Thu, 26 May 2022 18:27:16 -0700 Subject: [PATCH 11/16] Update modin/core/io/utils.py Co-authored-by: Mahesh Vashishtha --- modin/core/io/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modin/core/io/utils.py b/modin/core/io/utils.py index 7723e8f935e..0a6947c1cd7 100644 --- a/modin/core/io/utils.py +++ b/modin/core/io/utils.py @@ -47,7 +47,7 @@ def is_local_path(path_or_buf) -> bool: return False -def get_device_id(path: Union[str, pathlib.PurePath]) -> Union[int, None]: +def get_device_id(path: Union[str, pathlib.PurePath]) -> Optional[int]: """ Return the result of `os.stat(path).st_dev` for the portion of `path` that exists locally. From cc80818437ee4dace67cb127bf2a7702d6c1f499 Mon Sep 17 00:00:00 2001 From: Rehan Durrani Date: Thu, 26 May 2022 18:50:34 -0700 Subject: [PATCH 12/16] Add testing Signed-off-by: Rehan Durrani --- modin/core/io/utils.py | 4 ++-- modin/pandas/test/test_io.py | 19 +++++++++++++++++++ 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/modin/core/io/utils.py b/modin/core/io/utils.py index 0a6947c1cd7..68a5d54faf3 100644 --- a/modin/core/io/utils.py +++ b/modin/core/io/utils.py @@ -67,9 +67,9 @@ def get_device_id(path: Union[str, pathlib.PurePath]) -> Optional[int]: index += 1 try: os.stat(os.path.join(*path_list[:index])) - except: + except Exception: return None - while os.path.exists(os.path.join(*path_list[:index])): + while os.path.exists(os.path.join(*path_list[:index])) and index <= len(path_list): index += 1 index -= 1 return os.stat(os.path.join(*path_list[:index])).st_dev diff --git a/modin/pandas/test/test_io.py b/modin/pandas/test/test_io.py index 3348c07e244..69d2cba3a0a 100644 --- a/modin/pandas/test/test_io.py +++ b/modin/pandas/test/test_io.py @@ -36,6 +36,7 @@ from modin.utils import to_pandas from modin.pandas.utils import from_arrow from modin.test.test_utils import warns_that_defaulting_to_pandas +from modin.core.io.utils import is_local_path import pyarrow as pa import os from scipy import sparse @@ -2378,3 +2379,21 @@ def test_to_period(): ) modin_df, pandas_df = create_test_dfs(TEST_DATA, index=index) df_equals(modin_df.to_period(), pandas_df.to_period()) + + +def test_is_local_path(): + s3_path = "s3://modin-example-bucket/modin-example-file" + assert not is_local_path(s3_path), "S3 Path incorrectly flagged as local!" + azure_blob_path = "https://modin-example-storage-account.blob.core.windows.net/modin-example-container/modin-example-file" + assert not is_local_path( + azure_blob_path + ), "Azure Blob Storage Path incorrectly flagged as local!" + gcs_path = "gs://modin-example-bucket/modin-example-file" + assert not is_local_path(gcs_path), "GCS Path incorrectly flagged as local!" + assert is_local_path( + os.getcwd() + ), "Current Working Directory incorrectly flagged as not local!" + new_file = os.getcwd() + "/modin-example-file" + assert is_local_path( + new_file + ), "Non-existent file under current working directory incorrectly flagged as not local!" From 8081d9c0f9a7d6342024f7d8c9391d9880a02b4e Mon Sep 17 00:00:00 2001 From: Rehan Durrani Date: Wed, 1 Jun 2022 17:31:17 -0700 Subject: [PATCH 13/16] Use fsspec and default to pandas Signed-off-by: Rehan Durrani --- .../implementations/pandas_on_ray/io/io.py | 26 +++++++++++++---- modin/core/io/utils.py | 29 +++++++++---------- 2 files changed, 34 insertions(+), 21 deletions(-) diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py index 4b306694a09..981bb7e9b86 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py @@ -166,10 +166,17 @@ def to_csv(cls, qc, **kwargs): if not cls._to_csv_check_support(kwargs): return RayIO.to_csv(qc, **kwargs) - if len(ray.nodes()) > 1 and is_local_path(kwargs["path_or_buf"]): - raise ValueError( - "`path_or_buf` must point to a networked file or buffer when in cluster mode." + if len(ray.nodes()) > 1 and ( + not isinstance(kwargs["path_or_buf"], str) + or is_local_path(kwargs["path_or_buf"]) + ): + from modin.error_message import ErrorMessage + + ErrorMessage.single_warning( + "`path_or_buf` must point to a networked file or distributed filesystem (e.g. S3) " + + "when in cluster mode. Defaulting to pandas for `to_csv`" ) + return RayIO.to_csv(qc, **kwargs) signals = SignalActor.remote(len(qc._modin_frame._partitions) + 1) @@ -283,10 +290,17 @@ def to_parquet(cls, qc, **kwargs): if not cls._to_parquet_check_support(kwargs): return RayIO.to_parquet(qc, **kwargs) - if len(ray.nodes()) > 1 and is_local_path(kwargs["path_or_buf"]): - raise ValueError( - "`path_or_buf` must point to a networked file or buffer when in cluster mode." + if len(ray.nodes()) > 1 and ( + not isinstance(kwargs["path_or_buf"], str) + or is_local_path(kwargs["path_or_buf"]) + ): + from modin.error_message import ErrorMessage + + ErrorMessage.single_warning( + "`path_or_buf` must point to a networked file or distributed filesystem (e.g. S3) " + + "when in cluster mode. Defaulting to pandas for `to_parquet`" ) + return RayIO.to_parquet(qc, **kwargs) def func(df, **kw): """ diff --git a/modin/core/io/utils.py b/modin/core/io/utils.py index 68a5d54faf3..6c4ef58e385 100644 --- a/modin/core/io/utils.py +++ b/modin/core/io/utils.py @@ -17,34 +17,33 @@ import pathlib import re from typing import Optional, Union +import fsspec S3_ADDRESS_REGEX = re.compile("[sS]3://(.*?)/(.*)") -def is_local_path(path_or_buf) -> bool: +def is_local_path(path) -> bool: """ - Return ``True`` if the specified `path_or_buf` is a local path, ``False`` otherwise. + Return ``True`` if the specified `path` is a local path, ``False`` otherwise. Parameters ---------- - path_or_buf : str, path object or file-like object - The path or buffer to check. + path : str, path object or file-like object + The path to check. Returns ------- - Whether the `path_or_buf` points to a local file. + Whether the `path` points to a local file. """ - if isinstance(path_or_buf, str): - if S3_ADDRESS_REGEX.match(path_or_buf) is not None or "://" in path_or_buf: - return False # S3 or network path. - if isinstance(path_or_buf, (str, pathlib.PurePath)): - if os.path.exists(path_or_buf): - return True + try: + fsspec.open_local( + "/".join(path.split("/")[:-1]) + ) # Remove file name since that may not exist local_device_id = os.stat(os.getcwd()).st_dev - path_device_id = get_device_id(path_or_buf) - if path_device_id == local_device_id: - return True - return False + path_device_id = get_device_id(path) + return path_device_id == local_device_id + except Exception: + return False def get_device_id(path: Union[str, pathlib.PurePath]) -> Optional[int]: From 9c108afc0fc15029b0db8900b26286ced58e4148 Mon Sep 17 00:00:00 2001 From: Rehan Durrani Date: Thu, 2 Jun 2022 17:49:37 -0700 Subject: [PATCH 14/16] Use fsspec more and default to pandas Signed-off-by: Rehan Durrani --- modin/core/io/utils.py | 63 +++++++++++++++++------------------------- 1 file changed, 26 insertions(+), 37 deletions(-) diff --git a/modin/core/io/utils.py b/modin/core/io/utils.py index 6c4ef58e385..56cc7f487b2 100644 --- a/modin/core/io/utils.py +++ b/modin/core/io/utils.py @@ -14,12 +14,10 @@ """Collection of utility functions for distributed io.""" import os -import pathlib import re -from typing import Optional, Union import fsspec -S3_ADDRESS_REGEX = re.compile("[sS]3://(.*?)/(.*)") +IS_FILE_ONLY_REGEX = re.compile("[^\/]*\.\w+") # noqa: W605 def is_local_path(path) -> bool: @@ -34,41 +32,32 @@ def is_local_path(path) -> bool: Returns ------- Whether the `path` points to a local file. - """ - try: - fsspec.open_local( - "/".join(path.split("/")[:-1]) - ) # Remove file name since that may not exist - local_device_id = os.stat(os.getcwd()).st_dev - path_device_id = get_device_id(path) - return path_device_id == local_device_id - except Exception: - return False - -def get_device_id(path: Union[str, pathlib.PurePath]) -> Optional[int]: + Notes + ----- + If the filesystem corresponds to a `ZipFileSystem`, `TarFileSystem` or `CachingFileSystem`, + this code will return `False` even if it is local. """ - Return the result of `os.stat(path).st_dev` for the portion of `path` that exists locally. - - Parameters - ---------- - path : str, path object - The path to check. - - Returns - ------- - The `st_dev` field of `os.stat` of the portion of the `path` that exists locally, None if no - part of the path exists locally. - """ - index = 1 - path_list = list(pathlib.Path(path).parts) - if path_list[0] == "/": - index += 1 try: - os.stat(os.path.join(*path_list[:index])) + if IS_FILE_ONLY_REGEX.match(path) is not None: + # If we are passed just a filename, we will perform our check on the current working + # directory. + parent_dir = os.getcwd() + else: + # If we are passed a full path, we want to remove the filename from it. + parent_dir = "/".join(path.split("/")[:-1]) + fs = fsspec.core.url_to_fs(parent_dir)[0] # Grab just the FileSystem object + if hasattr( + fs, "local_file" + ): # If the FS does not have the `local_file` attr, it is not local. + # We still need to check that it is not a mounted file - as fsspec treats mounted + # files the same as local ones, but we want to distinguish between local and mounted. + local_device_id = os.stat(os.path.abspath(os.sep)).st_dev + path_device_id = os.stat(parent_dir).st_dev + return path_device_id == local_device_id + return False except Exception: - return None - while os.path.exists(os.path.join(*path_list[:index])) and index <= len(path_list): - index += 1 - index -= 1 - return os.stat(os.path.join(*path_list[:index])).st_dev + # If an exception is raised, it means we tried to open a filesystem that requires additional + # dependencies. This means that it is definitely not a local filesystem, so we can return + # `False` here. + return False From f1bc60aa1e0fea64b4fa4929f224e1fcc17b0d29 Mon Sep 17 00:00:00 2001 From: Rehan Durrani Date: Thu, 2 Jun 2022 19:17:28 -0700 Subject: [PATCH 15/16] Add support for windows Signed-off-by: Rehan Durrani --- modin/core/io/utils.py | 8 ++++++-- modin/pandas/test/test_io.py | 6 +++++- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/modin/core/io/utils.py b/modin/core/io/utils.py index 56cc7f487b2..1fd171970bd 100644 --- a/modin/core/io/utils.py +++ b/modin/core/io/utils.py @@ -17,7 +17,7 @@ import re import fsspec -IS_FILE_ONLY_REGEX = re.compile("[^\/]*\.\w+") # noqa: W605 +IS_FILE_ONLY_REGEX = re.compile(f"[^\\{os.sep}]*\.\w+") # noqa: W605 def is_local_path(path) -> bool: @@ -45,13 +45,17 @@ def is_local_path(path) -> bool: parent_dir = os.getcwd() else: # If we are passed a full path, we want to remove the filename from it. - parent_dir = "/".join(path.split("/")[:-1]) + parent_dir = os.sep.join(path.split(os.sep)[:-1]) fs = fsspec.core.url_to_fs(parent_dir)[0] # Grab just the FileSystem object if hasattr( fs, "local_file" ): # If the FS does not have the `local_file` attr, it is not local. # We still need to check that it is not a mounted file - as fsspec treats mounted # files the same as local ones, but we want to distinguish between local and mounted. + if os.name == "nt" and parent_dir[:3] == "D:\\": + # In Windows, os.path.abspath(os.sep) will give us the C Drive, but we want the + # D drive to also be marked as local. + return True local_device_id = os.stat(os.path.abspath(os.sep)).st_dev path_device_id = os.stat(parent_dir).st_dev return path_device_id == local_device_id diff --git a/modin/pandas/test/test_io.py b/modin/pandas/test/test_io.py index 69d2cba3a0a..ac9b522e630 100644 --- a/modin/pandas/test/test_io.py +++ b/modin/pandas/test/test_io.py @@ -2393,7 +2393,11 @@ def test_is_local_path(): assert is_local_path( os.getcwd() ), "Current Working Directory incorrectly flagged as not local!" - new_file = os.getcwd() + "/modin-example-file" + new_file = os.getcwd() + "/modin-example-file.extension" assert is_local_path( new_file ), "Non-existent file under current working directory incorrectly flagged as not local!" + new_file_in_curr_dir = "modin-example-file.extension" + assert is_local_path( + new_file_in_curr_dir, + ), "Non-existent file without absolute path incorrectly flagged as not local!" From a7acd819a0411a8f3e1c383d18743a6ac235ebfc Mon Sep 17 00:00:00 2001 From: Rehan Durrani Date: Thu, 2 Jun 2022 19:18:52 -0700 Subject: [PATCH 16/16] Fix windows support Signed-off-by: Rehan Durrani --- modin/core/io/utils.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/modin/core/io/utils.py b/modin/core/io/utils.py index 1fd171970bd..4e76594161b 100644 --- a/modin/core/io/utils.py +++ b/modin/core/io/utils.py @@ -55,8 +55,9 @@ def is_local_path(path) -> bool: if os.name == "nt" and parent_dir[:3] == "D:\\": # In Windows, os.path.abspath(os.sep) will give us the C Drive, but we want the # D drive to also be marked as local. - return True - local_device_id = os.stat(os.path.abspath(os.sep)).st_dev + local_device_id = os.stat("D:\\") + else: + local_device_id = os.stat(os.path.abspath(os.sep)).st_dev path_device_id = os.stat(parent_dir).st_dev return path_device_id == local_device_id return False