Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cognite_toolkit/_cdf_tk/client/_toolkit_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from .api.migration import MigrationAPI
from .api.project import ProjectAPI
from .api.raw import RawAPI
from .api.records import RecordsAPI
from .api.relationships import RelationshipsAPI
from .api.robotics import RoboticsAPI
from .api.search import SearchAPI
Expand Down Expand Up @@ -115,6 +116,7 @@ def __init__(
self.project = ProjectAPI(config=toolkit_config, cognite_client=self)
self.infield = InfieldAPI(http_client)
self.streams = StreamsAPI(http_client)
self.records = RecordsAPI(http_client)

@property
def config(self) -> ToolkitClientConfig:
Expand Down
59 changes: 59 additions & 0 deletions cognite_toolkit/_cdf_tk/client/api/records.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from collections.abc import Sequence

from cognite_toolkit._cdf_tk.client.cdf_client import CDFResourceAPI, PagedResponse
from cognite_toolkit._cdf_tk.client.cdf_client.api import Endpoint
from cognite_toolkit._cdf_tk.client.http_client import HTTPClient, ItemsSuccessResponse, SuccessResponse
from cognite_toolkit._cdf_tk.client.resource_classes.records import RecordIdentifier, RecordRequest, RecordResponse


class RecordsAPI(CDFResourceAPI[RecordIdentifier, RecordRequest, RecordResponse]):
"""API for managing records in CDF streams.

Records are scoped to a stream, so the stream_id is passed to each method,
following the same pattern as RawTablesAPI with db_name.
"""

def __init__(self, http_client: HTTPClient) -> None:
super().__init__(
http_client=http_client,
method_endpoint_map={
"create": Endpoint(method="POST", path="/streams/{streamId}/records", item_limit=1000),
"upsert": Endpoint(method="POST", path="/streams/{streamId}/records/upsert", item_limit=1000),
"delete": Endpoint(method="POST", path="/streams/{streamId}/records/delete", item_limit=1000),
},
)

def _validate_page_response(
self, response: SuccessResponse | ItemsSuccessResponse
) -> PagedResponse[RecordResponse]:
return PagedResponse[RecordResponse].model_validate_json(response.body)

def ingest(self, stream_id: str, items: Sequence[RecordRequest]) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try to standardize

Suggested change
def ingest(self, stream_id: str, items: Sequence[RecordRequest]) -> None:
def create(self, stream_id: str, items: Sequence[RecordRequest]) -> None:

"""Ingest records into a stream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The stream_id parameter is directly concatenated into the URL path using an f-string. This can lead to path traversal vulnerabilities if the stream_id contains characters like ../. An attacker could potentially manipulate the URL to hit unintended API endpoints. It is recommended to URL-encode the stream_id using urllib.parse.quote(stream_id, safe='') to ensure it is treated as a single path segment. Note: You will need to import quote from urllib.parse.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd lean towards this not really being a issue due to the worst case scenario being a malformed / invalid API request that would be rejected anyways

Args:
stream_id: The external ID of the stream to ingest records into.
items: Sequence of RecordRequest items to ingest.
"""
endpoint = f"/streams/{stream_id}/records"
self._request_no_response(items, "create", endpoint=endpoint)

def upsert(self, stream_id: str, items: Sequence[RecordRequest]) -> None:
Copy link
Collaborator

@doctrino doctrino Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I try to use create,retrieve, delete, update as far as possible even for upsert endpoints.

Suggested change
def upsert(self, stream_id: str, items: Sequence[RecordRequest]) -> None:
def update(self, stream_id: str, items: Sequence[RecordRequest]) -> None:

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets move stream_id innto the RecordRequest, see how it is solved for RawAPI

"""Upsert records into a stream (create or update).

Args:
stream_id: The external ID of the stream to upsert records into.
items: Sequence of RecordRequest items to upsert.
"""
endpoint = f"/streams/{stream_id}/records/upsert"
self._request_no_response(items, "upsert", endpoint=endpoint)

def delete(self, stream_id: str, items: Sequence[RecordIdentifier]) -> None:
Comment on lines +41 to +51
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, move stream_id into RecordIdentifier.

"""Delete records from a stream.

Args:
stream_id: The external ID of the stream to delete records from.
items: Sequence of RecordIdentifier objects to delete.
"""
endpoint = f"/streams/{stream_id}/records/delete"
self._request_no_response(items, "delete", endpoint=endpoint)
40 changes: 40 additions & 0 deletions cognite_toolkit/_cdf_tk/client/resource_classes/records.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from pydantic import JsonValue

from cognite_toolkit._cdf_tk.client._resource_base import BaseModelObject, Identifier, RequestResource, ResponseResource

from .data_modeling._references import ContainerReference


class RecordIdentifier(Identifier):
space: str
external_id: str

def __str__(self) -> str:
return f"Record({self.space}, {self.external_id})"


class RecordSource(BaseModelObject):
source: ContainerReference
properties: dict[str, JsonValue]


class RecordRequest(RequestResource):
"""A record request resource for ingesting into a stream."""

space: str
external_id: str
sources: list[RecordSource]

def as_id(self) -> RecordIdentifier:
return RecordIdentifier(space=self.space, external_id=self.external_id)


class RecordResponse(ResponseResource[RecordRequest]):
"""A record response from the API."""

space: str
external_id: str
properties: dict[str, dict[str, JsonValue]] | None = None

def as_request_resource(self) -> RecordRequest:
raise NotImplementedError("Converting RecordResponse to RecordRequest is not yet supported.")
42 changes: 42 additions & 0 deletions tests/test_unit/test_cdf_tk/test_client/test_cdf_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from cognite_toolkit._cdf_tk.client.api.graphql_data_models import GraphQLDataModelsAPI
from cognite_toolkit._cdf_tk.client.api.location_filters import LocationFiltersAPI
from cognite_toolkit._cdf_tk.client.api.raw import RawTablesAPI
from cognite_toolkit._cdf_tk.client.api.records import RecordsAPI
from cognite_toolkit._cdf_tk.client.api.search_config import SearchConfigurationsAPI
from cognite_toolkit._cdf_tk.client.api.streams import StreamsAPI
from cognite_toolkit._cdf_tk.client.api.workflow_triggers import WorkflowTriggersAPI
Expand All @@ -21,6 +22,7 @@
from cognite_toolkit._cdf_tk.client.http_client import HTTPClient
from cognite_toolkit._cdf_tk.client.request_classes.filters import AnnotationFilter
from cognite_toolkit._cdf_tk.client.resource_classes.annotation import AnnotationResponse
from cognite_toolkit._cdf_tk.client.resource_classes.data_modeling._references import ContainerReference
from cognite_toolkit._cdf_tk.client.resource_classes.filemetadata import FileMetadataResponse
from cognite_toolkit._cdf_tk.client.resource_classes.function_schedule import (
FunctionScheduleRequest,
Expand All @@ -32,6 +34,7 @@
from cognite_toolkit._cdf_tk.client.resource_classes.identifiers import ExternalId
from cognite_toolkit._cdf_tk.client.resource_classes.location_filter import LocationFilterResponse
from cognite_toolkit._cdf_tk.client.resource_classes.raw import RAWTableResponse
from cognite_toolkit._cdf_tk.client.resource_classes.records import RecordRequest, RecordSource
from cognite_toolkit._cdf_tk.client.resource_classes.search_config import SearchConfigResponse
from cognite_toolkit._cdf_tk.client.resource_classes.streams import StreamResponse
from cognite_toolkit._cdf_tk.client.resource_classes.workflow import WorkflowResponse
Expand Down Expand Up @@ -171,6 +174,45 @@ def test_raw_table_api_crud(self, toolkit_config: ToolkitClientConfig, respx_moc
assert len(page.items) == 1
assert page.items[0] == instance

def test_records_api_crud(self, toolkit_config: ToolkitClientConfig, respx_mock: respx.MockRouter) -> None:
"""Test RecordsAPI ingest, upsert, and delete methods.

This cannot be tested using the generic test as it requires custom mocking of the
api endpoints as stream_id is part of the URL.
"""
config = toolkit_config
api = RecordsAPI(HTTPClient(config))

stream_id = "my_stream"
request = RecordRequest(
space="my_space",
external_id="record_001",
sources=[
RecordSource(
source=ContainerReference(space="my_space", external_id="my_container"),
properties={"name": "Example Record"},
)
],
)

# Test ingest (API returns 202 Accepted)
respx_mock.post(config.create_api_url(f"/streams/{stream_id}/records")).mock(
return_value=httpx.Response(status_code=202)
)
api.ingest(stream_id, [request])

# Test upsert (API returns 202 Accepted)
respx_mock.post(config.create_api_url(f"/streams/{stream_id}/records/upsert")).mock(
return_value=httpx.Response(status_code=202)
)
api.upsert(stream_id, [request])

# Test delete
respx_mock.post(config.create_api_url(f"/streams/{stream_id}/records/delete")).mock(
return_value=httpx.Response(status_code=200)
)
api.delete(stream_id, [request.as_id()])

def test_metadataapi_crud_iterate(self, toolkit_config: ToolkitClientConfig, respx_mock: respx.MockRouter) -> None:
resource = get_example_minimum_responses(FileMetadataResponse)
instance = FileMetadataResponse.model_validate(resource)
Expand Down
Loading