Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
bf6d4ae
[CDF-27106] ✨ Add Data Product Versions (Alpha)
ronpal Feb 18, 2026
1a8bb52
trigger CI
ronpal Feb 18, 2026
3c7c4b8
Skip DataProductVersionCRUD in integration tests
ronpal Feb 18, 2026
8619445
Use bare list[] instead of builtins.list[] to match codebase style
ronpal Feb 18, 2026
71d5dd4
Switch data product versions from auto-assigned versionId to semantic…
ronpal Feb 18, 2026
1880b11
Refactor data product versions API: add _resolve() helper, use base c…
ronpal Feb 18, 2026
1a97a04
Merge branch 'main' of https://github.com/cognitedata/toolkit into cd…
ronpal Feb 18, 2026
8bcc440
Fix FakeCogniteResourceGenerator: generate valid semantic versions fo…
ronpal Feb 18, 2026
62453d6
Update cognite_toolkit/_cdf_tk/client/api/data_product_versions.py
ronpal Feb 18, 2026
0c53d86
Address PR review: refactor API signatures, add space validation, imp…
ronpal Feb 19, 2026
da2d42a
Merge branch 'main' of https://github.com/cognitedata/toolkit into cd…
ronpal Feb 19, 2026
8d967c3
Merge branch 'cdf-27106/add-data-product-versions' of https://github.…
ronpal Feb 19, 2026
3fec250
Reorder iterate/list methods and remove type: ignore comment
ronpal Feb 19, 2026
e56dd59
Fix CI: handle Annotated pattern constraints in fake generator, add v…
ronpal Feb 19, 2026
1b61ae5
Update snapshot for DataProductVersionResponse in complete_org_alpha_…
ronpal Feb 19, 2026
5a1663a
Add comment explaining why as_update builds the payload manually
ronpal Feb 19, 2026
d92b077
Refactor FakeCogniteResourceGenerator: extract constraint helpers for…
ronpal Feb 19, 2026
5c5fd2b
Apply suggestion from @doctrino
ronpal Feb 19, 2026
1bc39b9
Inline URL construction in DataProductVersionsAPI, remove _resolve he…
ronpal Feb 19, 2026
bddc651
Merge branch 'main' into cdf-27106/add-data-product-versions
ronpal Feb 19, 2026
027073c
Fix ruff formatting issues
ronpal Feb 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 157 additions & 0 deletions cognite_toolkit/_cdf_tk/client/api/data_product_versions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
"""Data Product Versions API for managing versions of CDF data products."""

from collections import defaultdict
from collections.abc import Iterable, Sequence
from typing import Literal

from cognite_toolkit._cdf_tk.client.cdf_client import CDFResourceAPI, Endpoint, PagedResponse
from cognite_toolkit._cdf_tk.client.http_client import (
HTTPClient,
ItemsSuccessResponse,
RequestMessage,
SuccessResponse,
)
from cognite_toolkit._cdf_tk.client.resource_classes.data_product_version import (
DataProductVersionRequest,
DataProductVersionResponse,
)
from cognite_toolkit._cdf_tk.client.resource_classes.identifiers import DataProductVersionId


class DataProductVersionsAPI(
CDFResourceAPI[DataProductVersionId, DataProductVersionRequest, DataProductVersionResponse]
):
"""API for managing data product versions.

All endpoints are scoped under a parent data product: /dataproducts/{externalId}/versions.
Versions are identified by a user-specified semantic version string (major.minor.patch).
"""

def __init__(self, http_client: HTTPClient) -> None:
super().__init__(
http_client=http_client,
method_endpoint_map={
"create": Endpoint(method="POST", path="/dataproducts/{externalId}/versions", item_limit=1),
"retrieve": Endpoint(method="GET", path="/dataproducts/{externalId}/versions/{version}", item_limit=1),
"update": Endpoint(method="POST", path="/dataproducts/{externalId}/versions/update", item_limit=1),
"delete": Endpoint(method="POST", path="/dataproducts/{externalId}/versions/delete", item_limit=10),
"list": Endpoint(method="GET", path="/dataproducts/{externalId}/versions", item_limit=10),
},
api_version="alpha",
)

