Skip to content

Commit

Permalink
feat: automatically inject OL info into spark job in DataprocCreateBa…
Browse files Browse the repository at this point in the history
…tchOperator (#44612)

Signed-off-by: Kacper Muda <[email protected]>
  • Loading branch information
kacpermuda authored Dec 30, 2024
1 parent 1c3d2f3 commit e9412bf
Show file tree
Hide file tree
Showing 6 changed files with 625 additions and 2 deletions.
11 changes: 9 additions & 2 deletions docs/apache-airflow-providers-openlineage/guides/user.rst
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ This is because each emitting task sends a `ParentRunFacet <https://openlineage.
which requires the DAG-level lineage to be enabled in some OpenLineage backend systems.
Disabling DAG-level lineage while enabling task-level lineage might cause errors or inconsistencies.

.. _options:spark_inject_parent_job_info:

Passing parent job information to Spark jobs
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Expand All @@ -425,9 +426,15 @@ It allows Spark integration to automatically include ``parentRunFacet`` in appli
creating a parent-child relationship between tasks from different integrations.
See `Scheduling from Airflow <https://openlineage.io/docs/integrations/spark/configuration/airflow>`_.

.. warning::
This configuration serves as the default behavior for all Operators that support automatic Spark properties injection,
unless it is explicitly overridden at the Operator level.
To prevent a specific Operator from injecting the parent job information while
allowing all other supported Operators to do so by default, ``openlineage_inject_parent_job_info=False``
can be explicitly provided to that specific Operator.

.. note::

If any of the above properties are manually specified in the Spark job configuration, the integration will refrain from injecting parent job properties to ensure that manually provided values are preserved.
If any of the ``spark.openlineage.parent*`` properties are manually specified in the Spark job configuration, the integration will refrain from injecting parent job properties to ensure that manually provided values are preserved.

You can enable this automation by setting ``spark_inject_parent_job_info`` option to ``true`` in Airflow configuration.

Expand Down
3 changes: 3 additions & 0 deletions docs/exts/templates/openlineage.rst.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@ Spark operators
===============
The OpenLineage integration can automatically inject information into Spark application properties when its being submitted from Airflow.
The following is a list of supported operators along with the corresponding information that can be injected.
See :ref:`automatic injection of parent job information <options:spark_inject_parent_job_info>` for more details.

apache-airflow-providers-google
"""""""""""""""""""""""""""""""

- :class:`~airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator`
- Parent Job Information
- :class:`~airflow.providers.google.cloud.operators.dataproc.DataprocCreateBatchOperator`
- Parent Job Information


:class:`~airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator`
Expand Down
135 changes: 135 additions & 0 deletions providers/src/airflow/providers/google/cloud/openlineage/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations

import copy
import logging
import os
import pathlib
Expand All @@ -30,6 +31,8 @@
from airflow.providers.common.compat.openlineage.facet import Dataset
from airflow.utils.context import Context

from google.cloud.dataproc_v1 import Batch, RuntimeConfig

from airflow.providers.common.compat.openlineage.facet import (
BaseFacet,
ColumnLineageDatasetFacet,
Expand Down Expand Up @@ -386,3 +389,135 @@ def inject_openlineage_properties_into_dataproc_job(
job=job, job_type=job_type, new_properties=properties
)
return job_with_ol_config


def _is_dataproc_batch_of_supported_type(batch: dict | Batch) -> bool:
"""
Check if a Dataproc batch is of a supported type for Openlineage automatic injection.
This function determines if the given batch is of a supported type
by checking for specific job type attributes or keys in the batch.
Args:
batch: The Dataproc batch to check.
Returns:
True if the batch is of a supported type (`spark_batch` or
`pyspark_batch`), otherwise False.
"""
supported_job_types = ("spark_batch", "pyspark_batch")
if isinstance(batch, Batch):
if any(getattr(batch, job_type) for job_type in supported_job_types):
return True
return False

# For dictionary-based batch
if any(job_type in batch for job_type in supported_job_types):
return True
return False


def _extract_dataproc_batch_properties(batch: dict | Batch) -> dict:
"""
Extract Dataproc batch properties from a Batch object or dictionary.
This function retrieves the `properties` from the `runtime_config` of a
Dataproc `Batch` object or a dictionary representation of a batch.
Args:
batch: The Dataproc batch to extract properties from.
Returns:
Extracted `properties` if found, otherwise an empty dictionary.
"""
if isinstance(batch, Batch):
return dict(batch.runtime_config.properties)

# For dictionary-based batch
run_time_config = batch.get("runtime_config", {})
if isinstance(run_time_config, RuntimeConfig):
return dict(run_time_config.properties)
return run_time_config.get("properties", {})


def _replace_dataproc_batch_properties(batch: dict | Batch, new_properties: dict) -> dict | Batch:
"""
Replace the properties of a Dataproc batch.
Args:
batch: The original Dataproc batch definition.
new_properties: The new properties to replace the existing ones.
Returns:
A modified copy of the Dataproc batch definition with updated properties.
"""
batch = copy.deepcopy(batch)
if isinstance(batch, Batch):
if not batch.runtime_config:
batch.runtime_config = RuntimeConfig(properties=new_properties)
elif isinstance(batch.runtime_config, dict):
batch.runtime_config["properties"] = new_properties
else:
batch.runtime_config.properties = new_properties
return batch

# For dictionary-based batch
run_time_config = batch.get("runtime_config")
if not run_time_config:
batch["runtime_config"] = {"properties": new_properties}
elif isinstance(run_time_config, dict):
run_time_config["properties"] = new_properties
else:
run_time_config.properties = new_properties
return batch


def inject_openlineage_properties_into_dataproc_batch(
batch: dict | Batch, context: Context, inject_parent_job_info: bool
) -> dict | Batch:
"""
Inject OpenLineage properties into Dataproc batch definition.
It's not removing any configuration or modifying the batch in any other way.
This function add desired OpenLineage properties to Dataproc batch configuration.
Note:
Any modification to job will be skipped if:
- OpenLineage provider is not accessible.
- The batch type is not supported.
- Automatic parent job information injection is disabled.
- Any OpenLineage properties with parent job information are already present
in the Spark job configuration.
Args:
batch: The original Dataproc batch definition.
context: The Airflow context in which the job is running.
inject_parent_job_info: Flag indicating whether to inject parent job information.
Returns:
The modified batch definition with OpenLineage properties injected, if applicable.
"""
if not inject_parent_job_info:
log.debug("Automatic injection of OpenLineage information is disabled.")
return batch

if not _is_openlineage_provider_accessible():
log.warning(
"Could not access OpenLineage provider for automatic OpenLineage "
"properties injection. No action will be performed."
)
return batch

if not _is_dataproc_batch_of_supported_type(batch):
log.warning(
"Could not find a supported Dataproc batch type for automatic OpenLineage "
"properties injection. No action will be performed.",
)
return batch

properties = _extract_dataproc_batch_properties(batch)

properties = inject_parent_job_information_into_spark_properties(properties=properties, context=context)

batch_with_ol_config = _replace_dataproc_batch_properties(batch=batch, new_properties=properties)
return batch_with_ol_config
13 changes: 13 additions & 0 deletions providers/src/airflow/providers/google/cloud/operators/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
DataprocWorkflowTemplateLink,
)
from airflow.providers.google.cloud.openlineage.utils import (
inject_openlineage_properties_into_dataproc_batch,
inject_openlineage_properties_into_dataproc_job,
)
from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator
Expand Down Expand Up @@ -2425,6 +2426,9 @@ def __init__(
asynchronous: bool = False,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
polling_interval_seconds: int = 5,
openlineage_inject_parent_job_info: bool = conf.getboolean(
"openlineage", "spark_inject_parent_job_info", fallback=False
),
**kwargs,
):
super().__init__(**kwargs)
Expand All @@ -2446,6 +2450,7 @@ def __init__(
self.asynchronous = asynchronous
self.deferrable = deferrable
self.polling_interval_seconds = polling_interval_seconds
self.openlineage_inject_parent_job_info = openlineage_inject_parent_job_info

def execute(self, context: Context):
if self.asynchronous and self.deferrable:
Expand All @@ -2468,6 +2473,14 @@ def execute(self, context: Context):
else:
self.log.info("Starting batch. The batch ID will be generated since it was not provided.")

if self.openlineage_inject_parent_job_info:
self.log.info("Automatic injection of OpenLineage information into Spark properties is enabled.")
self.batch = inject_openlineage_properties_into_dataproc_batch(
batch=self.batch,
context=context,
inject_parent_job_info=self.openlineage_inject_parent_job_info,
)

try:
self.operation = self.hook.create_batch(
region=self.region,
Expand Down
Loading

0 comments on commit e9412bf

Please sign in to comment.