diff --git a/tools/datasets/__init__.py b/tools/datasets/__init__.py index faf5e8d96..64940ebc1 100644 --- a/tools/datasets/__init__.py +++ b/tools/datasets/__init__.py @@ -22,9 +22,9 @@ from pathlib import Path from typing import TYPE_CHECKING, Any, Literal +from tools import fs from tools.codemod import ruff from tools.datasets.npm import Npm -from tools.fs import REPO_ROOT from tools.schemapi import utils if TYPE_CHECKING: @@ -60,7 +60,7 @@ class Application: Parameters ---------- out_dir_tools, out_dir_altair - Directories to store ``.parquet`` metadata files. + Directories to store metadata files. out_fp_typing Path to write metadata-derived typing module. @@ -72,7 +72,7 @@ class Application: def __init__( self, out_dir_tools: Path, out_dir_altair: Path, out_fp_typing: Path ) -> None: - out_dir_tools.mkdir(exist_ok=True) + fs.mkdir(out_dir_tools) METADATA = "metadata" self.paths = types.MappingProxyType["_PathAlias", Path]( { @@ -102,7 +102,7 @@ def refresh( include_typing Regenerate ``altair.datasets._typing``. frozen - Don't perform any requests or attempt to check for new versions. + Don't perform any requests. .. note:: **Temporary** measure to work from ``main`` until `vega-datasets@3`_. @@ -123,20 +123,28 @@ def refresh( def reset(self) -> None: """Remove all metadata files.""" - for fp in self.paths.values(): - fp.unlink(missing_ok=True) + fs.rm(*self.paths.values()) def read(self, name: _PathAlias, /) -> pl.DataFrame: """Read existing metadata from file.""" - import polars as pl - - return pl.read_parquet(self.paths[name]) + return self.scan(name).collect() def scan(self, name: _PathAlias, /) -> pl.LazyFrame: """Scan existing metadata from file.""" import polars as pl - return pl.scan_parquet(self.paths[name]) + fp = self.paths[name] + if fp.suffix == ".parquet": + return pl.scan_parquet(fp) + elif ".csv" in fp.suffixes: + return pl.scan_csv(fp) + elif ".json" in fp.suffixes: + return pl.read_json(fp).lazy() + else: + msg = ( + f"Unable to read {fp.name!r} as tabular data.\nSuffixes: {fp.suffixes}" + ) + raise NotImplementedError(msg) def write_csv_gzip(self, frame: pl.DataFrame | pl.LazyFrame, fp: Path, /) -> None: """ @@ -152,8 +160,7 @@ def write_csv_gzip(self, frame: pl.DataFrame | pl.LazyFrame, fp: Path, /) -> Non """ if fp.suffix != ".gz": fp = fp.with_suffix(".csv.gz") - if not fp.exists(): - fp.touch() + fp.touch() df = frame.lazy().collect() buf = BytesIO() with gzip.GzipFile(fp, mode="wb", mtime=0) as f: @@ -169,16 +176,13 @@ def write_json_gzip(self, obj: Any, fp: Path, /) -> None: """ if fp.suffix != ".gz": fp = fp.with_suffix(".json.gz") - if not fp.exists(): - fp.touch() - + fp.touch() with gzip.GzipFile(fp, mode="wb", mtime=0) as f: f.write(json.dumps(obj).encode()) def write_parquet(self, frame: pl.DataFrame | pl.LazyFrame, fp: Path, /) -> None: """Write ``frame`` to ``fp``, with some extra safety.""" - if not fp.exists(): - fp.touch() + fp.touch() df = frame.lazy().collect() df.write_parquet(fp, compression="zstd", compression_level=17) @@ -233,7 +237,7 @@ def generate_typing(self, dpkg: datapackage.DataPackage) -> None: ruff.write_lint_format(self.paths["typing"], contents) -_alt_datasets = REPO_ROOT / "altair" / "datasets" +_alt_datasets = fs.REPO_ROOT / "altair" / "datasets" app = Application( Path(__file__).parent / "_metadata", _alt_datasets / "_metadata",