Skip to content

Commit

Permalink
Add BigQuery job link (#45020) (#45222)
Browse files Browse the repository at this point in the history
* Add bigquery job link

* Fix BigQuery job detail link formatting

Co-authored-by: nakamura1878 <[email protected]>
  • Loading branch information
shahar1 and nakamura1878 authored Dec 26, 2024
1 parent d4e0995 commit 3ad1cf5
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 2 deletions.
25 changes: 25 additions & 0 deletions providers/src/airflow/providers/google/cloud/links/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
BIGQUERY_BASE_LINK
+ "?referrer=search&project={project_id}&d={dataset_id}&p={project_id}&page=table&t={table_id}"
)
BIGQUERY_JOB_DETAIL_LINK = (
BIGQUERY_BASE_LINK + "?project={project_id}&ws=!1m5!1m4!1m3!1s{project_id}!2s{job_id}!3s{location}"
)


class BigQueryDatasetLink(BaseGoogleLink):
Expand Down Expand Up @@ -78,3 +81,25 @@ def persist(
key=BigQueryTableLink.key,
value={"dataset_id": dataset_id, "project_id": project_id, "table_id": table_id},
)


class BigQueryJobDetailLink(BaseGoogleLink):
"""Helper class for constructing BigQuery Job Detail Link."""

name = "BigQuery Job Detail"
key = "bigquery_job_detail"
format_str = BIGQUERY_JOB_DETAIL_LINK

@staticmethod
def persist(
context: Context,
task_instance: BaseOperator,
project_id: str,
location: str,
job_id: str,
):
task_instance.xcom_push(
context,
key=BigQueryJobDetailLink.key,
value={"project_id": project_id, "location": location, "job_id": job_id},
)
17 changes: 15 additions & 2 deletions providers/src/airflow/providers/google/cloud/operators/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@
)
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook, BigQueryJob
from airflow.providers.google.cloud.hooks.gcs import GCSHook, _parse_gcs_url
from airflow.providers.google.cloud.links.bigquery import BigQueryDatasetLink, BigQueryTableLink
from airflow.providers.google.cloud.links.bigquery import (
BigQueryDatasetLink,
BigQueryJobDetailLink,
BigQueryTableLink,
)
from airflow.providers.google.cloud.openlineage.mixins import _BigQueryOpenLineageMixin
from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator
from airflow.providers.google.cloud.triggers.bigquery import (
Expand Down Expand Up @@ -2554,7 +2558,7 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator, _BigQueryOpenLineageMix
)
template_fields_renderers = {"configuration": "json", "configuration.query.query": "sql"}
ui_color = BigQueryUIColors.QUERY.value
operator_extra_links = (BigQueryTableLink(),)
operator_extra_links = (BigQueryTableLink(), BigQueryJobDetailLink())

def __init__(
self,
Expand Down Expand Up @@ -2726,6 +2730,15 @@ def execute(self, context: Any):
)
context["ti"].xcom_push(key="job_id_path", value=job_id_path)

persist_kwargs = {
"context": context,
"task_instance": self,
"project_id": self.project_id,
"location": self.location,
"job_id": self.job_id,
}
BigQueryJobDetailLink.persist(**persist_kwargs)

# Wait for the job to complete
if not self.deferrable:
job.result(timeout=self.result_timeout, retry=self.result_retry)
Expand Down
1 change: 1 addition & 0 deletions providers/src/airflow/providers/google/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1200,6 +1200,7 @@ extra-links:
- airflow.providers.google.cloud.links.dataplex.DataplexLakeLink
- airflow.providers.google.cloud.links.bigquery.BigQueryDatasetLink
- airflow.providers.google.cloud.links.bigquery.BigQueryTableLink
- airflow.providers.google.cloud.links.bigquery.BigQueryJobDetailLink
- airflow.providers.google.cloud.links.bigquery_dts.BigQueryDataTransferConfigLink
- airflow.providers.google.cloud.links.compute.ComputeInstanceDetailsLink
- airflow.providers.google.cloud.links.compute.ComputeInstanceTemplateDetailsLink
Expand Down

0 comments on commit 3ad1cf5

Please sign in to comment.