Skip to content

Commit

Permalink
Add more scripts (#19)
Browse files Browse the repository at this point in the history
* Add more scripts

* apply review comments
  • Loading branch information
pedro93 authored Jan 29, 2025
1 parent a54348c commit 97874bb
Show file tree
Hide file tree
Showing 8 changed files with 410 additions and 3 deletions.
23 changes: 23 additions & 0 deletions datahub_metadata_model/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
Example script on how to extract [DataHub's Metadata Model](https://datahubproject.io/docs/metadata-modeling/metadata-model) directly from a cli package to make schema-informed programmatic operations.

This works by introspecting the current installed acryl-datahub package's internals for the list of entities that a DataHub version supports, what it's aspects are and the avro schema of those aspects (in a JSON representation).

The output of `load_entity_registry()` function is a `DataHubIndex` object comprised of 2 properties:
- `registry`: A dictionary of entity name to definition that entity: its category, key aspect and list of aspects that compose the entity:
```python
# This is the output of `index.registry.get("dataset")`
# In other words what is the registry definition for the dataset entity

DataHubRegistryEntityEntry(
category='core',
key='datasetKey',
aspects=['datasetKey', 'viewProperties', 'subTypes', 'datasetProfile', 'datasetUsageStatistics', 'operation', 'domains', 'proposals', 'schemaProposals', 'schemaMetadata', 'status', 'container', 'deprecation', 'usageFeatures', 'storageFeatures', 'lineageFeatures', 'testResults', 'siblings', 'embed', 'incidentsSummary', 'inferredNeighbors', 'inferredMetadata', 'schemaFieldsInferredMetadata', 'schemaFieldsInferredNeighbors', 'assertionsSummary', 'datasetProperties', 'editableDatasetProperties', 'datasetDeprecation', 'datasetUpstreamLineage', 'upstreamLineage', 'institutionalMemory', 'ownership', 'editableSchemaMetadata', 'globalTags', 'glossaryTerms', 'browsePaths', 'dataPlatformInstance', 'browsePathsV2', 'anomaliesSummary', 'access', 'structuredProperties', 'forms', 'partitionsSummary', 'share', 'origin', 'documentation', 'entityInferenceMetadata', 'versionProperties'])
```
- `schemas`: A dictionary of aspect names to the Avro Schema of the aspects.
```python

# Output of index.schemas.get("datasetKey")
# This is the schema definition of the datasetKey aspect.

{"type": "record", "Aspect": {"name": "datasetKey", "keyForEntity": "dataset", "entityCategory": "core", "entityAspects": ["viewProperties", "subTypes", "datasetProfile", "datasetUsageStatistics", "operation", "domains", "proposals", "schemaProposals", "schemaMetadata", "status", "container", "deprecation", "usageFeatures", "storageFeatures", "lineageFeatures", "testResults", "siblings", "embed", "incidentsSummary", "inferredNeighbors", "inferredMetadata", "schemaFieldsInferredMetadata", "schemaFieldsInferredNeighbors", "assertionsSummary", "datasetProperties", "editableDatasetProperties", "datasetDeprecation", "datasetUpstreamLineage", "upstreamLineage", "institutionalMemory", "ownership", "editableSchemaMetadata", "globalTags", "glossaryTerms", "browsePaths", "dataPlatformInstance", "browsePathsV2", "anomaliesSummary", "access", "structuredProperties", "forms", "partitionsSummary", "share", "origin", "documentation", "entityInferenceMetadata", "versionProperties"], "entityDoc": "Datasets represent logical or physical data assets stored or represented in various data platforms. Tables, Views, Streams are all instances of datasets."}, "name": "DatasetKey", "namespace": "com.linkedin.pegasus2avro.metadata.key", "fields": [{"Searchable": {"enableAutocomplete": true, "fieldType": "URN"}, "java": {"class": "com.linkedin.pegasus2avro.common.urn.Urn"}, "Urn": "Urn", "type": "string", "name": "platform", "doc": "Data platform urn associated with the dataset"}, {"Searchable": {"boostScore": 10.0, "enableAutocomplete": true, "fieldName": "id", "fieldType": "WORD_GRAM"}, "type": "string", "name": "name", "doc": "Unique guid for dataset"}, {"Searchable": {"addToFilters": true, "fieldType": "TEXT_PARTIAL", "filterNameOverride": "Environment", "queryByDefault": false}, "type": {"type": "enum", "symbolDocs": {"CORP": "Designates corporation fabrics", "DEV": "Designates development fabrics", "EI": "Designates early-integration fabrics", "NON_PROD": "Designates non-production fabrics", "PRD": "Alternative Prod spelling", "PRE": "Designates pre-production fabrics", "PROD": "Designates production fabrics", "QA": "Designates quality assurance fabrics", "RVW": "Designates review fabrics", "SANDBOX": "Designates sandbox fabrics", "STG": "Designates staging fabrics", "TEST": "Designates testing fabrics", "TST": "Alternative Test spelling", "UAT": "Designates user acceptance testing fabrics"}, "name": "FabricType", "namespace": "com.linkedin.pegasus2avro.common", "symbols": ["DEV", "TEST", "QA", "UAT", "EI", "PRE", "STG", "NON_PROD", "PROD", "CORP", "RVW", "PRD", "TST", "SANDBOX"], "doc": "Fabric group type"}, "name": "origin", "doc": "Fabric type where dataset belongs to or where it was generated."}], "doc": "Key for a Dataset"}
```
79 changes: 79 additions & 0 deletions datahub_metadata_model/script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import os
import json
import logging
from dataclasses import dataclass
from avro.schema import RecordSchema
from importlib.metadata import version
from datahub.metadata.schema_classes import (
KEY_ASPECTS,
ASPECT_NAME_MAP,
)

logger = logging.getLogger(__name__)

# Utility to get path of executed script regardless of where it is executed from
__location__ = os.path.realpath(
os.path.join(os.getcwd(), os.path.dirname(__file__)))


@dataclass
class DataHubRegistryEntityEntry:
# Category of the DataHub Entity
category: str
# Name of the key aspect for the DataHub Entity
key: str
# List of aspects that make up the DataHub Entity
aspects: list[str]


@dataclass
class DataHubIndex:
# Index of entities that exist with their aspect names
registry: dict[str, DataHubRegistryEntityEntry]
# Schemas of aspects
schemas: dict[str, RecordSchema]


def load_entity_registry() -> DataHubIndex:

registry: dict[str, DataHubRegistryEntityEntry] = {}

schemas: dict[str, RecordSchema] = {}

datahub_version = version('acryl-datahub')
logger.debug(f"Processing version {datahub_version}")

for key in KEY_ASPECTS:
if KEY_ASPECTS[key].ASPECT_INFO and 'keyForEntity' in KEY_ASPECTS[key].ASPECT_INFO:

entity_name: str = KEY_ASPECTS[key].ASPECT_INFO['keyForEntity']
key_aspect: str = KEY_ASPECTS[key].ASPECT_NAME
category: str = KEY_ASPECTS[key].ASPECT_INFO['entityCategory']

aspects: list[str] = [key_aspect]
aspects.extend(KEY_ASPECTS[key].ASPECT_INFO['entityAspects'])

registry[entity_name] = DataHubRegistryEntityEntry(
category=category,
key=key_aspect,
aspects=aspects
)

# Load aspect schemas
for aspect_name in aspects:
if aspect_name not in schemas:
aspect = ASPECT_NAME_MAP.get(aspect_name)
if aspect:
schemas[aspect_name] = aspect.RECORD_SCHEMA
else:
logger.warning(f"Aspect: {aspect_name} not found in ASPECT_NAME_MAP")

logger.info(f"Finished loading DataHub's Entity Registry")

return DataHubIndex(registry=registry, schemas=schemas)


## Load DataHub's Entity Registry
index: DataHubIndex = load_entity_registry()

print((index.schemas.get("datasetKey")))
5 changes: 4 additions & 1 deletion devbox.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
"python": {
"version": "3.10",
"disable_plugin": false
},
"uv": {
"version": "latest"
}
},
"env": {
"VENV_DIR": "$PWD/venv",
"UV_PYTHON": "$PWD/venv/bin/python"
},
"shell": {
"init_hook": ". $VENV_DIR/bin/activate"
"init_hook": ". $VENV_DIR/bin/activate && uv pip install -r requirements.txt"
}
}
57 changes: 57 additions & 0 deletions openapi-examples/put.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import os
import logging
import json
from typing import Optional, Iterable
from datahub.ingestion.graph.client import (
DataHubGraph,
get_default_graph
)


