|
| 1 | +# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES. |
| 2 | +# All rights reserved. |
| 3 | +# SPDX-License-Identifier: Apache-2.0 |
| 4 | + |
| 5 | +"""Graph operator: extract relational DB schema and store it in Neo4j.""" |
| 6 | + |
| 7 | +from __future__ import annotations |
| 8 | + |
| 9 | +from typing import Any |
| 10 | + |
| 11 | +import pandas as pd |
| 12 | + |
| 13 | +from nemo_retriever.graph.abstract_operator import AbstractOperator |
| 14 | +from nemo_retriever.graph.cpu_operator import CPUOperator |
| 15 | +from nemo_retriever.params import TabularExtractParams |
| 16 | + |
| 17 | + |
| 18 | +class TabularSchemaExtractOp(AbstractOperator, CPUOperator): |
| 19 | + """Extract schema entities from a relational DB and write them to Neo4j. |
| 20 | +
|
| 21 | + Combines two steps: |
| 22 | + 1. Pull schema metadata (tables, columns, views, PKs, FKs) from the |
| 23 | + database via the :class:`~nemo_retriever.tabular_data.sql_database.SQLDatabase` |
| 24 | + connector stored in *tabular_params*. |
| 25 | + 2. Write the extracted entities as graph nodes and relationships into Neo4j. |
| 26 | +
|
| 27 | + The operator produces an empty DataFrame as output so it can be chained |
| 28 | + with downstream operators (e.g. :class:`TabularFetchEmbeddingsOp`) via |
| 29 | + ``>>``. All meaningful state lives in Neo4j after this step. |
| 30 | + """ |
| 31 | + |
| 32 | + def __init__( |
| 33 | + self, |
| 34 | + *, |
| 35 | + tabular_params: TabularExtractParams | None = None, |
| 36 | + **kwargs: Any, |
| 37 | + ) -> None: |
| 38 | + super().__init__(tabular_params=tabular_params, **kwargs) |
| 39 | + self._tabular_params = tabular_params |
| 40 | + |
| 41 | + def preprocess(self, data: Any, **kwargs: Any) -> TabularExtractParams | None: |
| 42 | + if isinstance(data, TabularExtractParams): |
| 43 | + return data |
| 44 | + return self._tabular_params |
| 45 | + |
| 46 | + def process(self, data: TabularExtractParams | None, **kwargs: Any) -> pd.DataFrame: |
| 47 | + from nemo_retriever.tabular_data.ingestion.extract_data import ( |
| 48 | + extract_tabular_db_data, |
| 49 | + store_relational_db_in_neo4j, |
| 50 | + ) |
| 51 | + |
| 52 | + schema_data = extract_tabular_db_data(params=data) |
| 53 | + store_relational_db_in_neo4j(data=schema_data) |
| 54 | + return pd.DataFrame() |
| 55 | + |
| 56 | + def postprocess(self, data: Any, **kwargs: Any) -> Any: |
| 57 | + return data |
0 commit comments