diff --git a/homeassistant/helpers/intent.py b/homeassistant/helpers/intent.py index 5b21c12d755330..39fb327cff6ad4 100644 --- a/homeassistant/helpers/intent.py +++ b/homeassistant/helpers/intent.py @@ -528,6 +528,9 @@ def async_match_targets( # noqa: C901 ] = _default_area_candidate_filter, ) -> MatchTargetsResult: """Match entities based on constraints in order to handle an intent.""" + """1. Initialize candidates → 2. Filter by domain → 3. Filter by state → 4. Early-exit check → 5. Populate metadata → +6. Filter by name → 7. Filter by features → 8. Filter by device classes → 9. Filter by floor → 10. Filter by area → +11. Exposure check → 12. Deduplicate by name → 13. Enforce single-target constraint → 14. Return result""" preferences = preferences or MatchTargetsPreferences() filtered_by_domain = False @@ -550,15 +553,12 @@ def async_match_targets( # noqa: C901 for state in states ] - if constraints.domains and (not filtered_by_domain): - # Filter by domain (if we didn't already do it) - candidates = [c for c in candidates if c.state.domain in constraints.domains] - if not candidates: + candidates, filtered_by_domain = _filter_candidates_by_domain(candidates, constraints.domains) + if not candidates: return MatchTargetsResult(False, MatchFailedReason.DOMAIN) if constraints.states: - # Filter by state - candidates = [c for c in candidates if c.state.state in constraints.states] + candidates = _filter_candidates_by_state(candidates, constraints.states) if not candidates: return MatchTargetsResult(False, MatchFailedReason.STATE) @@ -579,10 +579,7 @@ def async_match_targets( # noqa: C901 return MatchTargetsResult(True, states=[c.state for c in candidates]) - # We need entity registry entries now - er = entity_registry.async_get(hass) - for candidate in candidates: - candidate.entity = er.async_get(candidate.state.entity_id) + _populate_candidate_metadata(hass, candidates)##一次补全## if constraints.name: # Filter by entity name or alias @@ -610,183 +607,210 @@ def async_match_targets( # noqa: C901 # True when area information has been added to candidates areas_added = False - - if constraints.floor_name or constraints.area_name: - ar = area_registry.async_get(hass) - dr = device_registry.async_get(hass) - _add_areas(ar, dr, candidates) - areas_added = True - - if constraints.floor_name: - # Filter by areas associated with floor - fr = floor_registry.async_get(hass) - targeted_floors = list(find_floors(constraints.floor_name, fr)) - if not targeted_floors: - return MatchTargetsResult( - False, - MatchFailedReason.INVALID_FLOOR, - no_match_name=constraints.floor_name, - ) - - possible_floor_ids = {floor.floor_id for floor in targeted_floors} - possible_area_ids = { - area.id - for area in ar.async_list_areas() - if area.floor_id in possible_floor_ids - } - - candidates = [ - c for c in candidates if area_candidate_filter(c, possible_area_ids) - ] - if not candidates: - return MatchTargetsResult( - False, MatchFailedReason.FLOOR, floors=targeted_floors - ) - else: - # All areas are possible - possible_area_ids = {area.id for area in ar.async_list_areas()} - - if constraints.area_name: - targeted_areas = list(find_areas(constraints.area_name, ar)) - if not targeted_areas: - return MatchTargetsResult( - False, - MatchFailedReason.INVALID_AREA, - no_match_name=constraints.area_name, - ) - - matching_area_ids = {area.id for area in targeted_areas} - - # May be constrained by floors above - possible_area_ids.intersection_update(matching_area_ids) - candidates = [ - c for c in candidates if area_candidate_filter(c, possible_area_ids) - ] - if not candidates: - return MatchTargetsResult( - False, MatchFailedReason.AREA, areas=targeted_areas - ) - + if constraints.floor_name: + candidates, targeted_floors, floor_fail_result = _filter_candidates_by_floor( + hass, candidates, constraints.floor_name, area_candidate_filter + ) + if floor_fail_result: + return floor_fail_result + if not candidates: + return MatchTargetsResult(False, MatchFailedReason.FLOOR, floors=targeted_floors) + + area_registry = area_registry.async_get(hass) + possible_area_ids = {area.id for area in area_registry.async_list_areas()} + if targeted_floors: + possible_area_ids = { + area.id for area in area_registry.async_list_areas() + if area.floor_id in {f.floor_id for f in targeted_floors} + } + if constraints.assistant: # Check exposure candidates = [c for c in candidates if c.is_exposed] if not candidates: return MatchTargetsResult(False, MatchFailedReason.ASSISTANT) - if constraints.name and (not constraints.allow_duplicate_names): - # Check for duplicates - if not areas_added: - ar = area_registry.async_get(hass) - dr = device_registry.async_get(hass) - _add_areas(ar, dr, candidates) - areas_added = True - - sorted_candidates = sorted( - [c for c in candidates if c.matched_name], - key=lambda c: c.matched_name or "", + if constraints.name and not constraints.allow_duplicate_names: + candidates, duplicate_fail_result = _disambiguate_duplicate_names( + hass, candidates, preferences, area_candidate_filter ) - final_candidates: list[MatchTargetsCandidate] = [] - for name, group in groupby(sorted_candidates, key=lambda c: c.matched_name): - group_candidates = list(group) - if len(group_candidates) < 2: - # No duplicates for name - final_candidates.extend(group_candidates) - continue + if duplicate_fail_result: + return duplicate_fail_result + if not candidates: + return MatchTargetsResult(False, MatchFailedReason.NAME) - # Try to disambiguate by preferences - if preferences.floor_id: - group_candidates = [ - c - for c in group_candidates - if (c.area is not None) - and (c.area.floor_id == preferences.floor_id) - ] - if len(group_candidates) < 2: - # Disambiguated by floor - final_candidates.extend(group_candidates) - continue - - if preferences.area_id: - group_candidates = [ - c - for c in group_candidates - if area_candidate_filter(c, {preferences.area_id}) - ] - if len(group_candidates) < 2: - # Disambiguated by area - final_candidates.extend(group_candidates) - continue - - # Couldn't disambiguate duplicate names - return MatchTargetsResult( - False, - MatchFailedReason.DUPLICATE_NAME, - no_match_name=name, - areas=targeted_areas or [], - floors=targeted_floors or [], - ) + if constraints.single_target: + candidates, single_target_fail_result = _enforce_single_target( + hass, candidates, preferences, area_candidate_filter + ) + if single_target_fail_result: + return single_target_fail_result + if not candidates: + return MatchTargetsResult(False, MatchFailedReason.MULTIPLE_TARGETS) - if not final_candidates: - return MatchTargetsResult( - False, - MatchFailedReason.NAME, - areas=targeted_areas or [], - floors=targeted_floors or [], - ) + return MatchTargetsResult( + True, + None, + states=[c.state for c in candidates], + areas=targeted_areas or [], + floors=targeted_floors or [], + ) - candidates = final_candidates - if constraints.single_target and len(candidates) > 1: - # Find best match using preferences - if not (preferences.area_id or preferences.floor_id): - # No preferences - return MatchTargetsResult( - False, - MatchFailedReason.MULTIPLE_TARGETS, - states=[c.state for c in candidates], - ) - if not areas_added: - ar = area_registry.async_get(hass) - dr = device_registry.async_get(hass) - _add_areas(ar, dr, candidates) - areas_added = True +def _populate_candidate_metadata(hass: HomeAssistant, candidates: list[MatchTargetsCandidate]): + """“Populate candidate devices’ entity, device, and area metadata all at once to avoid repeated operation”""" + entity_registry = entity_registry.async_get(hass) + device_registry = device_registry.async_get(hass) + area_registry = area_registry.async_get(hass) - filtered_candidates: list[MatchTargetsCandidate] = candidates - if preferences.area_id: - # Filter by area - filtered_candidates = [ - c for c in candidates if area_candidate_filter(c, {preferences.area_id}) - ] + for candidate in candidates: + # Populate entity registration information + candidate.entity = entity_registry.async_get(candidate.state.entity_id) + if not candidate.entity: + continue - if (len(filtered_candidates) > 1) and preferences.floor_id: - # Filter by floor - filtered_candidates = [ - c - for c in candidates - if c.area and (c.area.floor_id == preferences.floor_id) - ] + # Populate device information + if candidate.entity.device_id: + candidate.device = device_registry.async_get(candidate.entity.device_id) - if len(filtered_candidates) != 1: - # Filtering could not restrict to a single target - return MatchTargetsResult( - False, - MatchFailedReason.MULTIPLE_TARGETS, - states=[c.state for c in candidates], - ) + # Populate area information + if candidate.entity.area_id: + candidate.area = area_registry.async_get_area(candidate.entity.area_id) + elif candidate.device and candidate.device.area_id: + candidate.area = area_registry.async_get_area(candidate.device.area_id) + +def _filter_candidates_by_domain( + candidates: list[MatchTargetsCandidate], + domains: list[str] | None, +) -> tuple[list[MatchTargetsCandidate], bool]: + """Filter by domain""" + if not domains: + return candidates, False + filtered = [c for c in candidates if c.state.domain in domains] + return filtered, len(filtered) == len(candidates) + +def _filter_candidates_by_state( + candidates: list[MatchTargetsCandidate], + states: list[str] | None, +) -> list[MatchTargetsCandidate]: + """Filter by device state""" + if not states: + return candidates + return [c for c in candidates if c.state.state in states] - # Filtering succeeded - candidates = filtered_candidates +def _filter_candidates_by_floor( + hass: HomeAssistant, + candidates: list[MatchTargetsCandidate], + floor_name: str | None, + area_candidate_filter: Callable[[MatchTargetsCandidate, Collection[str]], bool], +) -> tuple[list[MatchTargetsCandidate], list[floor_registry.FloorEntry] | None, MatchTargetsResult | None]: + """Filter candidates based on floor""" + if not floor_name: + return candidates, None, None + fr = floor_registry.async_get(hass) + targeted_floors = list(find_floors(floor_name, fr)) + if not targeted_floors: + return [], None, MatchTargetsResult( + False, MatchFailedReason.INVALID_FLOOR, no_match_name=floor_name + ) + ar = area_registry.async_get(hass) + possible_floor_ids = {floor.floor_id for floor in targeted_floors} + possible_area_ids = { + area.id for area in ar.async_list_areas() if area.floor_id in possible_floor_ids + } + filtered_candidates = [c for c in candidates if area_candidate_filter(c, possible_area_ids)] + return filtered_candidates, targeted_floors, None + +def _filter_candidates_by_area( + hass: HomeAssistant, + candidates: list[MatchTargetsCandidate], + area_name: str | None, + possible_area_ids: Collection[str], + area_candidate_filter: Callable[[MatchTargetsCandidate, Collection[str]], bool], +) -> tuple[list[MatchTargetsCandidate], list[area_registry.AreaEntry] | None, MatchTargetsResult | None]: + """Filter candidate devices by area""" + if not area_name: + return candidates, None, None + ar = area_registry.async_get(hass) + targeted_areas = list(find_areas(area_name, ar)) + if not targeted_areas: + return [], None, MatchTargetsResult( + False, MatchFailedReason.INVALID_AREA, no_match_name=area_name + ) + matching_area_ids = {area.id for area in targeted_areas} + possible_area_ids.intersection_update(matching_area_ids) + filtered_candidates = [c for c in candidates if area_candidate_filter(c, possible_area_ids)] + return filtered_candidates, targeted_areas, None - return MatchTargetsResult( - True, - None, - states=[c.state for c in candidates], - areas=targeted_areas or [], - floors=targeted_floors or [], +def _disambiguate_duplicate_names( + hass: HomeAssistant, + candidates: list[MatchTargetsCandidate], + preferences: MatchTargetsPreferences, + area_candidate_filter: Callable[[MatchTargetsCandidate, Collection[str]], bool], +) -> tuple[list[MatchTargetsCandidate], MatchTargetsResult | None]: + """Resolve duplicate device names""" + sorted_candidates = sorted( + [c for c in candidates if c.matched_name], + key=lambda c: c.matched_name or "", ) + final_candidates = [] + for name, group in groupby(sorted_candidates, key=lambda c: c.matched_name): + group_candidates = list(group) + if len(group_candidates) < 2: + final_candidates.extend(group_candidates) + continue + # Filter duplicates according to user preferences + if preferences.floor_id: + group_candidates = [ + c for c in group_candidates + if c.area and c.area.floor_id == preferences.floor_id + ] + if len(group_candidates) < 2: + final_candidates.extend(group_candidates) + continue + if preferences.area_id: + group_candidates = [ + c for c in group_candidates + if area_candidate_filter(c, {preferences.area_id}) + ] + if len(group_candidates) < 2: + final_candidates.extend(group_candidates) + continue + # cannot duplicates + return [], MatchTargetsResult( + False, MatchFailedReason.DUPLICATE_NAME, no_match_name=name + ) + return final_candidates, None - +def _enforce_single_target( + hass: HomeAssistant, + candidates: list[MatchTargetsCandidate], + preferences: MatchTargetsPreferences, + area_candidate_filter: Callable[[MatchTargetsCandidate, Collection[str]], bool], +) -> tuple[list[MatchTargetsCandidate], MatchTargetsResult | None]: + """(Validate single-target constraint""" + if len(candidates) == 1: + return candidates, None + if not (preferences.area_id or preferences.floor_id): + return [], MatchTargetsResult( + False, MatchFailedReason.MULTIPLE_TARGETS, states=[c.state for c in candidates] + ) + filtered_candidates = candidates + if preferences.area_id: + filtered_candidates = [ + c for c in candidates if area_candidate_filter(c, {preferences.area_id}) + ] + if len(filtered_candidates) > 1 and preferences.floor_id: + filtered_candidates = [ + c for c in filtered_candidates + if c.area and c.area.floor_id == preferences.floor_id + ] + if len(filtered_candidates) != 1: + return [], MatchTargetsResult( + False, MatchFailedReason.MULTIPLE_TARGETS, states=[c.state for c in candidates] + ) + return filtered_candidates, None @callback @bind_hass def async_match_states(