diff --git a/CHANGELOG.md b/CHANGELOG.md index 98959a4a..c1cd7edf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# WIP + +* Added parallel job for `TableStoreFiledir` + # 0.13.2-post.1 * Allow `pandas >= 2` and `numpy >= 1.21` diff --git a/datapipe/store/filedir.py b/datapipe/store/filedir.py index b53a5055..262b26c4 100644 --- a/datapipe/store/filedir.py +++ b/datapipe/store/filedir.py @@ -9,6 +9,7 @@ import numpy as np import pandas as pd from iteration_utilities import duplicates +from joblib import Parallel, delayed from PIL import Image from sqlalchemy import Column, Integer, String @@ -139,6 +140,7 @@ def __init__( readonly: Optional[bool] = None, enable_rm: bool = False, fsspec_kwargs: Dict[str, Any] = {}, + max_workers: int = 16, ): """ При построении `TableStoreFiledir` есть два способа указать схему @@ -176,6 +178,7 @@ def __init__( enable_rm -- если True, включить удаление файлов fsspec_kwargs -- kwargs для fsspec + max_workers -- число воркеров для чтения/записей/удаления файлов """ self.fsspec_kwargs = fsspec_kwargs @@ -248,9 +251,9 @@ def __init__( for attrname in self.attrnames ] self.attrname_to_cls = { - column.name: type_to_cls[type(column.type)] # type: ignore - for column in self.primary_schema + column.name: type_to_cls[type(column.type)] for column in self.primary_schema # type: ignore } + self.max_workers = max_workers def get_primary_schema(self) -> DataSchema: return self.primary_schema @@ -264,16 +267,21 @@ def delete_rows(self, idx: IndexDF) -> None: assert not self.readonly + filepaths = [] for row_idx in idx.index: attrnames_series = idx.loc[row_idx, self.attrnames] assert isinstance(attrnames_series, pd.Series) attrnames = cast(List[str], attrnames_series.tolist()) - _, path = fsspec.core.split_protocol( + _, filepath = fsspec.core.split_protocol( self._filenames_from_idxs_values(attrnames)[0] ) - self.filesystem.rm(path) + filepaths.append(filepath) + + Parallel(n_jobs=self.max_workers, prefer="threads")( + delayed(self.filesystem.rm)(filepath) for filepath in filepaths + ) def _filenames_from_idxs_values(self, idxs_values: List[str]) -> List[str]: return [ @@ -320,6 +328,7 @@ def insert_rows( adapter = self.adapter # WARNING: Здесь я поставил .drop(columns=self.attrnames), тк ключи будут хранится снаружи, в имени + filepaths, datas = [], [] for row_idx, data in zip( df.index, cast( @@ -334,10 +343,18 @@ def insert_rows( # Проверяем, что значения ключей не приведут к неоднозначному результату при парсинге регулярки self._assert_key_values(filepath, idxs_values) + filepaths.append(filepath) + datas.append(data) + def _write_filepath(filepath: str, data: Any): with fsspec.open(filepath, f"w{self.adapter.mode}", auto_mkdir=True) as f: self.adapter.dump(data, f) + Parallel(n_jobs=self.max_workers, prefer="threads")( + delayed(_write_filepath)(filepath, data) + for filepath, data in zip(filepaths, datas) + ) + def _read_rows_fast( self, idx: IndexDF, @@ -378,7 +395,7 @@ def read_rows( ): return self._read_rows_fast(idx) - def _iterate_files(): + def _iterate_files() -> fsspec.core.OpenFile: if idx is None: for file_open in fsspec.open_files( self.filename_glob, f"r{adapter.mode}", **self.fsspec_kwargs @@ -409,8 +426,7 @@ def _iterate_files(): for file_open in found_files: yield file_open - df_records = [] - for file_open in _iterate_files(): + def get_data(file_open) -> Dict[str, Any]: with file_open as f: data = {} @@ -435,8 +451,11 @@ def _iterate_files(): "Switch argument add_filepath_column to False or rename this key in input data." ) data["filepath"] = f"{self.protocol_str}{file_open.path}" + return data - df_records.append(data) + df_records = Parallel(n_jobs=self.max_workers, prefer="threads")( + delayed(get_data)(file_open) for file_open in _iterate_files() + ) df = pd.DataFrame(df_records) diff --git a/pyproject.toml b/pyproject.toml index be885180..55ed55d0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,6 +51,7 @@ sphinx = {version = ">=4.5,<6.0", optional = true} myst-parser = {version = ">=0.17.2,<1.1.0", optional = true} ray = {version = "^2.5.0", optional = true, extras = ["default"]} +joblib = {version = "^1.3.2", optional = true} [tool.poetry.extras] @@ -62,6 +63,7 @@ s3fs = ["s3fs"] redis = ["redis"] qdrant = ["qdrant-client"] ray = ["ray"] +joblib = ["joblib"] docs = ["sphinx", "myst-parser"]