Skip to content

Commit

Permalink
swap print -> logging.info
Browse files Browse the repository at this point in the history
  • Loading branch information
fritz-astronomer committed Jun 7, 2024
1 parent 465f3c7 commit 403fd93
Showing 1 changed file with 13 additions and 12 deletions.
25 changes: 13 additions & 12 deletions astronomer_starship/providers/starship/operators/starship.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Operators, TaskGroups, and DAGs for interacting with the Starship migrations."""
import logging
from datetime import datetime
from typing import Any, Union, List

Expand Down Expand Up @@ -37,13 +38,13 @@ def __init__(self, variable_key: Union[str, None] = None, **kwargs):
self.variable_key = variable_key

def execute(self, context: Context) -> Any:
print("Getting Variable", self.variable_key)
logging.info("Getting Variable", self.variable_key)
variables = self.source_hook.get_variables()
variable: Union[dict, None] = (
[v for v in variables if v["key"] == self.variable_key] or [None]
)[0]
if variable is not None:
print("Migrating Variable", self.variable_key)
logging.info("Migrating Variable", self.variable_key)
self.target_hook.set_variable(**variable)
else:
raise RuntimeError("Variable not found! " + self.variable_key)
Expand Down Expand Up @@ -90,13 +91,13 @@ def __init__(self, pool_name: Union[str, None] = None, **kwargs):
self.pool_name = pool_name

def execute(self, context: Context) -> Any:
print("Getting Pool", self.pool_name)
logging.info("Getting Pool", self.pool_name)
pool: Union[dict, None] = (
[v for v in self.source_hook.get_pools() if v["name"] == self.pool_name]
or [None]
)[0]
if pool is not None:
print("Migrating Pool", self.pool_name)
logging.info("Migrating Pool", self.pool_name)
self.target_hook.set_pool(**pool)
else:
raise RuntimeError("Pool not found!")
Expand Down Expand Up @@ -140,7 +141,7 @@ def __init__(self, connection_id: Union[str, None] = None, **kwargs):
self.connection_id = connection_id

def execute(self, context: Context) -> Any:
print("Getting Connection", self.connection_id)
logging.info("Getting Connection", self.connection_id)
connection: Union[dict, None] = (
[
v
Expand All @@ -150,7 +151,7 @@ def execute(self, context: Context) -> Any:
or [None]
)[0]
if connection is not None:
print("Migrating Connection", self.connection_id)
logging.info("Migrating Connection", self.connection_id)
self.target_hook.set_connection(**connection)
else:
raise RuntimeError("Connection not found!")
Expand Down Expand Up @@ -204,18 +205,18 @@ def __init__(
self.dag_run_limit = dag_run_limit

def execute(self, context):
print("Pausing local DAG for", self.target_dag_id)
logging.info("Pausing local DAG for", self.target_dag_id)
self.source_hook.set_dag_is_paused(dag_id=self.target_dag_id, is_paused=True)
# TODO - Poll until all tasks are done

print("Getting local DAG Runs for", self.target_dag_id)
logging.info("Getting local DAG Runs for", self.target_dag_id)
dag_runs = self.source_hook.get_dag_runs(
dag_id=self.target_dag_id, limit=self.dag_run_limit
)
if len(dag_runs["dag_runs"]) == 0:
raise AirflowSkipException("No DAG Runs found for " + self.target_dag_id)

print("Getting local Task Instances for", self.target_dag_id)
logging.info("Getting local Task Instances for", self.target_dag_id)
task_instances = self.source_hook.get_task_instances(
dag_id=self.target_dag_id, limit=self.dag_run_limit
)
Expand All @@ -224,16 +225,16 @@ def execute(self, context):
"No Task Instances found for " + self.target_dag_id
)

print("Setting target DAG Runs for", self.target_dag_id)
logging.info("Setting target DAG Runs for", self.target_dag_id)
self.target_hook.set_dag_runs(dag_runs=dag_runs["dag_runs"])

print("Setting target Task Instances for", self.target_dag_id)
logging.info("Setting target Task Instances for", self.target_dag_id)
self.target_hook.set_task_instances(
task_instances=task_instances["task_instances"]
)

if self.unpause_dag_in_target:
print("Unpausing target DAG for", self.target_dag_id)
logging.info("Unpausing target DAG for", self.target_dag_id)
self.target_hook.set_dag_is_paused(
dag_id=self.target_dag_id, is_paused=False
)
Expand Down

0 comments on commit 403fd93

Please sign in to comment.