Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding TMC canalespy query helpers #960

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
68 changes: 68 additions & 0 deletions parsons/databases/table.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import datetime

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -164,3 +165,70 @@ def truncate(self):

self.db.query(f"TRUNCATE TABLE {self.table}")
logger.info(f"{self.table} truncated.")

def dedup_table(
self,
order_by_column_name=None,
order_by_direction=None,
cascade=False,
columns_to_ignore=None,
):
"""
Description:
This function re-creates a deduped version of a table by grabbing
all columns and inserting those into a partition statement for
row_number().
Args:
sharinetmc marked this conversation as resolved.
Show resolved Hide resolved
order_by_column_name: str
Column name of specific column that you would like to dedup using order by
order_by_direction: str
Order by direction, if you would like to dedup by ordering by a specific column,
this is the direction of the order by
example: 'asc'
cascade: bool
Set to True if you want any dependent views to be dropped -
queries will fail if there are dependent views and this is set to False.
columns_to_ignore: list
List any columns that should be ignored in the dedup
"""
current_timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
run_cascade = "CASCADE" if cascade else ""
order_by_column_name = (
"random()" if order_by_column_name is None else order_by_column_name
)
if order_by_direction is None and order_by_column_name is not None:
raise Exception("order_by_direction argument is blank")

columns_list = self.columns

# remove order_by columns
columns_list.remove(
order_by_column_name
) if order_by_column_name is not None else None

# remove ignore columns
if columns_to_ignore is not None:
for column in columns_to_ignore:
columns_list.remove(column)

partition = ", ".join(columns_list)

dedup_query = f"""
sharinetmc marked this conversation as resolved.
Show resolved Hide resolved
alter table {self.table}
rename to {self.table}_temp_{current_timestamp};
create table {self.table} as
select * from
(select *
, row_number() over (partition by {partition}
order by {order_by_column_name} {order_by_direction}) as dup
from {self.table}_temp_{current_timestamp})
where dup=1;
alter table {self.table}
drop column dup;
drop table {self.table}temp_{current_timestamp} {run_cascade};
sharinetmc marked this conversation as resolved.
Show resolved Hide resolved
"""

self.db.query(dedup_query)
logger.info(f"Finished deduping {self.table}...")

return None
16 changes: 15 additions & 1 deletion parsons/utilities/sql_helpers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import re

__all__ = ["redact_credentials"]
__all__ = ["redact_credentials", "get_sql_from_file"]


def redact_credentials(sql):
Expand All @@ -12,3 +12,17 @@ def redact_credentials(sql):
sql_censored = re.sub(pattern, "CREDENTIALS REDACTED", sql, flags=re.IGNORECASE)

return sql_censored


def get_sql_from_file(sql_file):
sharinetmc marked this conversation as resolved.
Show resolved Hide resolved
"""
Description:
This function allows you to grab SQL defined in a separate file.
`Args`:
sql_file: str
The relevant file path
`Returns:`
The SQL from the file
"""
with open(sql_file, "r") as f:
return f.read()
5 changes: 5 additions & 0 deletions test/test_databases/test_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ def test_truncate(self):
self.tbl.truncate()
self.assertEqual(self.tbl.num_rows, 0)

def test_dedup_table(self):

self.tbl.dedup_table(order_by_column_name="user_name")
sharinetmc marked this conversation as resolved.
Show resolved Hide resolved
self.assertEqual(self.tbl.num_rows, 2)

def test_get_rows(self):

data = [
Expand Down
14 changes: 14 additions & 0 deletions test/test_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,20 @@ def test_redact_credentials():
assert sql_helpers.redact_credentials(test_str) == test_result


def test_get_sql_from_file():

# Test query string
test_str = "select * from schema.tablename limit 10"

# Create fake file.
os.mkdir("tmp")
sharinetmc marked this conversation as resolved.
Show resolved Hide resolved
test_file_name = "tmp/sql_file.txt"
with open(test_file_name, "w+") as sql_file:
sql_file.write(test_str)

assert sql_helpers.get_sql_from_file(test_file_name) == test_str


class TestCheckEnv(unittest.TestCase):
def test_environment_field(self):
"""Test check field"""
Expand Down
Loading