logger = logging.getLogger(__name__)

logging.basicConfig(level=logging.INFO,
format='%(message)s'
)

# Utility to get path of executed script regardless of where it is executed from
__location__ = os.path.realpath(
os.path.join(os.getcwd(), os.path.dirname(__file__)))

# Connect to the DataHub instance configured in your ~/.datahubenv file.
client: DataHubGraph = get_default_graph()

def scrollEntity(client: DataHubGraph, entity: str, variables: dict) -> Iterable: # type: ignore
endpoint = f"{client.config.server}/openapi/v3/entity/{entity}"

first_iter = True
scroll_id: Optional[str] = None
while first_iter or scroll_id:
first_iter = False
variables["scrollId"] = scroll_id

response: dict = client._get_generic(endpoint, variables)

scroll_id = response.get("scrollId", None)
for entity in response.get("entities", []):
yield entity

logger.debug(f"Scrolling to next page: {scroll_id}")


def putEntity(client: DataHubGraph, entity: str, payload: list[dict]):
endpoint = f"{client.config.server}/openapi/v3/entity/{entity}"

response = client._post_generic(endpoint, payload)

logger.warning(response)

# This is just an example file name for a hypothetical json payload of DataHub policies generated by the
# openapi-examples/list.py fil``
file_name = "policies.json"