def _validate_page_response(
self, response: SuccessResponse | ItemsSuccessResponse
) -> PagedResponse[DataProductVersionResponse]:
return PagedResponse[DataProductVersionResponse].model_validate_json(response.body)

@staticmethod
def _group_by_parent(
items: Sequence[DataProductVersionRequest],
) -> dict[str, list[DataProductVersionRequest]]:
by_parent: dict[str, list[DataProductVersionRequest]] = {}
for item in items:
by_parent.setdefault(item.data_product_external_id, []).append(item)
return by_parent

def create(self, items: Sequence[DataProductVersionRequest]) -> list[DataProductVersionResponse]:
results: list[DataProductVersionResponse] = []
for dp_ext_id, group in self._group_by_parent(items).items():
url = self._make_url(self._method_endpoint_map["create"].path.format(externalId=dp_ext_id))
for item in group:
response = self._http_client.request_single_retries(
RequestMessage(
endpoint_url=url,
method="POST",
body_content={"items": [item.dump()]},
api_version=self._api_version,
)
)
page = self._validate_page_response(response.get_success_or_raise())
for version in page.items:
version.data_product_external_id = dp_ext_id
results.extend(page.items)
return results

def retrieve(
self,
items: Sequence[DataProductVersionId],
ignore_unknown_ids: bool = False,
) -> list[DataProductVersionResponse]:
results: list[DataProductVersionResponse] = []
for item in items:
url = self._make_url(
self._method_endpoint_map["retrieve"].path.format(
externalId=item.data_product_external_id, version=item.version
)
)
response = self._http_client.request_single_retries(
RequestMessage(
endpoint_url=url,
method="GET",
api_version=self._api_version,
)
)
if isinstance(response, SuccessResponse):
ver = DataProductVersionResponse.model_validate_json(response.body)
ver.data_product_external_id = item.data_product_external_id
results.append(ver)
elif ignore_unknown_ids:
continue
else:
_ = response.get_success_or_raise()
return results

def update(
self,
items: Sequence[DataProductVersionRequest],
mode: Literal["patch", "replace"] = "replace",
) -> list[DataProductVersionResponse]:
results: list[DataProductVersionResponse] = []
for dp_ext_id, group in self._group_by_parent(items).items():
url = self._make_url(self._method_endpoint_map["update"].path.format(externalId=dp_ext_id))
for item in group:
response = self._http_client.request_single_retries(
RequestMessage(
endpoint_url=url,
method="POST",
body_content={"items": [item.as_update(mode=mode)]},
api_version=self._api_version,
)
)
page = self._validate_page_response(response.get_success_or_raise())
for ver in page.items:
ver.data_product_external_id = dp_ext_id
results.extend(page.items)
return results

def delete(self, ids: Sequence[DataProductVersionId]) -> None:
by_parent: defaultdict[str, list[str]] = defaultdict(list)
for id_ in ids:
by_parent[id_.data_product_external_id].append(id_.version)
for dp_ext_id, versions in by_parent.items():
url = self._make_url(self._method_endpoint_map["delete"].path.format(externalId=dp_ext_id))
self._http_client.request_single_retries(
RequestMessage(
endpoint_url=url,
method="POST",
body_content={"items": [{"version": v} for v in versions]},
api_version=self._api_version,
)
).get_success_or_raise()

def iterate(
self, data_product_external_id: str, limit: int | None = 10
) -> Iterable[list[DataProductVersionResponse]]:
path = self._method_endpoint_map["list"].path.format(externalId=data_product_external_id)
for batch in self._iterate(limit=limit, endpoint_path=path):
for item in batch:
item.data_product_external_id = data_product_external_id
yield batch

def list(self, data_product_external_id: str, limit: int | None = 10) -> list[DataProductVersionResponse]:
path = self._method_endpoint_map["list"].path.format(externalId=data_product_external_id)
items = self._list(limit=limit, endpoint_path=path)
for item in items:
item.data_product_external_id = data_product_external_id
return items
3 changes: 3 additions & 0 deletions cognite_toolkit/_cdf_tk/client/api/data_products.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from cognite_toolkit._cdf_tk.client.resource_classes.data_product import DataProductRequest, DataProductResponse
from cognite_toolkit._cdf_tk.client.resource_classes.identifiers import ExternalId

