From 124c53f33c8792d8c41a0ac99c3c63c347923c6e Mon Sep 17 00:00:00 2001 From: anders-albert Date: Fri, 17 Oct 2025 08:14:34 +0200 Subject: [PATCH 1/7] feat: added DataSetSelect --- .../_cdf_tk/utils/interactive_select.py | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/cognite_toolkit/_cdf_tk/utils/interactive_select.py b/cognite_toolkit/_cdf_tk/utils/interactive_select.py index 1a5c886d59..7aa9360211 100644 --- a/cognite_toolkit/_cdf_tk/utils/interactive_select.py +++ b/cognite_toolkit/_cdf_tk/utils/interactive_select.py @@ -10,6 +10,7 @@ from cognite.client.data_classes import ( Asset, DataSet, + DataSetList, UserProfileList, filters, ) @@ -773,3 +774,45 @@ def _get_available_spaces(self, include_global: bool = False) -> SpaceList: if include_global: return self._available_spaces return SpaceList([space for space in self._available_spaces if not space.is_global]) + + +class DataSetSelect: + def __init__(self, client: ToolkitClient, operation: str) -> None: + self.client = client + self.operation = operation + + def select_data_set( + self, multi: bool, allow_empty: bool = False, include_resource_counts: bool = True + ) -> DataSet | DataSetList | None: + datasets = self.client.data_sets.list(limit=-1) + if not datasets and not allow_empty: + raise ToolkitValueError("No data sets available to select.") + choices: list[questionary.Choice] = [] + for dataset in datasets: + title = ( + f"{dataset.name} ({dataset.external_id})" if dataset.name != dataset.external_id else f"{dataset.name}" + ) + if include_resource_counts and dataset.external_id: + asset_count = AssetAggregator(self.client).count(data_set_external_id=dataset.external_id) + event_count = EventAggregator(self.client).count(data_set_external_id=dataset.external_id) + file_count = FileAggregator(self.client).count(data_set_external_id=dataset.external_id) + time_series_count = TimeSeriesAggregator(self.client).count(data_set_external_id=dataset.external_id) + title += f"[Assets: {asset_count:,}, Events: {event_count:,}, Files: {file_count:,}, Time Series: {time_series_count:,}]" + choices.append(questionary.Choice(title=title, value=dataset)) + + message = f"Select a data set to {self.operation} listed as 'name (external_id)'" + if include_resource_counts: + message += " [Assets: x, Events: x, Files: x, Time Series: x]" + if multi: + selected = questionary.checkbox(message, choices=choices).ask() + else: + selected = questionary.select(message, choices=choices).ask() + if selected is None: + raise ToolkitValueError("No data set selected. Aborting.") + + if multi: + if not selected and not allow_empty: + raise ToolkitValueError("No data sets selected. Aborting.") + return DataSetList(selected) + else: + return selected From 8d122e9653ae928978887d2e13af5e75d124761c Mon Sep 17 00:00:00 2001 From: anders-albert Date: Fri, 17 Oct 2025 23:41:33 +0200 Subject: [PATCH 2/7] refactor: support asst hierarchy --- .../_cdf_tk/utils/interactive_select.py | 103 ++++++++++++++---- 1 file changed, 79 insertions(+), 24 deletions(-) diff --git a/cognite_toolkit/_cdf_tk/utils/interactive_select.py b/cognite_toolkit/_cdf_tk/utils/interactive_select.py index 7aa9360211..4eee50612d 100644 --- a/cognite_toolkit/_cdf_tk/utils/interactive_select.py +++ b/cognite_toolkit/_cdf_tk/utils/interactive_select.py @@ -9,6 +9,7 @@ import questionary from cognite.client.data_classes import ( Asset, + AssetList, DataSet, DataSetList, UserProfileList, @@ -776,31 +777,91 @@ def _get_available_spaces(self, include_global: bool = False) -> SpaceList: return SpaceList([space for space in self._available_spaces if not space.is_global]) -class DataSetSelect: +T_Option = TypeVar("T_Option", bound=DataSet | Asset) + + +class AssetCentricInteractive: def __init__(self, client: ToolkitClient, operation: str) -> None: self.client = client self.operation = operation + @overload + def select_data_set( + self, multi: Literal[False], allow_empty: Literal[True] = True, include_resource_counts: bool = True + ) -> DataSet | None: ... + + @overload + def select_data_set( + self, multi: Literal[False], allow_empty: Literal[False] = False, include_resource_counts: bool = True + ) -> DataSet: ... + + @overload + def select_data_set( + self, multi: Literal[True], allow_empty: bool, include_resource_counts: bool = True + ) -> DataSetList: ... + def select_data_set( self, multi: bool, allow_empty: bool = False, include_resource_counts: bool = True ) -> DataSet | DataSetList | None: datasets = self.client.data_sets.list(limit=-1) - if not datasets and not allow_empty: - raise ToolkitValueError("No data sets available to select.") + selected = self._select( + datasets, "data set", "data_set_external_id", multi, allow_empty, include_resource_counts + ) + if isinstance(selected, list): + return DataSetList(selected) + return selected + + @overload + def select_hierarchy( + self, multi: Literal[False], allow_empty: Literal[True] = True, include_resource_counts: bool = True + ) -> Asset | None: ... + + @overload + def select_hierarchy( + self, multi: Literal[False], allow_empty: Literal[False] = False, include_resource_counts: bool = True + ) -> Asset: ... + + @overload + def select_hierarchy(self, multi: Literal[True], allow_empty: bool, include_resource_counts: bool) -> AssetList: ... + + def select_hierarchy( + self, multi: bool, allow_empty: bool = False, include_resource_counts: bool = True + ) -> Asset | AssetList | None: + hierarchies = self.client.assets.list(root=True, limit=-1) + selected = self._select(hierarchies, "hierarchy", "hierarchy", multi, allow_empty, include_resource_counts) + if isinstance(selected, list): + return AssetList(selected) + return selected + + def _select( + self, + options: Sequence[T_Option], + display_name: str, + count_arg: Literal["data_set_external_id", "hierarchy"], + multi: bool, + allow_empty: bool = False, + include_resource_counts: bool = True, + ) -> T_Option | list[T_Option] | None: + if not options and not allow_empty: + raise ToolkitValueError(f"No {display_name} is available to select.") choices: list[questionary.Choice] = [] - for dataset in datasets: - title = ( - f"{dataset.name} ({dataset.external_id})" if dataset.name != dataset.external_id else f"{dataset.name}" - ) - if include_resource_counts and dataset.external_id: - asset_count = AssetAggregator(self.client).count(data_set_external_id=dataset.external_id) - event_count = EventAggregator(self.client).count(data_set_external_id=dataset.external_id) - file_count = FileAggregator(self.client).count(data_set_external_id=dataset.external_id) - time_series_count = TimeSeriesAggregator(self.client).count(data_set_external_id=dataset.external_id) - title += f"[Assets: {asset_count:,}, Events: {event_count:,}, Files: {file_count:,}, Time Series: {time_series_count:,}]" - choices.append(questionary.Choice(title=title, value=dataset)) - - message = f"Select a data set to {self.operation} listed as 'name (external_id)'" + for option in options: + title = f"{option.name} ({option.external_id})" if option.name != option.external_id else f"{option.name}" + if include_resource_counts and option.external_id: + kwargs = {count_arg: option.external_id} + + # MyPy fails to recognize that kwargs has string keys here. + asset_count = AssetAggregator(self.client).count(**kwargs) # type: ignore[misc] + event_count = EventAggregator(self.client).count(**kwargs) # type: ignore[misc] + file_count = FileAggregator(self.client).count(**kwargs) # type: ignore[misc] + time_series_count = TimeSeriesAggregator(self.client).count(**kwargs) # type: ignore[misc] + title += ( + f"[Assets: {asset_count:,}, Events: {event_count:,}, " + f"Files: {file_count:,}, Time Series: {time_series_count:,}]" + ) + choices.append(questionary.Choice(title=title, value=option)) + + message = f"Select a {display_name} to {self.operation} listed as 'name (external_id)'" if include_resource_counts: message += " [Assets: x, Events: x, Files: x, Time Series: x]" if multi: @@ -808,11 +869,5 @@ def select_data_set( else: selected = questionary.select(message, choices=choices).ask() if selected is None: - raise ToolkitValueError("No data set selected. Aborting.") - - if multi: - if not selected and not allow_empty: - raise ToolkitValueError("No data sets selected. Aborting.") - return DataSetList(selected) - else: - return selected + raise ToolkitValueError(f"No {display_name} selected. Aborting.") + return selected From 2ddfb6e3d821a84a95bdf2bf8835dcd40ce97e45 Mon Sep 17 00:00:00 2001 From: anders-albert Date: Fri, 17 Oct 2025 23:42:46 +0200 Subject: [PATCH 3/7] refactro: rename for clarity --- cognite_toolkit/_cdf_tk/utils/interactive_select.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cognite_toolkit/_cdf_tk/utils/interactive_select.py b/cognite_toolkit/_cdf_tk/utils/interactive_select.py index 4eee50612d..c6e76b13d2 100644 --- a/cognite_toolkit/_cdf_tk/utils/interactive_select.py +++ b/cognite_toolkit/_cdf_tk/utils/interactive_select.py @@ -44,7 +44,7 @@ T_Type = TypeVar("T_Type", bound=Asset | DataSet) -class AssetCentricInteractiveSelect(ABC): +class AssetCentricBaseSelect(ABC): def __init__(self, client: ToolkitClient, operation: str) -> None: self.client = client self.operation = operation @@ -222,22 +222,22 @@ def _select( return selected -class AssetInteractiveSelect(AssetCentricInteractiveSelect): +class AssetInteractiveSelect(AssetCentricBaseSelect): def _get_aggregator(self, client: ToolkitClient) -> AssetCentricAggregator: return AssetAggregator(self.client) -class FileMetadataInteractiveSelect(AssetCentricInteractiveSelect): +class FileMetadataInteractiveSelect(AssetCentricBaseSelect): def _get_aggregator(self, client: ToolkitClient) -> AssetCentricAggregator: return FileAggregator(self.client) -class TimeSeriesInteractiveSelect(AssetCentricInteractiveSelect): +class TimeSeriesInteractiveSelect(AssetCentricBaseSelect): def _get_aggregator(self, client: ToolkitClient) -> AssetCentricAggregator: return TimeSeriesAggregator(self.client) -class EventInteractiveSelect(AssetCentricInteractiveSelect): +class EventInteractiveSelect(AssetCentricBaseSelect): def _get_aggregator(self, client: ToolkitClient) -> AssetCentricAggregator: return EventAggregator(self.client) From e780a2d46de13fdbb7e2815b46281bddf0c4a8c3 Mon Sep 17 00:00:00 2001 From: anders-albert Date: Fri, 17 Oct 2025 23:46:17 +0200 Subject: [PATCH 4/7] refactor: add sorting --- cognite_toolkit/_cdf_tk/utils/interactive_select.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cognite_toolkit/_cdf_tk/utils/interactive_select.py b/cognite_toolkit/_cdf_tk/utils/interactive_select.py index c6e76b13d2..115064d66e 100644 --- a/cognite_toolkit/_cdf_tk/utils/interactive_select.py +++ b/cognite_toolkit/_cdf_tk/utils/interactive_select.py @@ -845,7 +845,7 @@ def _select( if not options and not allow_empty: raise ToolkitValueError(f"No {display_name} is available to select.") choices: list[questionary.Choice] = [] - for option in options: + for option in sorted(options, key=lambda o: o.name or o.external_id or ""): title = f"{option.name} ({option.external_id})" if option.name != option.external_id else f"{option.name}" if include_resource_counts and option.external_id: kwargs = {count_arg: option.external_id} From ddebab672a86d42e2fe5886c44f0adbfa21c9f6f Mon Sep 17 00:00:00 2001 From: anders-albert Date: Sat, 18 Oct 2025 00:23:57 +0200 Subject: [PATCH 5/7] fix: bug --- cognite_toolkit/_cdf_tk/utils/interactive_select.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cognite_toolkit/_cdf_tk/utils/interactive_select.py b/cognite_toolkit/_cdf_tk/utils/interactive_select.py index 115064d66e..1559ff8d9d 100644 --- a/cognite_toolkit/_cdf_tk/utils/interactive_select.py +++ b/cognite_toolkit/_cdf_tk/utils/interactive_select.py @@ -868,6 +868,6 @@ def _select( selected = questionary.checkbox(message, choices=choices).ask() else: selected = questionary.select(message, choices=choices).ask() - if selected is None: + if selected is None and not allow_empty: raise ToolkitValueError(f"No {display_name} selected. Aborting.") return selected From 354f6178294ef9f2048d6ddaa30e742840ca2530 Mon Sep 17 00:00:00 2001 From: anders-albert Date: Sat, 18 Oct 2025 00:24:18 +0200 Subject: [PATCH 6/7] tests: added first test. --- .../test_utils/test_interactive_select.py | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/tests/test_unit/test_cdf_tk/test_utils/test_interactive_select.py b/tests/test_unit/test_cdf_tk/test_utils/test_interactive_select.py index 8409e312ab..ecbcfb177a 100644 --- a/tests/test_unit/test_cdf_tk/test_utils/test_interactive_select.py +++ b/tests/test_unit/test_cdf_tk/test_utils/test_interactive_select.py @@ -26,6 +26,7 @@ from cognite_toolkit._cdf_tk.utils.aggregators import AssetCentricAggregator from cognite_toolkit._cdf_tk.utils.interactive_select import ( AssetCentricDestinationSelect, + AssetCentricInteractive, AssetInteractiveSelect, DataModelingSelect, EventInteractiveSelect, @@ -967,3 +968,47 @@ def test_select_instance_spaces_without_view_or_instance_type_no_instances(self, selector.select_instance_space() assert "No instances found in any space" in str(exc_info.value) + + +class TestAssetCentricInteractive: + @pytest.mark.parametrize( + "multi,allow_empty,expected", + [ + pytest.param(False, False, DataSet("B"), id="Single select, no empty"), + pytest.param(True, False, [DataSet("B")], id="Multi select, no empty"), + pytest.param(False, True, None, id="Single select, allow empty"), + pytest.param(True, True, [], id="Multi select, allow empty"), + ], + ) + def test_select_dataset( + self, multi: bool, allow_empty: bool, expected: DataSet | list[DataSet] | None, monkeypatch + ) -> None: + data_sets = [DataSet(external_id=letter) for letter in "ABC"] + + def select_data_set(choices: list[Choice]) -> DataSet | list[DataSet] | None: + assert len(choices) == 3 + if allow_empty and multi: + return [] + elif allow_empty: + return None + elif multi: + return [choices[1].value] # Select "B" + else: + return choices[1].value # Select "B" + + answers = [select_data_set] + + with ( + monkeypatch_toolkit_client() as client, + MockQuestionary(AssetInteractiveSelect.__module__, monkeypatch, answers), + ): + client.data_sets.list.return_value = data_sets + client.assets.aggregate_count.return_value = 10 + client.events.aggregate_count.return_value = 20 + client.files.aggregate.return_value = [CountAggregate(30)] + client.time_series.aggregate_count.return_value = 40 + + selector = AssetCentricInteractive(client, "test_operation") + result = selector.select_data_set(multi, allow_empty, include_resource_counts=True) + + assert result == expected From 582b2f84c8460e02bfb4cc69e764444afdb5393d Mon Sep 17 00:00:00 2001 From: anders-albert Date: Sat, 18 Oct 2025 09:16:51 +0200 Subject: [PATCH 7/7] tests: added test --- .../test_utils/test_interactive_select.py | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/tests/test_unit/test_cdf_tk/test_utils/test_interactive_select.py b/tests/test_unit/test_cdf_tk/test_utils/test_interactive_select.py index ecbcfb177a..78b79bdd33 100644 --- a/tests/test_unit/test_cdf_tk/test_utils/test_interactive_select.py +++ b/tests/test_unit/test_cdf_tk/test_utils/test_interactive_select.py @@ -1012,3 +1012,45 @@ def select_data_set(choices: list[Choice]) -> DataSet | list[DataSet] | None: result = selector.select_data_set(multi, allow_empty, include_resource_counts=True) assert result == expected + + @pytest.mark.parametrize( + "multi,allow_empty,expected", + [ + pytest.param(False, False, Asset(external_id="B"), id="Single select, no empty"), + pytest.param(True, False, [Asset(external_id="B")], id="Multi select, no empty"), + pytest.param(False, True, None, id="Single select, allow empty"), + pytest.param(True, True, [], id="Multi select, allow empty"), + ], + ) + def test_select_hierarchy( + self, multi: bool, allow_empty: bool, expected: Asset | list[Asset] | None, monkeypatch + ) -> None: + hierarchies = [Asset(external_id=letter) for letter in "ABC"] + + def select_hierarchy(choices: list[Choice]) -> Asset | list[Asset] | None: + assert len(choices) == 3 + if allow_empty and multi: + return [] + elif allow_empty: + return None + elif multi: + return [choices[1].value] # Select "B" + else: + return choices[1].value # Select "B" + + answers = [select_hierarchy] + + with ( + monkeypatch_toolkit_client() as client, + MockQuestionary(AssetInteractiveSelect.__module__, monkeypatch, answers), + ): + client.assets.list.return_value = hierarchies + client.assets.aggregate_count.return_value = 10 + client.events.aggregate_count.return_value = 20 + client.files.aggregate.return_value = [CountAggregate(30)] + client.time_series.aggregate_count.return_value = 40 + + selector = AssetCentricInteractive(client, "test_operation") + result = selector.select_hierarchy(multi, allow_empty, include_resource_counts=True) + + assert result == expected