diff --git a/benchmarks/benchmarks.py b/benchmarks/benchmarks.py index 747e9eb6..9e0ef0d7 100644 --- a/benchmarks/benchmarks.py +++ b/benchmarks/benchmarks.py @@ -254,3 +254,27 @@ def time_run(self): def peakmem_run(self): """Benchmark the memory usage of read_parquet(self.path, columns=self.columns)""" self.run() + + +class ReadFewColumnsHTTPSWithOptimization: + """Benchmark read_parquet("https://", columns=[...]) + + Note: fsspec optimization is now automatic for remote URLs, + so this benchmark is equivalent to ReadFewColumnsHTTPS. + Kept for historical comparison purposes. + """ + + path = "https://data.lsdb.io/hats/gaia_dr3/gaia/dataset/Norder=2/Dir=0/Npix=0.parquet" + columns = ["_healpix_29", "ra", "astrometric_primary_flag"] + + def run(self): + """Run the benchmark (fsspec optimization is automatic for remote URLs).""" + _ = read_parquet(self.path, columns=self.columns) + + def time_run(self): + """Benchmark the runtime with automatic fsspec optimization""" + self.run() + + def peakmem_run(self): + """Benchmark the memory usage with automatic fsspec optimization""" + self.run() diff --git a/src/nested_pandas/nestedframe/io.py b/src/nested_pandas/nestedframe/io.py index 23343101..2c6a2790 100644 --- a/src/nested_pandas/nestedframe/io.py +++ b/src/nested_pandas/nestedframe/io.py @@ -57,6 +57,11 @@ def read_parquet( Notes ----- + For remote storage (S3, GCS, HTTP/HTTPS), this function automatically uses + fsspec.parquet.open_parquet_file for optimized access with intelligent + precaching, which can significantly improve performance compared to standard + PyArrow reading. + pyarrow supports partial loading of nested structures from parquet, for example ```pd.read_parquet("data.parquet", columns=["nested.a"])``` will load the "a" column of the "nested" column. Standard pandas/pyarrow @@ -94,8 +99,11 @@ def read_parquet( reject_nesting = [reject_nesting] # First load through pyarrow + # If data is remote, use fsspec.parquet for better performance + if _should_use_fsspec_optimization(data, kwargs.get("filesystem")): + table = _read_with_fsspec_optimization(data, columns, kwargs) # If `filesystem` is specified - use it - if kwargs.get("filesystem") is not None: + elif kwargs.get("filesystem") is not None: table = pq.read_table(data, columns=columns, **kwargs) # Otherwise convert with a special function else: @@ -291,3 +299,91 @@ def _cast_list_cols_to_nested(df): if pa.types.is_list(dtype.pyarrow_dtype): df[col] = pack_lists(df[[col]]) return df + + +def _should_use_fsspec_optimization(data, explicit_filesystem): + """Determine if fsspec optimization should be used. + + Parameters + ---------- + data : str, Path, UPath, or file-like object + The data source + explicit_filesystem : filesystem or None + Explicitly provided filesystem + + Returns + ------- + bool + True if fsspec optimization should be used for this data source + """ + # Don't use optimization if explicit filesystem is provided + if explicit_filesystem is not None: + return False + + # Don't use for file-like objects + if hasattr(data, "read"): + return False + + # For UPath objects, check if they're remote (check before Path since UPath inherits from Path) + if isinstance(data, UPath): + return data.protocol not in ("", "file") + + # Don't use for Path objects (local files) + if isinstance(data, Path): + return False + + # For strings, check if they look like remote URLs + if isinstance(data, str): + return data.startswith(("http://", "https://", "s3://", "gs://", "gcs://", "azure://", "adl://")) + + return False + + +def _read_with_fsspec_optimization(data, columns, kwargs): + """Read parquet using fsspec optimization for better remote storage performance. + + Parameters + ---------- + data : str, UPath, or path-like + Path to the parquet file + columns : list or None + Columns to read + kwargs : dict + Additional kwargs for reading + + Returns + ------- + pyarrow.Table + The loaded table + """ + try: + import fsspec.parquet + except ImportError: + # Fall back to regular method if fsspec.parquet not available + data_converted, filesystem = _transform_read_parquet_data_arg(data) + return pq.read_table(data_converted, filesystem=filesystem, columns=columns, **kwargs) + + # Convert UPath to string if needed + if isinstance(data, UPath): + path_str = str(data) + # Use UPath storage options for fsspec + storage_options = data.storage_options if data.storage_options else None + else: + path_str = str(data) + storage_options = None + + # Use fsspec.parquet.open_parquet_file for optimized access + try: + with fsspec.parquet.open_parquet_file( + path_str, + columns=columns, + storage_options=storage_options, + engine="pyarrow" + ) as parquet_file: + # Read the table using PyArrow with the optimized file handle + table = pq.read_table(parquet_file, columns=columns, **kwargs) + return table + except Exception: + # Fall back to regular method if optimization fails + data_converted, filesystem = _transform_read_parquet_data_arg(data) + return pq.read_table(data_converted, filesystem=filesystem, columns=columns, **kwargs) diff --git a/tests/nested_pandas/nestedframe/test_io.py b/tests/nested_pandas/nestedframe/test_io.py index 24b56103..94fefef4 100644 --- a/tests/nested_pandas/nestedframe/test_io.py +++ b/tests/nested_pandas/nestedframe/test_io.py @@ -399,3 +399,54 @@ def test__transform_read_parquet_data_arg(): "https://data.lsdb.io/hats/gaia_dr3/gaia/dataset/Norder=2/Dir=0/Npix=0.parquet", ] ) + + +def test_read_parquet_with_fsspec_optimization(): + """Test that read_parquet automatically uses fsspec optimization for remote files.""" + # Test with local file (should not use fsspec optimization) + local_path = "tests/test_data/nested.parquet" + + # Test basic reading - local files should work as before + nf1 = read_parquet(local_path) + + # Test with additional kwargs + nf2 = read_parquet( + local_path, + columns=["a", "nested.flux"], + use_threads=True + ) + + assert len(nf2) <= len(nf1) # filtered columns + assert "a" in nf2.columns + assert "nested" in nf2.columns + + +def test_fsspec_optimization_path_detection(): + """Test _should_use_fsspec_optimization correctly identifies remote paths.""" + from nested_pandas.nestedframe.io import _should_use_fsspec_optimization + from pathlib import Path + + # Test cases that should NOT use optimization + assert not _should_use_fsspec_optimization("local_file.parquet", None) + assert not _should_use_fsspec_optimization("/path/to/file.parquet", None) + assert not _should_use_fsspec_optimization(Path("local_file.parquet"), None) + assert not _should_use_fsspec_optimization(UPath("local_file.parquet"), None) + assert not _should_use_fsspec_optimization("https://example.com/file.parquet", "some_filesystem") + + # Test file-like object + import io + assert not _should_use_fsspec_optimization(io.BytesIO(b"test"), None) + + # Test cases that SHOULD use optimization + assert _should_use_fsspec_optimization("https://example.com/file.parquet", None) + assert _should_use_fsspec_optimization("s3://bucket/file.parquet", None) + assert _should_use_fsspec_optimization("gs://bucket/file.parquet", None) + assert _should_use_fsspec_optimization(UPath("https://example.com/file.parquet"), None) + assert _should_use_fsspec_optimization(UPath("s3://bucket/file.parquet"), None) + + +def test_docstring_includes_fsspec_notes(): + """Test that the docstring mentions the automatic fsspec optimization.""" + docstring = read_parquet.__doc__ + assert "fsspec" in docstring + assert "remote" in docstring.lower()