from .data_product_versions import DataProductVersionsAPI


class DataProductsAPI(CDFResourceAPI[ExternalId, DataProductRequest, DataProductResponse]):
"""API for managing CDF data products."""
Expand All @@ -24,6 +26,7 @@ def __init__(self, http_client: HTTPClient) -> None:
},
api_version="alpha",
)
self.versions = DataProductVersionsAPI(http_client)

def _validate_page_response(
self, response: SuccessResponse | ItemsSuccessResponse
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from typing import Annotated, Any, ClassVar, Literal

from pydantic import Field

from cognite_toolkit._cdf_tk.client._resource_base import (
BaseModelObject,
ResponseResource,
UpdatableRequestResource,
)
from cognite_toolkit._cdf_tk.constants import SPACE_FORMAT_PATTERN

from .identifiers import DataProductVersionId, SemanticVersion

SpaceId = Annotated[str, Field(pattern=SPACE_FORMAT_PATTERN, max_length=43)]


class ViewInstanceSpaces(BaseModelObject):
read: list[SpaceId] = Field(default_factory=list)
write: list[SpaceId] = Field(default_factory=list)


class DataProductVersionView(BaseModelObject):
external_id: str
instance_spaces: ViewInstanceSpaces = Field(default_factory=ViewInstanceSpaces)


class DataProductVersionDataModel(BaseModelObject):
external_id: str
version: str
views: list[DataProductVersionView] = Field(default_factory=list)


class DataProductVersionTerms(BaseModelObject):
usage: str | None = None
limitations: str | None = None


class DataProductVersion(BaseModelObject):
data_product_external_id: str = Field(exclude=True)
version: SemanticVersion
data_model: DataProductVersionDataModel
status: Literal["draft", "published", "deprecated"] = "draft"
description: str | None = None
terms: DataProductVersionTerms | None = None

def as_id(self) -> DataProductVersionId:
return DataProductVersionId(
data_product_external_id=self.data_product_external_id,
version=self.version,
)


class DataProductVersionRequest(DataProductVersion, UpdatableRequestResource):
container_fields: ClassVar[frozenset[str]] = frozenset()

def as_update(self, mode: Literal["patch", "replace"]) -> dict[str, Any]:
# The versions update API uses nested {set}/{setNull}/{modify} operators
# instead of a flat body, so we must build the payload manually.
Comment on lines +57 to +58
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is something we can give feeback on to the API team. This update is deviating significantly from other updates with very nested structure. For example, HostedExtractros have a similar issue and solves it in a more standardized way.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feedback given!

update_item: dict[str, Any] = {"version": self.version}
update: dict[str, Any] = {}
exclude_unset = mode == "patch"
dumped = self.model_dump(mode="json", by_alias=True, exclude_unset=exclude_unset)

for key in ("status", "description"):
if key not in dumped:
continue
if dumped[key] is None:
update[key] = {"setNull": True}
else:
update[key] = {"set": dumped[key]}

if "terms" in dumped and dumped["terms"] is not None:
terms_modify: dict[str, Any] = {}
for sub_key in ("usage", "limitations"):
if sub_key in dumped["terms"]:
val = dumped["terms"][sub_key]
terms_modify[sub_key] = {"setNull": True} if val is None else {"set": val}
if terms_modify:
update["terms"] = {"modify": terms_modify}

if "dataModel" in dumped and dumped["dataModel"] is not None:
views = dumped["dataModel"].get("views")
if views is not None:
update["dataModel"] = {"modify": {"views": {"set": views}}}

update_item["update"] = update
return update_item


class DataProductVersionResponse(DataProductVersion, ResponseResource[DataProductVersionRequest]):
created_time: int
last_updated_time: int

def as_request_resource(self) -> DataProductVersionRequest:
data = self.dump(camel_case=False)
data["data_product_external_id"] = self.data_product_external_id
return DataProductVersionRequest.model_validate(data, extra="ignore")
17 changes: 17 additions & 0 deletions cognite_toolkit/_cdf_tk/client/resource_classes/identifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@

from cognite_toolkit._cdf_tk.client._resource_base import Identifier

SemanticVersion = Annotated[
str,
Field(
min_length=5,
max_length=14,
pattern=r"^(0|[1-9]\d{0,3})\.(0|[1-9]\d{0,3})\.(0|[1-9]\d{0,3})$",
),
]


class InternalOrExternalIdDefinition(Identifier):
type: str
Expand Down Expand Up @@ -130,6 +139,14 @@ def __str__(self) -> str:
return f"dataSetId={self.data_set_id}"


class DataProductVersionId(Identifier):
data_product_external_id: str
version: SemanticVersion

def __str__(self) -> str:
return f"dataProductExternalId='{self.data_product_external_id}', version='{self.version}'"


class TransformationNotificationId(Identifier):
transformation_external_id: str
destination: str
Expand Down
4 changes: 3 additions & 1 deletion cognite_toolkit/_cdf_tk/client/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from ._toolkit_client import ToolAPI
from .api.agents import AgentsAPI
from .api.assets import AssetsAPI
from .api.data_product_versions import DataProductVersionsAPI
from .api.data_products import DataProductsAPI
from .api.datasets import DataSetsAPI
from .api.events import EventsAPI
Expand Down Expand Up @@ -221,7 +222,8 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
self.tool.workflows = MagicMock(spec=WorkflowsAPI)
self.tool.workflows.triggers = MagicMock(spec_set=WorkflowTriggersAPI)
self.tool.workflows.versions = MagicMock(spec_set=WorkflowVersionsAPI)
self.tool.data_products = MagicMock(spec_set=DataProductsAPI)
self.tool.data_products = MagicMock(spec=DataProductsAPI)
self.tool.data_products.versions = MagicMock(spec_set=DataProductVersionsAPI)

self.streams = MagicMock(spec=StreamsAPI)

Expand Down
3 changes: 3 additions & 0 deletions cognite_toolkit/_cdf_tk/cruds/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
DataModelCRUD,
DatapointSubscriptionCRUD,
DataProductCRUD,
DataProductVersionCRUD,
DataSetsCRUD,
EdgeCRUD,
EventCRUD,
Expand Down Expand Up @@ -99,6 +100,7 @@
_EXCLUDED_CRUDS.add(SimulatorModelRevisionCRUD)
if not FeatureFlag.is_enabled(Flags.DATA_PRODUCTS):
_EXCLUDED_CRUDS.add(DataProductCRUD)
_EXCLUDED_CRUDS.add(DataProductVersionCRUD)

CRUDS_BY_FOLDER_NAME_INCLUDE_ALPHA: defaultdict[str, list[type[Loader]]] = defaultdict(list)
CRUDS_BY_FOLDER_NAME: defaultdict[str, list[type[Loader]]] = defaultdict(list)
Expand Down Expand Up @@ -194,6 +196,7 @@ def get_crud(resource_dir: str, kind: str) -> type[Loader]:
"DataCRUD",
"DataModelCRUD",
"DataProductCRUD",
"DataProductVersionCRUD",
"DataSetsCRUD",
"DatapointSubscriptionCRUD",
"DatapointsCRUD",
Expand Down
2 changes: 2 additions & 0 deletions cognite_toolkit/_cdf_tk/cruds/_resource_cruds/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .configuration import SearchConfigCRUD
from .data_organization import DataSetsCRUD, LabelCRUD
from .data_product import DataProductCRUD
from .data_product_version import DataProductVersionCRUD
from .datamodel import (
ContainerCRUD,
DataModelCRUD,
Expand Down Expand Up @@ -55,6 +56,7 @@
"ContainerCRUD",
"DataModelCRUD",
"DataProductCRUD",
"DataProductVersionCRUD",
"DataSetsCRUD",
"DatapointSubscriptionCRUD",
"EdgeCRUD",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ def get_required_capability(
cls, items: Sequence[DataProductRequest] | None, read_only: bool
) -> Capability | list[Capability]:

# dataproductsAcl is not yet in the SDK — return empty to skip capability verification.
# TODO: dataproductsAcl is not yet in the SDK — return empty to skip capability verification.
# Cannot use UnknownACL due to bug in the SDK.
# Once available, require: CREATE, READ, UPDATE, DELETE (all four actions).
return []

def dump_resource(self, resource: DataProductResponse, local: dict[str, Any] | None = None) -> dict[str, Any]:
Expand Down
Loading