Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ engineering: refactor Step paths #3165

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions apps/metadata_migrate/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from etl import config
from etl import grapher_model as gm
from etl.command import main as etl_main
from etl.db import get_engine, read_sql
from etl.db import CatalogPath, get_engine, read_sql
from etl.metadata_export import merge_or_create_yaml, reorder_fields
from etl.paths import BASE_DIR, DAG_FILE, DATA_DIR, STEP_DIR

Expand Down Expand Up @@ -122,10 +122,10 @@ def cli(
assert variable.catalogPath, f"Variable {var_id} does not come from ETL. Migrate it there first."

# extract dataset URI and columns
uri, cols = variable.catalogPath.split("#")
cols = f"^{cols}$"
uri = uri.split("/", 1)[1]
uri, table_name = uri.rsplit("/", 1)
catalog_path = CatalogPath(variable.catalogPath)
cols = f"^{catalog_path.column}$"
uri = catalog_path.uri
table_name = catalog_path.table
else:
grapher_config = None

Expand Down
54 changes: 26 additions & 28 deletions apps/wizard/app_pages/dataset_explorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from apps.wizard.utils import metadata_export_basic, set_states
from etl.config import ENV_IS_REMOTE
from etl.paths import DATA_DIR
from etl.steps import extract_step_attributes, filter_to_subgraph, load_dag
from etl.steps import Step, create_step_from_uri, filter_to_subgraph, load_dag

# CONFIG
st.set_page_config(
Expand Down Expand Up @@ -62,44 +62,44 @@ def generate_graph(
collapse_others: bool = True,
collapse_meadow: bool = True,
) -> Any:
def _friendly_label(attributes: Dict[str, str], length_limit: int = 32) -> str:
label_1 = f"{attributes['namespace']}/{attributes['name']}"
def _friendly_label(st: Step, length_limit: int = 32) -> str:
label_1 = f"{st.namespace}/{st.name}"
if len(label_1) > length_limit:
label_1 = label_1[:length_limit] + "..."
label = f"{label_1}\n{attributes['version']}"
label = f"{label_1}\n{st.version}"
return label

def _friendly_title(attributes: Dict[str, str], children: List[str]) -> str:
def _friendly_title(st: Step, children: List[str]) -> str:
deps = "\n- ".join(children)
title = f"""{attributes['identifier'].upper()}
version {attributes['version']} ({attributes['kind']})
title = f"""{st.identifier.upper()}
version {st.version} ({"public" if st.is_public else "private"})
"""
title = attributes["step"].upper()
title = st.path.upper()
if deps:
title = title + "\n\ndependencies:\n- " + deps
return title

def _collapse_node(attributes: Dict[str, str]) -> bool:
if collapse_snapshot and (attributes["channel"] in ["snapshot", "walden"]):
def _collapse_node(st: Step) -> bool:
if collapse_snapshot and (st.channel in ["snapshot", "walden"]):
return True
if collapse_meadow and (attributes["channel"] in ["meadow"]):
if collapse_meadow and (st.channel in ["meadow"]):
return True
if collapse_others and (attributes["channel"] not in ["snapshot", "walden", "meadow", "garden", "grapher"]):
if collapse_others and (st.channel not in ["snapshot", "walden", "meadow", "garden", "grapher"]):
return True
return False

# Create edges
edges = []
nodes = []
for parent, children in dag.items():
attributes = extract_step_attributes(parent)
step = create_step_from_uri(parent)

# Main node
if parent == uri_main:
kwargs = {
"color": COLORS.get(attributes["channel"], COLOR_OTHER),
"label": f"{attributes['namespace'].upper()}/{attributes['name'].upper()}\n{attributes['version']}",
"title": _friendly_title(attributes, children),
"color": COLORS.get(step.channel, COLOR_OTHER),
"label": f"{step.namespace.upper()}/{step.name.upper()}\n{step.version}",
"title": _friendly_title(step, children),
"font": {
"size": 40,
"face": "courier",
Expand All @@ -111,23 +111,23 @@ def _collapse_node(attributes: Dict[str, str]) -> bool:
else:
# Nodes that will not show label within them (user chose to hide them)
kwargs = {
"color": COLORS.get(attributes["channel"], COLOR_OTHER),
"color": COLORS.get(step.channel, COLOR_OTHER),
"mass": 1,
"opacity": 0.9,
}
if _collapse_node(attributes):
if _collapse_node(step):
kwargs = {
**kwargs,
"title": _friendly_title(attributes, children),
"title": _friendly_title(step, children),
"mass": 1,
"opacity": 0.9,
}
# Nodes that will show label within them
else:
kwargs = {
**kwargs,
"label": _friendly_label(attributes),
"title": _friendly_title(attributes, children),
"label": _friendly_label(step),
"title": _friendly_title(step, children),
"font": {
"size": 20,
"face": "courier",
Expand All @@ -136,7 +136,7 @@ def _collapse_node(attributes: Dict[str, str]) -> bool:
}

# Change if step is private
if attributes["kind"] == "private":
if step.is_private:
kwargs["label"] = "🔒 " + kwargs.get("label", "")

node = Node(
Expand Down Expand Up @@ -191,10 +191,8 @@ def _collapse_node(attributes: Dict[str, str]) -> bool:

def get_dataset(dataset_uri: str) -> catalog.Dataset | None:
"""Get dataset."""
attributes = extract_step_attributes(dataset_uri)
dataset_path = (
DATA_DIR / f"{attributes['channel']}/{attributes['namespace']}/{attributes['version']}/{attributes['name']}"
)
uri = dataset_uri.split("//")[1]
dataset_path = DATA_DIR / uri
dataset = None
try:
dataset = catalog.Dataset(dataset_path)
Expand Down Expand Up @@ -234,8 +232,8 @@ def _show_tab_graph(tab, option, dag):
_ = generate_graph(dag, option, collapse_snapshot, collapse_others, collapse_meadow)

# Get dataset attributes, check we want to display it
attributes = extract_step_attributes(cast(str, option))
show_d_details = attributes["channel"] in ["garden", "grapher"]
step = create_step_from_uri(cast(str, option))
show_d_details = step.channel in ["garden", "grapher"]

# Show a more complete overview for Garden and Grapher datasets
if show_d_details:
Expand Down
54 changes: 52 additions & 2 deletions etl/db.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import functools
import os
import warnings
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from urllib.parse import quote

Expand Down Expand Up @@ -423,8 +424,10 @@ def get_info_for_etl_datasets(db_conn: Optional[pymysql.Connection] = None) -> p
df["is_archived"] = df["is_archived"].astype(bool)
df["is_private"] = df["is_private"].astype(bool)

etl_paths = [CatalogPath(etl_path) for etl_path in df["etl_path"]]

# Sanity check.
unknown_channels = set([etl_path.split("/")[0] for etl_path in set(df["etl_path"])]) - {"grapher"}
unknown_channels = set([etl_path.channel for etl_path in set(etl_paths)]) - {"grapher"}
if len(unknown_channels) > 0:
log.error(
"Variables in grapher DB are expected to come only from ETL grapher channel, "
Expand All @@ -434,7 +437,8 @@ def get_info_for_etl_datasets(db_conn: Optional[pymysql.Connection] = None) -> p
# Create a column with the step name.
# First assume all steps are public (hence starting with "data://").
# Then edit private steps so they start with "data-private://".
df["step"] = ["data://" + "/".join(etl_path.split("#")[0].split("/")[:-1]) for etl_path in df["etl_path"]]
# TODO
df["step"] = [f"data://{etl_path.uri}" for etl_path in etl_paths]
df.loc[df["is_private"], "step"] = df[df["is_private"]]["step"].str.replace("data://", "data-private://")

return df
Expand Down Expand Up @@ -505,3 +509,49 @@ def get_dataset_charts(dataset_ids: List[str], db_conn: Optional[pymysql.Connect
]

return df


@dataclass(unsafe_hash=True)
class CatalogPath:
"""Helper class to parse and manipulate catalog paths.

Examples:
channel/namespace/version/dataset/table#column:
data://channel/namespace/version/dataset/table#column:
"""

path: str

def __init__(self, path: str) -> None:
assert "://" not in path, "Path should not contain ://"
assert "#" in path, "CatalogPath should contain a table and a column"
self.path = path

@property
def channel(self) -> str:
return self.path.split("/")[0]

@property
def namespace(self) -> str:
return self.path.split("/")[1]

@property
def version(self) -> str:
return self.path.split("/")[2]

@property
def dataset(self) -> str:
return self.path.split("/")[3]

@property
def table(self) -> str:
return self.path.split("/")[4]

@property
def column(self) -> str:
return self.path.split("#")[-1]

@property
def uri(self) -> str:
"""Return dataset URI `channel/namespace/version/dataset`."""
return f"{self.channel}/{self.namespace}/{self.version}/{self.dataset}"
4 changes: 2 additions & 2 deletions etl/grapher_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@

from etl import config, paths
from etl.config import GRAPHER_USER_ID
from etl.db import read_sql
from etl.db import CatalogPath, read_sql

log = structlog.get_logger()

Expand Down Expand Up @@ -1183,7 +1183,7 @@ def s3_metadata_path(self, typ: S3_PATH_TYP = "s3") -> str:
@property
def table_name(self) -> str:
assert self.catalogPath
return self.catalogPath.split("#")[0].rsplit("/", 1)[1]
return CatalogPath(self.catalogPath).table

@property
def step_path(self) -> Path:
Expand Down
Loading
Loading