diff --git a/docs/guides/LIF_Add_Data_Source.md b/docs/guides/LIF_Add_Data_Source.md index aa0662b..38bcc48 100644 --- a/docs/guides/LIF_Add_Data_Source.md +++ b/docs/guides/LIF_Add_Data_Source.md @@ -1,5 +1,7 @@ # Add a New Data Source +> **Related:** For the adapter class contract, return types, and design guidelines, see [`creating_a_data_source_adapter.md`](creating_a_data_source_adapter.md). This guide is the end-to-end tutorial; that doc is the reference. + Data sources are used by the **Orchestrator** to fulfill LIF queries. These sources can be open or require authN/authZ, and return data in a variety of formats. Data sources are configured through an adapter so you can have multiple data sources that use the same adapter. Reference implementations for 2 adapter flows are provided in the repository: - LIF to LIF - Example Data Source to LIF @@ -54,7 +56,7 @@ The `example-data-source-rest-api-to-lif` adapter is the reference implementatio 4. Adjust the import code in `components/lif/data_source_adapters/sis_data_source_to_lif_adapter/__init__.py` to reflect the new adapter name of `SisDataSourceToLIFAdapter` -4. Add the adapter id (`sis-data-source-to-lif`) into the `components/lif/data_source_adapters/__init__.py::_EXTERNAL_ADAPTERS` map and add the adapter import: +5. Add the adapter id (`sis-data-source-to-lif`) into the `components/lif/data_source_adapters/__init__.py::_EXTERNAL_ADAPTERS` map and add the adapter import: ```python ... from .sis_data_source_to_lif_adapter.adapter import SisDataSourceToLIFAdapter @@ -65,7 +67,7 @@ The `example-data-source-rest-api-to-lif` adapter is the reference implementatio } ``` -4. In the docker compose file for `dagster-code-location`, add the following environment variables with the appropriate configuration for the data source: +6. In the docker compose file for `dagster-code-location`, add the following environment variables with the appropriate configuration for the data source: ``` ADAPTERS__SIS_DATA_SOURCE_TO_LIF__ORG1_ACME_SIS_DATA_SOURCE__CREDENTIALS__HOST ADAPTERS__SIS_DATA_SOURCE_TO_LIF__ORG1_ACME_SIS_DATA_SOURCE__CREDENTIALS__SCHEME @@ -74,13 +76,13 @@ The `example-data-source-rest-api-to-lif` adapter is the reference implementatio - Note the format is `ADAPTERS__[[ADAPTER_ID]]__[[ORG]][[DATA_SOURCE_ID]]__CREDENTIALS__...` -4. Rebuild and start docker compose with `deployments/advisor-demo-docker` (from the root of the repo, you can run `docker-compose -f deployments/advisor-demo-docker/docker-compose.yml up --build`) +7. Rebuild and start docker compose with `deployments/advisor-demo-docker` (from the root of the repo, you can run `docker-compose -f deployments/advisor-demo-docker/docker-compose.yml up --build`) -5. In the **MDR** > `Data Models` tab, add a new `SourceSchema` Data Model that describes how the data will be returned from the data source. It does not need to be exhaustive, just enough to cover the data that will be mapped into the _Org LIF_ schema paths. Take note of the **MDR** data source ID (in the context path of the **MDR** URL and at the top of the right hand panel when the data model itself is selected). This ID will be used to configure the translation flow later on. +8. In the **MDR** > `Data Models` tab, add a new `SourceSchema` Data Model that describes how the data will be returned from the data source. It does not need to be exhaustive, just enough to cover the data that will be mapped into the _Org LIF_ schema paths. Take note of the **MDR** data source ID (in the context path of the **MDR** URL and at the top of the right hand panel when the data model itself is selected). This ID will be used to configure the translation flow later on. - The unique name of entities, attributes, etc should be a 'dot path'. For example, if the source schema contains `user > details > address > state`, the `name` for the **MDR** entry should be _state_, and the `unique name` should be _user.details.address.state_. - Only attributes are able to be mapped, so for the above case, _state_ should be an attribute. -6. In the **MDR** > `Mappings` tab, select the new data source. In the center column, click `Create`. Using the built in controls, configure the translations from the new `Source Data Model` into the `Target Data Model` with the sticky lines. +9. In the **MDR** > `Mappings` tab, select the new data source. In the center column, click `Create`. Using the built in controls, configure the translations from the new `Source Data Model` into the `Target Data Model` with the sticky lines. - Reminder: Only attributes can be mapped. - Due to a bug in the user flow, after mapping an attribute, manually lower case the JSONata _expression_ by double clicking on the sticky line and adjusting the field. For example, given the expression: ``` @@ -91,14 +93,14 @@ The `example-data-source-rest-api-to-lif` adapter is the reference implementatio { "person": [{ "contact": [{ "address": [{ "addressCity": user.details.address.state }] }] }] } ``` -7. If target fields in the mappings need to be added into the _Org LIF_ model, first review the `Data Models` > _Base LIF_ data model to see if the field already exists and just needs to be marked as included in _Org LIF_ model (You can review this by accessing `StateU LIF` > `Base LIF Inclusions` > find the field and tick the `Inc` checkbox). If the field does not exist in the _Base LIF_ model, then in the _Org LIF_ model, use the three vertical dots button to create the needed entities and attributes. Please do not modify the _Base LIF_ model. +10. If target fields in the mappings need to be added into the _Org LIF_ model, first review the `Data Models` > _Base LIF_ data model to see if the field already exists and just needs to be marked as included in _Org LIF_ model (You can review this by accessing `StateU LIF` > `Base LIF Inclusions` > find the field and tick the `Inc` checkbox). If the field does not exist in the _Base LIF_ model, then in the _Org LIF_ model, use the three vertical dots button to create the needed entities and attributes. Please do not modify the _Base LIF_ model. - If creating new entities or attributes: - Remember the dot.path for the unique name - Ensure the new fields have `Array` set to `Yes` - If you update your _Org LIF_ data model, you should also update `components/lif/mdr_client/resources/openapi_constrained_with_interactions.json`. This file must be updated from http://localhost:8012/datamodels/open_api_schema/17?include_attr_md=true which is not currently exportable from the **MDR** UI. You will need to include the user's Bearer token from using **MDR**'s UI in an `Authorization` header when retrieving the download, such as `curl 'http://localhost:8012/datamodels/open_api_schema/17?include_attr_md=true' -H 'Authorization: Bearer ...' > components/lif/mdr_client/resources/openapi_constrained_with_interactions.json`. After changing the json file, rebuild and start docker compose (the rebuild/start can be done in a later step as well). - If the GraphQL schema isn't validating in the Strawberry GraphQL UI (`localhost:8010`) the way you'd expect, the json file needs to be updated (or the _Org LIF_ data model needs adjustment) -8. Add a new block in `deployments/advisor-demo-docker/volumes/lif_query_planner/org1/information_sources_config_org1.yml` and enumerate the _Org LIF_ schema JSON paths the data source will populate (note the population occurs during translation). Only specify 2 nodes deep: for `person.contact.address.addressState`, just add `person.contact`. +11. Add a new block in `deployments/advisor-demo-docker/volumes/lif_query_planner/org1/information_sources_config_org1.yml` and enumerate the _Org LIF_ schema JSON paths the data source will populate (note the population occurs during translation). Only specify 2 nodes deep: for `person.contact.address.addressState`, just add `person.contact`. ```yaml - information_source_id: "org1-acme-sis-data-source" information_source_organization: "Org1" @@ -112,7 +114,7 @@ The `example-data-source-rest-api-to-lif` adapter is the reference implementatio target_schema_id: "17" <-- In the reference implementation, the Org LIF schema ID is constant (17) ``` -9. After a docker compose rebuild and start, you should be able to query LIF via the **LIF API**, which is exposed via the Strawberry GraphQL endpoint http://localhost:8010 with the following payload. Note `employmentPreferences > organizationTypes` is populated from `org1-example-data-source`, and the `custom > ...` and `contact > ...` are populated from `acme-sis-data-source`. +12. After a docker compose rebuild and start, you should be able to query LIF via the **LIF API**, which is exposed via the Strawberry GraphQL endpoint http://localhost:8010 with the following payload. Note `employmentPreferences > organizationTypes` is populated from `org1-example-data-source`, and the `custom > ...` and `contact > ...` are populated from `acme-sis-data-source`. ```json query MyQuery { person( @@ -134,7 +136,7 @@ The `example-data-source-rest-api-to-lif` adapter is the reference implementatio } ``` -10. In order for the new data source to be leveraged in the Advisor, additional work needs to occur: +13. In order for the new data source to be leveraged in the Advisor, additional work needs to occur: - The MCP service needs to be aware additional _Org LIF_ schema changes - Your organization's user IDs needs to be available in the Advisor API so the Advisor login details matches the appropriate user in the new data source. Currently, there's only the 6 static users for demo purposes. In the future, this should be a configurable effort with robust authN and the LIF **Identity Mapper**. diff --git a/docs/guides/creating_a_data_source_adapter.md b/docs/guides/creating_a_data_source_adapter.md new file mode 100644 index 0000000..cd824f7 --- /dev/null +++ b/docs/guides/creating_a_data_source_adapter.md @@ -0,0 +1,271 @@ +# Creating a Data Source Adapter + +This guide is the **reference** for the data source adapter contract: what adapters are, what they receive, what they return, and how to write one. It is aimed at developers adapting their own code to the LIF system or writing a new adapter from scratch. + +> **Looking for an end-to-end walkthrough?** [`LIF_Add_Data_Source.md`](LIF_Add_Data_Source.md) is the tutorial. It walks through a concrete scenario — building an SIS-style adapter, setting up the MDR source schema and JSONata mappings, wiring up Docker Compose, and verifying via GraphQL. Use that guide when you want step-by-step instructions; use this one when you need to understand the adapter contract. + +## How Adapters Fit In + +The **Orchestrator** (Dagster) executes adapters to fetch person data from external sources. Each adapter is a Python class that knows how to talk to one kind of data source. The orchestrator calls adapters based on a **query plan** — a list of instructions that says "for this person, fetch these fields from this source using this adapter." + +``` +Query Planner Orchestrator (Dagster) + │ │ + │ query plan parts │ + └───────────────────────────────>│ + │ + ┌────────────────┼────────────────┐ + │ │ │ + Adapter A Adapter B Adapter C + (LIF-to-LIF) (REST API) (Your adapter) + │ │ │ + │ │ ┌────────────┘ + │ │ │ + │ Translator Service + │ │ │ + └────────┬───────┘────┘ + │ + Query Planner + (stores results) +``` + +There are two flows: + +1. **LIF-to-LIF** — The source already returns data in LIF schema format. The adapter returns structured `OrchestratorJobQueryPlanPartResults` directly. No translation needed. + +2. **Pipeline-integrated** — The source returns data in its own format. The adapter returns raw JSON (`dict`). The orchestrator then sends this to the **Translator** service, which uses MDR-defined transformation rules to convert it into LIF schema format. + +Most custom adapters will use the pipeline-integrated flow. + +## What the Adapter Receives + +When the orchestrator instantiates your adapter, it passes two arguments: + +### `lif_query_plan_part: LIFQueryPlanPart` + +Contains everything the adapter needs to know about what data to fetch: + +| Field | Type | Description | +|-------|------|-------------| +| `person_id` | `LIFPersonIdentifier` | The person to look up. Has `.identifier` (e.g., `"100001"`) and `.identifierType` (e.g., `"School-assigned number"`) | +| `information_source_id` | `str` | Which configured data source this is (e.g., `"org1-acme-sis"`) | +| `adapter_id` | `str` | Your adapter's registered ID (e.g., `"acme-sis-to-lif"`) | +| `lif_fragment_paths` | `List[str]` | Which LIF data fields are needed (e.g., `["Person.Contact", "Person.Name"]`) | +| `translation` | `LIFQueryPlanPartTranslation \| None` | If set, has `source_schema_id` and `target_schema_id` for the translator | + +### `credentials: dict` + +Key-value pairs loaded from environment variables. The keys come from your adapter's `credential_keys` class variable. Common keys: `host`, `scheme`, `token`. + +## What the Adapter Returns + +### Pipeline-integrated adapters (most custom adapters) + +Return the raw JSON response from your data source as a Python `dict`. The orchestrator passes this to the Translator service, which applies MDR-defined JSONata transformation rules to map it into LIF schema format. + +```python +def run(self) -> dict: + response = requests.get(url, headers=headers, timeout=30) + response.raise_for_status() + return response.json() +``` + +The translator will: +1. Fetch the source and target schemas from the MDR (using the IDs in `translation`) +2. Fetch the JSONata transformation expressions from the MDR +3. Apply the transformations to convert your source data into LIF-formatted fragments +4. Return `OrchestratorJobQueryPlanPartResults` with the translated data + +This means your adapter does not need to know anything about the LIF schema. It just fetches data from the source in whatever format the source provides. The schema mapping is handled entirely in the MDR configuration. + +### LIF-to-LIF adapters + +If your source already returns LIF-formatted data, return `OrchestratorJobQueryPlanPartResults` directly: + +```python +def run(self) -> OrchestratorJobQueryPlanPartResults: + # ... fetch data ... + return OrchestratorJobQueryPlanPartResults( + information_source_id=self.lif_query_plan_part.information_source_id, + adapter_id=self.lif_query_plan_part.adapter_id, + data_timestamp=dt.datetime.now(dt.timezone.utc).isoformat(), + person_id=self.lif_query_plan_part.person_id, + fragments=[LIFFragment(fragment_path="person.all", fragment=[data])], + error=None, + ) +``` + +## Writing Your Adapter + +### Step 1: Create the adapter directory + +``` +components/lif/data_source_adapters/ +└── my_source_adapter/ + ├── __init__.py + └── adapter.py +``` + +### Step 2: Implement the adapter class + +Your adapter must subclass `LIFDataSourceAdapter` and define three class variables: + +| Variable | Required | Description | +|----------|----------|-------------| +| `adapter_id` | Yes | Unique string ID (e.g., `"my-source-to-lif"`). Used in config files and env vars. | +| `adapter_type` | Yes | One of: `LIFAdapterType.PIPELINE_INTEGRATED`, `LIF_TO_LIF`, `STANDALONE`, `AI_WRITE` | +| `credential_keys` | No | List of credential keys your adapter needs (defaults to `[]`) | + +You must also implement `__init__` (accepting `lif_query_plan_part` and `credentials`) and `run()`. + +Here is a complete example for a REST API data source: + +```python +# components/lif/data_source_adapters/my_source_adapter/adapter.py + +import requests + +from lif.datatypes import LIFQueryPlanPart +from lif.logging import get_logger +from ..core import LIFAdapterType, LIFDataSourceAdapter + +logger = get_logger(__name__) + + +class MySourceAdapter(LIFDataSourceAdapter): + adapter_id = "my-source-to-lif" + adapter_type = LIFAdapterType.PIPELINE_INTEGRATED + credential_keys = ["host", "scheme", "token"] + + def __init__(self, lif_query_plan_part: LIFQueryPlanPart, credentials: dict): + self.lif_query_plan_part = lif_query_plan_part + self.host = credentials.get("host") + self.scheme = credentials.get("scheme") or "https" + self.token = credentials.get("token") + + def run(self) -> dict: + identifier = self.lif_query_plan_part.person_id.identifier or "" + url = f"{self.scheme}://{self.host}/api/people/{identifier}" + + headers = {"Authorization": f"Bearer {self.token}"} + + logger.info(f"Fetching from {url}") + + response = requests.get(url, headers=headers, timeout=30) + response.raise_for_status() + result = response.json() + + if "errors" in result: + error_msg = f"Source API errors: {result['errors']}" + logger.error(error_msg) + raise Exception(error_msg) + + logger.info("Source query executed successfully") + return result +``` + +And the `__init__.py`: + +```python +# components/lif/data_source_adapters/my_source_adapter/__init__.py + +from .adapter import MySourceAdapter + +__all__ = ["MySourceAdapter"] +``` + +### Step 3: Register the adapter + +Add your adapter to the registry in `components/lif/data_source_adapters/__init__.py`: + +```python +from .my_source_adapter import MySourceAdapter + +_EXTERNAL_ADAPTERS = { + "example-data-source-rest-api-to-lif": ExampleDataSourceRestAPIToLIFAdapter, + "my-source-to-lif": MySourceAdapter, # <-- add this +} +``` + +The registry key must match your adapter's `adapter_id`. + +### Step 4: Wire it up + +Once the adapter class is written and registered, three more things need to happen before it runs: + +1. **Credentials** — set environment variables on the `dagster-code-location` container using the naming convention `ADAPTERS______CREDENTIALS__` (uppercased, dashes converted to underscores). Missing credentials produce a warning at startup but do not block initialization, so handle absent values gracefully in `__init__`. + +2. **Information source config** — add an entry to the query planner's `information_sources_config_*.yml` referencing your `adapter_id`, the `lif_fragment_paths` your source provides, and a `translation` block with the MDR schema IDs (for pipeline-integrated adapters). + +3. **MDR schemas and mappings** — create a source schema describing your API response and JSONata mappings to the target LIF schema. Only attributes (leaf fields) can be mapped. + +[`LIF_Add_Data_Source.md`](LIF_Add_Data_Source.md) walks through each of these with a concrete example — use it as the step-by-step companion when you are ready to wire your adapter into a running environment. + +## Adapter Design Guidelines + +### Error handling + +- **Raise exceptions on failure.** The orchestrator has built-in retry logic (3 retries with exponential backoff and jitter). Let exceptions propagate so retries can kick in. +- **Check for error payloads.** Many APIs return 200 with an `errors` field in the body. Check for this and raise if present. +- **Use timeouts.** Always set a timeout on HTTP requests (30 seconds is a reasonable default). + +### Logging + +Use the LIF logger: + +```python +from lif.logging import get_logger +logger = get_logger(__name__) +``` + +Log at `info` level for key milestones (request URL, success) and `debug` for response payloads. The orchestrator logs are visible in Dagster's run view. + +### Statelessness + +Adapters are instantiated fresh for each query plan part execution. Do not store state between calls. Caching is handled upstream by the LIF Query Cache service. + +### Network access + +The adapter runs inside the `dagster-code-location` container. If your data source is on the host machine's localhost, use `host.docker.internal` as the hostname. + +### Credential validation + +You can override `validate_credentials` for custom checks: + +```python +@classmethod +def validate_credentials(cls, credentials: dict) -> None: + super().validate_credentials(credentials) + if not credentials.get("token"): + raise ValueError("Token is required for MySource adapter") +``` + +## Reference Implementations + +The repository includes two adapters you can study or clone as a starting point: + +| Adapter | Type | Returns | Path | +|---------|------|---------|------| +| `lif-to-lif` | `LIF_TO_LIF` | `OrchestratorJobQueryPlanPartResults` | `components/lif/data_source_adapters/lif_to_lif_adapter/` | +| `example-data-source-rest-api-to-lif` | `PIPELINE_INTEGRATED` | `dict` | `components/lif/data_source_adapters/example_data_source_rest_api_to_lif_adapter/` | + +The `example-data-source-rest-api-to-lif` adapter is the simplest starting point for most custom adapters. It demonstrates the full pipeline-integrated flow in under 45 lines of code. + +## Troubleshooting + +For MDR mapping issues, empty fragments, cache invalidation, and Dagster run inspection, see the troubleshooting section of [`LIF_Add_Data_Source.md`](LIF_Add_Data_Source.md#troubleshooting). The items below are specific to adapter development. + +### Adapter not found + +If the orchestrator raises `Unknown adapter_id`, the adapter class is not in `ADAPTER_REGISTRY`. Verify the import and `_EXTERNAL_ADAPTERS` entry in `components/lif/data_source_adapters/__init__.py`, and that the registry key matches the `adapter_id` class variable exactly. + +### Empty credentials + +If your adapter receives an empty `credentials` dict, check: +- The env var naming matches the convention exactly (uppercased, dashes to underscores in both the adapter ID and information source ID) +- The env vars are set on the `dagster-code-location` container, not another service +- Your adapter's `credential_keys` list includes every key you read in `__init__` — only declared keys are loaded from the environment + +### Exceptions that don't retry + +The orchestrator retries adapter failures up to 3 times with exponential backoff — but only if your `run()` method raises. If you catch exceptions and return a malformed result instead, the orchestrator has nothing to retry on. Let exceptions propagate.