diff --git a/homeassistant/helpers/entity_registry.py b/homeassistant/helpers/entity_registry.py index 3b0cb67f6a260e..0357b517d2c2e9 100644 --- a/homeassistant/helpers/entity_registry.py +++ b/homeassistant/helpers/entity_registry.py @@ -936,11 +936,9 @@ def async_get_or_create( unit_of_measurement: str | None | UndefinedType = UNDEFINED, ) -> RegistryEntry: """Get entity. Create if it doesn't exist.""" - config_entry_id: str | None | UndefinedType = UNDEFINED - if not config_entry: - config_entry_id = None - elif config_entry is not UNDEFINED: - config_entry_id = config_entry.entry_id + + # Extract config_entry_id from config_entry parameter + config_entry_id = self._extract_config_entry_id(config_entry) supported_features = supported_features or 0 @@ -964,6 +962,7 @@ def async_get_or_create( ) self.hass.verify_event_loop_thread("entity_registry.async_get_or_create") + _validate_item( self.hass, domain, @@ -977,90 +976,262 @@ def async_get_or_create( unique_id=unique_id, ) - entity_registry_id: str | None = None - created_at = utcnow() - deleted_entity = self.deleted_entities.pop((domain, platform, unique_id), None) - options: Mapping[str, Mapping[str, Any]] | None - if deleted_entity is not None: - aliases = deleted_entity.aliases - area_id = deleted_entity.area_id - categories = deleted_entity.categories - created_at = deleted_entity.created_at - device_class = deleted_entity.device_class - if deleted_entity.disabled_by is not UNDEFINED: - disabled_by = deleted_entity.disabled_by - # Adjust disabled_by based on config entry state - if config_entry and config_entry is not UNDEFINED: - if config_entry.disabled_by: - if disabled_by is None: - disabled_by = RegistryEntryDisabler.CONFIG_ENTRY - elif disabled_by == RegistryEntryDisabler.CONFIG_ENTRY: - disabled_by = None - elif disabled_by == RegistryEntryDisabler.CONFIG_ENTRY: - disabled_by = None - # Restore entity_id if it's available - if self._entity_id_available(deleted_entity.entity_id): - entity_id = deleted_entity.entity_id - entity_registry_id = deleted_entity.id - if deleted_entity.hidden_by is not UNDEFINED: - hidden_by = deleted_entity.hidden_by - icon = deleted_entity.icon - labels = deleted_entity.labels - name = deleted_entity.name - if deleted_entity.options is not UNDEFINED: - options = deleted_entity.options - else: - options = get_initial_options() if get_initial_options else None - else: - aliases = set() - area_id = None - categories = {} - device_class = None - icon = None - labels = set() - name = None - options = get_initial_options() if get_initial_options else None - - if not entity_id: - entity_id = self.async_generate_entity_id( + #Restore data from deleted entity if available, otherwise use defaults + entity_data = self._restore_or_initialize_entity_data( + domain, platform, unique_id, disabled_by, hidden_by, + config_entry, get_initial_options + ) + + # Generate entity_id if not restored from deleted entity + if not entity_data["entity_id"]: + entity_data["entity_id"] = self.async_generate_entity_id( domain, suggested_object_id or calculated_object_id or f"{platform}_{unique_id}", ) + # Auto-disable entity if config entry prefers new entities disabled + entity_data["disabled_by"] = self._apply_integration_disable_preference( + entity_data["disabled_by"], config_entry + ) + + # Create and register the new entity + return self._create_and_register_entity( + domain, platform, unique_id, entity_data, suggested_object_id, + capabilities, config_entry_id, config_subentry_id, device_id, + entity_category, has_entity_name, original_device_class, + original_icon, original_name, supported_features, + translation_key, unit_of_measurement + ) + + def _extract_config_entry_id( + self, + config_entry: ConfigEntry | None | UndefinedType, + ) -> str | None | UndefinedType: + """ + Extract config_entry_id from config_entry parameter. + + Returns UNDEFINED if not provided, None if explicitly None, + otherwise returns the entry_id. + """ + if not config_entry: + return None + elif config_entry is not UNDEFINED: + return config_entry.entry_id + return UNDEFINED + + def _restore_or_initialize_entity_data( + self, + domain: str, + platform: str, + unique_id: str, + disabled_by: RegistryEntryDisabler | None, + hidden_by: RegistryEntryHider | None, + config_entry: ConfigEntry | None | UndefinedType, + get_initial_options: Callable[[], EntityOptionsType | None] | None, + ) -> dict[str, Any]: + """ + Restore entity data from deleted entity or initialize with defaults. + + If a previously deleted entity with the same identity exists, + restore its data. Otherwise, return default values for a new entity. + """ + created_at = utcnow() + deleted_entity = self.deleted_entities.pop((domain, platform, unique_id), None) + + if deleted_entity is not None: + # Restore data from previously deleted entity + return self._restore_deleted_entity_data( + deleted_entity, disabled_by, hidden_by, + config_entry, get_initial_options, created_at + ) + else: + # Initialize with default values for new entity + return self._initialize_new_entity_data(get_initial_options, created_at) + + def _restore_deleted_entity_data( + self, + deleted_entity: Any, + disabled_by: RegistryEntryDisabler | None, + hidden_by: RegistryEntryHider | None, + config_entry: ConfigEntry | None | UndefinedType, + get_initial_options: Callable[[], EntityOptionsType | None] | None, + created_at: datetime, + ) -> dict[str, Any]: + """ + Restore entity data from a previously deleted entity. + + Preserves user customizations (aliases, area, categories, etc.) + and restores the original entity_id if available. + """ + # Restore basic entity data + entity_data = { + "entity_registry_id": deleted_entity.id, + "created_at": deleted_entity.created_at, + "aliases": deleted_entity.aliases, + "area_id": deleted_entity.area_id, + "categories": deleted_entity.categories, + "device_class": deleted_entity.device_class, + "icon": deleted_entity.icon, + "labels": deleted_entity.labels, + "name": deleted_entity.name, + "entity_id": None, + } + + # Restore disabled_by state with config entry consideration + if deleted_entity.disabled_by is not UNDEFINED: + entity_data["disabled_by"] = self._adjust_disabled_by_for_config_entry( + deleted_entity.disabled_by, config_entry + ) + else: + entity_data["disabled_by"] = disabled_by + + # Restore entity_id if it's still available + if self._entity_id_available(deleted_entity.entity_id): + entity_data["entity_id"] = deleted_entity.entity_id + + # Restore hidden_by state + if deleted_entity.hidden_by is not UNDEFINED: + entity_data["hidden_by"] = deleted_entity.hidden_by + else: + entity_data["hidden_by"] = hidden_by + + # Restore or initialize options + if deleted_entity.options is not UNDEFINED: + entity_data["options"] = deleted_entity.options + else: + entity_data["options"] = get_initial_options() if get_initial_options else None + + return entity_data + + def _initialize_new_entity_data( + self, + get_initial_options: Callable[[], EntityOptionsType | None] | None, + created_at: datetime, + ) -> dict[str, Any]: + """ + Initialize default values for a new entity. + + Returns a dict with empty/default values for all entity attributes. + """ + return { + "entity_registry_id": None, + "created_at": created_at, + "aliases": set(), + "area_id": None, + "categories": {}, + "device_class": None, + "disabled_by": None, + "hidden_by": None, + "icon": None, + "labels": set(), + "name": None, + "entity_id": None, + "options": get_initial_options() if get_initial_options else None, + } + + def _adjust_disabled_by_for_config_entry( + self, + disabled_by: RegistryEntryDisabler | None, + config_entry: ConfigEntry | None | UndefinedType, + ) -> RegistryEntryDisabler | None: + """ + Adjust disabled_by state based on config entry's disabled state. + + Automatically enables/disables entity to match config entry state: + - If config entry is disabled, entity should be disabled by CONFIG_ENTRY + - If config entry is enabled but entity was disabled by it, re-enable entity + - If no config entry, clear CONFIG_ENTRY disabler + """ + if config_entry and config_entry is not UNDEFINED: + # Config entry exists, match its state + if config_entry.disabled_by: + # Config entry disabled, ensure entity is disabled too + if disabled_by is None: + return RegistryEntryDisabler.CONFIG_ENTRY + elif disabled_by == RegistryEntryDisabler.CONFIG_ENTRY: + # Config entry enabled but entity was disabled by it, re-enable + return None + elif disabled_by == RegistryEntryDisabler.CONFIG_ENTRY: + # No config entry but entity was disabled by one, re-enable + return None + + return disabled_by + + def _apply_integration_disable_preference( + self, + disabled_by: RegistryEntryDisabler | None, + config_entry: ConfigEntry | None | UndefinedType, + ) -> RegistryEntryDisabler | None: + """ + Apply integration's preference to disable new entities by default. + + If config entry has pref_disable_new_entities set and entity is not + already disabled, disable it with INTEGRATION disabler. + """ if ( disabled_by is None and config_entry and config_entry is not UNDEFINED and config_entry.pref_disable_new_entities ): - disabled_by = RegistryEntryDisabler.INTEGRATION + return RegistryEntryDisabler.INTEGRATION + + return disabled_by + def _create_and_register_entity( + self, + domain: str, + platform: str, + unique_id: str, + entity_data: dict[str, Any], + suggested_object_id: str | None, + capabilities: Mapping[str, Any] | None | UndefinedType, + config_entry_id: str | None | UndefinedType, + config_subentry_id: str | None | UndefinedType, + device_id: str | None | UndefinedType, + entity_category: EntityCategory | UndefinedType | None, + has_entity_name: bool | UndefinedType, + original_device_class: str | None | UndefinedType, + original_icon: str | None | UndefinedType, + original_name: str | None | UndefinedType, + supported_features: int | None | UndefinedType, + translation_key: str | None | UndefinedType, + unit_of_measurement: str | None | UndefinedType, + ) -> RegistryEntry: + """ + Create new registry entry and register it in the system. + + Constructs the RegistryEntry object, adds it to entities dict, + schedules persistence, fires creation event, and returns the entry. + """ + # Helper to convert UNDEFINED to None for entry creation def none_if_undefined[_T](value: _T | UndefinedType) -> _T | None: """Return None if value is UNDEFINED, otherwise return value.""" return None if value is UNDEFINED else value + # Create the registry entry with all data entry = RegistryEntry( - aliases=aliases, - area_id=area_id, - categories=categories, + aliases=entity_data["aliases"], + area_id=entity_data["area_id"], + categories=entity_data["categories"], capabilities=none_if_undefined(capabilities), config_entry_id=none_if_undefined(config_entry_id), config_subentry_id=none_if_undefined(config_subentry_id), - created_at=created_at, - device_class=device_class, + created_at=entity_data["created_at"], + device_class=entity_data["device_class"], device_id=none_if_undefined(device_id), - disabled_by=disabled_by, + disabled_by=entity_data["disabled_by"], entity_category=none_if_undefined(entity_category), - entity_id=entity_id, - hidden_by=hidden_by, + entity_id=entity_data["entity_id"], + hidden_by=entity_data["hidden_by"], has_entity_name=none_if_undefined(has_entity_name) or False, - icon=icon, - id=entity_registry_id, - labels=labels, - name=name, - options=options, + icon=entity_data["icon"], + id=entity_data["entity_registry_id"], + labels=entity_data["labels"], + name=entity_data["name"], + options=entity_data["options"], original_device_class=none_if_undefined(original_device_class), original_icon=none_if_undefined(original_icon), original_name=none_if_undefined(original_name), @@ -1071,14 +1242,24 @@ def none_if_undefined[_T](value: _T | UndefinedType) -> _T | None: unique_id=unique_id, unit_of_measurement=none_if_undefined(unit_of_measurement), ) - self.entities[entity_id] = entry - _LOGGER.info("Registered new %s.%s entity: %s", domain, platform, entity_id) + + # Register entity in entities dict + self.entities[entity_data["entity_id"]] = entry + + # Log registration + _LOGGER.info( + "Registered new %s.%s entity: %s", + domain, platform, entity_data["entity_id"] + ) + + # Schedule registry save to persistent storage self.async_schedule_save() + # Fire creation event to notify listeners self.hass.bus.async_fire_internal( EVENT_ENTITY_REGISTRY_UPDATED, _EventEntityRegistryUpdatedData_CreateRemove( - action="create", entity_id=entity_id + action="create", entity_id=entity_data["entity_id"] ), ) @@ -1142,91 +1323,250 @@ def async_device_modified( Disable entities in the registry that are associated to a device when the device is disabled. """ - if event.data["action"] == "remove": - entities = async_entries_for_device( - self, event.data["device_id"], include_disabled_entities=True - ) - removed_device_dict = event.data["device"] - for entity in entities: - config_entry_id = entity.config_entry_id - if ( - config_entry_id in removed_device_dict["config_entries"] - and entity.config_subentry_id - in removed_device_dict["config_entries_subentries"][config_entry_id] - ): - self.async_remove(entity.entity_id) - else: - if entity.entity_id not in self.entities: - # Entity has been removed already, skip it - continue - self.async_update_entity(entity.entity_id, device_id=None) - return + action = event.data["action"] - if event.data["action"] != "update": - # Ignore "create" action + # Handle device removal + if action == "remove": + self._handle_device_removal(event) + return + + # Ignore "create" action, only process "update" + if action != "update": return + # Handle device update + self._handle_device_update(event) + + def _handle_device_removal( + self, event: Event[EventDeviceRegistryUpdatedData] + ) -> None: + """ + Handle device removal by cleaning up associated entities. + + When a device is removed: + - Remove entities that belong to config entries/subentries that were removed + - Unlink entities from the device if they still have valid config entries + """ + device_id = event.data["device_id"] + removed_device_dict = event.data["device"] # type: ignore + + # Get all entities associated with the removed device + entities = async_entries_for_device( + self, device_id, include_disabled_entities=True + ) + + for entity in entities: + if self._should_remove_entity_on_device_removal(entity, removed_device_dict): + # Entity's config entry/subentry was removed with device, remove entity + self.async_remove(entity.entity_id) + else: + # Entity still has valid config entry, just unlink from device + if entity.entity_id not in self.entities: + # Entity has been removed already, skip it + continue + self.async_update_entity(entity.entity_id, device_id=None) + + def _should_remove_entity_on_device_removal( + self, + entity: RegistryEntry, + removed_device_dict: dict[str, Any], + ) -> bool: + """ + Determine if entity should be removed when device is removed. + + Entity should be removed if its config_entry and config_subentry + were both part of the removed device's configuration. + """ + config_entry_id = entity.config_entry_id + + # Check if entity's config entry was part of the removed device + if config_entry_id not in removed_device_dict["config_entries"]: + return False + + # Check if entity's config subentry was part of the removed device + config_subentry_id = entity.config_subentry_id + device_subentries = removed_device_dict["config_entries_subentries"][config_entry_id] + + return config_subentry_id in device_subentries + + def _handle_device_update( + self, event: Event[EventDeviceRegistryUpdatedData] + ) -> None: + """ + Handle device update by managing entity states and associations. + + Processes device updates to: + - Remove entities whose config entries/subentries were removed from device + - Re-enable entities if device was re-enabled + - Disable entities if device was disabled + """ + device_id = event.data["device_id"] + + # Get current device state device_registry = dr.async_get(self.hass) - device = device_registry.async_get(event.data["device_id"]) + device = device_registry.async_get(device_id) - # The device may be deleted already if the event handling is late, do nothing - # in that case. Entities will be removed when we get the "remove" event. + # Device may be deleted if event handling is late, skip processing if not device: return - # Remove entities which belong to config entries no longer associated with the - # device - if old_config_entries := event.data["changes"].get("config_entries"): - entities = async_entries_for_device( - self, event.data["device_id"], include_disabled_entities=True - ) - for entity in entities: - config_entry_id = entity.config_entry_id - if ( - entity.config_entry_id in old_config_entries - and entity.config_entry_id not in device.config_entries - ): - self.async_remove(entity.entity_id) - - # Remove entities which belong to config subentries no longer associated with the - # device - if old_config_entries_subentries := event.data["changes"].get( + # Process config entry changes + self._handle_config_entry_changes(event, device) + + # Process config subentry changes + self._handle_config_subentry_changes(event, device) + + # Handle device enabled/disabled state changes + self._handle_device_disabled_state(event, device) + + def _handle_config_entry_changes( + self, + event: Event[EventDeviceRegistryUpdatedData], + device: Any, + ) -> None: + """ + Remove entities whose config entries are no longer associated with device. + + When a device's config_entries list changes, remove entities that belonged + to config entries that are no longer part of the device. + """ + # Check if config_entries changed + old_config_entries = event.data["changes"].get("config_entries") # type: ignore + if not old_config_entries: + return + + device_id = event.data["device_id"] + entities = async_entries_for_device( + self, device_id, include_disabled_entities=True + ) + + for entity in entities: + # Check if entity's config entry was removed from device + if ( + entity.config_entry_id in old_config_entries + and entity.config_entry_id not in device.config_entries + ): + self.async_remove(entity.entity_id) + + def _handle_config_subentry_changes( + self, + event: Event[EventDeviceRegistryUpdatedData], + device: Any, + ) -> None: + """ + Remove entities whose config subentries are no longer associated with device. + + When a device's config_entries_subentries mapping changes, remove entities + that belonged to subentries that are no longer part of the device. + """ + # Check if config_entries_subentries changed + old_config_entries_subentries = event.data["changes"].get( # type: ignore "config_entries_subentries" - ): - entities = async_entries_for_device( - self, event.data["device_id"], include_disabled_entities=True - ) - for entity in entities: - config_entry_id = entity.config_entry_id - config_subentry_id = entity.config_subentry_id - if ( - config_entry_id in device.config_entries - and config_entry_id in old_config_entries_subentries - and config_subentry_id - in old_config_entries_subentries[config_entry_id] - and config_subentry_id - not in device.config_entries_subentries[config_entry_id] - ): - self.async_remove(entity.entity_id) - - # Re-enable disabled entities if the device is no longer disabled + ) + if not old_config_entries_subentries: + return + + device_id = event.data["device_id"] + entities = async_entries_for_device( + self, device_id, include_disabled_entities=True + ) + + for entity in entities: + if self._should_remove_entity_for_subentry_change( + entity, device, old_config_entries_subentries + ): + self.async_remove(entity.entity_id) + + def _should_remove_entity_for_subentry_change( + self, + entity: RegistryEntry, + device: Any, + old_config_entries_subentries: dict[str, Any], + ) -> bool: + """ + Determine if entity should be removed due to config subentry change. + + Returns True if: + - Entity's config_entry is still on the device + - Entity's config_entry had subentries in the old device state + - Entity's config_subentry was in the old subentries list + - Entity's config_subentry is NOT in the new subentries list + """ + config_entry_id = entity.config_entry_id + config_subentry_id = entity.config_subentry_id + + # Config entry must still be on device + if config_entry_id not in device.config_entries: + return False + + # Config entry must have had subentries in old state + if config_entry_id not in old_config_entries_subentries: + return False + + # Entity's subentry must have been in old list + if config_subentry_id not in old_config_entries_subentries[config_entry_id]: + return False + + # Entity's subentry must NOT be in new list (was removed) + if config_subentry_id in device.config_entries_subentries[config_entry_id]: + return False + + return True + + def _handle_device_disabled_state( + self, + event: Event[EventDeviceRegistryUpdatedData], + device: Any, + ) -> None: + """ + Update entity disabled states based on device's disabled state. + + - If device is enabled: re-enable entities that were disabled by DEVICE + - If device is disabled: disable all entities with DEVICE disabler + - Skip if device disabled by CONFIG_ENTRY (handled separately) + """ + device_id = event.data["device_id"] + + + # Re-enable entities if device is now enabled if not device.disabled: - entities = async_entries_for_device( - self, event.data["device_id"], include_disabled_entities=True - ) - for entity in entities: - if entity.disabled_by is not RegistryEntryDisabler.DEVICE: - continue - self.async_update_entity(entity.entity_id, disabled_by=None) + self._reenable_entities_for_enabled_device(device_id) return - # Ignore device disabled by config entry, this is handled by - # async_config_entry_disabled_by_changed + # Skip if device disabled by config entry (handled elsewhere) if device.disabled_by is dr.DeviceEntryDisabler.CONFIG_ENTRY: return - # Fetch entities which are not already disabled and disable them - entities = async_entries_for_device(self, event.data["device_id"]) + # Disable entities for disabled device + self._disable_entities_for_disabled_device(device_id) + + def _reenable_entities_for_enabled_device(self, device_id: str) -> None: + """ + Re-enable entities that were disabled by DEVICE disabler. + + When a device is re-enabled, re-enable all entities that were + automatically disabled due to the device being disabled. + """ + entities = async_entries_for_device( + self, device_id, include_disabled_entities=True + ) + + for entity in entities: + # Only re-enable entities that were disabled by DEVICE + if entity.disabled_by is not RegistryEntryDisabler.DEVICE: + continue + self.async_update_entity(entity.entity_id, disabled_by=None) + + def _disable_entities_for_disabled_device(self, device_id: str) -> None: + """ + Disable all active entities for a disabled device. + + When a device is disabled (not by CONFIG_ENTRY), automatically + disable all non-disabled entities with DEVICE disabler. + """ + # Only fetch entities that are not already disabled + entities = async_entries_for_device(self, device_id) + for entity in entities: self.async_update_entity( entity.entity_id, disabled_by=RegistryEntryDisabler.DEVICE @@ -1263,42 +1603,109 @@ def _async_update_entity( translation_key: str | None | UndefinedType = UNDEFINED, unit_of_measurement: str | None | UndefinedType = UNDEFINED, ) -> RegistryEntry: - """Private facing update properties method.""" + """Private method to update properties of an entity.""" old = self.entities[entity_id] - new_values: dict[str, Any] = {} # Dict with new key/value pairs - old_values: dict[str, Any] = {} # Dict with old key/value pairs - - for attr_name, value in ( - ("aliases", aliases), - ("area_id", area_id), - ("categories", categories), - ("capabilities", capabilities), - ("config_entry_id", config_entry_id), - ("config_subentry_id", config_subentry_id), - ("device_class", device_class), - ("device_id", device_id), - ("disabled_by", disabled_by), - ("entity_category", entity_category), - ("hidden_by", hidden_by), - ("icon", icon), - ("has_entity_name", has_entity_name), - ("labels", labels), - ("name", name), - ("options", options), - ("original_device_class", original_device_class), - ("original_icon", original_icon), - ("original_name", original_name), - ("platform", platform), - ("supported_features", supported_features), - ("translation_key", translation_key), - ("unit_of_measurement", unit_of_measurement), - ): + # Collect changed attribute values + new_values, old_values = self._collect_changed_attributes( + old, + aliases=aliases, + area_id=area_id, + categories=categories, + capabilities=capabilities, + config_entry_id=config_entry_id, + config_subentry_id=config_subentry_id, + device_class=device_class, + device_id=device_id, + disabled_by=disabled_by, + entity_category=entity_category, + hidden_by=hidden_by, + icon=icon, + has_entity_name=has_entity_name, + labels=labels, + name=name, + options=options, + original_device_class=original_device_class, + original_icon=original_icon, + original_name=original_name, + platform=platform, + supported_features=supported_features, + translation_key=translation_key, + unit_of_measurement=unit_of_measurement, + ) + + + # Validate changes if any exist + self._validate_entity_changes( + old, new_values, new_unique_id, config_entry_id, + config_subentry_id, device_id, disabled_by, + entity_category, hidden_by + ) + + # Handle automatic disabled_by updates based on config_entry state + self._handle_config_entry_disabled_state( + old, new_values, disabled_by, config_entry_id + ) + + # Handle entity ID change if requested + entity_id = self._handle_entity_id_change( + entity_id, old, new_entity_id, new_values, old_values + ) + + # Handle unique ID change if requested + self._handle_unique_id_change( + old, new_unique_id, new_values, old_values + ) + + # Return early if no changes were made + if not new_values: + return old + + # Apply changes and notify system + return self._apply_entity_changes( + entity_id, old, new_values, old_values + ) + + def _collect_changed_attributes( + self, + old: RegistryEntry, + **kwargs: Any, + ) -> tuple[dict[str, Any], dict[str, Any]]: + """ + Collect attribute changes by comparing new values with existing entity. + + Returns tuple of (new_values, old_values) dicts containing only changed attributes. + """ + new_values: dict[str, Any] = {} + old_values: dict[str, Any] = {} + + # Iterate through all provided attributes + for attr_name, value in kwargs.items(): + # Skip undefined values and values that haven't changed if value is not UNDEFINED and value != getattr(old, attr_name): new_values[attr_name] = value old_values[attr_name] = getattr(old, attr_name) - # Only validate if data has changed + return new_values, old_values + + def _validate_entity_changes( + self, + old: RegistryEntry, + new_values: dict[str, Any], + new_unique_id: str | UndefinedType, + config_entry_id: str | None | UndefinedType, + config_subentry_id: str | None | UndefinedType, + device_id: str | None | UndefinedType, + disabled_by: RegistryEntryDisabler | None | UndefinedType, + entity_category: EntityCategory | None | UndefinedType, + hidden_by: RegistryEntryHider | None | UndefinedType, + ) -> None: + """ + Validate entity changes if any data has been modified. + + Raises validation errors if changes are invalid. + """ + # Only validate if there are actual changes if new_values or new_unique_id is not UNDEFINED: _validate_item( self.hass, @@ -1314,71 +1721,150 @@ def _async_update_entity( unique_id=new_unique_id, ) - if disabled_by is UNDEFINED and config_entry_id is not UNDEFINED: - if config_entry_id: - config_entry = self.hass.config_entries.async_get_entry(config_entry_id) - if TYPE_CHECKING: - # We've checked the config_entry exists in _validate_item - assert config_entry is not None - if config_entry.disabled_by: - if old.disabled_by is None: - new_values["disabled_by"] = RegistryEntryDisabler.CONFIG_ENTRY - elif old.disabled_by == RegistryEntryDisabler.CONFIG_ENTRY: - new_values["disabled_by"] = None + def _handle_config_entry_disabled_state( + self, + old: RegistryEntry, + new_values: dict[str, Any], + disabled_by: RegistryEntryDisabler | None | UndefinedType, + config_entry_id: str | None | UndefinedType, +) -> None: + """ + Automatically update entity's disabled_by state based on config_entry state. + + If config_entry_id changes and disabled_by wasn't explicitly set: + - Enable/disable entity to match config entry's disabled state + - Clear CONFIG_ENTRY disabler if config_entry_id is removed + """ + # Skip if disabled_by was explicitly provided or config_entry_id unchanged + if disabled_by is not UNDEFINED or config_entry_id is UNDEFINED: + return + + if config_entry_id: + # Get the config entry to check its disabled state + config_entry = self.hass.config_entries.async_get_entry(config_entry_id) + if TYPE_CHECKING: + # We've checked the config_entry exists in _validate_item + assert config_entry is not None + + # If config entry is disabled, disable the entity too + if config_entry.disabled_by: + if old.disabled_by is None: + new_values["disabled_by"] = RegistryEntryDisabler.CONFIG_ENTRY + # If config entry is enabled but entity was disabled by it, re-enable entity elif old.disabled_by == RegistryEntryDisabler.CONFIG_ENTRY: new_values["disabled_by"] = None + # If config_entry_id is being cleared and entity was disabled by it, re-enable + elif old.disabled_by == RegistryEntryDisabler.CONFIG_ENTRY: + new_values["disabled_by"] = None + + def _handle_entity_id_change( + self, + entity_id: str, + old: RegistryEntry, + new_entity_id: str | UndefinedType, + new_values: dict[str, Any], + old_values: dict[str, Any], +) -> str: + """ + Handle entity ID change with validation. + + Validates new entity ID and updates registry mapping. + Returns the final entity_id (either original or new). + """ + # Skip if no entity ID change requested or if it's the same + if new_entity_id is UNDEFINED or new_entity_id == old.entity_id: + return entity_id + + # Validate new entity ID is available + if not self._entity_id_available(new_entity_id): + raise ValueError("Entity with this ID is already registered") + + # Validate entity ID format + if not valid_entity_id(new_entity_id): + raise ValueError("Invalid entity ID") + + # Validate domain hasn't changed (e.g., light.x -> switch.x not allowed) + if split_entity_id(new_entity_id)[0] != split_entity_id(entity_id)[0]: + raise ValueError("New entity ID should be same domain") + + # Remove old mapping and add new one + self.entities.pop(entity_id) + new_values["entity_id"] = new_entity_id + old_values["entity_id"] = old.entity_id + + return new_entity_id + + def _handle_unique_id_change( + self, + old: RegistryEntry, + new_unique_id: str | UndefinedType, + new_values: dict[str, Any], + old_values: dict[str, Any], + ) -> None: + """ + Handle unique ID change with conflict detection. + + Validates that new unique_id isn't already in use and tracks the previous value. + """ + # Skip if no unique ID change requested + if new_unique_id is UNDEFINED: + return - if new_entity_id is not UNDEFINED and new_entity_id != old.entity_id: - if not self._entity_id_available(new_entity_id): - raise ValueError("Entity with this ID is already registered") - - if not valid_entity_id(new_entity_id): - raise ValueError("Invalid entity ID") - - if split_entity_id(new_entity_id)[0] != split_entity_id(entity_id)[0]: - raise ValueError("New entity ID should be same domain") - - self.entities.pop(entity_id) - entity_id = new_values["entity_id"] = new_entity_id - old_values["entity_id"] = old.entity_id - - if new_unique_id is not UNDEFINED: - conflict_entity_id = self.async_get_entity_id( - old.domain, old.platform, new_unique_id + # Check for conflicts with existing entities + conflict_entity_id = self.async_get_entity_id( + old.domain, old.platform, new_unique_id + ) + if conflict_entity_id: + raise ValueError( + f"Unique id '{new_unique_id}' is already in use by " + f"'{conflict_entity_id}'" ) - if conflict_entity_id: - raise ValueError( - f"Unique id '{new_unique_id}' is already in use by " - f"'{conflict_entity_id}'" - ) - new_values["unique_id"] = new_unique_id - old_values["unique_id"] = old.unique_id - new_values["previous_unique_id"] = old.unique_id - - if not new_values: - return old + + # Record the change and preserve previous unique_id for reference + new_values["unique_id"] = new_unique_id + old_values["unique_id"] = old.unique_id + new_values["previous_unique_id"] = old.unique_id + def _apply_entity_changes( + self, + entity_id: str, + old: RegistryEntry, + new_values: dict[str, Any], + old_values: dict[str, Any], + ) -> RegistryEntry: + """ + Apply changes to entity and fire update event. + + Updates entity in registry, schedules persistence, and notifies listeners. + """ + # Add modification timestamp new_values["modified_at"] = utcnow() + # Ensure we're on the correct event loop thread self.hass.verify_event_loop_thread("entity_registry.async_update_entity") + # Create updated entity using attrs evolve new = self.entities[entity_id] = attr.evolve(old, **new_values) + # Schedule registry save to persistent storage self.async_schedule_save() + # Prepare event data with changes data: _EventEntityRegistryUpdatedData_Update = { "action": "update", "entity_id": entity_id, "changes": old_values, } + # Include old entity_id if it changed if old.entity_id != entity_id: data["old_entity_id"] = old.entity_id + # Fire internal event to notify listeners self.hass.bus.async_fire_internal(EVENT_ENTITY_REGISTRY_UPDATED, data) return new - + @callback def async_update_entity( self,