Skip to content

Commit

Permalink
run ruff on scripts (#21)
Browse files Browse the repository at this point in the history
* run ruff on scripts

* fix broken example due to refactoring
  • Loading branch information
anshbansal authored Feb 6, 2025
1 parent cc4208e commit 6b7d1b0
Show file tree
Hide file tree
Showing 26 changed files with 495 additions and 424 deletions.
18 changes: 13 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,21 @@
Example scripts shared publically for using Acryl Cloud

# Basic Python CLI Setup
How to use it:
- Use python3
- Install DataHub cli
```shell
pip install acryl-datahub
pip install --upgrade uv
uv venv venv --python 3.10
source venv/bin/activate
uv pip install -r ./requirements.txt
```
- Run `datahub init` , this will guide you through configuring datahub cli to connect to your instance, the server url should be: https://<customer>.acryl.io/gms and adding the access token that you created

# Find Recommended CLI version to use with your server version
- In our documentation you'll find a page with release notes like https://datahubproject.io/docs/managed-datahub/release-notes/v_0_3_2. Go to the release notes for your server version and use the recommended CLI unless you have been advised otherwise.
- In our documentation you'll find a page with release notes like https://datahubproject.io/docs/managed-datahub/release-notes/v_0_3_8. Go to the release notes for your server version and use the recommended CLI unless you have been advised otherwise.


# Internal guide to contribute
Run these
```
ruff format
ruff check --fix
```
24 changes: 11 additions & 13 deletions bulk_subscribe/subscribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,31 @@
logger = logging.getLogger(__name__)

# Utility to get path of executed script regardless of where it is executed from
__location__ = os.path.realpath(
os.path.join(os.getcwd(), os.path.dirname(__file__)))
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))


def subscribe_to(urn: str, actors: list[str]):
with open(os.path.join(__location__, "createSubscription.graphql")) as f:
query = f.read()

graphql_query: str = textwrap.dedent(query)
variables: dict = {"input":
{
"entityUrn": urn,
"subscriptionTypes": "", # TODO update this based on the subscription you want to create
"entityChangeTypes": "", # TODO update this based on the subscription you want to create
variables: dict = {
"input": {
"entityUrn": urn,
"subscriptionTypes": "", # TODO update this based on the subscription you want to create
"entityChangeTypes": "", # TODO update this based on the subscription you want to create
}
}

response = client.execute_graphql(graphql_query, variables)
return response.get("listIngestionSources", {})


# Connect to the DataHub instance configured in your ~/.datahubenv file.
client: DataHubGraph = get_default_graph()

# TODO Update this
term_urn="urn:li:glossaryTerm:ef5085ba63d081d70cbfbeebc2795374"
term_urn = "urn:li:glossaryTerm:ef5085ba63d081d70cbfbeebc2795374"

# TODO Update this
actor_urns: List[str] = ["urn:li:corpuser:admin"]
Expand All @@ -49,10 +50,7 @@ def subscribe_to(urn: str, actors: list[str]):
}
]

urns_to_subscribe_to = client.get_urns_by_filter(
query="*",
extraFilters=filters
)
urns_to_subscribe_to = client.get_urns_by_filter(query="*", extraFilters=filters)

for urn in urns_to_subscribe_to:
subscribe_to(urn, actor_urns)
subscribe_to(urn, actor_urns)
1 change: 0 additions & 1 deletion circuit_break_toolkit/examples.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import os
import time
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
from graphql.gql_variable_definitions import (
get_scroll_across_lineage_vars,
Expand Down
29 changes: 16 additions & 13 deletions configure_user_settings/script.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.schema_classes import (
CorpUserSettingsClass,
CorpUserAppearanceSettingsClass
CorpUserAppearanceSettingsClass,
)

logger = logging.getLogger(__name__)

# Utility to get path of executed script regardless of where it is executed from
__location__ = os.path.realpath(
os.path.join(os.getcwd(), os.path.dirname(__file__)))
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))

# Connect to the DataHub instance configured in your ~/.datahubenv file.
client: DataHubGraph = get_default_graph()
Expand All @@ -24,24 +24,27 @@

# TODO fill this out
user_urn = ""

if not client.exists(user_urn):
logger.error(f"{user_urn} does not exist")

settings: Optional[CorpUserSettingsClass] = client.get_aspect(user_urn, aspect_type=CorpUserSettingsClass)
settings: Optional[CorpUserSettingsClass] = client.get_aspect(
user_urn, aspect_type=CorpUserSettingsClass
)

if not settings:
settings = CorpUserSettingsClass(appearance=CorpUserAppearanceSettingsClass(showThemeV2=True))
settings = CorpUserSettingsClass(
appearance=CorpUserAppearanceSettingsClass(showThemeV2=True)
)

if not settings.appearance:
if not settings.appearance:
settings.appearance = CorpUserAppearanceSettingsClass(showThemeV2=True)
else:
settings.appearance.showThemeV2=True

