Skip to content

Commit

Permalink
refactor: Reuse tools.fs more, fix app.(read|scan)
Browse files Browse the repository at this point in the history
Using only `.parquet` was relevant in earlier versions that produced multiple `.parquet` files
Now these methods safely handle all formats in use
  • Loading branch information
dangotbanned committed Jan 21, 2025
1 parent 7433eb8 commit d64dbee
Showing 1 changed file with 22 additions and 18 deletions.
40 changes: 22 additions & 18 deletions tools/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal

from tools import fs
from tools.codemod import ruff
from tools.datasets.npm import Npm
from tools.fs import REPO_ROOT
from tools.schemapi import utils

if TYPE_CHECKING:
Expand Down Expand Up @@ -60,7 +60,7 @@ class Application:
Parameters
----------
out_dir_tools, out_dir_altair
Directories to store ``.parquet`` metadata files.
Directories to store metadata files.
out_fp_typing
Path to write metadata-derived typing module.
Expand All @@ -72,7 +72,7 @@ class Application:
def __init__(
self, out_dir_tools: Path, out_dir_altair: Path, out_fp_typing: Path
) -> None:
out_dir_tools.mkdir(exist_ok=True)
fs.mkdir(out_dir_tools)
METADATA = "metadata"
self.paths = types.MappingProxyType["_PathAlias", Path](
{
Expand Down Expand Up @@ -102,7 +102,7 @@ def refresh(
include_typing
Regenerate ``altair.datasets._typing``.
frozen
Don't perform any requests or attempt to check for new versions.
Don't perform any requests.
.. note::
**Temporary** measure to work from ``main`` until `vega-datasets@3`_.
Expand All @@ -123,20 +123,28 @@ def refresh(

def reset(self) -> None:
"""Remove all metadata files."""
for fp in self.paths.values():
fp.unlink(missing_ok=True)
fs.rm(*self.paths.values())

def read(self, name: _PathAlias, /) -> pl.DataFrame:
"""Read existing metadata from file."""
import polars as pl

return pl.read_parquet(self.paths[name])
return self.scan(name).collect()

def scan(self, name: _PathAlias, /) -> pl.LazyFrame:
"""Scan existing metadata from file."""
import polars as pl

return pl.scan_parquet(self.paths[name])
fp = self.paths[name]
if fp.suffix == ".parquet":
return pl.scan_parquet(fp)
elif ".csv" in fp.suffixes:
return pl.scan_csv(fp)
elif ".json" in fp.suffixes:
return pl.read_json(fp).lazy()
else:
msg = (
f"Unable to read {fp.name!r} as tabular data.\nSuffixes: {fp.suffixes}"
)
raise NotImplementedError(msg)

def write_csv_gzip(self, frame: pl.DataFrame | pl.LazyFrame, fp: Path, /) -> None:
"""
Expand All @@ -152,8 +160,7 @@ def write_csv_gzip(self, frame: pl.DataFrame | pl.LazyFrame, fp: Path, /) -> Non
"""
if fp.suffix != ".gz":
fp = fp.with_suffix(".csv.gz")
if not fp.exists():
fp.touch()
fp.touch()
df = frame.lazy().collect()
buf = BytesIO()
with gzip.GzipFile(fp, mode="wb", mtime=0) as f:
Expand All @@ -169,16 +176,13 @@ def write_json_gzip(self, obj: Any, fp: Path, /) -> None:
"""
if fp.suffix != ".gz":
fp = fp.with_suffix(".json.gz")
if not fp.exists():
fp.touch()

fp.touch()
with gzip.GzipFile(fp, mode="wb", mtime=0) as f:
f.write(json.dumps(obj).encode())

def write_parquet(self, frame: pl.DataFrame | pl.LazyFrame, fp: Path, /) -> None:
"""Write ``frame`` to ``fp``, with some extra safety."""
if not fp.exists():
fp.touch()
fp.touch()
df = frame.lazy().collect()
df.write_parquet(fp, compression="zstd", compression_level=17)

Expand Down Expand Up @@ -233,7 +237,7 @@ def generate_typing(self, dpkg: datapackage.DataPackage) -> None:
ruff.write_lint_format(self.paths["typing"], contents)


_alt_datasets = REPO_ROOT / "altair" / "datasets"
_alt_datasets = fs.REPO_ROOT / "altair" / "datasets"
app = Application(
Path(__file__).parent / "_metadata",
_alt_datasets / "_metadata",
Expand Down

0 comments on commit d64dbee

Please sign in to comment.