Skip to content

Commit

Permalink
fix: Properly apply regex match for Airbyte connection filter
Browse files Browse the repository at this point in the history
  • Loading branch information
blarghmatey committed Feb 10, 2025
1 parent 8a65214 commit c1f8ed8
Showing 1 changed file with 5 additions and 16 deletions.
21 changes: 5 additions & 16 deletions src/ol_orchestrate/definitions/lakehouse/elt.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from ol_orchestrate.resources.secrets.vault import Vault

airbyte_host_map = {
"dev": "config-airbyte-qa.odl.mit.edu",
"dev": "api-airbyte-qa.odl.mit.edu",
"qa": "api-airbyte-qa.odl.mit.edu",
"production": "api-airbyte.odl.mit.edu",
}
Expand Down Expand Up @@ -50,9 +50,6 @@
path="dagster-http-auth-password", mount_point="secret-data"
)["data"]["dagster_unhashed_password"],
"request_timeout": 60, # Allow up to a minute for Airbyte requests
"request_additional_params": {
"verify": False,
},
}
)

Expand All @@ -69,17 +66,8 @@
# sources, since they are defined as ol_warehouse_raw_data in the
# sources.yml files. (TMM 2023-01-18)
key_prefix="ol_warehouse_raw_data",
connection_filter=lambda conn: re.match(r"S3 (Glue )?Data Lake", conn.name),
connection_to_group_fn=(
# Airbyte uses the unicode "right arrow" (U+2192) in the connection names for
# separating the source and destination. This selects the source name specifier
# and converts it to a lowercased, underscore separated string.
lambda conn_name: re.sub(
r"[^A-Za-z0-9_]", "", re.sub(r"[-\s]+", "_", conn_name)
)
.strip("_")
.lower()
),
connection_filter=lambda conn: re.search(r"S3 (Glue )?Data Lake", conn.name)
is not None,
)


Expand All @@ -88,7 +76,8 @@
# those tables. The eager auto materialize policy will then take effect for any
# downstream dbt models that are dependent on those staging models being completed.
group_names = set()
for asset in airbyte_assets.compute_cacheable_data():
computed_assets = airbyte_assets.compute_cacheable_data()
for asset in computed_assets:
group_names.add(asset.group_name)

airbyte_asset_jobs = []
Expand Down

0 comments on commit c1f8ed8

Please sign in to comment.