mcp: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityUrn=user_urn,
aspect=settings
)
settings.appearance.showThemeV2 = True

mcp: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityUrn=user_urn, aspect=settings
)

if dry_run:
logger.warning(f"{json.dumps(mcp.to_obj())}")
Expand Down
6 changes: 5 additions & 1 deletion data_apps_selective_copy/datahub_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
FRONTEND_URL_ENV = "DATAHUB_FRONTEND_URL"
GMS_URL_ENV = "DATAHUB_GMS_URL"


def get_gms_token_env() -> str:
return os.getenv(DATAHUB_GMS_TOKEN_ENV)

Expand Down Expand Up @@ -38,6 +39,7 @@ def get_gms_url():
def _get_graphql_url():
return f"{get_frontend_url()}/api/graphql"


def _graphql_query(graph: DataHubGraph, query, vars):
response = graph._post_generic(
url=_get_graphql_url(),
Expand All @@ -47,6 +49,7 @@ def _graphql_query(graph: DataHubGraph, query, vars):
click.secho(f"Error: {response['errors']}", fg="red")
return response


def get_session_login_as(username: str, password: str, url: str) -> requests.Session:
session = requests.Session()
headers = {
Expand Down Expand Up @@ -129,7 +132,8 @@ def raise_incident(
},
)


def get_graph():
return DataHubGraph(
DatahubClientConfig(server=get_gms_url(), token=get_gms_token_env())
)
)
57 changes: 28 additions & 29 deletions datahub_metadata_model/script.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,27 @@
import os
import json
import logging
from dataclasses import dataclass
from avro.schema import RecordSchema
from importlib.metadata import version
from datahub.metadata.schema_classes import (
KEY_ASPECTS,
KEY_ASPECTS,
ASPECT_NAME_MAP,
)

logger = logging.getLogger(__name__)

# Utility to get path of executed script regardless of where it is executed from
__location__ = os.path.realpath(
os.path.join(os.getcwd(), os.path.dirname(__file__)))
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))


@dataclass
class DataHubRegistryEntityEntry:
# Category of the DataHub Entity
category: str
# Name of the key aspect for the DataHub Entity
key: str
# List of aspects that make up the DataHub Entity
aspects: list[str]
# Category of the DataHub Entity
category: str
# Name of the key aspect for the DataHub Entity
key: str
# List of aspects that make up the DataHub Entity
aspects: list[str]


@dataclass
Expand All @@ -35,45 +33,46 @@ class DataHubIndex:


def load_entity_registry() -> DataHubIndex:

registry: dict[str, DataHubRegistryEntityEntry] = {}

schemas: dict[str, RecordSchema] = {}

datahub_version = version('acryl-datahub')
datahub_version = version("acryl-datahub")
logger.debug(f"Processing version {datahub_version}")

for key in KEY_ASPECTS:
if KEY_ASPECTS[key].ASPECT_INFO and 'keyForEntity' in KEY_ASPECTS[key].ASPECT_INFO:

entity_name: str = KEY_ASPECTS[key].ASPECT_INFO['keyForEntity']
key_aspect: str = KEY_ASPECTS[key].ASPECT_NAME
category: str = KEY_ASPECTS[key].ASPECT_INFO['entityCategory']

if (
KEY_ASPECTS[key].ASPECT_INFO
and "keyForEntity" in KEY_ASPECTS[key].ASPECT_INFO
):
entity_name: str = KEY_ASPECTS[key].ASPECT_INFO["keyForEntity"]
key_aspect: str = KEY_ASPECTS[key].ASPECT_NAME
category: str = KEY_ASPECTS[key].ASPECT_INFO["entityCategory"]

aspects: list[str] = [key_aspect]
aspects.extend(KEY_ASPECTS[key].ASPECT_INFO['entityAspects'])
aspects.extend(KEY_ASPECTS[key].ASPECT_INFO["entityAspects"])

registry[entity_name] = DataHubRegistryEntityEntry(
category=category,
key=key_aspect,
aspects=aspects
category=category, key=key_aspect, aspects=aspects
)

# Load aspect schemas
for aspect_name in aspects:
if aspect_name not in schemas:
aspect = ASPECT_NAME_MAP.get(aspect_name)
if aspect:
schemas[aspect_name] = aspect.RECORD_SCHEMA
else:
logger.warning(f"Aspect: {aspect_name} not found in ASPECT_NAME_MAP")

logger.info(f"Finished loading DataHub's Entity Registry")

logger.warning(
f"Aspect: {aspect_name} not found in ASPECT_NAME_MAP"
)

logger.info("Finished loading DataHub's Entity Registry")

return DataHubIndex(registry=registry, schemas=schemas)


## Load DataHub's Entity Registry
index: DataHubIndex = load_entity_registry()

print((index.schemas.get("datasetKey")))
print((index.schemas.get("datasetKey")))
Loading

0 comments on commit 6b7d1b0

Please sign in to comment.