Skip to content

Commit

Permalink
Add examples on how to use DataHub's OpenAPI in python (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
pedro93 authored Jan 20, 2025
1 parent 9f5b4ba commit a54348c
Show file tree
Hide file tree
Showing 8 changed files with 284 additions and 4 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
venv/*
.env
.envrc
.devbox/*
*.lock
*.lock
.vscode/*
10 changes: 8 additions & 2 deletions devbox.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
{
"$schema": "https://raw.githubusercontent.com/jetify-com/devbox/0.12.0/.schema/devbox.schema.json",
"packages": ["[email protected]"],
"packages": {
"python": {
"version": "3.10",
"disable_plugin": false
}
},
"env": {
"VENV_DIR": "venv"
"VENV_DIR": "$PWD/venv",
"UV_PYTHON": "$PWD/venv/bin/python"
},
"shell": {
"init_hook": ". $VENV_DIR/bin/activate"
Expand Down
1 change: 1 addition & 0 deletions entities-with-tags/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
An example for receiving back which datasets have a particular tag got a given platform (with optional subtype filter)
193 changes: 193 additions & 0 deletions entities-with-tags/entities-with-tags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
from datahub.ingestion.graph.client import DataHubGraph, get_default_graph
from datahub.emitter.mce_builder import make_data_platform_urn, make_tag_urn
from typing import List, Optional, Dict, Any
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def build_search_query() -> str:
"""Build the GraphQL query for searching entities."""
return """
query getSearchResultsForMultiple($input: SearchAcrossEntitiesInput!) {
searchAcrossEntities(input: $input) {
start
count
total
searchResults {
entity {
urn
type
... on Dataset {
name
platform {
name
}
properties {
name
qualifiedName
}
globalTags {
tags {
tag {
name
}
}
}
subTypes {
typeNames
}
}
}
}
}
}
"""


def build_search_input(
tag_name: str,
platform_code: str,
subtype: Optional[str] = None,
start: int = 0,
) -> Dict[str, Any]:
"""Build the input parameters for the search query."""
and_filters = [
{
"field": "platform",
"condition": "EQUAL",
"values": [make_data_platform_urn(platform_code)],
"negated": False
},
{
"field": "tags",
"condition": "EQUAL",
"values": [make_tag_urn(tag_name)],
"negated": False
},
{
"field": "_entityType",
"values": ["DATASET"]
}
]

if subtype:
and_filters.append({
"field": "typeNames",
"values": [subtype],
"condition": "EQUAL",
"negated": False
})

return {
"input": {
"types": [],
"query": "",
"start": start,
"count": 1000,
"filters": [],
"orFilters": [{"and": and_filters}]
}
}


def process_entity_info(entity: Dict[str, Any]) -> Dict[str, Any]:
"""Process entity information into a standardized format."""
return {
"urn": entity.get("urn"),
"name": entity.get("name"),
"platform": entity.get("platform", {}).get("name"),
"qualified_name": entity.get("properties", {}).get("qualifiedName"),
"tags": [
tag["tag"]["name"]
for tag in entity.get("globalTags", {}).get("tags", [])
if tag.get("tag", {}).get("name")
],
"subtypes": entity.get("subTypes", {}).get("typeNames", [])
}


def get_entities_by_filters(
tag_name: str,
platform_code: str,
graph: DataHubGraph,
subtype: Optional[str] = None,
output_to_std_out: bool = False,
) -> Optional[List[Dict[str, Any]]]:
"""Get entities using search query with filters."""
try:

query = build_search_query()
entities_info = []
start = 0

while True:
variables = build_search_input(tag_name, platform_code, subtype, start)
result = graph.execute_graphql(query, variables=variables)

if not result or "searchAcrossEntities" not in result:
logger.error("Invalid response from GraphQL query")
return None

search_response = result["searchAcrossEntities"]
search_results = search_response["searchResults"]
total_results = search_response["total"]

for result in search_results:
entity = result.get("entity")
if entity:
entity_info = process_entity_info(entity)
entities_info.append(entity_info)

if output_to_std_out:
logger.info(f"Dataset: {entity_info['name']}")
logger.info(f"URN: {entity_info['urn']}")
logger.info(f"Platform: {entity_info['platform']}")
logger.info(f"Subtypes: {', '.join(entity_info['subtypes'])}")
logger.info(f"Tags: {', '.join(entity_info['tags'])}")
logger.info("---")

# Check if we've received all results
if start + len(search_results) >= total_results:
break

# Increment start for next batch
start += len(search_results)

logger.info(f"Total results found: {len(entities_info)}")
return entities_info

except Exception as e:
logger.error(f"Error getting entities: {str(e)}")
return None


def main():
"""Main execution function."""

graph = get_default_graph()

# Example using Snowflake platform
dremio_results = get_entities_by_filters(
tag_name="Usage - B",
platform_code="snowflake",
graph=graph,
subtype="View",
output_to_std_out=True,
)
logger.info(f"Dremio results: {dremio_results}")

# Example using BigQuery platform
bigquery_results = get_entities_by_filters(
tag_name="__default_high_queries",
platform_code="bigquery",
graph=graph,
#subtype="View",
#output_to_std_out=True,
)
logger.info(f"BigQuery results: {bigquery_results}")


if __name__ == "__main__":
main()
1 change: 1 addition & 0 deletions openapi-examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
A few example scripts on how to use the openapi endpoints in DataHub
32 changes: 32 additions & 0 deletions openapi-examples/create.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import logging
from datahub.ingestion.graph.client import (
DataHubGraph,
get_default_graph
)

logger = logging.getLogger("PUSH_ROLES_TO_ACRYL_PROC")

# Connect to the DataHub instance configured in your ~/.datahubenv file.
client: DataHubGraph = get_default_graph()

def push_roles_to_acryl(request):
open_api_endpoint = f"{client.config.server}/openapi/v3/entity/role?async=false&systemMetadata=false"
logger.info(f"open_api_endpoint >> {open_api_endpoint}")

response = client._post_generic(url=open_api_endpoint, payload_dict=request)
print(response)


if __name__ == "__main__":
urn = "urn:li:role:mlops_pii_role"
request = [{
"roleProperties": {
"value": {
"description": "Snowflake role under domain",
"name": "MLOPS_PII_ROLE",
"type": "PRODUCTION_PII_ROLES"
}
},
"urn": "urn:li:role:mlops_pii_role",
}]
push_roles_to_acryl(request)
44 changes: 44 additions & 0 deletions openapi-examples/list.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import os
import logging
import json
from typing import Optional, Iterable
from datahub.ingestion.graph.client import (
DataHubGraph,
get_default_graph
)

logger = logging.getLogger(__name__)

# Utility to get path of executed script regardless of where it is executed from
__location__ = os.path.realpath(
os.path.join(os.getcwd(), os.path.dirname(__file__)))

# Connect to the DataHub instance configured in your ~/.datahubenv file.
client=DataHubGraph = get_default_graph()

def scrollEntity(client: DataHubGraph, entity: str, variables: dict) -> Iterable: # type: ignore
endpoint = f"{client.config.server}/openapi/v3/entity/{entity}"

first_iter = True
scroll_id: Optional[str] = None
while first_iter or scroll_id:
first_iter = False
variables["scrollId"] = scroll_id

response: dict = client._get_generic(endpoint, variables)

scroll_id = response.get("scrollId", None)
for entity in response.get("entities", []):
yield entity

logger.debug(f"Scrolling to next page: {scroll_id}")


search_body = {
"query": "*",
"sort": "urn"
}

results = scrollEntity(client, "datahubpolicy", search_body)

print(json.dumps(list(results)))
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
acryl-datahub==0.14.1
acryl-datahub-cloud==0.3.6
acryl-datahub-cloud==0.3.6
rich==13.9.4

0 comments on commit a54348c

Please sign in to comment.