Skip to content

Commit

Permalink
Ingest sloan courses via api (#1487)
Browse files Browse the repository at this point in the history
* create sloan courses and course offerings data assets extracted from API

# Conflicts:
#	src/ol_orchestrate/resources/openedx.py

* add sloan api definition and code location

* Create a new OAuth resource, refactor OpenEdxApiClient to inherit from it

* remove unused code

* remove sloan API method from OAuthApiClient and add them to assets file
  • Loading branch information
rachellougee authored Feb 25, 2025
1 parent daf2fd9 commit c9f04c2
Show file tree
Hide file tree
Showing 8 changed files with 365 additions and 117 deletions.
120 changes: 120 additions & 0 deletions src/ol_orchestrate/assets/sloan_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Call MIT Sloan Executive Education APIs to get courses and course-offerings data
# Model the different asset objects according to their type and data structure
import hashlib
import json
from datetime import UTC, datetime
from pathlib import Path

import jsonlines
from dagster import (
AssetExecutionContext,
AssetKey,
AssetOut,
DataVersion,
Output,
multi_asset,
)

from ol_orchestrate.resources.oauth import OAuthApiClientFactory


@multi_asset(
group_name="sloan_executive_education",
outs={
"course_metadata": AssetOut(
description="The metadata for courses extracted from sloan course API",
io_manager_key="s3file_io_manager",
key=AssetKey(("sloan_executive_education", "course_metadata")),
),
"course_offering_metadata": AssetOut(
description="The metadata for course offerings extracted from sloan course "
"offering API",
io_manager_key="s3file_io_manager",
key=AssetKey(("sloan_executive_education", "course_offering_metadata")),
),
},
)
def sloan_course_metadata(
context: AssetExecutionContext, sloan_api: OAuthApiClientFactory
):
data_retrieval_timestamp = datetime.now(tz=UTC).isoformat()

sloan_courses = sloan_api.client.fetch_with_auth(
"https://mit-unified-portal-prod-78eeds.43d8q2.usa-e2.cloudhub.io/api/courses"
)
courses = [
{
"course_id": course["Course_Id"],
"title": course["Title"],
"description": course["Description"],
"course_url": course["URL"],
"certification_type": course["Certification_Type"],
"topics": course["Topics"],
"image_url": course["Image_Src"],
"created": course["SourceCreateDate"],
"modified": course["SourceLastModifiedDate"],
"retrieved_at": data_retrieval_timestamp,
}
for course in sloan_courses
]

context.log.info("Total extracted %d Sloan courses....", len(courses))

sloan_course_offerings = sloan_api.client.fetch_with_auth(
"https://mit-unified-portal-prod-78eeds.43d8q2.usa-e2.cloudhub.io/api/course-offerings"
)
course_offerings = [
{
"title": course_offering["CO_Title"],
"course_id": course_offering["Course_Id"],
"start_date": course_offering["Start_Date"],
"end_date": course_offering["End_Date"],
"delivery": course_offering["Delivery"],
"duration": course_offering["Duration"],
"price": course_offering["Price"],
"continuing_ed_credits": course_offering["Continuing_Ed_Credits"],
"time_commitment": course_offering["Time_Commitment"],
"location": course_offering["Location"],
"tuition_cost_non_usd": course_offering["Tuition_Cost(non-USD)"],
"currency": course_offering["Currency"],
"faculty": course_offering["Faculty_Name"],
"retrieved_at": data_retrieval_timestamp,
}
for course_offering in sloan_course_offerings
]
context.log.info(
"Total extracted %d Sloan course offerings....", len(course_offerings)
)

course_data_version = hashlib.sha256(
json.dumps(courses).encode("utf-8")
).hexdigest()
course_offering_data_version = hashlib.sha256(
json.dumps(course_offerings).encode("utf-8")
).hexdigest()

course_file = Path(f"course_{course_data_version}.json")
course_offering_file = Path(f"course_offering_{course_offering_data_version}.json")
course_object_key = f"{'/'.join(context.asset_key_for_output('course_metadata').path)}/{course_data_version}.json" # noqa: E501
course_offering_object_key = f"{'/'.join(context.asset_key_for_output('course_offering_metadata').path)}/{course_offering_data_version}.json" # noqa: E501

with (
jsonlines.open(course_file, mode="w") as course,
jsonlines.open(course_offering_file, mode="w") as offering,
):
course.write_all(courses)
offering.write_all(course_offerings)

yield Output(
(course_file, course_object_key),
output_name="course_metadata",
data_version=DataVersion(course_data_version),
metadata={"object_key": course_object_key},
)

yield Output(
(course_offering_file, course_offering_object_key),
output_name="course_offering_metadata",
data_version=DataVersion(course_offering_data_version),
metadata={"object_key": course_offering_object_key},
)
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from dagster import (
AssetSelection,
Definitions,
ScheduleDefinition,
)
from dagster_aws.s3 import S3Resource

from ol_orchestrate.assets.sloan_api import sloan_course_metadata
from ol_orchestrate.io_managers.filepath import S3FileObjectIOManager
from ol_orchestrate.lib.constants import DAGSTER_ENV, VAULT_ADDRESS
from ol_orchestrate.lib.dagster_helpers import default_io_manager
from ol_orchestrate.lib.utils import authenticate_vault, s3_uploads_bucket
from ol_orchestrate.resources.oauth import OAuthApiClientFactory

vault = authenticate_vault(DAGSTER_ENV, VAULT_ADDRESS)

extract_api_daily_schedule = ScheduleDefinition(
name="learning_resource_api_schedule",
target=AssetSelection.assets(sloan_course_metadata),
cron_schedule="@daily",
execution_timezone="Etc/UTC",
)

extract_api_data = Definitions(
resources={
"io_manager": default_io_manager(DAGSTER_ENV),
"s3file_io_manager": S3FileObjectIOManager(
bucket=s3_uploads_bucket(DAGSTER_ENV)["bucket"],
path_prefix=s3_uploads_bucket(DAGSTER_ENV)["prefix"],
),
"vault": vault,
"s3": S3Resource(),
"sloan_api": OAuthApiClientFactory(deployment="sloan", vault=vault),
},
assets=[sloan_course_metadata],
schedules=[extract_api_daily_schedule],
)
49 changes: 49 additions & 0 deletions src/ol_orchestrate/lib/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from typing import Any, Literal

from ol_orchestrate.resources.secrets.vault import Vault


def authenticate_vault(dagster_env: str, vault_address: str) -> Vault:
"""
Authenticate with Vault based on the dagster environment and authentication method.
Parameters:
dagster_env (str): The environment in which the Dagster service is running.
vault_address (str): The address of the Vault server.
Returns:
Vault: An authenticated Vault client.
"""
if dagster_env == "dev":
vault = Vault(vault_addr=vault_address, vault_auth_type="github")
vault._auth_github() # noqa: SLF001
else:
vault = Vault(
vault_addr=vault_address, vault_role="dagster-server", aws_auth_mount="aws"
)
vault._auth_aws_iam() # noqa: SLF001

return vault


def s3_uploads_bucket(
dagster_env: Literal["dev", "qa", "production"],
) -> dict[str, Any]:
"""
Return the S3 bucket configuration based on the environment.
Parameters:
dagster_env (Literal): Environment name, one of "dev", "qa", or "production".
Returns:
dict: A dictionary with the S3 bucket and prefix for the specified environment.
"""
bucket_map = {
"dev": {"bucket": "ol-devops-sandbox", "prefix": "pipeline-storage"},
"qa": {"bucket": "ol-data-lake-landing-zone-qa", "prefix": ""},
"production": {
"bucket": "ol-data-lake-landing-zone-production",
"prefix": "",
},
}
return bucket_map[dagster_env]
2 changes: 1 addition & 1 deletion src/ol_orchestrate/repositories/open_edx.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def open_edx_export_irx_job_config(
edx_resource_config = {
"client_id": edx_creds["id"],
"client_secret": edx_creds["secret"],
"lms_url": edx_creds["url"],
"base_url": edx_creds["url"],
"studio_url": edx_creds["studio_url"],
"token_type": "JWT",
"token_url": f"{edx_creds['url']}/oauth2/access_token",
Expand Down
140 changes: 140 additions & 0 deletions src/ol_orchestrate/resources/oauth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import time
from collections.abc import Generator
from contextlib import contextmanager
from datetime import UTC, datetime, timedelta
from typing import Any, Optional, Self

import httpx
from dagster import ConfigurableResource, InitResourceContext, ResourceDependency
from pydantic import Field, PrivateAttr, ValidationError, validator

from ol_orchestrate.resources.secrets.vault import Vault

TOO_MANY_REQUESTS = 429


class OAuthApiClient(ConfigurableResource):
client_id: str = Field(description="OAUTH2 client ID")
client_secret: str = Field(description="OAUTH2 client secret")
token_type: str = Field(
default="JWT",
description="Token type to generate for use with authenticated requests",
)
token_url: str = Field(
description="URL to request token. e.g. https://lms.mitx.mit.edu/oauth2/access_token",
)
base_url: str = Field(
description="Base URL of OAuth API client being queries. e.g. https://lms.mitx.mit.edu/",
)
http_timeout: int = Field(
default=60,
description=(
"Time (in seconds) to allow for requests to complete before timing out."
),
)
_access_token: Optional[str] = PrivateAttr(default=None)
_access_token_expires: Optional[datetime] = PrivateAttr(default=None)
_http_client: httpx.Client = PrivateAttr(default=None)

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._initialize_client()

def _initialize_client(self) -> None:
if self._http_client is not None:
return
timeout = httpx.Timeout(self.http_timeout, connect=10)
self._http_client = httpx.Client(timeout=timeout)

@validator("token_type")
def validate_token_type(cls, token_type): # noqa: N805
if token_type.lower() not in ["jwt", "bearer"]:
raise ValidationError
return token_type

def _fetch_access_token(self) -> Optional[str]:
now = datetime.now(tz=UTC)
if self._access_token is None or (self._access_token_expires or now) <= now:
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"token_type": self.token_type,
}
response = self._http_client.post(self.token_url, data=payload)
response.raise_for_status()
self._access_token = response.json()["access_token"]
self._access_token_expires = now + timedelta(
seconds=response.json()["expires_in"]
)
return self._access_token

@property
def _username(self) -> str:
response = self._http_client.get(
f"{self.base_url}/api/user/v1/me",
headers={"Authorization": f"JWT {self._fetch_access_token()}"},
)
response.raise_for_status()
return response.json()["username"]

def fetch_with_auth(
self,
request_url: str,
page_size: int = 100,
extra_params: dict[str, Any] | None = None,
) -> dict[Any, Any]:
if self.token_url == f"{self.base_url}/oauth2/access_token":
request_params = {"username": self._username, "page_size": page_size}
else:
request_params = {}

response = self._http_client.get(
request_url,
headers={"Authorization": f"JWT {self._fetch_access_token()}"},
params=httpx.QueryParams(**request_params),
)

try:
response.raise_for_status()
except httpx.HTTPStatusError as error_response:
if error_response.response.status_code == TOO_MANY_REQUESTS:
retry_after = error_response.response.headers.get("Retry-After", 60)
delay = int(retry_after) if retry_after.isdigit() else 60
time.sleep(delay)
return self.fetch_with_auth(
request_url, page_size=page_size, extra_params=extra_params
)
raise
return response.json()


class OAuthApiClientFactory(ConfigurableResource):
deployment: str = Field(description="The name of the deployment")
_client: OAuthApiClient = PrivateAttr()
vault: ResourceDependency[Vault]

def _initialize_client(self) -> OAuthApiClient:
client_secrets = self.vault.client.secrets.kv.v1.read_secret(
mount_point="secret-data",
path=f"pipelines/{self.deployment}/oauth-client",
)["data"]

self._client = OAuthApiClient(
client_id=client_secrets["id"],
client_secret=client_secrets["secret"],
base_url=client_secrets["url"],
token_url=client_secrets.get(
"token_url", f"{client_secrets['url']}/oauth2/access_token"
),
)
return self._client

@property
def client(self) -> OAuthApiClient:
return self._client

@contextmanager
def yield_for_execution(self, context: InitResourceContext) -> Generator[Self]: # noqa: ARG002
self._initialize_client()
yield self
Loading

0 comments on commit c9f04c2

Please sign in to comment.