Skip to content

Commit 3cb1678

Browse files
authored
feat(ingest/bigquery): Add way to reference existing DataHub Tag from a bigquery label (#11544)
1 parent f09e18c commit 3cb1678

File tree

6 files changed

+1135
-21
lines changed

6 files changed

+1135
-21
lines changed

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import logging
22
import re
3+
from base64 import b32decode
34
from collections import defaultdict
45
from typing import Dict, Iterable, List, Optional, Set, Type, Union, cast
56

@@ -89,12 +90,13 @@
8990
HiveColumnToAvroConverter,
9091
get_schema_fields_for_hive_column,
9192
)
92-
from datahub.utilities.mapping import Constants
9393
from datahub.utilities.perf_timer import PerfTimer
9494
from datahub.utilities.ratelimiter import RateLimiter
9595
from datahub.utilities.registries.domain_registry import DomainRegistry
9696
from datahub.utilities.threaded_iterator_executor import ThreadedIteratorExecutor
9797

98+
ENCODED_TAG_PREFIX = "urn_li_encoded_tag_"
99+
98100
logger: logging.Logger = logging.getLogger(__name__)
99101
# Handle table snapshots
100102
# See https://cloud.google.com/bigquery/docs/table-snapshots-intro.
@@ -194,6 +196,18 @@ def store_table_refs(self):
194196
or self.config.use_queries_v2
195197
)
196198

