Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Jan 27, 2025
2 parents 73f68af + 0c98cdc commit ba9d406
Show file tree
Hide file tree
Showing 13 changed files with 235 additions and 13 deletions.
6 changes: 6 additions & 0 deletions docs/automations/bigquery-metadata-sync.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ import FeatureAvailability from '@site/src/components/FeatureAvailability';

<FeatureAvailability saasOnly />

:::info

This feature is currently in open beta in Acryl Cloud. Reach out to your Acryl representative to get access.

:::

## Introduction

BigQuery Metadata Sync is an automation that synchronizes DataHub Tags, Table and Column descriptions, and Column Glossary Terms with
Expand Down
6 changes: 6 additions & 0 deletions docs/automations/docs-propagation.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Documentation Propagation Automation

:::info

This feature is currently in open beta in Acryl Cloud. Reach out to your Acryl representative to get access.

:::

## Introduction

Documentation Propagation is an automation automatically propagates column and asset (coming soon) descriptions based on downstream column-level lineage and sibling relationships.
Expand Down
6 changes: 6 additions & 0 deletions docs/automations/glossary-term-propagation.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

<FeatureAvailability saasOnly />

:::info

This feature is currently in open beta in Acryl Cloud. Reach out to your Acryl representative to get access.

:::

## Introduction

Glossary Term Propagation is an automation feature that propagates classification labels (Glossary Terms) across column and assets based on downstream lineage and sibling relationships.
Expand Down
7 changes: 6 additions & 1 deletion docs/automations/snowflake-tag-propagation.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ import FeatureAvailability from '@site/src/components/FeatureAvailability';

<FeatureAvailability saasOnly />

> Note that this Automation in currently in open **Beta**. With any questions or issues, please reach out to your Acryl representative.
:::info

This feature is currently in open beta in Acryl Cloud. Reach out to your Acryl representative to get access.

:::


## Introduction

Expand Down
2 changes: 2 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ def platform_validator(cls, v: str) -> str:

@dataclass
class GlueSourceReport(StaleEntityRemovalSourceReport):
catalog_id: Optional[str] = None
tables_scanned = 0
filtered: List[str] = dataclass_field(default_factory=list)
databases: EntityFilterReport = EntityFilterReport.field(type="database")
Expand Down Expand Up @@ -315,6 +316,7 @@ def __init__(self, config: GlueSourceConfig, ctx: PipelineContext):
self.extract_owners = config.extract_owners
self.source_config = config
self.report = GlueSourceReport()
self.report.catalog_id = self.source_config.catalog_id
self.glue_client = config.glue_client
self.s3_client = config.s3_client
self.extract_transforms = config.extract_transforms
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,10 @@ class LookMLSourceConfig(
)
emit_reachable_views_only: bool = Field(
True,
description="When enabled, only views that are reachable from explores defined in the model files are emitted",
description=(
"When enabled, only views that are reachable from explores defined in the model files are emitted. "
"If set to False, all views imported in model files are emitted. Views that are unreachable i.e. not explicitly defined in the model files are currently not emitted however reported as warning for debugging purposes."
),
)
populate_sql_logic_for_missing_descriptions: bool = Field(
False,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
from datahub.ingestion.source.looker.lookml_config import (
BASE_PROJECT_NAME,
MODEL_FILE_EXTENSION,
VIEW_FILE_EXTENSION,
LookerConnectionDefinition,
LookMLSourceConfig,
LookMLSourceReport,
Expand Down Expand Up @@ -884,6 +885,7 @@ def get_internal_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901
view_urn = maybe_looker_view.id.get_urn(
self.source_config
)

view_connection_mapping = view_connection_map.get(
view_urn
)
Expand Down Expand Up @@ -939,6 +941,9 @@ def get_internal_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901
str(maybe_looker_view.id)
)

if not self.source_config.emit_reachable_views_only:
self.report_skipped_unreachable_views(viewfile_loader, processed_view_map)

if (
self.source_config.tag_measures_and_dimensions
and self.reporter.events_produced != 0
Expand Down Expand Up @@ -966,5 +971,56 @@ def gen_project_workunits(self, project_name: str) -> Iterable[MetadataWorkUnit]
),
).as_workunit()

