Skip to content

Commit

Permalink
Correct Snowflake example DAG to demonstrate multiple queries run
Browse files Browse the repository at this point in the history
As per RCA in #1383, to demonstrate multiple queries in a single
statement, the queries need to be independent of each other as they
get run in an async & indepedently of one another  after submission.
Hence, correcting the example DAG to have queries independent of
each other.

closes: #1385
  • Loading branch information
pankajkoti committed Dec 18, 2023
1 parent 1ccef0f commit 15a2870
Showing 1 changed file with 22 additions and 7 deletions.
29 changes: 22 additions & 7 deletions astronomer/providers/snowflake/example_dags/example_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,16 @@


SNOWFLAKE_SAMPLE_TABLE_MULTI = os.getenv("SNOWFLAKE_SAMPLE_TABLE_MULTI", "sample_table_multi")
MULTIPLE_QUERY_IN_ONE_RUN = (
f"CREATE OR REPLACE TRANSIENT TABLE {SNOWFLAKE_SAMPLE_TABLE_MULTI} (name VARCHAR(250), id INT);"
f"INSERT INTO {SNOWFLAKE_SAMPLE_TABLE_MULTI} VALUES ('name', 1);"
f"DROP TABLE {SNOWFLAKE_SAMPLE_TABLE_MULTI};"
MULTIPLE_QUERY_IN_ONE_RUN_CREATE_TABLES = (
f"CREATE OR REPLACE TRANSIENT TABLE {SNOWFLAKE_SAMPLE_TABLE_MULTI}_1 (name VARCHAR(250), id INT);"
f"CREATE OR REPLACE TRANSIENT TABLE {SNOWFLAKE_SAMPLE_TABLE_MULTI}_2 (name VARCHAR(250), id INT);"
f"CREATE OR REPLACE TRANSIENT TABLE {SNOWFLAKE_SAMPLE_TABLE_MULTI}_3 (name VARCHAR(250), id INT);"
)

MULTIPLE_QUERY_IN_ONE_RUN_DROP_TABLES = (
f"DROP TABLE {SNOWFLAKE_SAMPLE_TABLE_MULTI}_1;"
f"DROP TABLE {SNOWFLAKE_SAMPLE_TABLE_MULTI}_2;"
f"DROP TABLE {SNOWFLAKE_SAMPLE_TABLE_MULTI}_3;"
)

default_args = {
Expand Down Expand Up @@ -74,13 +80,22 @@
task_id="snowflake_op_sql_select_stmts", sql=SNOWFLAKE_SLACK_SQL, return_last=False
)

snowflake_op_multiple_query_in_one_run = SnowflakeOperatorAsync(
task_id="snowflake_op_multiple_query_in_one_run", sql=MULTIPLE_QUERY_IN_ONE_RUN, return_last=False
snowflake_op_multiple_query_in_one_run_create_tables = SnowflakeOperatorAsync(
task_id="snowflake_op_multiple_query_in_one_run_create_tables",
sql=MULTIPLE_QUERY_IN_ONE_RUN_CREATE_TABLES,
return_last=False,
)

snowflake_op_multiple_query_in_one_run_drop_tables = SnowflakeOperatorAsync(
task_id="snowflake_op_multiple_query_in_one_run_drop_tables",
sql=MULTIPLE_QUERY_IN_ONE_RUN_DROP_TABLES,
return_last=False,
)

(
snowflake_op_sql_str
>> [snowflake_op_with_params, snowflake_op_sql_list, snowflake_op_sql_multiple_stmts]
>> snowflake_op_sql_select_stmts
>> snowflake_op_multiple_query_in_one_run
>> snowflake_op_multiple_query_in_one_run_create_tables
>> snowflake_op_multiple_query_in_one_run_drop_tables
)

0 comments on commit 15a2870

Please sign in to comment.