if __name__ == "__main__":
with open(file_name) as f:
policies: list[dict] = json.loads(f.read())
putEntity(client, "datahubpolicy", policies)

1 change: 1 addition & 0 deletions recursive_metadata/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Utility scripts that pull/write all metadata associated with an urn and any references it may have.
87 changes: 87 additions & 0 deletions recursive_metadata/read.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import os
import json
import logging
from typing import Any
from collections import deque
from datahub.ingestion.graph.client import (
DataHubGraph,
get_default_graph
)

logger = logging.getLogger(__name__)

logging.basicConfig(level=logging.INFO,
format='%(message)s'
)

# Utility to get path of executed script regardless of where it is executed from
__location__ = os.path.realpath(
os.path.join(os.getcwd(), os.path.dirname(__file__)))


def find_urn_li_strings(data: dict[str, Any]) -> list[str]:
result = []

def recursive_search(obj):
if isinstance(obj, dict):
# Search through dictionary values
for value in obj.values():
recursive_search(value)
elif isinstance(obj, list):
# Search through list items
for item in obj:
recursive_search(item)
elif isinstance(obj, str) and obj.startswith("urn:li:"):
# Found a matching string
result.append(obj)

recursive_search(data)
return result


def download_urn(client: DataHubGraph, data: dict[str, Any], queue: deque, root_urn: str) -> None:

if root_urn in data:
logger.warning(f"Skipping {root_urn}")
return

payload: dict = client.get_entity_raw(root_urn)

urn = payload["urn"]
aspects: dict = payload["aspects"]

data[urn] = aspects

for aspect in aspects:
aspect_payload = aspects.get(aspect)
urn_references = set(find_urn_li_strings(aspect_payload))
for ref in urn_references:
if ref not in data.keys():
queue.append(ref)


## Connect to the DataHub instance configured in your ~/.datahubenv file.
client: DataHubGraph = get_default_graph()

# This is the full set of data to download.
# urn -> dict of aspects
data: dict[str, Any] = {}

# Used to process the list of refs to the asset to be downloaded
queue = deque()

# Urn to be downloaded + all it's dependencies (only 1 layer)
# TODO: This might fail on things like glossary terms having the glossary group references, same thing for domains.
root_urn = "urn:li:dataset:(urn:li:dataPlatform:snowflake,long_tail_companions.analytics.pet_details,PROD)"

download_urn(client, data, queue, root_urn)

# Process the queue
while queue:
urn = queue.popleft()
logger.info(f"Processing: {urn}")
download_urn(client, data, deque(), urn)


with open('data.json', 'w') as file:
json.dump(data, file, indent=4)
Loading

0 comments on commit 97874bb

Please sign in to comment.