diff --git a/cognite/client/_api/data_modeling/__init__.py b/cognite/client/_api/data_modeling/__init__.py index 2c0ed3782a..4ec09dfb17 100644 --- a/cognite/client/_api/data_modeling/__init__.py +++ b/cognite/client/_api/data_modeling/__init__.py @@ -8,6 +8,7 @@ from cognite.client._api.data_modeling.instances import InstancesAPI from cognite.client._api.data_modeling.spaces import SpacesAPI from cognite.client._api.data_modeling.statistics import StatisticsAPI +from cognite.client._api.data_modeling.streams import StreamsAPI from cognite.client._api.data_modeling.views import ViewsAPI from cognite.client._api_client import APIClient @@ -26,3 +27,4 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client self.instances = InstancesAPI(config, api_version, cognite_client) self.graphql = DataModelingGraphQLAPI(config, api_version, cognite_client) self.statistics = StatisticsAPI(config, api_version, cognite_client) + self.streams = StreamsAPI(config, api_version, cognite_client) diff --git a/cognite/client/_api/data_modeling/streams.py b/cognite/client/_api/data_modeling/streams.py new file mode 100644 index 0000000000..40727e7275 --- /dev/null +++ b/cognite/client/_api/data_modeling/streams.py @@ -0,0 +1,132 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from cognite.client._api_client import APIClient +from cognite.client.data_classes.data_modeling.streams import Stream, StreamList, StreamWrite +from cognite.client.exceptions import CogniteAPIError +from cognite.client.utils._experimental import FeaturePreviewWarning + +if TYPE_CHECKING: + from cognite.client import CogniteClient + from cognite.client.config import ClientConfig + + +class StreamsAPI(APIClient): + _RESOURCE_PATH = "/streams" + + def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None: + super().__init__(config, api_version, cognite_client) + self._CREATE_LIMIT = 1 + self._warning = FeaturePreviewWarning(api_maturity="beta", sdk_maturity="alpha", feature_name="Streams API") + + def retrieve(self, external_id: str, include_statistics: bool = False) -> Stream | None: + """`Retrieve a stream by external ID. `_ + + Args: + external_id (str): Stream external ID + include_statistics (bool): If set to True, usage statistics will be returned together with stream settings. Defaults to False. + + Returns: + Stream | None: Requested stream or None if it does not exist. + + Examples: + + Retrieve a single stream by external id: + + >>> from cognite.client import CogniteClient + >>> client = CogniteClient() + >>> res = client.data_modeling.streams.retrieve(external_id='myStream') + + Retrieve a stream with statistics: + + >>> res = client.data_modeling.streams.retrieve(external_id='myStream', include_statistics=True) + + """ + self._warning.warn() + params = {"includeStatistics": "true"} if include_statistics else None + try: + result = self._get(url_path=f"{self._RESOURCE_PATH}/{external_id}", params=params) + return Stream._load(result.json(), cognite_client=self._cognite_client) + except CogniteAPIError as e: + if e.code == 404: + return None + raise + + def delete(self, external_id: str) -> None: + """`Delete a stream `_ + + Args: + external_id (str): External ID of stream. + + Examples: + + Delete a stream by external id: + + >>> from cognite.client import CogniteClient + >>> client = CogniteClient() + >>> client.data_modeling.streams.delete(external_id="myStream") + """ + self._warning.warn() + self._post(url_path=f"{self._RESOURCE_PATH}/delete", json={"items": [{"externalId": external_id}]}) + + def list(self) -> StreamList: + """`List streams `_ + + Returns all streams available in the project. + + Returns: + StreamList: List of all streams in the project + + Examples: + + List all streams: + + >>> from cognite.client import CogniteClient + >>> client = CogniteClient() + >>> stream_list = client.data_modeling.streams.list() + + Iterate over the returned list: + + >>> for stream in stream_list: + ... stream # do something with the stream + """ + self._warning.warn() + return self._list( + list_cls=StreamList, + resource_cls=Stream, + method="GET", + limit=None, + ) + + def create(self, stream: StreamWrite) -> Stream: + """`Create a stream. `_ + + Args: + stream (StreamWrite): Stream to create. + + Returns: + Stream: Created stream + + Examples: + + Create a new stream: + + >>> from cognite.client import CogniteClient + >>> from cognite.client.data_classes.data_modeling import StreamWrite, StreamWriteSettings + >>> client = CogniteClient() + >>> stream = StreamWrite( + ... external_id="myStream", + ... settings=StreamWriteSettings(template_name="ImmutableTestStream") + ... ) + >>> res = client.data_modeling.streams.create(stream) + """ + self._warning.warn() + return ( + self._create_multiple( # using create multiple to leverage existing code, but only allowing one at a time + list_cls=StreamList, + resource_cls=Stream, + items=stream, + input_resource_cls=StreamWrite, + ) + ) diff --git a/cognite/client/_api_client.py b/cognite/client/_api_client.py index 5156e37c07..99a9322814 100644 --- a/cognite/client/_api_client.py +++ b/cognite/client/_api_client.py @@ -109,6 +109,7 @@ class APIClient: "simulators/models/revisions", "simulators/models/routines", "simulators/models/routines/revisions", + "streams", "timeseries", "transformations", "transformations/schedules", diff --git a/cognite/client/data_classes/data_modeling/__init__.py b/cognite/client/data_classes/data_modeling/__init__.py index 43207cb1c5..3e40947de7 100644 --- a/cognite/client/data_classes/data_modeling/__init__.py +++ b/cognite/client/data_classes/data_modeling/__init__.py @@ -91,6 +91,16 @@ TypedNodeApply, ) from cognite.client.data_classes.data_modeling.spaces import Space, SpaceApply, SpaceApplyList, SpaceList +from cognite.client.data_classes.data_modeling.streams import ( + Stream, + StreamLifecycleSettings, + StreamLimit, + StreamLimitSettings, + StreamList, + StreamSettings, + StreamWrite, + StreamWriteSettings, +) from cognite.client.data_classes.data_modeling.views import ( ConnectionDefinition, EdgeConnection, @@ -191,6 +201,14 @@ "SpaceApply", "SpaceApplyList", "SpaceList", + "Stream", + "StreamLifecycleSettings", + "StreamLimit", + "StreamLimitSettings", + "StreamList", + "StreamSettings", + "StreamWrite", + "StreamWriteSettings", "Text", "TimeSeriesReference", "Timestamp", diff --git a/cognite/client/data_classes/data_modeling/streams.py b/cognite/client/data_classes/data_modeling/streams.py new file mode 100644 index 0000000000..1fe60057c8 --- /dev/null +++ b/cognite/client/data_classes/data_modeling/streams.py @@ -0,0 +1,251 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Literal + +from typing_extensions import Self + +from cognite.client.data_classes._base import ( + CogniteObject, + CogniteResourceList, + WriteableCogniteResource, +) + +if TYPE_CHECKING: + from cognite.client import CogniteClient + + +class StreamWriteSettings(CogniteObject): + """Settings for creating a stream. + + Args: + template_name (str): The name of the stream template. Templates are project-specific and should be retrieved from your CDF project configuration. + """ + + def __init__(self, template_name: str) -> None: + self.template_name = template_name + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls(template_name=resource["template"]["name"]) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return {"template": {"name": self.template_name}} + + +class StreamLimit(CogniteObject): + """Limit settings for a stream resource. + + Args: + provisioned (float): Amount of resource provisioned. + consumed (float | None): Amount of resource consumed. Only returned when include_statistics=True. + """ + + def __init__(self, provisioned: float, consumed: float | None = None) -> None: + self.provisioned = provisioned + self.consumed = consumed + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls(provisioned=resource["provisioned"], consumed=resource.get("consumed")) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return {"provisioned": self.provisioned, "consumed": self.consumed} + + +class StreamLifecycleSettings(CogniteObject): + """Lifecycle settings for a stream. + + These settings are populated from the stream creation template and cannot be changed. + + Args: + retained_after_soft_delete (str): Time until soft-deleted stream is actually deleted, in ISO-8601 format. + data_deleted_after (str | None): Time after which records are scheduled for deletion, in ISO-8601 format. Only for immutable streams. + """ + + def __init__( + self, + retained_after_soft_delete: str, + data_deleted_after: str | None = None, + ) -> None: + self.retained_after_soft_delete = retained_after_soft_delete + self.data_deleted_after = data_deleted_after + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls( + retained_after_soft_delete=resource["retainedAfterSoftDelete"], + data_deleted_after=resource.get("dataDeletedAfter"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return { + "retainedAfterSoftDelete": self.retained_after_soft_delete, + "dataDeletedAfter": self.data_deleted_after, + } + + +class StreamLimitSettings(CogniteObject): + """Limit settings for a stream. + + Args: + max_records_total (StreamLimit): Maximum number of records that can be stored. + max_giga_bytes_total (StreamLimit): Maximum amount of data in gigabytes. + max_filtering_interval (str | None): Maximum time range for lastUpdatedTime filter, in ISO-8601 format. Only for immutable streams. + """ + + def __init__( + self, + max_records_total: StreamLimit, + max_giga_bytes_total: StreamLimit, + max_filtering_interval: str | None = None, + ) -> None: + self.max_records_total = max_records_total + self.max_giga_bytes_total = max_giga_bytes_total + self.max_filtering_interval = max_filtering_interval + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls( + max_records_total=StreamLimit._load(resource["maxRecordsTotal"]), + max_giga_bytes_total=StreamLimit._load(resource["maxGigaBytesTotal"]), + max_filtering_interval=resource.get("maxFilteringInterval"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return { + "maxRecordsTotal": self.max_records_total.dump(camel_case), + "maxGigaBytesTotal": self.max_giga_bytes_total.dump(camel_case), + "maxFilteringInterval": self.max_filtering_interval, + } + + +class StreamSettings(CogniteObject): + """Response settings for a stream. + + These are read-only settings returned when retrieving a stream. + + Args: + lifecycle (StreamLifecycleSettings): Lifecycle settings. + limits (StreamLimitSettings): Limit settings. + """ + + def __init__(self, lifecycle: StreamLifecycleSettings, limits: StreamLimitSettings) -> None: + self.lifecycle = lifecycle + self.limits = limits + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls( + lifecycle=StreamLifecycleSettings._load(resource["lifecycle"]), + limits=StreamLimitSettings._load(resource["limits"]), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return { + "lifecycle": self.lifecycle.dump(camel_case), + "limits": self.limits.dump(camel_case), + } + + +class StreamWrite(WriteableCogniteResource): + """A stream for data ingestion. This is the write version. + + Args: + external_id (str): A unique identifier for the stream. + settings (StreamWriteSettings): The settings for the stream, including the template. + """ + + def __init__(self, external_id: str, settings: StreamWriteSettings) -> None: + self.external_id = external_id + self.settings = settings + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls( + external_id=resource["externalId"], + settings=StreamWriteSettings._load(resource["settings"], cognite_client), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return { + "externalId": self.external_id, + "settings": self.settings.dump(camel_case=camel_case), + } + + def as_write(self) -> StreamWrite: + """Returns this StreamWrite instance.""" + return self + + +class Stream(WriteableCogniteResource["StreamWrite"]): + """A stream for data ingestion. Streams are immutable after creation and cannot be updated. + + Args: + external_id (str): A unique identifier for the stream. + created_time (int): The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds. + type (Literal['Immutable', 'Mutable']): The type of the stream. + created_from_template (str): The template the stream was created from. + settings (StreamSettings): The stream settings including lifecycle and limits. + """ + + def __init__( + self, + external_id: str, + created_time: int, + type: Literal["Immutable", "Mutable"], + created_from_template: str, + settings: StreamSettings, + ) -> None: + self.external_id = external_id + self.created_time = created_time + self.type = type + self.created_from_template = created_from_template + self.settings = settings + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls( + external_id=resource["externalId"], + created_time=resource["createdTime"], + type=resource["type"], + created_from_template=resource["createdFromTemplate"], + settings=StreamSettings._load(resource["settings"]), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return { + "externalId": self.external_id, + "createdTime": self.created_time, + "type": self.type, + "createdFromTemplate": self.created_from_template, + "settings": self.settings.dump(camel_case), + } + + def as_write(self) -> StreamWrite: + """Returns a StreamWrite object with the same external_id and template. + + Note: Since streams are immutable and cannot be updated, this method creates + a StreamWrite suitable for creating a NEW stream with the same template settings, + not for updating the existing stream. To create a new stream, you must provide + a different external_id. + + Returns: + StreamWrite: A write object based on this stream's template. + """ + return StreamWrite( + external_id=self.external_id, + settings=StreamWriteSettings(template_name=self.created_from_template), + ) + + +class StreamList(CogniteResourceList[Stream]): + _RESOURCE = Stream + + def as_ids(self) -> list[str]: + """ + Converts all the streams to a stream id list. + + Returns: + list[str]: A list of stream ids. + """ + return [item.external_id for item in self] diff --git a/cognite/client/testing.py b/cognite/client/testing.py index e964576b8c..7464fd54c4 100644 --- a/cognite/client/testing.py +++ b/cognite/client/testing.py @@ -20,6 +20,7 @@ from cognite.client._api.data_modeling.space_statistics import SpaceStatisticsAPI from cognite.client._api.data_modeling.spaces import SpacesAPI from cognite.client._api.data_modeling.statistics import StatisticsAPI +from cognite.client._api.data_modeling.streams import StreamsAPI from cognite.client._api.data_modeling.views import ViewsAPI from cognite.client._api.data_sets import DataSetsAPI from cognite.client._api.datapoints import DatapointsAPI @@ -135,6 +136,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: self.data_modeling.graphql = MagicMock(spec_set=DataModelingGraphQLAPI) self.data_modeling.statistics = MagicMock(spec=StatisticsAPI) self.data_modeling.statistics.spaces = MagicMock(spec_set=SpaceStatisticsAPI) + self.data_modeling.streams = MagicMock(spec_set=StreamsAPI) self.data_sets = MagicMock(spec_set=DataSetsAPI) diff --git a/tests/tests_unit/test_api/test_data_modeling/test_streams.py b/tests/tests_unit/test_api/test_data_modeling/test_streams.py new file mode 100644 index 0000000000..dafad5a668 --- /dev/null +++ b/tests/tests_unit/test_api/test_data_modeling/test_streams.py @@ -0,0 +1,225 @@ +import re +from collections.abc import Iterator + +import pytest +from responses import RequestsMock + +from cognite.client import CogniteClient +from cognite.client.data_classes.data_modeling import Stream, StreamWrite, StreamWriteSettings +from cognite.client.exceptions import CogniteAPIError + + +class TestStreams: + @pytest.fixture + def mock_streams_create_response(self, rsps: RequestsMock, cognite_client: CogniteClient) -> Iterator[RequestsMock]: + response_body = { + "items": [ + { + "externalId": "test_stream", + "createdTime": 1761319720908, + "createdFromTemplate": "ImmutableTestStream", + "type": "Immutable", + "settings": { + "lifecycle": { + "retainedAfterSoftDelete": "PT24H", + "dataDeletedAfter": "PT168H", + }, + "limits": { + "maxFilteringInterval": "PT168H", + "maxRecordsTotal": {"provisioned": 500000000}, + "maxGigaBytesTotal": {"provisioned": 500}, + }, + }, + } + ] + } + url_pattern = re.compile( + re.escape(cognite_client.data_modeling.streams._get_base_url_with_base_path()) + "/streams" + ) + rsps.add(rsps.POST, url_pattern, status=201, json=response_body) + yield rsps + + @pytest.fixture + def mock_streams_list_response(self, rsps: RequestsMock, cognite_client: CogniteClient) -> Iterator[RequestsMock]: + response_body = { + "items": [ + { + "externalId": "test_stream_1", + "createdTime": 1760201842000, + "createdFromTemplate": "ImmutableTestStream", + "type": "Immutable", + "settings": { + "lifecycle": { + "retainedAfterSoftDelete": "PT24H", + "dataDeletedAfter": "PT168H", + }, + "limits": { + "maxFilteringInterval": "PT168H", + "maxRecordsTotal": {"provisioned": 500000000}, + "maxGigaBytesTotal": {"provisioned": 500}, + }, + }, + }, + { + "externalId": "test_stream_2", + "createdTime": 1760201817980, + "createdFromTemplate": "MutableTestStream", + "type": "Mutable", + "settings": { + "lifecycle": {"retainedAfterSoftDelete": "PT24H"}, + "limits": { + "maxRecordsTotal": {"provisioned": 10000000}, + "maxGigaBytesTotal": {"provisioned": 20}, + }, + }, + }, + ] + } + url_pattern = re.compile( + re.escape(cognite_client.data_modeling.streams._get_base_url_with_base_path()) + "/streams" + ) + rsps.add(rsps.GET, url_pattern, status=200, json=response_body) + yield rsps + + @pytest.fixture + def mock_streams_retrieve_response( + self, rsps: RequestsMock, cognite_client: CogniteClient + ) -> Iterator[RequestsMock]: + response_body = { + "externalId": "test_stream", + "createdTime": 1760201842000, + "createdFromTemplate": "ImmutableTestStream", + "type": "Immutable", + "settings": { + "lifecycle": { + "retainedAfterSoftDelete": "PT24H", + "hotPhaseDuration": "PT72H", + "dataDeletedAfter": "PT168H", + }, + "limits": { + "maxFilteringInterval": "PT168H", + "maxRecordsTotal": {"provisioned": 500000000}, + "maxGigaBytesTotal": {"provisioned": 500}, + }, + }, + } + url_pattern = re.compile( + re.escape(cognite_client.data_modeling.streams._get_base_url_with_base_path()) + + r"/streams/test_stream(\?.*)?$" + ) + rsps.add(rsps.GET, url_pattern, status=200, json=response_body) + yield rsps + + @pytest.fixture + def mock_streams_retrieve_with_stats_response( + self, rsps: RequestsMock, cognite_client: CogniteClient + ) -> Iterator[RequestsMock]: + response_body = { + "externalId": "test_stream", + "createdTime": 1760201842000, + "createdFromTemplate": "ImmutableTestStream", + "type": "Immutable", + "settings": { + "lifecycle": { + "retainedAfterSoftDelete": "PT24H", + "hotPhaseDuration": "PT72H", + "dataDeletedAfter": "PT168H", + }, + "limits": { + "maxFilteringInterval": "PT168H", + "maxRecordsTotal": {"provisioned": 500000000, "consumed": 25000000}, + "maxGigaBytesTotal": {"provisioned": 500, "consumed": 100}, + }, + }, + } + url_pattern = re.compile( + re.escape(cognite_client.data_modeling.streams._get_base_url_with_base_path()) + + r"/streams/test_stream\?includeStatistics=true$" + ) + rsps.add(rsps.GET, url_pattern, status=200, json=response_body) + yield rsps + + @pytest.fixture + def mock_streams_delete_response(self, rsps: RequestsMock, cognite_client: CogniteClient) -> Iterator[RequestsMock]: + url_pattern = re.compile( + re.escape(cognite_client.data_modeling.streams._get_base_url_with_base_path()) + "/streams/delete" + ) + rsps.add(rsps.POST, url_pattern, status=200, json={}) + yield rsps + + @pytest.fixture + def mock_streams_delete_raise_error( + self, rsps: RequestsMock, cognite_client: CogniteClient + ) -> Iterator[RequestsMock]: + response_body = {"error": {"message": "Stream not found", "code": 404}} + url_pattern = re.compile( + re.escape(cognite_client.data_modeling.streams._get_base_url_with_base_path()) + "/streams/delete" + ) + rsps.add(rsps.POST, url_pattern, status=404, json=response_body) + yield rsps + + def test_create_stream(self, cognite_client: CogniteClient, mock_streams_create_response: RequestsMock) -> None: + stream = StreamWrite( + external_id="test_stream", + settings=StreamWriteSettings(template_name="ImmutableTestStream"), + ) + result = cognite_client.data_modeling.streams.create(stream) + assert isinstance(result, Stream) + assert result.external_id == "test_stream" + assert result.type == "Immutable" + + def test_list_streams(self, cognite_client: CogniteClient, mock_streams_list_response: RequestsMock) -> None: + result = cognite_client.data_modeling.streams.list() + assert len(result) == 2 + assert result[0].external_id == "test_stream_1" + assert result[1].external_id == "test_stream_2" + + def test_retrieve_stream(self, cognite_client: CogniteClient, mock_streams_retrieve_response: RequestsMock) -> None: + result = cognite_client.data_modeling.streams.retrieve(external_id="test_stream") + assert result is not None + assert result.external_id == "test_stream" + assert result.type == "Immutable" + + def test_retrieve_stream_with_statistics( + self, cognite_client: CogniteClient, mock_streams_retrieve_with_stats_response: RequestsMock + ) -> None: + result = cognite_client.data_modeling.streams.retrieve(external_id="test_stream", include_statistics=True) + assert result is not None + assert result.external_id == "test_stream" + assert result.type == "Immutable" + assert result.settings.limits.max_filtering_interval == "PT168H" + assert result.settings.limits.max_records_total.provisioned == 500000000 + assert result.settings.limits.max_records_total.consumed == 25000000 + assert result.settings.limits.max_giga_bytes_total.provisioned == 500 + assert result.settings.limits.max_giga_bytes_total.consumed == 100 + + def test_delete_stream(self, cognite_client: CogniteClient, mock_streams_delete_response: RequestsMock) -> None: + cognite_client.data_modeling.streams.delete(external_id="test_stream") + + def test_failed_delete_stream( + self, cognite_client: CogniteClient, mock_streams_delete_raise_error: RequestsMock + ) -> None: + with pytest.raises(CogniteAPIError) as error: + cognite_client.data_modeling.streams.delete(external_id="i-dont-actually-exist") + assert error.value.code == 404 + + def test_stream_as_write(self, cognite_client: CogniteClient, mock_streams_retrieve_response: RequestsMock) -> None: + stream = cognite_client.data_modeling.streams.retrieve(external_id="test_stream") + assert stream is not None + + # Convert to write object + stream_write = stream.as_write() + assert stream_write.external_id == stream.external_id + assert stream_write.settings.template_name == stream.created_from_template + assert stream_write.settings.template_name == "ImmutableTestStream" + + def test_retrieve_non_existent_stream_returns_none(self, rsps: RequestsMock, cognite_client: CogniteClient) -> None: + response_body = {"error": {"message": "Stream not found", "code": 404}} + url_pattern = re.compile( + re.escape(cognite_client.data_modeling.streams._get_base_url_with_base_path()) + + r"/streams/non_existent_stream(\\?.*)?$" + ) + rsps.add(rsps.GET, url_pattern, status=404, json=response_body) + + result = cognite_client.data_modeling.streams.retrieve(external_id="non_existent_stream") + assert result is None