Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding TMC canalespy query helpers #960

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
21 changes: 10 additions & 11 deletions parsons/databases/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,16 +179,16 @@ def dedup_table(
all columns and inserting those into a partition statement for
row_number().
Args:
sharinetmc marked this conversation as resolved.
Show resolved Hide resolved
order_by_column_name: str
order_by_column_name: str (optional)
Column name of specific column that you would like to dedup using order by
order_by_direction: str
order_by_direction: str (optional)
Order by direction, if you would like to dedup by ordering by a specific column,
this is the direction of the order by
example: 'asc'
cascade: bool
cascade: bool (optional)
Set to True if you want any dependent views to be dropped -
queries will fail if there are dependent views and this is set to False.
columns_to_ignore: list
columns_to_ignore: list (optional)
List any columns that should be ignored in the dedup
"""
current_timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
Expand All @@ -214,18 +214,17 @@ def dedup_table(
partition = ", ".join(columns_list)

dedup_query = f"""
sharinetmc marked this conversation as resolved.
Show resolved Hide resolved
alter table {self.table}
rename to {self.table}_temp_{current_timestamp};
create table {self.table} as
select * from
create table {self.table}_temp_{current_timestamp} as
(select *
, row_number() over (partition by {partition}
order by {order_by_column_name} {order_by_direction}) as dup
from {self.table}_temp_{current_timestamp})
from {self.table})
where dup=1;
alter table {self.table}
alter table {self.table}_temp_{current_timestamp}
drop column dup;
drop table {self.table}temp_{current_timestamp} {run_cascade};
truncate table {self.table}
insert into {self.table} (select * from {self.table}_temp_{current_timestamp})
{run_cascade};
sharinetmc marked this conversation as resolved.
Show resolved Hide resolved
"""

self.db.query(dedup_query)
Expand Down
16 changes: 1 addition & 15 deletions parsons/utilities/sql_helpers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import re

__all__ = ["redact_credentials", "get_sql_from_file"]
__all__ = ["redact_credentials"]


def redact_credentials(sql):
Expand All @@ -12,17 +12,3 @@ def redact_credentials(sql):
sql_censored = re.sub(pattern, "CREDENTIALS REDACTED", sql, flags=re.IGNORECASE)

return sql_censored


def get_sql_from_file(sql_file):
sharinetmc marked this conversation as resolved.
Show resolved Hide resolved
"""
Description:
This function allows you to grab SQL defined in a separate file.
`Args`:
sql_file: str
The relevant file path
`Returns:`
The SQL from the file
"""
with open(sql_file, "r") as f:
return f.read()
14 changes: 0 additions & 14 deletions test/test_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,20 +148,6 @@ def test_redact_credentials():
assert sql_helpers.redact_credentials(test_str) == test_result


def test_get_sql_from_file():

# Test query string
test_str = "select * from schema.tablename limit 10"

# Create fake file.
os.mkdir("tmp")
test_file_name = "tmp/sql_file.txt"
with open(test_file_name, "w+") as sql_file:
sql_file.write(test_str)

assert sql_helpers.get_sql_from_file(test_file_name) == test_str


class TestCheckEnv(unittest.TestCase):
def test_environment_field(self):
"""Test check field"""
Expand Down
Loading