From fb0bee9f58a52cc115529bd07bf1f317670158c7 Mon Sep 17 00:00:00 2001 From: Hakimovich99 Date: Fri, 17 Nov 2023 10:30:11 +0100 Subject: [PATCH] pre-commit fixes --- src/components/load_from_csv/src/main.py | 29 +++++++++++++++--------- src/components/text_cleaning/__init__.py | 0 2 files changed, 18 insertions(+), 11 deletions(-) create mode 100644 src/components/text_cleaning/__init__.py diff --git a/src/components/load_from_csv/src/main.py b/src/components/load_from_csv/src/main.py index 1811f55..414814c 100644 --- a/src/components/load_from_csv/src/main.py +++ b/src/components/load_from_csv/src/main.py @@ -1,7 +1,6 @@ import logging import typing as t -import dask import dask.dataframe as dd import pandas as pd from fondant.component import DaskLoadComponent @@ -20,16 +19,19 @@ def __init__( column_name_mapping: t.Optional[dict], n_rows_to_load: t.Optional[int], index_column: t.Optional[str], - ) -> None: + ) -> None: """ Args: spec: the component spec - dataset_uri: The remote path to the parquet file/folder containing the dataset - column_name_mapping: Mapping of the consumed dataset to fondant column names - n_rows_to_load: optional argument that defines the number of rows to load. Useful for - testing pipeline runs on a small scale. - index_column: Column to set index to in the load component, if not specified a default - globally unique index will be set. + dataset_uri: The remote path to the parquet file/folder + containing the dataset column_name_mapping: Mapping of + the consumed dataset to fondant column names + n_rows_to_load: optional argument that defines the + number of rows to load. Useful for testing pipeline + runs on a small scale. + index_column: Column to set index to in the load component, + if not specified a default globally unique index will + be set. """ self.dataset_uri = dataset_uri self.column_separator = column_separator @@ -72,7 +74,10 @@ def set_df_index(self, dask_df: dd.DataFrame) -> dd.DataFrame: ) def _set_unique_index(dataframe: pd.DataFrame, partition_info=None): - """Function that sets a unique index based on the partition and row number.""" + """ + Function that sets a unique index + based on the partition and row number. + """ dataframe["id"] = 1 dataframe["id"] = ( str(partition_info["number"]) @@ -98,7 +103,7 @@ def _get_meta_df() -> pd.DataFrame: dask_df = dask_df.set_index(self.index_column, drop=True) return dask_df - + def return_subset_of_df(self, dask_df: dd.DataFrame) -> dd.DataFrame: if self.n_rows_to_load is not None: partitions_length = 0 @@ -122,7 +127,9 @@ def load(self) -> dd.DataFrame: columns = self.get_columns_to_keep() logger.debug(f"Columns to keep: {columns}") - dask_df = dd.read_csv(self.dataset_uri, sep=self.column_separator, usecols=columns) + dask_df = dd.read_csv( + self.dataset_uri, sep=self.column_separator, usecols=columns + ) # 2) Rename columns if self.column_name_mapping: diff --git a/src/components/text_cleaning/__init__.py b/src/components/text_cleaning/__init__.py new file mode 100644 index 0000000..e69de29