|
| 1 | +# project imports |
| 2 | +from network_security.exceptions import NetworkSecurityException |
| 3 | +from network_security.logging.logger import logging |
| 4 | +from network_security.entity.artifact_entity import DataIngestionArtifact |
| 5 | + |
| 6 | +# importing config entitiy of Data Ingestion Config |
| 7 | +from network_security.entity.config_entity import DataIngestionConfig |
| 8 | + |
| 9 | +# general imports |
| 10 | +from typing import List |
| 11 | +from sklearn.model_selection import train_test_split |
| 12 | +import numpy as np |
| 13 | +import pandas as pd |
| 14 | +import os, sys |
| 15 | +import pymongo |
| 16 | + |
| 17 | +# dot evn loader |
| 18 | +from dotenv import load_dotenv |
| 19 | +load_dotenv() |
| 20 | + |
| 21 | +""" |
| 22 | +Steps for this data_ingestion modele: |
| 23 | +
|
| 24 | +1) Read data from MongoDB |
| 25 | +2) Store data in feature store |
| 26 | +3) Split the data into train and test sets |
| 27 | +4) Store the train and test sets in the ingested directory |
| 28 | +""" |
| 29 | + |
| 30 | +### DEFINING CONSTANTS ### |
| 31 | +MONGO_DB_URL = os.getenv("MONGODB_URI") |
| 32 | + |
| 33 | + |
| 34 | +class DataIngestion: |
| 35 | + def __init__(self, data_ingestion_config: DataIngestionConfig): |
| 36 | + try: |
| 37 | + self.data_ingestion_config = data_ingestion_config |
| 38 | + except Exception as e: |
| 39 | + raise NetworkSecurityException(e, sys) |
| 40 | + |
| 41 | + def export_collection_as_dataframe(self) -> pd.DataFrame: |
| 42 | + """ |
| 43 | + Reads data from MongoDB collection and converts it to a pandas DataFrame |
| 44 | +
|
| 45 | + Returns: |
| 46 | + pd.DataFrame: DataFrame containing the data from the MongoDB collection |
| 47 | + """ |
| 48 | + try: |
| 49 | + database_name = self.data_ingestion_config.db_name |
| 50 | + collection_name = self.data_ingestion_config.collection_name |
| 51 | + |
| 52 | + self.mongo_client = pymongo.MongoClient(MONGO_DB_URL) |
| 53 | + collection = self.mongo_client[database_name][collection_name] |
| 54 | + |
| 55 | + df = pd.DataFrame(list(collection.find())) |
| 56 | + if "_id" in df.columns: |
| 57 | + df = df.drop(columns=["_id"], axis=1) |
| 58 | + df.replace("na", np.nan, inplace=True) |
| 59 | + |
| 60 | + return df |
| 61 | + except Exception as e: |
| 62 | + raise NetworkSecurityException(e, sys) |
| 63 | + |
| 64 | + def export_data_to_feature_store(self, dataframe: pd.DataFrame) -> pd.DataFrame: |
| 65 | + """ |
| 66 | + Exports the DataFrame to the feature store path as a CSV file |
| 67 | + Args: |
| 68 | + dataframe (pd.DataFrame): The DataFrame to be exported |
| 69 | + """ |
| 70 | + try: |
| 71 | + feature_store_file_path = self.data_ingestion_config.feature_store_file_path |
| 72 | + |
| 73 | + # creating the folder |
| 74 | + dir_path = os.path.dirname(feature_store_file_path) |
| 75 | + os.makedirs(dir_path, exist_ok=True) |
| 76 | + dataframe.to_csv(feature_store_file_path, index=False, header=True) |
| 77 | + |
| 78 | + return dataframe |
| 79 | + except Exception as e: |
| 80 | + raise NetworkSecurityException(e, sys) |
| 81 | + |
| 82 | + def split_data_as_train_test(self, dataframe: pd.DataFrame) -> None: |
| 83 | + try: |
| 84 | + train_set, test_set = train_test_split( |
| 85 | + dataframe, |
| 86 | + test_size = self.data_ingestion_config.train_test_split_ratio, |
| 87 | + ) |
| 88 | + logging.info("Performed train test split on the data") |
| 89 | + |
| 90 | + logging.info("Exited the split_data_as_train_test method of Data Ingestion class") |
| 91 | + |
| 92 | + dir_path = os.path.dirname(self.data_ingestion_config.train_file_path) |
| 93 | + os.makedirs(dir_path, exist_ok=True) |
| 94 | + |
| 95 | + logging.info("Created the directory for train and test data") |
| 96 | + |
| 97 | + train_set.to_csv(self.data_ingestion_config.train_file_path, index=False, header=True) |
| 98 | + test_set.to_csv(self.data_ingestion_config.test_file_path, index=False, header=True) |
| 99 | + |
| 100 | + logging.info("Exported train and test data to their respective paths") |
| 101 | + |
| 102 | + except Exception as e: |
| 103 | + raise NetworkSecurityException(e, sys) |
| 104 | + |
| 105 | + def initiate_data_ingestion(self): |
| 106 | + try: |
| 107 | + # get data from mongoDB as data frame |
| 108 | + dataframe = self.export_collection_as_dataframe() |
| 109 | + |
| 110 | + # export data to feature store |
| 111 | + dataframe = self.export_data_to_feature_store(dataframe) |
| 112 | + |
| 113 | + # dropping columns if required |
| 114 | + self.split_data_as_train_test(dataframe) |
| 115 | + |
| 116 | + # creating data ingestion artifact |
| 117 | + data_ingestion_artifact = DataIngestionArtifact( |
| 118 | + train_file_path = self.data_ingestion_config.train_file_path, |
| 119 | + test_file_path = self.data_ingestion_config.test_file_path |
| 120 | + ) |
| 121 | + return data_ingestion_artifact |
| 122 | + |
| 123 | + except Exception as e: |
| 124 | + raise NetworkSecurityException(e, sys) |
| 125 | + |
0 commit comments