def report_skipped_unreachable_views(
self,
viewfile_loader: LookerViewFileLoader,
processed_view_map: Dict[str, Set[str]] = {},
) -> None:
view_files: Dict[str, List[pathlib.Path]] = {}
for project, folder_path in self.base_projects_folder.items():
folder = pathlib.Path(folder_path)
view_files[project] = list(folder.glob(f"**/*{VIEW_FILE_EXTENSION}"))

skipped_view_paths: Dict[str, List[str]] = {}
for project, views in view_files.items():
skipped_paths: Set[str] = set()

for view_path in views:
# Check if the view is already in processed_view_map
if not any(
str(view_path) in view_set
for view_set in processed_view_map.values()
):
looker_viewfile = viewfile_loader.load_viewfile(
path=str(view_path),
project_name=project,
connection=None,
reporter=self.reporter,
)

if looker_viewfile is not None:
for raw_view in looker_viewfile.views:
raw_view_name = raw_view.get("name", "")

if (
raw_view_name
and self.source_config.view_pattern.allowed(
raw_view_name
)
):
skipped_paths.add(str(view_path))

skipped_view_paths[project] = list(skipped_paths)

for project, view_paths in skipped_view_paths.items():
for path in view_paths:
self.reporter.report_warning(
title="Skipped View File",
message=(
"The Looker view file was skipped because it may not be referenced by any models."
),
context=(f"Project: {project}, View File Path: {path}"),
)

def get_report(self):
return self.reporter
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
connection: "my_connection"

include: "employee_income_source.view.lkml"
include: "employee_total_income.view.lkml"

explore: employee_income_source {
}

