diff --git a/parsons/databases/table.py b/parsons/databases/table.py index a6b2f7464c..bfae1f38dc 100644 --- a/parsons/databases/table.py +++ b/parsons/databases/table.py @@ -1,4 +1,5 @@ import logging +import datetime logger = logging.getLogger(__name__) @@ -164,3 +165,66 @@ def truncate(self): self.db.query(f"TRUNCATE TABLE {self.table}") logger.info(f"{self.table} truncated.") + + def dedup_table( + self, + order_by_column_name=None, + order_by_direction=None, + cascade=False, + columns_to_ignore=None, + ): + """ + Description: + This function re-creates a deduped version of a table by grabbing + all columns and inserting those into a partition statement for + row_number(). + Args: + order_by_column_name: str (optional) + Column name of specific column that you would like to dedup using order by + order_by_direction: str (optional) + Order by direction, if you would like to dedup by ordering by a specific column, + this is the direction of the order by + example: 'asc' + cascade: bool (optional) + Set to True if you want any dependent views to be dropped - + queries will fail if there are dependent views and this is set to False. + columns_to_ignore: list (optional) + List any columns that should be ignored in the dedup + """ + current_timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") + run_cascade = "CASCADE" if cascade else "" + order_by_column_name = "random()" if order_by_column_name is None else order_by_column_name + if order_by_direction is None and order_by_column_name is not None: + raise Exception("order_by_direction argument is blank") + + columns_list = self.columns + + # remove order_by columns + columns_list.remove(order_by_column_name) if order_by_column_name is not None else None + + # remove ignore columns + if columns_to_ignore is not None: + for column in columns_to_ignore: + columns_list.remove(column) + + partition = ", ".join(columns_list) + + dedup_query = f""" + create table {self.table}_temp_{current_timestamp} as + (select * + , row_number() over (partition by {partition} + order by {order_by_column_name} {order_by_direction}) as dup + from {self.table}) + where dup=1; + alter table {self.table}_temp_{current_timestamp} + drop column dup; + truncate table {self.table}; + insert into {self.table} (select * from {self.table}_temp_{current_timestamp}) + {run_cascade}; + drop view {self.table}_temp_{current_timestamp} + """ + + self.db.query(dedup_query) + logger.info(f"Finished deduping {self.table}...") + + return None diff --git a/test/test_databases/test_mysql.py b/test/test_databases/test_mysql.py index 9507cdd0ce..ac9c0a49c0 100644 --- a/test/test_databases/test_mysql.py +++ b/test/test_databases/test_mysql.py @@ -104,6 +104,11 @@ def test_truncate(self): self.tbl.truncate() self.assertEqual(self.tbl.num_rows, 0) + def test_dedup_table(self): + + self.tbl.dedup_table(order_by_column_name="user_name") + self.assertEqual(self.tbl.num_rows, 2) + def test_get_rows(self): data = [