Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# WIP

* Added parallel job for `TableStoreFiledir`

# 0.13.2-post.1

* Allow `pandas >= 2` and `numpy >= 1.21`
Expand Down
35 changes: 27 additions & 8 deletions datapipe/store/filedir.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import numpy as np
import pandas as pd
from iteration_utilities import duplicates
from joblib import Parallel, delayed
from PIL import Image
from sqlalchemy import Column, Integer, String

Expand Down Expand Up @@ -139,6 +140,7 @@ def __init__(
readonly: Optional[bool] = None,
enable_rm: bool = False,
fsspec_kwargs: Dict[str, Any] = {},
max_workers: int = 16,
):
"""
При построении `TableStoreFiledir` есть два способа указать схему
Expand Down Expand Up @@ -176,6 +178,7 @@ def __init__(
enable_rm -- если True, включить удаление файлов

fsspec_kwargs -- kwargs для fsspec
max_workers -- число воркеров для чтения/записей/удаления файлов
"""

self.fsspec_kwargs = fsspec_kwargs
Expand Down Expand Up @@ -248,9 +251,9 @@ def __init__(
for attrname in self.attrnames
]
self.attrname_to_cls = {
column.name: type_to_cls[type(column.type)] # type: ignore
for column in self.primary_schema
column.name: type_to_cls[type(column.type)] for column in self.primary_schema # type: ignore
}
self.max_workers = max_workers

def get_primary_schema(self) -> DataSchema:
return self.primary_schema
Expand All @@ -264,16 +267,21 @@ def delete_rows(self, idx: IndexDF) -> None:

assert not self.readonly

filepaths = []
for row_idx in idx.index:
attrnames_series = idx.loc[row_idx, self.attrnames]
assert isinstance(attrnames_series, pd.Series)

attrnames = cast(List[str], attrnames_series.tolist())

_, path = fsspec.core.split_protocol(
_, filepath = fsspec.core.split_protocol(
self._filenames_from_idxs_values(attrnames)[0]
)
self.filesystem.rm(path)
filepaths.append(filepath)

Parallel(n_jobs=self.max_workers, prefer="threads")(
delayed(self.filesystem.rm)(filepath) for filepath in filepaths
)

def _filenames_from_idxs_values(self, idxs_values: List[str]) -> List[str]:
return [
Expand Down Expand Up @@ -320,6 +328,7 @@ def insert_rows(
adapter = self.adapter

# WARNING: Здесь я поставил .drop(columns=self.attrnames), тк ключи будут хранится снаружи, в имени
filepaths, datas = [], []
for row_idx, data in zip(
df.index,
cast(
Expand All @@ -334,10 +343,18 @@ def insert_rows(

# Проверяем, что значения ключей не приведут к неоднозначному результату при парсинге регулярки
self._assert_key_values(filepath, idxs_values)
filepaths.append(filepath)
datas.append(data)

def _write_filepath(filepath: str, data: Any):
with fsspec.open(filepath, f"w{self.adapter.mode}", auto_mkdir=True) as f:
self.adapter.dump(data, f)

Parallel(n_jobs=self.max_workers, prefer="threads")(
delayed(_write_filepath)(filepath, data)
for filepath, data in zip(filepaths, datas)
)

def _read_rows_fast(
self,
idx: IndexDF,
Expand Down Expand Up @@ -378,7 +395,7 @@ def read_rows(
):
return self._read_rows_fast(idx)

def _iterate_files():
def _iterate_files() -> fsspec.core.OpenFile:
if idx is None:
for file_open in fsspec.open_files(
self.filename_glob, f"r{adapter.mode}", **self.fsspec_kwargs
Expand Down Expand Up @@ -409,8 +426,7 @@ def _iterate_files():
for file_open in found_files:
yield file_open

df_records = []
for file_open in _iterate_files():
def get_data(file_open) -> Dict[str, Any]:
with file_open as f:
data = {}

Expand All @@ -435,8 +451,11 @@ def _iterate_files():
"Switch argument add_filepath_column to False or rename this key in input data."
)
data["filepath"] = f"{self.protocol_str}{file_open.path}"
return data

df_records.append(data)
df_records = Parallel(n_jobs=self.max_workers, prefer="threads")(
delayed(get_data)(file_open) for file_open in _iterate_files()
)

df = pd.DataFrame(df_records)

Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ sphinx = {version = ">=4.5,<6.0", optional = true}
myst-parser = {version = ">=0.17.2,<1.1.0", optional = true}

ray = {version = "^2.5.0", optional = true, extras = ["default"]}
joblib = {version = "^1.3.2", optional = true}

[tool.poetry.extras]

Expand All @@ -62,6 +63,7 @@ s3fs = ["s3fs"]
redis = ["redis"]
qdrant = ["qdrant-client"]
ray = ["ray"]
joblib = ["joblib"]

docs = ["sphinx", "myst-parser"]

Expand Down