199+
def modified_base32decode(self, text_to_decode: str) -> str:
200+
# When we sync from DataHub to BigQuery, we encode the tags as modified base32 strings.
201+
# BiqQuery labels only support lowercase letters, international characters, numbers, or underscores.
202+
# So we need to modify the base32 encoding to replace the padding character `=` with `_` and convert to lowercase.
203+
if not text_to_decode.startswith("%s" % ENCODED_TAG_PREFIX):
204+
return text_to_decode
205+
text_to_decode = (
206+
text_to_decode.replace(ENCODED_TAG_PREFIX, "").upper().replace("_", "=")
207+
)
208+
text = b32decode(text_to_decode.encode("utf-8")).decode("utf-8")
209+
return text
210+
197211
def get_project_workunits(
198212
self, project: BigqueryProject
199213
) -> Iterable[MetadataWorkUnit]:
@@ -253,7 +267,7 @@ def gen_dataset_containers(
253267
tags_joined: Optional[List[str]] = None
254268
if tags and self.config.capture_dataset_label_as_tag:
255269
tags_joined = [
256-
f"{k}:{v}"
270+
self.make_tag_from_label(k, v)
257271
for k, v in tags.items()
258272
if is_tag_allowed(self.config.capture_dataset_label_as_tag, k)
259273
]
@@ -662,6 +676,11 @@ def _process_snapshot(
662676
dataset_name=dataset_name,
663677
)
664678

679+
def make_tag_from_label(self, key: str, value: str) -> str:
680+
if not value.startswith(ENCODED_TAG_PREFIX):
681+
return make_tag_urn(f"""{key}:{value}""")
682+
return self.modified_base32decode(value)
683+
665684
def gen_table_dataset_workunits(
666685
self,
667686
table: BigqueryTable,
@@ -707,7 +726,7 @@ def gen_table_dataset_workunits(
707726
tags_to_add = []
708727
tags_to_add.extend(
709728
[
710-
make_tag_urn(f"""{k}:{v}""")
729+
self.make_tag_from_label(k, v)
711730
for k, v in table.labels.items()
712731
if is_tag_allowed(self.config.capture_table_label_as_tag, k)
713732
]
@@ -733,7 +752,7 @@ def gen_view_dataset_workunits(
733752
tags_to_add = None
734753
if table.labels and self.config.capture_view_label_as_tag:
735754
tags_to_add = [
736-
make_tag_urn(f"{k}:{v}")
755+
self.make_tag_from_label(k, v)
737756
for k, v in table.labels.items()
738757
if is_tag_allowed(self.config.capture_view_label_as_tag, k)
739758
]
@@ -922,11 +941,6 @@ def gen_schema_fields(self, columns: List[BigqueryColumn]) -> List[SchemaField]:
922941
break
923942
else:
924943
tags = []
925-
if col.is_partition_column:
926-
tags.append(
927-
TagAssociationClass(make_tag_urn(Constants.TAG_PARTITION_KEY))
928-
)
929-
930944
if col.cluster_column_position is not None:
931945
tags.append(
932946
TagAssociationClass(
@@ -944,6 +958,7 @@ def gen_schema_fields(self, columns: List[BigqueryColumn]) -> List[SchemaField]:
944958
type=SchemaFieldDataType(
945959
self.BIGQUERY_FIELD_TYPE_MAPPINGS.get(col.data_type, NullType)()
946960
),
961+
isPartitioningKey=col.is_partition_column,
947962
nativeDataType=col.data_type,
948963
description=col.comment,
949964
nullable=col.is_nullable,

metadata-ingestion/src/datahub/ingestion/source/mongodb.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -290,8 +290,8 @@ def __init__(self, ctx: PipelineContext, config: MongoDBConfig):
290290

291291
# See https://pymongo.readthedocs.io/en/stable/examples/datetimes.html#handling-out-of-range-datetimes
292292
self.mongo_client = MongoClient(
293-
self.config.connect_uri, datetime_conversion="DATETIME_AUTO", **options
294-
) # type: ignore
293+
self.config.connect_uri, datetime_conversion="DATETIME_AUTO", **options # type: ignore
294+
)
295295

296296
# This cheaply tests the connection. For details, see
297297
# https://pymongo.readthedocs.io/en/stable/api/pymongo/mongo_client.html#pymongo.mongo_client.MongoClient

metadata-ingestion/tests/integration/bigquery_v2/bigquery_mcp_golden.json

Lines changed: 67 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,8 @@
269269
"actor": "urn:li:corpuser:datahub"
270270
}
271271
},
272-
"isPartOfKey": false
272+
"isPartOfKey": false,
273+
"isPartitioningKey": false
273274
},
274275
{
275276
"fieldPath": "email",
@@ -296,7 +297,8 @@
296297
"actor": "urn:li:corpuser:datahub"
297298
}
298299
},
299-
"isPartOfKey": false
300+
"isPartOfKey": false,
301+
"isPartitioningKey": false
300302
}
301303
]
302304
}
@@ -328,6 +330,29 @@
328330
"lastRunId": "no-run-id-provided"
329331
}
330332
},
333+
{
334+
"entityType": "dataset",
335+
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)",
336+
"changeType": "UPSERT",
337+
"aspectName": "globalTags",
338+
"aspect": {
339+
"json": {
340+
"tags": [
341+
{
342+
"tag": "urn:li:tag:priority:high"
343+
},
344+
{
345+
"tag": "urn:li:tag:purchase"
346+
}
347+
]
348+
}
349+
},
350+
"systemMetadata": {
351+
"lastObserved": 1643871600000,
352+
"runId": "bigquery-2022_02_03-07_00_00",
353+
"lastRunId": "no-run-id-provided"
354+
}
355+
},
331356
{
332357
"entityType": "dataset",
333358
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)",
@@ -463,7 +488,8 @@
463488
}
464489
]
465490
},
466-
"isPartOfKey": false
491+
"isPartOfKey": false,
492+
"isPartitioningKey": false
467493
},
468494
{
469495
"fieldPath": "email",
@@ -479,7 +505,8 @@
479505
"globalTags": {
480506
"tags": []
481507
},
482-
"isPartOfKey": false
508+
"isPartOfKey": false,
509+
"isPartitioningKey": false
483510
}
484511
]
485512
}
@@ -620,7 +647,8 @@
620647
}
621648
]
622649
},
623-
"isPartOfKey": false
650+
"isPartOfKey": false,
651+
"isPartitioningKey": false
624652
},
625653
{
626654
"fieldPath": "email",
@@ -636,7 +664,8 @@
636664
"globalTags": {
637665
"tags": []
638666
},
639-
"isPartOfKey": false
667+
"isPartOfKey": false,
668+
"isPartitioningKey": false
640669
}
641670
]
642671
}
@@ -1021,5 +1050,37 @@
10211050
"runId": "bigquery-2022_02_03-07_00_00",
10221051
"lastRunId": "no-run-id-provided"
10231052
}
1053+
},
1054+
{
1055+
"entityType": "tag",
1056+
"entityUrn": "urn:li:tag:priority:high",
1057+
"changeType": "UPSERT",
1058+
"aspectName": "tagKey",
1059+
"aspect": {
1060+
"json": {
1061+
"name": "priority:high"
1062+
}
1063+
},
1064+
"systemMetadata": {
1065+
"lastObserved": 1643871600000,
1066+
"runId": "bigquery-2022_02_03-07_00_00",
1067+
"lastRunId": "no-run-id-provided"
1068+
}
1069+
},
1070+
{
1071+
"entityType": "tag",
1072+
"entityUrn": "urn:li:tag:purchase",
1073+
"changeType": "UPSERT",
1074+
"aspectName": "tagKey",
1075+
"aspect": {
1076+
"json": {
1077+
"name": "purchase"
1078+
}
1079+
},
1080+
"systemMetadata": {
1081+
"lastObserved": 1643871600000,
1082+
"runId": "bigquery-2022_02_03-07_00_00",
1083+
"lastRunId": "no-run-id-provided"
1084+
}
10241085
}
10251086
]

0 commit comments

Comments
 (0)