-
Notifications
You must be signed in to change notification settings - Fork 0
Use tsfresh for feature selection #20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughA new method for writing pandas DataFrames to a PostgreSQL database was added to the database utility module. The feature extraction script now performs supervised or unsupervised feature selection after extraction, saves the selected features to a file, and optionally persists them to a database table. Control flow and configuration were updated accordingly. Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant ExtractFeaturesScript
participant DBUtils
participant Database
User->>ExtractFeaturesScript: Run main()
ExtractFeaturesScript->>ExtractFeaturesScript: Extract features
ExtractFeaturesScript->>ExtractFeaturesScript: Load target (if available)
ExtractFeaturesScript->>ExtractFeaturesScript: select_relevant_features()
ExtractFeaturesScript->>ExtractFeaturesScript: Save selected features to file
alt Database persistence enabled
ExtractFeaturesScript->>DBUtils: write_dataframe(selected_features, table_name)
DBUtils->>Database: Write DataFrame to table
end
ExtractFeaturesScript->>User: Report completion
Poem
Note ⚡️ AI Code Reviews for VS Code, Cursor, WindsurfCodeRabbit now has a plugin for VS Code, Cursor and Windsurf. This brings AI code reviews directly in the code editor. Each commit is reviewed immediately, finding bugs before the PR is raised. Seamless context handoff to your AI code agent ensures that you can easily incorporate review feedback. Note ⚡️ Faster reviews with cachingCodeRabbit now supports caching for code and dependencies, helping speed up reviews. This means quicker feedback, reduced wait times, and a smoother review experience overall. Cached data is encrypted and stored securely. This feature will be automatically enabled for all accounts on May 16th. To opt out, configure ✨ Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (1)
DataIngestion/feature_extraction/feature/extract_features.py (1)
334-350: High-RAM correlation matrix on wide tablesFor unsupervised selection
work_df.corr()is O(n²) in memory/time. With thousands of tsfresh features this quickly exhausts RAM/GPU. Consider:
- Using incremental VIF / pairwise pruning instead of full matrix.
- Limiting the candidate set first (e.g. top-k variance).
Otherwise the step may crash on realistic datasets.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting
📒 Files selected for processing (2)
DataIngestion/feature_extraction/feature/db_utils.py(1 hunks)DataIngestion/feature_extraction/feature/extract_features.py(5 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
DataIngestion/feature_extraction/feature/extract_features.py (1)
DataIngestion/feature_extraction/feature/db_utils.py (2)
SQLAlchemyPostgresConnector(28-160)write_dataframe(101-136)
🔇 Additional comments (2)
DataIngestion/feature_extraction/feature/db_utils.py (1)
101-137:Details
❓ Verification inconclusive
Prefer logging and stronger safety checks in
write_dataframe
Consistency – the rest of the module relies on
logging, but this method falls back tologging.[info|error]keeps output consistent and allows log-level control.SQL-injection risk –
table_namecomes straight from an environment variable. Althoughpandas.to_sqlquotes identifiers, it will happily create/overwrite arbitrary tables if a malicious name is supplied. Consider validating against^[A-Za-z_][A-Za-z0-9_]*$or whitelisting.Performance – for large DataFrames
to_sql(..., method="multi", chunksize=… )is orders-of-magnitude faster. Exposingmethod&chunksize(with sensible defaults) would make the API far more useful.Type hints – elsewhere you use
typing.Union; here you use the PEP-604|. Pick one style for coherence (ideally PEP-604 everywhere if you require Python ≥ 3.10).Example patch (illustrative):
- if df.empty: - print(f"No data to write to table {table_name}") + if df.empty: + logging.info("No data to write to table %s", table_name) return ... - df.to_sql(table_name, self.engine, if_exists=if_exists, - index=index, index_label=index_label) - except Exception as exc: - print(f"Error writing DataFrame to table {table_name}: {exc}") + df.to_sql( + table_name, + self.engine, + if_exists=if_exists, + index=index, + index_label=index_label, + method="multi", + chunksize=10_000, + ) + except Exception as exc: + logging.error("Error writing DataFrame to table %s: %s", table_name, exc)
#!/bin/bash # Locate and display the database utility module to inspect imports and logging usage file_path=$(fd db_utils.py) if [ -z "$file_path" ]; then echo "db_utils.py not found" exit 1 fi echo "=== Showing imports and top of file ===" sed -n '1,50p' "$file_path" echo echo "=== Showing type hints patterns ===" grep -n -E "def write_dataframe|typing\.|from typing" -n "$file_path" echo echo "=== Checking logging usage in this file ===" grep -n "logging" -n "$file_path"
#!/bin/bash # Locate db_utils.py in the repository file=$(fd --full-path db_utils.py | head -n 1) if [ -z "$file" ]; then echo "db_utils.py not found" exit 1 fi echo "=== File path: $file ===" # Show top 50 lines to inspect imports echo; echo "=== Top 50 lines (imports & setup) ===" sed -n '1,50p' "$file" # Check for logging imports echo; echo "=== Logging import lines ===" grep -nE "^import logging|^from logging" "$file" || echo "No direct logging imports found" # Check for any logging usage in this file echo; echo "=== Logging usage in this file ===" grep -R "logging\." -n "$file" || echo "No logging calls in db_utils.py" # Inspect type hint style around write_dataframe echo; echo "=== write_dataframe signature & typing imports ===" grep -nE "def write_dataframe" -n "$file" grep -nE "from typing|import typing" -n "$file" || echo "No typing imports found" # Search elsewhere in the module for logging usage (consistency check) echo; echo "=== Logging usage elsewhere in this module ===" grep -R "logging\." -n "$(dirname "$file")"
#!/bin/bash # Locate the db_utils.py file file=$(fd db_utils.py | head -n1) if [ -z "$file" ]; then echo "db_utils.py not found" exit 1 fi echo "=== File: $file ===" echo # Show imports and top of file echo "=== Top 50 lines (imports & setup) ===" sed -n '1,50p' "$file" echo # Check for logging imports echo "=== Logging import lines ===" grep -nE '^import logging|^from logging' "$file" || echo "No direct logging imports found" echo # Check for any logging usage in this file echo "=== Logging usage in this file ===" grep -n 'logging\.' "$file" || echo "No logging calls in db_utils.py" echo # Inspect type hint style around write_dataframe echo "=== write_dataframe signature & typing imports ===" grep -nE 'def write_dataframe|from typing|Union' "$file" || echo "No typing imports or Union usage found" # Check for PEP 604 union (‘|’) in type hints echo echo "=== PEP 604 union ('|') usage ===" grep -n -E '[[:space:]]\|[[:space:]]' "$file" || echo "No PEP 604 style unions found"
Prefer logging, validation, and performance options in write_dataframe
To keep this utility consistent and safe, consider the following updates to
write_dataframein DataIngestion/feature_extraction/feature/db_utils.py:
- Replace
loggingso you respect the module’s log configuration and control verbosity via levels.- Validate
table_name(e.g. match^[A-Za-z_][A-Za-z0-9_]*$) or whitelist allowed names to avoid accidental or malicious table creation/overwrite.- Expose and default the
method(“multi”) andchunksize(e.g. 10_000) parameters ondf.to_sqlfor much faster bulk inserts.- Standardize on one union‐type hint style (either
typing.Union[X, Y]or the PEP 604X | Y) across the codebase.Example diff:
--- a/DataIngestion/feature_extraction/feature/db_utils.py +++ b/DataIngestion/feature_extraction/feature/db_utils.py @@ -1,3 +1,4 @@ +import re import pandas as pd -from typing import Union +import logging +from typing import Union def write_dataframe( - self, df: pd.DataFrame, table_name: str, - if_exists: str = "append", index: bool = False, - index_label: str | None = None + self, + df: pd.DataFrame, + table_name: str, + if_exists: str = "append", + index: bool = False, + index_label: str | None = None, + method: str = "multi", + chunksize: int = 10_000 ) -> None: """Write *df* to *table_name* using pandas.to_sql.""" - if df.empty: - print(f"No data to write to table {table_name}") + if df.empty: + logging.info("No data to write to table %s", table_name) return - if not self.engine: + # Validate table name to prevent SQL-injection or unintended writes + if not re.match(r"^[A-Za-z_][A-Za-z0-9_]*$", table_name): + raise ValueError(f"Invalid table name: {table_name!r}") + if not self.engine: try: self.connect() except Exception as exc: @@ -111,8 +124,11 @@ def write_dataframe(self, df: pd.DataFrame, table_name: str, try: - df.to_sql(table_name, self.engine, if_exists=if_exists, - index=index, index_label=index_label) + df.to_sql( + table_name, + self.engine, + if_exists=if_exists, + index=index, + index_label=index_label, + method=method, + chunksize=chunksize, + ) except Exception as exc: - print(f"Error writing DataFrame to table {table_name}: {exc}") - raise + logging.error("Error writing DataFrame to table %s: %s", table_name, exc) + raisePlease review and adjust the regex or whitelist to suit your database naming conventions, and ensure your logging configuration is set appropriately.
DataIngestion/feature_extraction/feature/extract_features.py (1)
84-94: Minor: environment parsing consistencyYou switched to
PathforOUTPUT_PATHbut keepTARGET_VALUES_PATHas raw strings. Wrapping it inPath()too allows easy existence checks and path manipulation (TARGET_VALUES_PATH.exists()etc.).
[nitpick]
| connector = SQLAlchemyPostgresConnector( | ||
| user=DB_USER, | ||
| password=DB_PASSWORD, | ||
| host=DB_HOST, | ||
| port=DB_PORT, | ||
| db_name=DB_NAME, | ||
| ) | ||
| df_for_db = final_tsfresh_features.copy() | ||
| df_for_db.index.name = "id" | ||
| connector.write_dataframe( | ||
| df_for_db.reset_index(), | ||
| FEATURES_TABLE, | ||
| if_exists="replace", | ||
| index=False, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential runtime failure when GPU mode is enabled
final_tsfresh_features may be a cudf.DataFrame.
df_for_db.reset_index() therefore returns a cuDF as well, but pandas.DataFrame.to_sql (invoked inside write_dataframe) is not implemented for cuDF, leading to:
AttributeError: 'DataFrame' object has no attribute 'to_sql'
Convert to pandas before handing it to the connector:
- df_for_db = final_tsfresh_features.copy()
+ df_for_db = (
+ final_tsfresh_features.to_pandas()
+ if ("cudf" in sys.modules and isinstance(final_tsfresh_features, cudf.DataFrame))
+ else final_tsfresh_features.copy()
+ )Fail-fast validation inside write_dataframe (e.g. isinstance(df, pd.DataFrame)) would also prevent silent misuse.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| connector = SQLAlchemyPostgresConnector( | |
| user=DB_USER, | |
| password=DB_PASSWORD, | |
| host=DB_HOST, | |
| port=DB_PORT, | |
| db_name=DB_NAME, | |
| ) | |
| df_for_db = final_tsfresh_features.copy() | |
| df_for_db.index.name = "id" | |
| connector.write_dataframe( | |
| df_for_db.reset_index(), | |
| FEATURES_TABLE, | |
| if_exists="replace", | |
| index=False, | |
| ) | |
| connector = SQLAlchemyPostgresConnector( | |
| user=DB_USER, | |
| password=DB_PASSWORD, | |
| host=DB_HOST, | |
| port=DB_PORT, | |
| db_name=DB_NAME, | |
| ) | |
| df_for_db = ( | |
| final_tsfresh_features.to_pandas() | |
| if ("cudf" in sys.modules and isinstance(final_tsfresh_features, cudf.DataFrame)) | |
| else final_tsfresh_features.copy() | |
| ) | |
| df_for_db.index.name = "id" | |
| connector.write_dataframe( | |
| df_for_db.reset_index(), | |
| FEATURES_TABLE, | |
| if_exists="replace", | |
| index=False, | |
| ) |
🤖 Prompt for AI Agents
In DataIngestion/feature_extraction/feature/extract_features.py around lines 608
to 622, the dataframe final_tsfresh_features may be a cudf.DataFrame, causing an
AttributeError when calling to_sql inside write_dataframe. To fix this, convert
df_for_db.reset_index() to a pandas DataFrame before passing it to
connector.write_dataframe by calling .to_pandas(). Additionally, add a type
check inside write_dataframe to raise an error if the input is not a pandas
DataFrame to prevent silent misuse.
| def select_relevant_features( | ||
| features_df: pd.DataFrame, | ||
| target_series: original_pandas.Series | None = None, | ||
| correlation_threshold: float = 0.95, | ||
| variance_threshold: float = 0.0, | ||
| ) -> pd.DataFrame: | ||
| """Select relevant features using ``tsfresh`` when possible. | ||
|
|
||
| If *target_series* is provided, ``tsfresh.select_features`` is used for | ||
| supervised feature selection. Otherwise a simple variance- and | ||
| correlation-based filtering is applied. The returned DataFrame preserves | ||
| the original index. | ||
| """ | ||
|
|
||
| if features_df.empty: | ||
| return features_df | ||
|
|
||
| use_gpu = os.getenv("USE_GPU", "false").lower() == "true" and "cudf" in sys.modules | ||
|
|
||
| if target_series is not None: | ||
| logging.info("Using tsfresh supervised feature selection …") | ||
| if use_gpu and isinstance(features_df, cudf.DataFrame): | ||
| work_df = features_df.to_pandas() | ||
| else: | ||
| work_df = features_df.copy() | ||
| try: | ||
| selected = select_features(work_df, target_series) | ||
| if use_gpu: | ||
| return cudf.DataFrame.from_pandas(selected) | ||
| return selected | ||
| except Exception as exc: | ||
| logging.error( | ||
| "tsfresh feature selection failed: %s. Falling back to simple filtering.", | ||
| exc, | ||
| ) | ||
|
|
||
| if use_gpu and isinstance(features_df, cudf.DataFrame): | ||
| work_df = features_df.to_pandas() | ||
| else: | ||
| work_df = features_df.copy() | ||
|
|
||
| variances = work_df.var() | ||
| cols_to_keep = variances[variances > variance_threshold].index | ||
| work_df = work_df[cols_to_keep] | ||
|
|
||
| corr_matrix = work_df.corr().abs() | ||
| upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool)) | ||
| to_drop = [col for col in upper.columns if any(upper[col] > correlation_threshold)] | ||
| work_df = work_df.drop(columns=to_drop) | ||
|
|
||
| if use_gpu: | ||
| return cudf.DataFrame.from_pandas(work_df) | ||
| return work_df |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
select_relevant_features can silently drop/garble the index in GPU mode
cudf.DataFrame.from_pandas(selected) resets the index, so the returned cuDF loses the id information that downstream code expects (you later treat the index as the primary key when writing to parquet / DB).
Fix: preserve the index explicitly:
- return cudf.DataFrame.from_pandas(selected)
+ return cudf.DataFrame.from_pandas(selected).set_index(selected.index)(or convert the index to a column before the round-trip and restore it afterwards).
Without this, all rows receive a fresh RangeIndex, causing loss of the era-identifier and making the DB write produce duplicate primary keys.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def select_relevant_features( | |
| features_df: pd.DataFrame, | |
| target_series: original_pandas.Series | None = None, | |
| correlation_threshold: float = 0.95, | |
| variance_threshold: float = 0.0, | |
| ) -> pd.DataFrame: | |
| """Select relevant features using ``tsfresh`` when possible. | |
| If *target_series* is provided, ``tsfresh.select_features`` is used for | |
| supervised feature selection. Otherwise a simple variance- and | |
| correlation-based filtering is applied. The returned DataFrame preserves | |
| the original index. | |
| """ | |
| if features_df.empty: | |
| return features_df | |
| use_gpu = os.getenv("USE_GPU", "false").lower() == "true" and "cudf" in sys.modules | |
| if target_series is not None: | |
| logging.info("Using tsfresh supervised feature selection …") | |
| if use_gpu and isinstance(features_df, cudf.DataFrame): | |
| work_df = features_df.to_pandas() | |
| else: | |
| work_df = features_df.copy() | |
| try: | |
| selected = select_features(work_df, target_series) | |
| if use_gpu: | |
| return cudf.DataFrame.from_pandas(selected) | |
| return selected | |
| except Exception as exc: | |
| logging.error( | |
| "tsfresh feature selection failed: %s. Falling back to simple filtering.", | |
| exc, | |
| ) | |
| if use_gpu and isinstance(features_df, cudf.DataFrame): | |
| work_df = features_df.to_pandas() | |
| else: | |
| work_df = features_df.copy() | |
| variances = work_df.var() | |
| cols_to_keep = variances[variances > variance_threshold].index | |
| work_df = work_df[cols_to_keep] | |
| corr_matrix = work_df.corr().abs() | |
| upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool)) | |
| to_drop = [col for col in upper.columns if any(upper[col] > correlation_threshold)] | |
| work_df = work_df.drop(columns=to_drop) | |
| if use_gpu: | |
| return cudf.DataFrame.from_pandas(work_df) | |
| return work_df | |
| try: | |
| selected = select_features(work_df, target_series) | |
| if use_gpu: | |
| - return cudf.DataFrame.from_pandas(selected) | |
| + return cudf.DataFrame.from_pandas(selected).set_index(selected.index) | |
| return selected | |
| except Exception as exc: | |
| logging.error( | |
| "tsfresh feature selection failed: %s. Falling back to simple filtering.", | |
| exc, | |
| ) |
🤖 Prompt for AI Agents
In DataIngestion/feature_extraction/feature/extract_features.py around lines 298
to 350, when converting the selected features DataFrame from pandas to cudf
using cudf.DataFrame.from_pandas(selected), the index is reset causing loss of
the original index which downstream code relies on as a primary key. To fix
this, explicitly preserve the index by either resetting the index to a column
before conversion and restoring it after, or by passing the index explicitly
during conversion to ensure the original index is maintained in the cudf
DataFrame.
Docstrings generation was requested by @Fnux8890. * #20 (comment) The following files were modified: * `DataIngestion/feature_extraction/feature/db_utils.py` * `DataIngestion/feature_extraction/feature/extract_features.py`
|
Note Generated docstrings for this pull request at #22 |
Summary
Testing
python -m py_compile DataIngestion/feature_extraction/feature/*.pySummary by CodeRabbit