diff --git a/cognite_toolkit/_cdf_tk/utils/interactive_select.py b/cognite_toolkit/_cdf_tk/utils/interactive_select.py index 1a5c886d59..1559ff8d9d 100644 --- a/cognite_toolkit/_cdf_tk/utils/interactive_select.py +++ b/cognite_toolkit/_cdf_tk/utils/interactive_select.py @@ -9,7 +9,9 @@ import questionary from cognite.client.data_classes import ( Asset, + AssetList, DataSet, + DataSetList, UserProfileList, filters, ) @@ -42,7 +44,7 @@ T_Type = TypeVar("T_Type", bound=Asset | DataSet) -class AssetCentricInteractiveSelect(ABC): +class AssetCentricBaseSelect(ABC): def __init__(self, client: ToolkitClient, operation: str) -> None: self.client = client self.operation = operation @@ -220,22 +222,22 @@ def _select( return selected -class AssetInteractiveSelect(AssetCentricInteractiveSelect): +class AssetInteractiveSelect(AssetCentricBaseSelect): def _get_aggregator(self, client: ToolkitClient) -> AssetCentricAggregator: return AssetAggregator(self.client) -class FileMetadataInteractiveSelect(AssetCentricInteractiveSelect): +class FileMetadataInteractiveSelect(AssetCentricBaseSelect): def _get_aggregator(self, client: ToolkitClient) -> AssetCentricAggregator: return FileAggregator(self.client) -class TimeSeriesInteractiveSelect(AssetCentricInteractiveSelect): +class TimeSeriesInteractiveSelect(AssetCentricBaseSelect): def _get_aggregator(self, client: ToolkitClient) -> AssetCentricAggregator: return TimeSeriesAggregator(self.client) -class EventInteractiveSelect(AssetCentricInteractiveSelect): +class EventInteractiveSelect(AssetCentricBaseSelect): def _get_aggregator(self, client: ToolkitClient) -> AssetCentricAggregator: return EventAggregator(self.client) @@ -773,3 +775,99 @@ def _get_available_spaces(self, include_global: bool = False) -> SpaceList: if include_global: return self._available_spaces return SpaceList([space for space in self._available_spaces if not space.is_global]) + + +T_Option = TypeVar("T_Option", bound=DataSet | Asset) + + +class AssetCentricInteractive: + def __init__(self, client: ToolkitClient, operation: str) -> None: + self.client = client + self.operation = operation + + @overload + def select_data_set( + self, multi: Literal[False], allow_empty: Literal[True] = True, include_resource_counts: bool = True + ) -> DataSet | None: ... + + @overload + def select_data_set( + self, multi: Literal[False], allow_empty: Literal[False] = False, include_resource_counts: bool = True + ) -> DataSet: ... + + @overload + def select_data_set( + self, multi: Literal[True], allow_empty: bool, include_resource_counts: bool = True + ) -> DataSetList: ... + + def select_data_set( + self, multi: bool, allow_empty: bool = False, include_resource_counts: bool = True + ) -> DataSet | DataSetList | None: + datasets = self.client.data_sets.list(limit=-1) + selected = self._select( + datasets, "data set", "data_set_external_id", multi, allow_empty, include_resource_counts + ) + if isinstance(selected, list): + return DataSetList(selected) + return selected + + @overload + def select_hierarchy( + self, multi: Literal[False], allow_empty: Literal[True] = True, include_resource_counts: bool = True + ) -> Asset | None: ... + + @overload + def select_hierarchy( + self, multi: Literal[False], allow_empty: Literal[False] = False, include_resource_counts: bool = True + ) -> Asset: ... + + @overload + def select_hierarchy(self, multi: Literal[True], allow_empty: bool, include_resource_counts: bool) -> AssetList: ... + + def select_hierarchy( + self, multi: bool, allow_empty: bool = False, include_resource_counts: bool = True + ) -> Asset | AssetList | None: + hierarchies = self.client.assets.list(root=True, limit=-1) + selected = self._select(hierarchies, "hierarchy", "hierarchy", multi, allow_empty, include_resource_counts) + if isinstance(selected, list): + return AssetList(selected) + return selected + + def _select( + self, + options: Sequence[T_Option], + display_name: str, + count_arg: Literal["data_set_external_id", "hierarchy"], + multi: bool, + allow_empty: bool = False, + include_resource_counts: bool = True, + ) -> T_Option | list[T_Option] | None: + if not options and not allow_empty: + raise ToolkitValueError(f"No {display_name} is available to select.") + choices: list[questionary.Choice] = [] + for option in sorted(options, key=lambda o: o.name or o.external_id or ""): + title = f"{option.name} ({option.external_id})" if option.name != option.external_id else f"{option.name}" + if include_resource_counts and option.external_id: + kwargs = {count_arg: option.external_id} + + # MyPy fails to recognize that kwargs has string keys here. + asset_count = AssetAggregator(self.client).count(**kwargs) # type: ignore[misc] + event_count = EventAggregator(self.client).count(**kwargs) # type: ignore[misc] + file_count = FileAggregator(self.client).count(**kwargs) # type: ignore[misc] + time_series_count = TimeSeriesAggregator(self.client).count(**kwargs) # type: ignore[misc] + title += ( + f"[Assets: {asset_count:,}, Events: {event_count:,}, " + f"Files: {file_count:,}, Time Series: {time_series_count:,}]" + ) + choices.append(questionary.Choice(title=title, value=option)) + + message = f"Select a {display_name} to {self.operation} listed as 'name (external_id)'" + if include_resource_counts: + message += " [Assets: x, Events: x, Files: x, Time Series: x]" + if multi: + selected = questionary.checkbox(message, choices=choices).ask() + else: + selected = questionary.select(message, choices=choices).ask() + if selected is None and not allow_empty: + raise ToolkitValueError(f"No {display_name} selected. Aborting.") + return selected diff --git a/tests/test_unit/test_cdf_tk/test_utils/test_interactive_select.py b/tests/test_unit/test_cdf_tk/test_utils/test_interactive_select.py index 8409e312ab..78b79bdd33 100644 --- a/tests/test_unit/test_cdf_tk/test_utils/test_interactive_select.py +++ b/tests/test_unit/test_cdf_tk/test_utils/test_interactive_select.py @@ -26,6 +26,7 @@ from cognite_toolkit._cdf_tk.utils.aggregators import AssetCentricAggregator from cognite_toolkit._cdf_tk.utils.interactive_select import ( AssetCentricDestinationSelect, + AssetCentricInteractive, AssetInteractiveSelect, DataModelingSelect, EventInteractiveSelect, @@ -967,3 +968,89 @@ def test_select_instance_spaces_without_view_or_instance_type_no_instances(self, selector.select_instance_space() assert "No instances found in any space" in str(exc_info.value) + + +class TestAssetCentricInteractive: + @pytest.mark.parametrize( + "multi,allow_empty,expected", + [ + pytest.param(False, False, DataSet("B"), id="Single select, no empty"), + pytest.param(True, False, [DataSet("B")], id="Multi select, no empty"), + pytest.param(False, True, None, id="Single select, allow empty"), + pytest.param(True, True, [], id="Multi select, allow empty"), + ], + ) + def test_select_dataset( + self, multi: bool, allow_empty: bool, expected: DataSet | list[DataSet] | None, monkeypatch + ) -> None: + data_sets = [DataSet(external_id=letter) for letter in "ABC"] + + def select_data_set(choices: list[Choice]) -> DataSet | list[DataSet] | None: + assert len(choices) == 3 + if allow_empty and multi: + return [] + elif allow_empty: + return None + elif multi: + return [choices[1].value] # Select "B" + else: + return choices[1].value # Select "B" + + answers = [select_data_set] + + with ( + monkeypatch_toolkit_client() as client, + MockQuestionary(AssetInteractiveSelect.__module__, monkeypatch, answers), + ): + client.data_sets.list.return_value = data_sets + client.assets.aggregate_count.return_value = 10 + client.events.aggregate_count.return_value = 20 + client.files.aggregate.return_value = [CountAggregate(30)] + client.time_series.aggregate_count.return_value = 40 + + selector = AssetCentricInteractive(client, "test_operation") + result = selector.select_data_set(multi, allow_empty, include_resource_counts=True) + + assert result == expected + + @pytest.mark.parametrize( + "multi,allow_empty,expected", + [ + pytest.param(False, False, Asset(external_id="B"), id="Single select, no empty"), + pytest.param(True, False, [Asset(external_id="B")], id="Multi select, no empty"), + pytest.param(False, True, None, id="Single select, allow empty"), + pytest.param(True, True, [], id="Multi select, allow empty"), + ], + ) + def test_select_hierarchy( + self, multi: bool, allow_empty: bool, expected: Asset | list[Asset] | None, monkeypatch + ) -> None: + hierarchies = [Asset(external_id=letter) for letter in "ABC"] + + def select_hierarchy(choices: list[Choice]) -> Asset | list[Asset] | None: + assert len(choices) == 3 + if allow_empty and multi: + return [] + elif allow_empty: + return None + elif multi: + return [choices[1].value] # Select "B" + else: + return choices[1].value # Select "B" + + answers = [select_hierarchy] + + with ( + monkeypatch_toolkit_client() as client, + MockQuestionary(AssetInteractiveSelect.__module__, monkeypatch, answers), + ): + client.assets.list.return_value = hierarchies + client.assets.aggregate_count.return_value = 10 + client.events.aggregate_count.return_value = 20 + client.files.aggregate.return_value = [CountAggregate(30)] + client.time_series.aggregate_count.return_value = 40 + + selector = AssetCentricInteractive(client, "test_operation") + result = selector.select_hierarchy(multi, allow_empty, include_resource_counts=True) + + assert result == expected