Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 61 additions & 2 deletions airflow/dags/download_gtfs_schedule_v2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,67 @@

Type: [Now / Scheduled](https://docs.calitp.org/data-infra/airflow/dags-maintenance.html)

This DAG orchestrates raw data capture for GTFS schedule data. It reads GTFS data configuration files that are generated by the [`airtable_loader_2` DAG](../airtable_loader_v2/README.md) to determine the list of GTFS schedule URLs to scrape (this DAG will just find the latest such configuration file, so there is no formal dependency between the two DAGs on a daily run basis.)
This DAG orchestrates raw data capture for GTFS schedule data.
It reads GTFS data configuration files that are generated by the [`airtable_loader_2` DAG](../airtable_loader_v2/README.md) to determine the list of GTFS schedule URLs to scrape
(this DAG will just find the latest such configuration file, so there is no formal dependency between the two DAGs on a daily run basis.)


## Secrets

You may need to change authentication information in [Secret Manager](https://console.cloud.google.com/security/secret-manager); auth keys are loaded from Secret Manager at the start of DAG executions. You may create new versions of existing secrets, or add entirely new secrets. Secrets must be tagged with `gtfs_schedule: true` to be loaded and are referenced by `url_secret_key_name` or `header_secret_key_name` in Airtable's GTFS dataset records.
You may need to change authentication information in [Secret Manager](https://console.cloud.google.com/security/secret-manager);
auth keys are loaded from Secret Manager at the start of DAG executions.
You may create new versions of existing secrets, or add entirely new secrets.
Secrets must be tagged with `gtfs_schedule: true` to be loaded and are referenced by `url_secret_key_name` or `header_secret_key_name` in Airtable's GTFS dataset records.


## Running from the command line

To download the GTFS schedule data manually you can also run from the command line using `poetry run python download_schedule_feeds.py`.

Follow the following steps:


1. Login with gcloud.

```bash
$ gcloud auth application-default login --login-config=../../../iac/login.json
```

2. Define the correct values for the environment variables needed.

* _GOOGLE_CLOUD_PROJECT_: The project where the secret keys can be found.
* _CALITP_BUCKET__GTFS_DOWNLOAD_CONFIG_: The source bucket where the configuration files are located.
* _CALITP_BUCKET__GTFS_SCHEDULE_RAW_: The destination bucket where the schedule result files will be saved.


> [!NOTE]
> Bucket names could change, make sure these buckets are still correct before you run.

3. Run the command line with the environment variable.

To run on Staging, the full command should looks like this:

```bash
$ GOOGLE_CLOUD_PROJECT=cal-itp-data-infra-staging CALITP_BUCKET__GTFS_DOWNLOAD_CONFIG="gs://calitp-staging-gtfs-download-config" CALITP_BUCKET__GTFS_SCHEDULE_RAW="gs://calitp-staging-gtfs-schedule-raw-v2" poetry run python download_schedule_feeds.py
```


To run on Production, the full command should looks like this:

```bash
$ GOOGLE_CLOUD_PROJECT=cal-itp-data-infra CALITP_BUCKET__GTFS_DOWNLOAD_CONFIG="gs://calitp-gtfs-download-config" CALITP_BUCKET__GTFS_SCHEDULE_RAW="gs://calitp-gtfs-schedule-raw-v2" poetry run python download_schedule_feeds.py
```

4. Check the timestamp of the result files.

Go to Google Cloud Storage and check if the destination bucket (_CALITP_BUCKET__GTFS_SCHEDULE_RAW_) contains the new files with the timestamp in UTC time.

For example:

The schedule file and the Download Schedule Feed Results were created with `ts=2025-10-29T03:00:23.941260+00:00` where `+00:00` means that the time is in UTC.

* `gs://calitp-gtfs-schedule-raw-v2/schedule/dt=2025-10-29/ts=2025-10-29T03:00:23.941260+00:00/base64_url=XXXXX`
* `gs://calitp-gtfs-schedule-raw-v2/download_schedule_feed_results/dt=2025-10-29/ts=2025-10-29T03:00:23.941260+00:00/results.jsonl`


If the Timestamp is in Pacific time or other time the next process `Unizp and Validate GTFS Schedule Hourly` may not process those files.
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,16 @@ def download_all(task_instance, execution_date, **kwargs):
]
outcomes: List[GTFSDownloadOutcome] = []

logging.info(f"processing {len(configs)} configs")
print(f"processing {len(configs)} configs")

for i, config in enumerate(configs, start=1):
with sentry_sdk.push_scope() as scope:
logging.info(f"attempting to fetch {i}/{len(configs)} {config.url}")
print(f"attempting to fetch {i}/{len(configs)} {config.url}")

scope.set_tag("config_name", config.name)
scope.set_tag("config_url", config.url)
scope.set_context("config", config.dict())

try:
extract, content = download_feed(
config=config,
Expand Down Expand Up @@ -137,7 +139,7 @@ def download_all(task_instance, execution_date, **kwargs):
)

print(
f"took {humanize.naturaltime(pendulum.now() - start)} to process {len(configs)} configs"
f"took {humanize.naturaldelta(pendulum.now() - start)} to process {len(configs)} configs"
)

result = DownloadFeedsResult(
Expand All @@ -162,12 +164,13 @@ def download_all(task_instance, execution_date, **kwargs):
str(f.exception) or str(type(f.exception)) for f in result.failures
),
)
task_instance.xcom_push(
key="download_failures",
value=[
json.loads(f.json()) for f in result.failures
], # use the Pydantic serializer
)
# Commenting out since it is used only for email_download_failures.py (temporarily disabled)
# task_instance.xcom_push(
# key="download_failures",
# value=[
# json.loads(f.json()) for f in result.failures
# ], # use the Pydantic serializer
# )

success_rate = len(result.successes) / len(configs)
if success_rate < GTFS_FEED_LIST_ERROR_THRESHOLD:
Expand Down