From f9514c290c48627b1af4101b08637f98fa4e8098 Mon Sep 17 00:00:00 2001 From: Brian Wylie Date: Sat, 9 Nov 2024 10:48:35 -0700 Subject: [PATCH 01/12] coverting training values to True/False instead if 1/0 --- src/sageworks/core/views/training_view.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/sageworks/core/views/training_view.py b/src/sageworks/core/views/training_view.py index c6068fb5d..4808a6d2e 100644 --- a/src/sageworks/core/views/training_view.py +++ b/src/sageworks/core/views/training_view.py @@ -23,7 +23,7 @@ class TrainingView(CreateView): training_view = TrainingView.create(fs, column_list=["my_col1", "my_col2"]) # Query the view - df = training_view.query(f"SELECT * FROM {training_view.table} where training = 1") + df = training_view.query(f"SELECT * FROM {training_view.table} where training = True") ``` """ @@ -87,8 +87,8 @@ def create( create_view_query = f""" CREATE OR REPLACE VIEW {instance.table} AS SELECT {sql_columns}, CASE - WHEN {id_column} IN ({formatted_holdout_ids}) THEN 0 - ELSE 1 + WHEN {id_column} IN ({formatted_holdout_ids}) THEN False + ELSE True END AS training FROM {instance.source_table} """ @@ -120,8 +120,8 @@ def _default_training_view(self, data_source: DataSource, id_column: str): create_view_query = f""" CREATE OR REPLACE VIEW "{self.table}" AS SELECT {sql_columns}, CASE - WHEN MOD(ROW_NUMBER() OVER (ORDER BY {id_column}), 10) < 8 THEN 1 -- Assign 80% to training - ELSE 0 -- Assign roughly 20% to validation/test + WHEN MOD(ROW_NUMBER() OVER (ORDER BY {id_column}), 10) < 8 THEN True -- Assign 80% to training + ELSE False -- Assign roughly 20% to validation/test END AS training FROM {self.base_table_name} """ @@ -157,4 +157,4 @@ def _default_training_view(self, data_source: DataSource, id_column: str): # Pull the training data df = training_view.pull_dataframe() print(df.head()) - print(df["training"].value_counts()) + print(df["training"].value_counts()) \ No newline at end of file From 5f0c6a1e1c35361c0d59a3b0077080fe2aa35ed4 Mon Sep 17 00:00:00 2001 From: Brian Wylie Date: Sat, 9 Nov 2024 11:02:37 -0700 Subject: [PATCH 02/12] coverting training values to True/False instead if 1/0 --- docs/api_classes/endpoint.md | 2 +- docs/api_classes/overview.md | 2 +- examples/endpoint/endpoint_inference.py | 2 +- examples/full_ml_pipeline.py | 2 +- examples/storage/endpoint_inference.py | 2 +- notebooks/ML_Pipeline_with_SageWorks.ipynb | 2 +- notebooks/Regression_Confidence_Experiments.ipynb | 4 ++-- notebooks/Residual_Analysis.ipynb | 2 +- src/sageworks/api/endpoint.py | 2 +- src/sageworks/core/artifacts/endpoint_core.py | 2 +- .../light_scikit_learn/scikit_learn.template | 4 ++-- .../features_to_model/light_xgb_model/xgb_model.template | 4 ++-- .../core/transforms/pandas_transforms/pandas_to_features.py | 6 +++--- src/sageworks/core/views/training_view.py | 6 +++--- src/sageworks/utils/endpoint_utils.py | 4 ++-- tests/artifacts/model_metrics_tests.py | 2 +- 16 files changed, 24 insertions(+), 24 deletions(-) diff --git a/docs/api_classes/endpoint.md b/docs/api_classes/endpoint.md index bc85b16db..6936d8f2e 100644 --- a/docs/api_classes/endpoint.md +++ b/docs/api_classes/endpoint.md @@ -23,7 +23,7 @@ endpoint = Endpoint("abalone-regression-end") model = Model(endpoint.get_input()) fs = FeatureSet(model.get_input()) athena_table = fs.view("training").table -df = fs.query(f"SELECT * FROM {athena_table} where training = 0") +df = fs.query(f"SELECT * FROM {athena_table} where training = FALSE") # Run inference/predictions on the Endpoint results_df = endpoint.inference(df) diff --git a/docs/api_classes/overview.md b/docs/api_classes/overview.md index 1d3584859..010d17047 100644 --- a/docs/api_classes/overview.md +++ b/docs/api_classes/overview.md @@ -47,7 +47,7 @@ endpoint = Endpoint("abalone-regression-end") # Get a DataFrame of data (not used to train) and run predictions athena_table = fs.view("training").table -df = fs.query(f"SELECT * FROM {athena_table} where training = 0") +df = fs.query(f"SELECT * FROM {athena_table} where training = FALSE") results = endpoint.predict(df) print(results[["class_number_of_rings", "prediction"]]) ``` diff --git a/examples/endpoint/endpoint_inference.py b/examples/endpoint/endpoint_inference.py index b05f12d7b..3d4271e42 100644 --- a/examples/endpoint/endpoint_inference.py +++ b/examples/endpoint/endpoint_inference.py @@ -10,7 +10,7 @@ model = Model(endpoint.get_input()) fs = FeatureSet(model.get_input()) athena_table = fs.view("training").table -df = fs.query(f"SELECT * FROM {athena_table} where training = 0") +df = fs.query(f"SELECT * FROM {athena_table} where training = FALSE") # Run inference/predictions on the Endpoint results_df = endpoint.inference(df) diff --git a/examples/full_ml_pipeline.py b/examples/full_ml_pipeline.py index 44711ca64..66d458faa 100644 --- a/examples/full_ml_pipeline.py +++ b/examples/full_ml_pipeline.py @@ -45,6 +45,6 @@ # Get a DataFrame of data (not used to train) and run predictions athena_table = fs.view("training").table - df = fs.query(f"SELECT * FROM {athena_table} where training = 0") + df = fs.query(f"SELECT * FROM {athena_table} where training = FALSE") results = endpoint.inference(df) print(results[["class_number_of_rings", "prediction"]]) diff --git a/examples/storage/endpoint_inference.py b/examples/storage/endpoint_inference.py index d5d3ba67a..490675060 100644 --- a/examples/storage/endpoint_inference.py +++ b/examples/storage/endpoint_inference.py @@ -22,7 +22,7 @@ def run_inference(endpoint_name): feature_set = ModelCore(model).get_input() features = FeatureSetCore(feature_set) table = features.view("training").table - test_df = features.query(f'SELECT * FROM "{table}" where training = 0') + test_df = features.query(f'SELECT * FROM "{table}" where training = FALSE') # Drop some columns test_df.drop(["write_time", "api_invocation_time", "is_deleted"], axis=1, inplace=True) diff --git a/notebooks/ML_Pipeline_with_SageWorks.ipynb b/notebooks/ML_Pipeline_with_SageWorks.ipynb index 6f76460bd..8da0c61f9 100644 --- a/notebooks/ML_Pipeline_with_SageWorks.ipynb +++ b/notebooks/ML_Pipeline_with_SageWorks.ipynb @@ -303,7 +303,7 @@ "source": [ "# Get a DataFrame of data (not used to train) and run predictions\n", "table = feature_set.view(\"training\").table\n", - "test_df = feature_set.query(f\"SELECT * FROM {table} where training = 0\")\n", + "test_df = feature_set.query(f\"SELECT * FROM {table} where training = FALSE\")\n", "test_df.head()" ] }, diff --git a/notebooks/Regression_Confidence_Experiments.ipynb b/notebooks/Regression_Confidence_Experiments.ipynb index bffa31ecf..7e2426a4a 100644 --- a/notebooks/Regression_Confidence_Experiments.ipynb +++ b/notebooks/Regression_Confidence_Experiments.ipynb @@ -98,8 +98,8 @@ "source": [ "# Grab the Training View\n", "table = fs.view(\"training\").table\n", - "train_df = fs.query(f\"SELECT * FROM {table} where training = 1\")\n", - "hold_out_df = fs.query(f\"SELECT * FROM {table} where training = 0\")" + "train_df = fs.query(f\"SELECT * FROM {table} where training = TRUE\")\n", + "hold_out_df = fs.query(f\"SELECT * FROM {table} where training = FALSE\")" ] }, { diff --git a/notebooks/Residual_Analysis.ipynb b/notebooks/Residual_Analysis.ipynb index badcf31af..20b25fbed 100644 --- a/notebooks/Residual_Analysis.ipynb +++ b/notebooks/Residual_Analysis.ipynb @@ -302,7 +302,7 @@ "source": [ "# Get a DataFrame of data (not used to train) and run predictions\n", "table = feature_set.view(\"training\").table\n", - "test_df = feature_set.query(f\"SELECT * FROM {table} where training = 0\")\n", + "test_df = feature_set.query(f\"SELECT * FROM {table} where training = FALSE\")\n", "test_df.head()" ], "outputs": [] diff --git a/src/sageworks/api/endpoint.py b/src/sageworks/api/endpoint.py index 0f687d7f2..da4ecd7c8 100644 --- a/src/sageworks/api/endpoint.py +++ b/src/sageworks/api/endpoint.py @@ -81,7 +81,7 @@ def fast_inference(self, eval_df: pd.DataFrame) -> pd.DataFrame: model = Model(my_endpoint.get_input()) my_features = FeatureSet(model.get_input()) table = my_features.view("training").table - df = my_features.query(f'SELECT * FROM "{table}" where training = 0') + df = my_features.query(f'SELECT * FROM "{table}" where training = FALSE') results = my_endpoint.inference(df) target = model.target() pprint(results[[target, "prediction"]]) diff --git a/src/sageworks/core/artifacts/endpoint_core.py b/src/sageworks/core/artifacts/endpoint_core.py index b169a118b..e26262dcc 100644 --- a/src/sageworks/core/artifacts/endpoint_core.py +++ b/src/sageworks/core/artifacts/endpoint_core.py @@ -343,7 +343,7 @@ def auto_inference(self, capture: bool = False) -> pd.DataFrame: # Grab the evaluation data from the FeatureSet table = fs.view("training").table - eval_df = fs.query(f'SELECT * FROM "{table}" where training = 0') + eval_df = fs.query(f'SELECT * FROM "{table}" where training = FALSE') capture_uuid = "auto_inference" if capture else None return self.inference(eval_df, capture_uuid, id_column=fs.id_column) diff --git a/src/sageworks/core/transforms/features_to_model/light_scikit_learn/scikit_learn.template b/src/sageworks/core/transforms/features_to_model/light_scikit_learn/scikit_learn.template index 7138d092b..9616f0de8 100644 --- a/src/sageworks/core/transforms/features_to_model/light_scikit_learn/scikit_learn.template +++ b/src/sageworks/core/transforms/features_to_model/light_scikit_learn/scikit_learn.template @@ -175,8 +175,8 @@ if __name__ == "__main__": # Does the dataframe have a training column? elif "training" in all_df.columns: print("Found training column, splitting data based on training column") - df_train = all_df[all_df["training"] == 1].copy() - df_val = all_df[all_df["training"] == 0].copy() + df_train = all_df[all_df["training"]].copy() + df_val = all_df[~all_df["training"].copy() else: # Just do a random training Split print("WARNING: No training column found, splitting data with random state=42") diff --git a/src/sageworks/core/transforms/features_to_model/light_xgb_model/xgb_model.template b/src/sageworks/core/transforms/features_to_model/light_xgb_model/xgb_model.template index 0d34c2ca8..ce6875d4e 100644 --- a/src/sageworks/core/transforms/features_to_model/light_xgb_model/xgb_model.template +++ b/src/sageworks/core/transforms/features_to_model/light_xgb_model/xgb_model.template @@ -158,8 +158,8 @@ if __name__ == "__main__": # Does the dataframe have a training column? elif "training" in all_df.columns: print("Found training column, splitting data based on training column") - df_train = all_df[all_df["training"] == 1].copy() - df_val = all_df[all_df["training"] == 0].copy() + df_train = all_df[all_df["training"]].copy() + df_val = all_df[~all_df["training"].copy() else: # Just do a random training Split print("WARNING: No training column found, splitting data with random state=42") diff --git a/src/sageworks/core/transforms/pandas_transforms/pandas_to_features.py b/src/sageworks/core/transforms/pandas_transforms/pandas_to_features.py index 3347dc24b..d812cf59e 100644 --- a/src/sageworks/core/transforms/pandas_transforms/pandas_to_features.py +++ b/src/sageworks/core/transforms/pandas_transforms/pandas_to_features.py @@ -286,7 +286,7 @@ def prep_dataframe(self): """Training column detected: Since FeatureSets are read-only, SageWorks creates a training view that can be dynamically changed. We'll use this training column to create a training view.""" ) - self.incoming_hold_out_ids = self.output_df[self.output_df["training"] == 0][self.id_column].tolist() + self.incoming_hold_out_ids = self.output_df[~self.output_df["training"]][self.id_column].tolist() self.output_df = self.output_df.drop(columns=["training"]) def create_feature_group(self): @@ -423,8 +423,8 @@ def wait_for_rows(self, expected_rows: int): data_df = ds.sample() # Test setting a training column - data_df["training"] = 0 - data_df.loc[0:10, "training"] = 1 + data_df["training"] = False + data_df.loc[0:10, "training"] = True # Create my DF to Feature Set Transform (with one-hot encoding) df_to_features = PandasToFeatures("test_features") diff --git a/src/sageworks/core/views/training_view.py b/src/sageworks/core/views/training_view.py index 4808a6d2e..ee51a077c 100644 --- a/src/sageworks/core/views/training_view.py +++ b/src/sageworks/core/views/training_view.py @@ -23,7 +23,7 @@ class TrainingView(CreateView): training_view = TrainingView.create(fs, column_list=["my_col1", "my_col2"]) # Query the view - df = training_view.query(f"SELECT * FROM {training_view.table} where training = True") + df = training_view.query(f"SELECT * FROM {training_view.table} where training = TRUE") ``` """ @@ -33,7 +33,7 @@ def create( feature_set: FeatureSet, source_table: str = None, id_column: str = None, - holdout_ids: Union[list[str], None] = None, + holdout_ids: Union[list[str], list[int], None] = None, ) -> Union[View, None]: """Factory method to create and return a TrainingView instance. @@ -41,7 +41,7 @@ def create( feature_set (FeatureSet): A FeatureSet object source_table (str, optional): The table/view to create the view from. Defaults to None. id_column (str, optional): The name of the id column. Defaults to None. - holdout_ids (Union[list[str], None], optional): A list of holdout ids. Defaults to None. + holdout_ids (Union[list[str], list[int], None], optional): A list of holdout ids. Defaults to None. Returns: Union[View, None]: The created View object (or None if failed to create the view) diff --git a/src/sageworks/utils/endpoint_utils.py b/src/sageworks/utils/endpoint_utils.py index 67a1086cf..f71095210 100644 --- a/src/sageworks/utils/endpoint_utils.py +++ b/src/sageworks/utils/endpoint_utils.py @@ -38,7 +38,7 @@ def fs_training_data(end: Endpoint) -> pd.DataFrame: # Grab the FeatureSet by backtracking from the Endpoint fs = backtrack_to_fs(end) table = fs.view("training").table - train_df = fs.query(f'SELECT * FROM "{table}" where training = 1') + train_df = fs.query(f'SELECT * FROM "{table}" where training = TRUE') return train_df @@ -54,7 +54,7 @@ def fs_evaluation_data(end: Endpoint) -> pd.DataFrame: # Grab the FeatureSet by backtracking from the Endpoint fs = backtrack_to_fs(end) table = fs.view("training").table - eval_df = fs.query(f'SELECT * FROM "{table}" where training = 0') + eval_df = fs.query(f'SELECT * FROM "{table}" where training = FALSE') return eval_df diff --git a/tests/artifacts/model_metrics_tests.py b/tests/artifacts/model_metrics_tests.py index 548c02228..90a44253a 100644 --- a/tests/artifacts/model_metrics_tests.py +++ b/tests/artifacts/model_metrics_tests.py @@ -123,7 +123,7 @@ def test_inference_with_capture_uuid(): # Grab a dataframe for inference my_features = FeatureSet("abalone_features") table = my_features.view("training").table - df = my_features.query(f'SELECT * FROM "{table}" where training = 0') + df = my_features.query(f'SELECT * FROM "{table}" where training = FALSE') # Run inference my_endpoint = Endpoint("abalone-regression-end") From 35470af958596af9e0b7ad877135b56e755fb6de Mon Sep 17 00:00:00 2001 From: Brian Wylie Date: Sat, 9 Nov 2024 11:03:28 -0700 Subject: [PATCH 03/12] linter formatting --- src/sageworks/core/views/training_view.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sageworks/core/views/training_view.py b/src/sageworks/core/views/training_view.py index ee51a077c..3bc16f6f3 100644 --- a/src/sageworks/core/views/training_view.py +++ b/src/sageworks/core/views/training_view.py @@ -157,4 +157,4 @@ def _default_training_view(self, data_source: DataSource, id_column: str): # Pull the training data df = training_view.pull_dataframe() print(df.head()) - print(df["training"].value_counts()) \ No newline at end of file + print(df["training"].value_counts()) From 9ffd4df2426f598b06ea601242fc925c6a0c3bfc Mon Sep 17 00:00:00 2001 From: Brian Wylie Date: Sat, 9 Nov 2024 13:19:29 -0700 Subject: [PATCH 04/12] adding a script that converts training views from 0/1 to False/True --- scripts/convert_training_views.py | 93 +++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100644 scripts/convert_training_views.py diff --git a/scripts/convert_training_views.py b/scripts/convert_training_views.py new file mode 100644 index 000000000..d1bfbc7d7 --- /dev/null +++ b/scripts/convert_training_views.py @@ -0,0 +1,93 @@ +import json +import re +import base64 +import logging +import awswrangler as wr +from sageworks.core.cloud_platform.aws.aws_account_clamp import AWSAccountClamp + +log = logging.getLogger("sageworks") + +# Initialize your AWS session and Glue client +aws_account_clamp = AWSAccountClamp() +session = aws_account_clamp.boto3_session +glue_client = session.client("glue") + + +def _decode_view_sql(encoded_sql: str) -> str: + """Decode the base64-encoded SQL query from the view. + + Args: + encoded_sql (str): The encoded SQL query in the ViewOriginalText. + + Returns: + str: The decoded SQL query. + """ + # Extract the base64-encoded content from the comment + match = re.search(r"Presto View: ([\w=+/]+)", encoded_sql) + if match: + base64_sql = match.group(1) + decoded_bytes = base64.b64decode(base64_sql) + decoded_str = decoded_bytes.decode("utf-8") + + # Parse the decoded string as JSON to extract the SQL + try: + view_json = json.loads(decoded_str) + return view_json.get("originalSql", "") + except json.JSONDecodeError: + log.error("Failed to parse the decoded view SQL as JSON.") + return "" + return "" + + +def convert_training_views(database): + """Convert training views from 0/1 to FALSE/TRUE in the specified AWS Glue database""" + + # Use the Glue client to get the list of tables (views) from the database + paginator = glue_client.get_paginator("get_tables") + + for page in paginator.paginate(DatabaseName=database): + for table in page["TableList"]: + # Check if the table name ends with "_training" and is a view + if table["Name"].endswith("_training") and table.get("TableType") == "VIRTUAL_VIEW": + print(f"Checking view: {table['Name']}...") + + # Decode the 'ViewOriginalText' for the view + view_original_text = _decode_view_sql(table.get("ViewOriginalText")) + if view_original_text and (" THEN 0" in view_original_text or " THEN 1" in view_original_text): + print(f"\tConverting view: {table['Name']}...") + + # Update the THEN and ELSE view definitions by replacing 0/1 with FALSE/TRUE + updated_query = view_original_text.replace(" THEN 0", " THEN FALSE").replace( + " THEN 1", " THEN TRUE" + ) + updated_query = updated_query.replace(" ELSE 0", " ELSE FALSE").replace(" ELSE 1", " ELSE TRUE") + + # Construct the full CREATE OR REPLACE VIEW query + query = f""" + CREATE OR REPLACE VIEW {table['Name']} AS + {updated_query} + """ + + try: + # Execute the query using awswrangler + query_execution_id = wr.athena.start_query_execution( + sql=query, + database=database, + boto3_session=session, + ) + print(f"\tQueryExecutionId: {query_execution_id}") + + # Wait for the query to complete + wr.athena.wait_query(query_execution_id=query_execution_id, boto3_session=session) + print(f"\tSuccessfully converted view: {table['Name']}") + except Exception as e: + print(f"\tError updating view {table['Name']}: {e}") + else: + print(f"\tNo conversion needed for view: {table['Name']}") + + +if __name__ == "__main__": + # Specify your database scope + database_scope = ["sagemaker_featurestore"] + for db in database_scope: + convert_training_views(db) From 2ca60fe515b5bb94c5d1af964c32cee72120cb9b Mon Sep 17 00:00:00 2001 From: Brian Wylie Date: Sat, 9 Nov 2024 15:25:08 -0700 Subject: [PATCH 05/12] fix a global replace whoops :) --- .../features_to_model/light_scikit_learn/scikit_learn.template | 2 +- .../features_to_model/light_xgb_model/xgb_model.template | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sageworks/core/transforms/features_to_model/light_scikit_learn/scikit_learn.template b/src/sageworks/core/transforms/features_to_model/light_scikit_learn/scikit_learn.template index 9616f0de8..b01270361 100644 --- a/src/sageworks/core/transforms/features_to_model/light_scikit_learn/scikit_learn.template +++ b/src/sageworks/core/transforms/features_to_model/light_scikit_learn/scikit_learn.template @@ -176,7 +176,7 @@ if __name__ == "__main__": elif "training" in all_df.columns: print("Found training column, splitting data based on training column") df_train = all_df[all_df["training"]].copy() - df_val = all_df[~all_df["training"].copy() + df_val = all_df[~all_df["training"]].copy() else: # Just do a random training Split print("WARNING: No training column found, splitting data with random state=42") diff --git a/src/sageworks/core/transforms/features_to_model/light_xgb_model/xgb_model.template b/src/sageworks/core/transforms/features_to_model/light_xgb_model/xgb_model.template index ce6875d4e..398282793 100644 --- a/src/sageworks/core/transforms/features_to_model/light_xgb_model/xgb_model.template +++ b/src/sageworks/core/transforms/features_to_model/light_xgb_model/xgb_model.template @@ -159,7 +159,7 @@ if __name__ == "__main__": elif "training" in all_df.columns: print("Found training column, splitting data based on training column") df_train = all_df[all_df["training"]].copy() - df_val = all_df[~all_df["training"].copy() + df_val = all_df[~all_df["training"]].copy() else: # Just do a random training Split print("WARNING: No training column found, splitting data with random state=42") From 65b5b01792523cb1f79fd8467d11c29b489d36dd Mon Sep 17 00:00:00 2001 From: Brian Wylie Date: Sat, 9 Nov 2024 15:30:47 -0700 Subject: [PATCH 06/12] convert script docstring --- scripts/convert_training_views.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/scripts/convert_training_views.py b/scripts/convert_training_views.py index d1bfbc7d7..15cb9a3bf 100644 --- a/scripts/convert_training_views.py +++ b/scripts/convert_training_views.py @@ -1,3 +1,10 @@ +"""Convert training views from 0/1 to FALSE/TRUE in the specified AWS Glue database. + + Note: This script is a 'schema' change for the training views, and is a 'one-time' + operation. The code quality here is not as important as the correctness of the + operation and since this will only be run once for existing clients and never + again, we don't want to sweat the details. +""" import json import re import base64 From 79d38b718d7e00fd6553b6f05e9d511b1d4d890b Mon Sep 17 00:00:00 2001 From: Brian Wylie Date: Sat, 9 Nov 2024 15:31:14 -0700 Subject: [PATCH 07/12] linter formatting --- scripts/convert_training_views.py | 1 + 1 file changed, 1 insertion(+) diff --git a/scripts/convert_training_views.py b/scripts/convert_training_views.py index 15cb9a3bf..c8a2c8f57 100644 --- a/scripts/convert_training_views.py +++ b/scripts/convert_training_views.py @@ -5,6 +5,7 @@ operation and since this will only be run once for existing clients and never again, we don't want to sweat the details. """ + import json import re import base64 From eb41f3b10fc965635b5c18411039f8c9284bb6d7 Mon Sep 17 00:00:00 2001 From: Brian Wylie Date: Sat, 9 Nov 2024 16:06:40 -0700 Subject: [PATCH 08/12] removing an endpoint util function that wasn't really used and putting in some nice additional sanity checks when we backtrace from endpoint to model to featureset --- src/sageworks/core/artifacts/endpoint_core.py | 15 ++-- src/sageworks/core/artifacts/monitor_core.py | 2 +- src/sageworks/utils/endpoint_utils.py | 69 ++++++++++--------- 3 files changed, 49 insertions(+), 37 deletions(-) diff --git a/src/sageworks/core/artifacts/endpoint_core.py b/src/sageworks/core/artifacts/endpoint_core.py index e26262dcc..ff4f2b55e 100644 --- a/src/sageworks/core/artifacts/endpoint_core.py +++ b/src/sageworks/core/artifacts/endpoint_core.py @@ -336,10 +336,17 @@ def auto_inference(self, capture: bool = False) -> pd.DataFrame: capture (bool, optional): Capture the inference results and metrics (default=False) """ - # Backtrack to the FeatureSet - model_name = self.get_input() - fs_name = ModelCore(model_name).get_input() - fs = FeatureSetCore(fs_name) + # Sanity Check that we have a model + model = ModelCore(self.get_input()) + if not model.exists(): + self.log.error("No model found for this endpoint. Returning empty DataFrame.") + return pd.DataFrame() + + # Now get the FeatureSet and make sure it exists + fs = FeatureSetCore(model.get_input()) + if not fs.exists(): + self.log.error("No FeatureSet found for this endpoint. Returning empty DataFrame.") + return pd.DataFrame() # Grab the evaluation data from the FeatureSet table = fs.view("training").table diff --git a/src/sageworks/core/artifacts/monitor_core.py b/src/sageworks/core/artifacts/monitor_core.py index dda56e5f3..4abd7e0c8 100644 --- a/src/sageworks/core/artifacts/monitor_core.py +++ b/src/sageworks/core/artifacts/monitor_core.py @@ -489,7 +489,7 @@ def monitoring_schedule_exists(self): # # Make predictions on the Endpoint using the FeatureSet evaluation data - pred_df = endpoint_utils.predictions_using_fs(my_endpoint) + pred_df = my_endpoint.auto_inference() print(pred_df.head()) # Check that data capture is working diff --git a/src/sageworks/utils/endpoint_utils.py b/src/sageworks/utils/endpoint_utils.py index f71095210..e3617d736 100644 --- a/src/sageworks/utils/endpoint_utils.py +++ b/src/sageworks/utils/endpoint_utils.py @@ -1,6 +1,7 @@ """Endpoint Utilities for SageWorks endpoints""" import logging +from typing import Union import pandas as pd # SageWorks Imports @@ -12,20 +13,6 @@ log = logging.getLogger("sageworks") -def predictions_using_fs(end: Endpoint) -> pd.DataFrame: - """Code to run predictions using the FeatureSet - - Args: - end (Endpoint): Endpoint to backtrace: End -> Model -> FeatureSet (evaluation data) - - Returns: - pd.DataFrame: Dataframe with the predictions using the FeatureSet data - """ - # Grab the FeatureSet evaluation data - feature_df = fs_evaluation_data(end) - return end._predict(feature_df) - - def fs_training_data(end: Endpoint) -> pd.DataFrame: """Code to get the training data from the FeatureSet used to train the Model @@ -37,6 +24,13 @@ def fs_training_data(end: Endpoint) -> pd.DataFrame: """ # Grab the FeatureSet by backtracking from the Endpoint fs = backtrack_to_fs(end) + + # Sanity check that we have a FeatureSet + if fs is None: + log.error("No FeatureSet found for this endpoint. Returning empty dataframe.") + return pd.DataFrame() + + # Get the training data table = fs.view("training").table train_df = fs.query(f'SELECT * FROM "{table}" where training = TRUE') return train_df @@ -53,21 +47,38 @@ def fs_evaluation_data(end: Endpoint) -> pd.DataFrame: """ # Grab the FeatureSet by backtracking from the Endpoint fs = backtrack_to_fs(end) + + # Sanity check that we have a FeatureSet + if fs is None: + log.error("No FeatureSet found for this endpoint. Returning empty dataframe.") + return pd.DataFrame() + + # Get the evaluation data table = fs.view("training").table eval_df = fs.query(f'SELECT * FROM "{table}" where training = FALSE') return eval_df -def backtrack_to_fs(end: Endpoint) -> FeatureSet: +def backtrack_to_fs(end: Endpoint) -> Union[FeatureSet, None]: """Code to Backtrack to FeatureSet: End -> Model -> FeatureSet Returns: - FeatureSet: The FeatureSet used to train the Model + FeatureSet (Union[FeatureSet, None]): The FeatureSet object or None if not found """ - # Grab the FeatureSet by backtracking from the Endpoint - model_name = end.get_input() - fs_name = Model(model_name).get_input() - fs = FeatureSet(fs_name) + + # Sanity Check that we have a model + model = Model(end.get_input()) + if not model.exists(): + log.error("No model found for this endpoint. Returning None.") + return None + + # Now get the FeatureSet and make sure it exists + fs = FeatureSet(model.get_input()) + if not fs.exists(): + log.error("No FeatureSet found for this endpoint. Returning None.") + return None + + # Return the FeatureSet return fs @@ -85,17 +96,11 @@ def backtrack_to_fs(end: Endpoint) -> FeatureSet: my_train_df = fs_training_data(my_endpoint) print(my_train_df) - # Make predictions on the Endpoint - pred_output_df = predictions_using_fs(my_endpoint) - print(pred_output_df) + # Get the evaluation data + my_eval_df = fs_evaluation_data(my_endpoint) + print(my_eval_df) - # Create a Classification Endpoint - my_endpoint_name = "wine-classification-end" - my_endpoint = Endpoint(my_endpoint_name) - if not my_endpoint.exists(): - print(f"Endpoint {my_endpoint_name} does not exist.") - exit(1) + # Backtrack to the FeatureSet + my_fs = backtrack_to_fs(my_endpoint) + print(my_fs) - # Make predictions on the Endpoint - pred_output_df = predictions_using_fs(my_endpoint) - print(pred_output_df) From 566d26bb6622c024919f976bb3c5ae192335cf12 Mon Sep 17 00:00:00 2001 From: Brian Wylie Date: Sat, 9 Nov 2024 16:17:07 -0700 Subject: [PATCH 09/12] putting in a sanity check for being handed an empty prediction dataframe --- src/sageworks/core/artifacts/endpoint_core.py | 8 ++++++-- src/sageworks/utils/endpoint_utils.py | 1 - 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/sageworks/core/artifacts/endpoint_core.py b/src/sageworks/core/artifacts/endpoint_core.py index ff4f2b55e..3110a7e81 100644 --- a/src/sageworks/core/artifacts/endpoint_core.py +++ b/src/sageworks/core/artifacts/endpoint_core.py @@ -654,8 +654,7 @@ def _capture_inference_results( self.log.important(f"Recomputing Details for {self.uuid} to show latest Inference Results...") self.details(recompute=True) - @staticmethod - def regression_metrics(target_column: str, prediction_df: pd.DataFrame) -> pd.DataFrame: + def regression_metrics(self, target_column: str, prediction_df: pd.DataFrame) -> pd.DataFrame: """Compute the performance metrics for this Endpoint Args: target_column (str): Name of the target column @@ -664,6 +663,11 @@ def regression_metrics(target_column: str, prediction_df: pd.DataFrame) -> pd.Da pd.DataFrame: DataFrame with the performance metrics """ + # Sanity Check the prediction DataFrame + if prediction_df.empty: + self.log.warning("No predictions were made. Returning empty DataFrame.") + return pd.DataFrame() + # Compute the metrics y_true = prediction_df[target_column] prediction_col = "prediction" if "prediction" in prediction_df.columns else "predictions" diff --git a/src/sageworks/utils/endpoint_utils.py b/src/sageworks/utils/endpoint_utils.py index e3617d736..c7d1182d3 100644 --- a/src/sageworks/utils/endpoint_utils.py +++ b/src/sageworks/utils/endpoint_utils.py @@ -103,4 +103,3 @@ def backtrack_to_fs(end: Endpoint) -> Union[FeatureSet, None]: # Backtrack to the FeatureSet my_fs = backtrack_to_fs(my_endpoint) print(my_fs) - From c0233846d50a2c3ed8c696482223d12c94e1be7b Mon Sep 17 00:00:00 2001 From: Brian Wylie Date: Sat, 9 Nov 2024 18:19:25 -0700 Subject: [PATCH 10/12] various sanity checks when for endpoints and models methods --- src/sageworks/core/artifacts/artifact.py | 7 ++++++- src/sageworks/core/artifacts/model_core.py | 11 ++++++++--- src/sageworks/core/cloud_platform/aws/aws_meta.py | 9 ++++++++- tests/artifacts/model_metrics_tests.py | 12 ++++++++++-- 4 files changed, 32 insertions(+), 7 deletions(-) diff --git a/src/sageworks/core/artifacts/artifact.py b/src/sageworks/core/artifacts/artifact.py index 11b3aae73..9a2f9a0f8 100644 --- a/src/sageworks/core/artifacts/artifact.py +++ b/src/sageworks/core/artifacts/artifact.py @@ -4,6 +4,7 @@ from abc import ABC, abstractmethod from datetime import datetime import logging +from typing import Union # SageWorks Imports from sageworks.core.cloud_platform.aws.aws_account_clamp import AWSAccountClamp @@ -128,8 +129,12 @@ def exists(self) -> bool: """Does the Artifact exist? Can we connect to it?""" pass - def sageworks_meta(self) -> dict: + def sageworks_meta(self) -> Union[dict, None]: """Get the SageWorks specific metadata for this Artifact + + Returns: + Union[dict, None]: Dictionary of SageWorks metadata for this Artifact + Note: This functionality will work for FeatureSets, Models, and Endpoints but not for DataSources and Graphs, those classes need to override this method. """ diff --git a/src/sageworks/core/artifacts/model_core.py b/src/sageworks/core/artifacts/model_core.py index e366a5ff8..44fed7fc1 100644 --- a/src/sageworks/core/artifacts/model_core.py +++ b/src/sageworks/core/artifacts/model_core.py @@ -246,6 +246,11 @@ def get_inference_metrics(self, capture_uuid: str = "latest") -> Union[pd.DataFr # Grab the metrics captured during model training (could return None) if capture_uuid == "model_training": + # Sanity check the sageworks metadata + if self.sageworks_meta() is None: + self.log.critical(f"Model {self.model_name} has no sageworks_meta(). Onboard() or delete this model!") + return + metrics = self.sageworks_meta().get("sageworks_training_metrics") return pd.DataFrame.from_dict(metrics) if metrics else None @@ -317,7 +322,7 @@ def arn(self) -> str: def group_arn(self) -> Union[str, None]: """AWS ARN (Amazon Resource Name) for the Model Package Group""" - return self.model_meta["ModelPackageGroupArn"] + return self.model_meta["ModelPackageGroupArn"] if self.model_meta else None def model_package_arn(self) -> Union[str, None]: """AWS ARN (Amazon Resource Name) for the Latest Model Package (within the Group)""" @@ -325,9 +330,9 @@ def model_package_arn(self) -> Union[str, None]: return None return self.latest_model["ModelPackageArn"] - def container_info(self) -> dict: + def container_info(self) -> Union[dict, None]: """Container Info for the Latest Model Package""" - return self.latest_model["InferenceSpecification"]["Containers"][0] + return self.latest_model["InferenceSpecification"]["Containers"][0] if self.latest_model else None def container_image(self) -> str: """Container Image for the Latest Model Package""" diff --git a/src/sageworks/core/cloud_platform/aws/aws_meta.py b/src/sageworks/core/cloud_platform/aws/aws_meta.py index d9eabfec7..92dd9dc32 100644 --- a/src/sageworks/core/cloud_platform/aws/aws_meta.py +++ b/src/sageworks/core/cloud_platform/aws/aws_meta.py @@ -491,8 +491,15 @@ def s3_describe_objects(self, bucket: str) -> Union[dict, None]: return wr.s3.describe_objects(path=bucket, boto3_session=self.boto3_session) @aws_throttle - def get_aws_tags(self, arn: str) -> dict: + def get_aws_tags(self, arn: str) -> Union[dict, None]: """List the tags for the given AWS ARN""" + + # Sanity check the ARN + if arn is None: + self.log.error("ARN is None, cannot retrieve tags.") + return None + + # Grab the tags from AWS return aws_tags_to_dict(self.sm_session.list_tags(resource_arn=arn)) @aws_throttle diff --git a/tests/artifacts/model_metrics_tests.py b/tests/artifacts/model_metrics_tests.py index 90a44253a..bae8b35f4 100644 --- a/tests/artifacts/model_metrics_tests.py +++ b/tests/artifacts/model_metrics_tests.py @@ -47,8 +47,16 @@ def test_retrieval_with_capture_uuid(): def test_validation_predictions(): print("\n\n*** Validation Predictions ***") - pprint(model_reg._get_validation_predictions().head()) - pprint(model_class._get_validation_predictions().head()) + val_preds = model_reg._get_validation_predictions() + if val_preds is None: + print(f"Model {model_reg.uuid} has no validation predictions!") + else: + pprint(model_reg._get_validation_predictions().head()) + val_preds = model_class._get_validation_predictions() + if val_preds is None: + print(f"Model {model_class.uuid} has no validation predictions!") + else: + pprint(model_class._get_validation_predictions().head()) def test_inference_predictions(): From 621a881ec2f8c8bbb9ff9c5c62cc2f627f9a380d Mon Sep 17 00:00:00 2001 From: Brian Wylie Date: Sat, 9 Nov 2024 18:34:03 -0700 Subject: [PATCH 11/12] various sanity checks when for endpoints and models methods --- src/sageworks/core/artifacts/model_core.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/sageworks/core/artifacts/model_core.py b/src/sageworks/core/artifacts/model_core.py index 44fed7fc1..7dd6eae17 100644 --- a/src/sageworks/core/artifacts/model_core.py +++ b/src/sageworks/core/artifacts/model_core.py @@ -248,8 +248,9 @@ def get_inference_metrics(self, capture_uuid: str = "latest") -> Union[pd.DataFr if capture_uuid == "model_training": # Sanity check the sageworks metadata if self.sageworks_meta() is None: - self.log.critical(f"Model {self.model_name} has no sageworks_meta(). Onboard() or delete this model!") - return + error_msg = f"Model {self.model_name} has no sageworks_meta(). Either onboard() or delete this model!" + self.log.critical(error_msg) + raise ValueError(error_msg) metrics = self.sageworks_meta().get("sageworks_training_metrics") return pd.DataFrame.from_dict(metrics) if metrics else None @@ -271,6 +272,13 @@ def confusion_matrix(self, capture_uuid: str = "latest") -> Union[pd.DataFrame, Returns: pd.DataFrame: DataFrame of the Confusion Matrix (might be None) """ + + # Sanity check the sageworks metadata + if self.sageworks_meta() is None: + error_msg = f"Model {self.model_name} has no sageworks_meta(). Either onboard() or delete this model!" + self.log.critical(error_msg) + raise ValueError(error_msg) + # Grab the metrics from the SageWorks Metadata (try inference first, then training) if capture_uuid == "latest": cm = self.sageworks_meta().get("sageworks_inference_cm") From 38b5ba5e06908ab2c4eca26fb1389adf2d663087 Mon Sep 17 00:00:00 2001 From: Brian Wylie Date: Sat, 9 Nov 2024 18:49:19 -0700 Subject: [PATCH 12/12] have test fail if the model doesn't exist --- tests/artifacts/model_metrics_tests.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/tests/artifacts/model_metrics_tests.py b/tests/artifacts/model_metrics_tests.py index bae8b35f4..86e1d4d41 100644 --- a/tests/artifacts/model_metrics_tests.py +++ b/tests/artifacts/model_metrics_tests.py @@ -47,16 +47,10 @@ def test_retrieval_with_capture_uuid(): def test_validation_predictions(): print("\n\n*** Validation Predictions ***") - val_preds = model_reg._get_validation_predictions() - if val_preds is None: - print(f"Model {model_reg.uuid} has no validation predictions!") - else: - pprint(model_reg._get_validation_predictions().head()) - val_preds = model_class._get_validation_predictions() - if val_preds is None: - print(f"Model {model_class.uuid} has no validation predictions!") - else: - pprint(model_class._get_validation_predictions().head()) + reg_val_preds = model_reg._get_validation_predictions() + pprint(reg_val_preds.head()) + class_val_preds = model_class._get_validation_predictions() + pprint(class_val_preds.head()) def test_inference_predictions():