diff --git a/cognite_toolkit/_cdf_tk/client/_toolkit_client.py b/cognite_toolkit/_cdf_tk/client/_toolkit_client.py index 93ba4f4a0b..26f6fe69e3 100644 --- a/cognite_toolkit/_cdf_tk/client/_toolkit_client.py +++ b/cognite_toolkit/_cdf_tk/client/_toolkit_client.py @@ -31,6 +31,7 @@ from .api.migration import MigrationAPI from .api.project import ProjectAPI from .api.raw import RawAPI +from .api.records import RecordsAPI from .api.relationships import RelationshipsAPI from .api.robotics import RoboticsAPI from .api.search import SearchAPI @@ -115,6 +116,7 @@ def __init__( self.project = ProjectAPI(config=toolkit_config, cognite_client=self) self.infield = InfieldAPI(http_client) self.streams = StreamsAPI(http_client) + self.records = RecordsAPI(http_client) @property def config(self) -> ToolkitClientConfig: diff --git a/cognite_toolkit/_cdf_tk/client/api/records.py b/cognite_toolkit/_cdf_tk/client/api/records.py new file mode 100644 index 0000000000..48e5ebfce8 --- /dev/null +++ b/cognite_toolkit/_cdf_tk/client/api/records.py @@ -0,0 +1,59 @@ +from collections.abc import Sequence + +from cognite_toolkit._cdf_tk.client.cdf_client import CDFResourceAPI, PagedResponse +from cognite_toolkit._cdf_tk.client.cdf_client.api import Endpoint +from cognite_toolkit._cdf_tk.client.http_client import HTTPClient, ItemsSuccessResponse, SuccessResponse +from cognite_toolkit._cdf_tk.client.resource_classes.records import RecordIdentifier, RecordRequest, RecordResponse + + +class RecordsAPI(CDFResourceAPI[RecordIdentifier, RecordRequest, RecordResponse]): + """API for managing records in CDF streams. + + Records are scoped to a stream, so the stream_id is passed to each method, + following the same pattern as RawTablesAPI with db_name. + """ + + def __init__(self, http_client: HTTPClient) -> None: + super().__init__( + http_client=http_client, + method_endpoint_map={ + "create": Endpoint(method="POST", path="/streams/{streamId}/records", item_limit=1000), + "upsert": Endpoint(method="POST", path="/streams/{streamId}/records/upsert", item_limit=1000), + "delete": Endpoint(method="POST", path="/streams/{streamId}/records/delete", item_limit=1000), + }, + ) + + def _validate_page_response( + self, response: SuccessResponse | ItemsSuccessResponse + ) -> PagedResponse[RecordResponse]: + return PagedResponse[RecordResponse].model_validate_json(response.body) + + def ingest(self, stream_id: str, items: Sequence[RecordRequest]) -> None: + """Ingest records into a stream. + + Args: + stream_id: The external ID of the stream to ingest records into. + items: Sequence of RecordRequest items to ingest. + """ + endpoint = f"/streams/{stream_id}/records" + self._request_no_response(items, "create", endpoint=endpoint) + + def upsert(self, stream_id: str, items: Sequence[RecordRequest]) -> None: + """Upsert records into a stream (create or update). + + Args: + stream_id: The external ID of the stream to upsert records into. + items: Sequence of RecordRequest items to upsert. + """ + endpoint = f"/streams/{stream_id}/records/upsert" + self._request_no_response(items, "upsert", endpoint=endpoint) + + def delete(self, stream_id: str, items: Sequence[RecordIdentifier]) -> None: + """Delete records from a stream. + + Args: + stream_id: The external ID of the stream to delete records from. + items: Sequence of RecordIdentifier objects to delete. + """ + endpoint = f"/streams/{stream_id}/records/delete" + self._request_no_response(items, "delete", endpoint=endpoint) diff --git a/cognite_toolkit/_cdf_tk/client/resource_classes/records.py b/cognite_toolkit/_cdf_tk/client/resource_classes/records.py new file mode 100644 index 0000000000..3f05fedd76 --- /dev/null +++ b/cognite_toolkit/_cdf_tk/client/resource_classes/records.py @@ -0,0 +1,40 @@ +from pydantic import JsonValue + +from cognite_toolkit._cdf_tk.client._resource_base import BaseModelObject, Identifier, RequestResource, ResponseResource + +from .data_modeling._references import ContainerReference + + +class RecordIdentifier(Identifier): + space: str + external_id: str + + def __str__(self) -> str: + return f"Record({self.space}, {self.external_id})" + + +class RecordSource(BaseModelObject): + source: ContainerReference + properties: dict[str, JsonValue] + + +class RecordRequest(RequestResource): + """A record request resource for ingesting into a stream.""" + + space: str + external_id: str + sources: list[RecordSource] + + def as_id(self) -> RecordIdentifier: + return RecordIdentifier(space=self.space, external_id=self.external_id) + + +class RecordResponse(ResponseResource[RecordRequest]): + """A record response from the API.""" + + space: str + external_id: str + properties: dict[str, dict[str, JsonValue]] | None = None + + def as_request_resource(self) -> RecordRequest: + raise NotImplementedError("Converting RecordResponse to RecordRequest is not yet supported.") diff --git a/tests/test_unit/test_cdf_tk/test_client/test_cdf_apis.py b/tests/test_unit/test_cdf_tk/test_client/test_cdf_apis.py index a8d43af2ea..7c9323c67b 100644 --- a/tests/test_unit/test_cdf_tk/test_client/test_cdf_apis.py +++ b/tests/test_unit/test_cdf_tk/test_client/test_cdf_apis.py @@ -11,6 +11,7 @@ from cognite_toolkit._cdf_tk.client.api.graphql_data_models import GraphQLDataModelsAPI from cognite_toolkit._cdf_tk.client.api.location_filters import LocationFiltersAPI from cognite_toolkit._cdf_tk.client.api.raw import RawTablesAPI +from cognite_toolkit._cdf_tk.client.api.records import RecordsAPI from cognite_toolkit._cdf_tk.client.api.search_config import SearchConfigurationsAPI from cognite_toolkit._cdf_tk.client.api.streams import StreamsAPI from cognite_toolkit._cdf_tk.client.api.workflow_triggers import WorkflowTriggersAPI @@ -21,6 +22,7 @@ from cognite_toolkit._cdf_tk.client.http_client import HTTPClient from cognite_toolkit._cdf_tk.client.request_classes.filters import AnnotationFilter from cognite_toolkit._cdf_tk.client.resource_classes.annotation import AnnotationResponse +from cognite_toolkit._cdf_tk.client.resource_classes.data_modeling._references import ContainerReference from cognite_toolkit._cdf_tk.client.resource_classes.filemetadata import FileMetadataResponse from cognite_toolkit._cdf_tk.client.resource_classes.function_schedule import ( FunctionScheduleRequest, @@ -32,6 +34,7 @@ from cognite_toolkit._cdf_tk.client.resource_classes.identifiers import ExternalId from cognite_toolkit._cdf_tk.client.resource_classes.location_filter import LocationFilterResponse from cognite_toolkit._cdf_tk.client.resource_classes.raw import RAWTableResponse +from cognite_toolkit._cdf_tk.client.resource_classes.records import RecordRequest, RecordSource from cognite_toolkit._cdf_tk.client.resource_classes.search_config import SearchConfigResponse from cognite_toolkit._cdf_tk.client.resource_classes.streams import StreamResponse from cognite_toolkit._cdf_tk.client.resource_classes.workflow import WorkflowResponse @@ -171,6 +174,45 @@ def test_raw_table_api_crud(self, toolkit_config: ToolkitClientConfig, respx_moc assert len(page.items) == 1 assert page.items[0] == instance + def test_records_api_crud(self, toolkit_config: ToolkitClientConfig, respx_mock: respx.MockRouter) -> None: + """Test RecordsAPI ingest, upsert, and delete methods. + + This cannot be tested using the generic test as it requires custom mocking of the + api endpoints as stream_id is part of the URL. + """ + config = toolkit_config + api = RecordsAPI(HTTPClient(config)) + + stream_id = "my_stream" + request = RecordRequest( + space="my_space", + external_id="record_001", + sources=[ + RecordSource( + source=ContainerReference(space="my_space", external_id="my_container"), + properties={"name": "Example Record"}, + ) + ], + ) + + # Test ingest (API returns 202 Accepted) + respx_mock.post(config.create_api_url(f"/streams/{stream_id}/records")).mock( + return_value=httpx.Response(status_code=202) + ) + api.ingest(stream_id, [request]) + + # Test upsert (API returns 202 Accepted) + respx_mock.post(config.create_api_url(f"/streams/{stream_id}/records/upsert")).mock( + return_value=httpx.Response(status_code=202) + ) + api.upsert(stream_id, [request]) + + # Test delete + respx_mock.post(config.create_api_url(f"/streams/{stream_id}/records/delete")).mock( + return_value=httpx.Response(status_code=200) + ) + api.delete(stream_id, [request.as_id()]) + def test_metadataapi_crud_iterate(self, toolkit_config: ToolkitClientConfig, respx_mock: respx.MockRouter) -> None: resource = get_example_minimum_responses(FileMetadataResponse) instance = FileMetadataResponse.model_validate(resource)