explore: employee_total_income {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
view: employee_income_source {
derived_table: {
sql: SELECT
employee_id,
employee_name,
{% if dw_eff_dt_date._is_selected or finance_dw_eff_dt_date._is_selected %}
prod_core.data.r_metric_summary_v2
{% elsif dw_eff_dt_week._is_selected or finance_dw_eff_dt_week._is_selected %}
prod_core.data.r_metric_summary_v3
{% else %}
'default_table' as source
{% endif %},
employee_income
FROM source_table
WHERE
{% condition source_region %} source_table.region {% endcondition %}
;;
}

dimension: id {
type: number
sql: ${TABLE}.employee_id;;
}

dimension: name {
type: string
sql: ${TABLE}.employee_name;;
}

dimension: source {
type: string
sql: ${TABLE}.source ;;
}

dimension: income {
type: number
sql: ${TABLE}.employee_income ;;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
view: employee_total_income {
sql_table_name: ${employee_income_source.SQL_TABLE_NAME} ;;

dimension: id {
type: number
sql: ${TABLE}.id;;
}

dimension: name {
type: string
sql: ${TABLE}.name;;
}

measure: total_income {
type: sum
sql: ${TABLE}.income;;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
view: employee_unreachable {
sql_table_name: ${employee_income_source.SQL_TABLE_NAME} ;;

dimension: id {
type: number
sql: ${TABLE}.id;;
}

dimension: name {
type: string
sql: ${TABLE}.name;;
}

measure: total_income {
type: sum
sql: ${TABLE}.income;;
}
}
64 changes: 55 additions & 9 deletions metadata-ingestion/tests/integration/lookml/test_lookml.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from freezegun import freeze_time
from looker_sdk.sdk.api40.models import DBConnection

from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.file import read_metadata_file
from datahub.ingestion.source.looker.looker_dataclasses import LookerModel
Expand All @@ -20,6 +22,7 @@
)
from datahub.ingestion.source.looker.lookml_config import LookMLSourceConfig
from datahub.ingestion.source.looker.lookml_refinement import LookerRefinementResolver
from datahub.ingestion.source.looker.lookml_source import LookMLSource
from datahub.metadata.schema_classes import (
DatasetSnapshotClass,
MetadataChangeEventClass,
Expand Down Expand Up @@ -78,7 +81,8 @@ def test_lookml_ingest(pytestconfig, tmp_path, mock_time):
)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
pipeline.raise_from_status(raise_warnings=False)
assert pipeline.source.get_report().warnings.total_elements == 1

mce_helpers.check_golden_file(
pytestconfig,
Expand Down Expand Up @@ -112,7 +116,8 @@ def test_lookml_refinement_ingest(pytestconfig, tmp_path, mock_time):
pipeline = Pipeline.create(new_recipe)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
pipeline.raise_from_status(raise_warnings=False)
assert pipeline.source.get_report().warnings.total_elements == 1

golden_path = test_resources_dir / "refinements_ingestion_golden.json"
mce_helpers.check_golden_file(
Expand Down Expand Up @@ -142,7 +147,8 @@ def test_lookml_refinement_include_order(pytestconfig, tmp_path, mock_time):
pipeline = Pipeline.create(new_recipe)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
pipeline.raise_from_status(raise_warnings=False)
assert pipeline.source.get_report().warnings.total_elements == 1

golden_path = test_resources_dir / "refinement_include_order_golden.json"
mce_helpers.check_golden_file(
Expand Down Expand Up @@ -332,7 +338,8 @@ def test_lookml_ingest_offline(pytestconfig, tmp_path, mock_time):
)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
pipeline.raise_from_status(raise_warnings=False)
assert pipeline.source.get_report().warnings.total_elements == 1

mce_helpers.check_golden_file(
pytestconfig,
Expand Down Expand Up @@ -377,7 +384,8 @@ def test_lookml_ingest_offline_with_model_deny(pytestconfig, tmp_path, mock_time
)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
pipeline.raise_from_status(raise_warnings=False)
assert pipeline.source.get_report().warnings.total_elements == 1

mce_helpers.check_golden_file(
pytestconfig,
Expand Down Expand Up @@ -424,7 +432,8 @@ def test_lookml_ingest_offline_platform_instance(pytestconfig, tmp_path, mock_ti
)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
pipeline.raise_from_status(raise_warnings=False)
assert pipeline.source.get_report().warnings.total_elements == 1

mce_helpers.check_golden_file(
pytestconfig,
Expand Down Expand Up @@ -507,7 +516,8 @@ def ingestion_test(
)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
pipeline.raise_from_status(raise_warnings=False)
assert pipeline.source.get_report().warnings.total_elements == 1

mce_helpers.check_golden_file(
pytestconfig,
Expand Down Expand Up @@ -553,7 +563,8 @@ def test_lookml_git_info(pytestconfig, tmp_path, mock_time):
)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
pipeline.raise_from_status(raise_warnings=False)
assert pipeline.source.get_report().warnings.total_elements == 1

mce_helpers.check_golden_file(
pytestconfig,
Expand Down Expand Up @@ -668,7 +679,8 @@ def test_hive_platform_drops_ids(pytestconfig, tmp_path, mock_time):
)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
pipeline.raise_from_status(raise_warnings=False)
assert pipeline.source.get_report().warnings.total_elements == 1

events = read_metadata_file(tmp_path / mce_out)
for mce in events:
Expand Down Expand Up @@ -1051,3 +1063,37 @@ def test_gms_schema_resolution(pytestconfig, tmp_path, mock_time):
output_path=tmp_path / mce_out_file,
golden_path=golden_path,
)


@freeze_time(FROZEN_TIME)
def test_unreachable_views(pytestconfig):
test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml"

config = {
"base_folder": f"{test_resources_dir}/lkml_unreachable_views",
"connection_to_platform_map": {"my_connection": "postgres"},
"parse_table_names_from_sql": True,
"tag_measures_and_dimensions": False,
"project_name": "lkml_samples",
"model_pattern": {"deny": ["data2"]},
"emit_reachable_views_only": False,
"liquid_variable": {
"order_region": "ap-south-1",
"source_region": "ap-south-1",
"dw_eff_dt_date": {
"_is_selected": True,
},
},
}

source = LookMLSource(
LookMLSourceConfig.parse_obj(config),
ctx=PipelineContext(run_id="lookml-source-test"),
)
wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()]
assert len(wu) == 15
assert source.reporter.warnings.total_elements == 1
assert (
"The Looker view file was skipped because it may not be referenced by any models."
in [failure.message for failure in source.get_report().warnings]
)
Loading

0 comments on commit ba9d406

Please sign in to comment.