diff --git a/src/main/java/io/github/hapjava/characteristics/Characteristic.java b/src/main/java/io/github/hapjava/characteristics/Characteristic.java index f73bf345b..d2e87030a 100644 --- a/src/main/java/io/github/hapjava/characteristics/Characteristic.java +++ b/src/main/java/io/github/hapjava/characteristics/Characteristic.java @@ -18,6 +18,8 @@ * @author Andy Lintner */ public interface Characteristic { + /** @return The UUID type for this characteristic. */ + String getType(); /** * Adds an attribute to the passed JsonObjectBuilder named "value" with the current value of the diff --git a/src/main/java/io/github/hapjava/characteristics/impl/base/BaseCharacteristic.java b/src/main/java/io/github/hapjava/characteristics/impl/base/BaseCharacteristic.java index 734a83cdc..1d2d55c5b 100644 --- a/src/main/java/io/github/hapjava/characteristics/impl/base/BaseCharacteristic.java +++ b/src/main/java/io/github/hapjava/characteristics/impl/base/BaseCharacteristic.java @@ -70,6 +70,12 @@ public BaseCharacteristic( this.unsubscriber = unsubscriber; } + @Override + /** {@inheritDoc} */ + public String getType() { + return type; + } + @Override /** {@inheritDoc} */ public final CompletableFuture toJson(int iid) { diff --git a/src/main/java/io/github/hapjava/server/impl/HomekitRegistry.java b/src/main/java/io/github/hapjava/server/impl/HomekitRegistry.java index 39ec31ff9..cc6cd352e 100644 --- a/src/main/java/io/github/hapjava/server/impl/HomekitRegistry.java +++ b/src/main/java/io/github/hapjava/server/impl/HomekitRegistry.java @@ -2,6 +2,7 @@ import io.github.hapjava.accessories.HomekitAccessory; import io.github.hapjava.characteristics.Characteristic; +import io.github.hapjava.server.impl.connections.SubscriptionManager; import io.github.hapjava.services.Service; import io.github.hapjava.services.impl.AccessoryInformationService; import java.util.ArrayList; @@ -19,14 +20,16 @@ public class HomekitRegistry { private static final Logger logger = LoggerFactory.getLogger(HomekitRegistry.class); private final String label; + private final SubscriptionManager subscriptions; private final Map accessories; private final Map> services = new HashMap<>(); private final Map> characteristics = new HashMap<>(); private boolean isAllowUnauthenticatedRequests = false; - public HomekitRegistry(String label) { + public HomekitRegistry(String label, SubscriptionManager subscriptions) { this.label = label; + this.subscriptions = subscriptions; this.accessories = new ConcurrentHashMap<>(); reset(); } @@ -61,6 +64,7 @@ public synchronized void reset() { services.put(accessory, newServicesByInterfaceId); characteristics.put(accessory, newCharacteristicsByInterfaceId); } + subscriptions.resync(this); } public String getLabel() { @@ -87,8 +91,8 @@ public void add(HomekitAccessory accessory) { accessories.put(accessory.getId(), accessory); } - public void remove(HomekitAccessory accessory) { - accessories.remove(accessory.getId()); + public boolean remove(HomekitAccessory accessory) { + return accessories.remove(accessory.getId()) != null; } public boolean isAllowUnauthenticatedRequests() { diff --git a/src/main/java/io/github/hapjava/server/impl/HomekitRoot.java b/src/main/java/io/github/hapjava/server/impl/HomekitRoot.java index df77afd07..1750a8d3e 100644 --- a/src/main/java/io/github/hapjava/server/impl/HomekitRoot.java +++ b/src/main/java/io/github/hapjava/server/impl/HomekitRoot.java @@ -36,6 +36,8 @@ public class HomekitRoot { private final SubscriptionManager subscriptions = new SubscriptionManager(); private boolean started = false; private int configurationIndex = 1; + private int nestedBatches = 0; + private boolean madeChanges = false; HomekitRoot( String label, HomekitWebHandler webHandler, InetAddress host, HomekitAuthInfo authInfo) @@ -65,7 +67,7 @@ public class HomekitRoot { this.authInfo = authInfo; this.label = label; this.category = category; - this.registry = new HomekitRegistry(label); + this.registry = new HomekitRegistry(label, subscriptions); } HomekitRoot( @@ -83,11 +85,27 @@ public class HomekitRoot { this( label, DEFAULT_ACCESSORY_CATEGORY, webHandler, authInfo, new JmdnsHomekitAdvertiser(jmdns)); } + /** - * Add an accessory to be handled and advertised by this root. Any existing HomeKit connections - * will be terminated to allow the clients to reconnect and see the updated accessory list. When - * using this for a bridge, the ID of the accessory must be greater than 1, as that ID is reserved - * for the Bridge itself. + * Begin a batch update of accessories. + * + *

After calling this, you can call addAccessory() and removeAccessory() multiple times without + * causing HAP-Java to re-publishing the metadata to HomeKit. You'll need to call + * completeUpdateBatch in order to publish all accumulated changes. + */ + public synchronized void batchUpdate() { + if (this.nestedBatches == 0) madeChanges = false; + ++this.nestedBatches; + } + + /** Publish accumulated accessory changes since batchUpdate() was called. */ + public synchronized void completeUpdateBatch() { + if (--this.nestedBatches == 0 && madeChanges) registry.reset(); + } + + /** + * Add an accessory to be handled and advertised by this root. When using this for a bridge, the + * ID of the accessory must be greater than 1, as that ID is reserved for the Bridge itself. * * @param accessory to advertise and handle. */ @@ -110,25 +128,28 @@ void addAccessorySkipRangeCheck(HomekitAccessory accessory) { if (logger.isTraceEnabled()) { accessory.getName().thenAccept(name -> logger.trace("Added accessory {}", name)); } - if (started) { + madeChanges = true; + if (started && nestedBatches == 0) { registry.reset(); } } /** - * Removes an accessory from being handled or advertised by this root. Any existing HomeKit - * connections will be terminated to allow the clients to reconnect and see the updated accessory - * list. + * Removes an accessory from being handled or advertised by this root. * * @param accessory accessory to cease advertising and handling */ public void removeAccessory(HomekitAccessory accessory) { - this.registry.remove(accessory); - if (logger.isTraceEnabled()) { - accessory.getName().thenAccept(name -> logger.trace("Removed accessory {}", name)); - } - if (started) { - registry.reset(); + if (this.registry.remove(accessory)) { + if (logger.isTraceEnabled()) { + accessory.getName().thenAccept(name -> logger.trace("Removed accessory {}", name)); + } + madeChanges = true; + if (started && nestedBatches == 0) { + registry.reset(); + } + } else { + accessory.getName().thenAccept(name -> logger.warn("Could not remove accessory {}", name)); } } @@ -140,6 +161,7 @@ public void removeAccessory(HomekitAccessory accessory) { */ public void start() { started = true; + madeChanges = false; registry.reset(); webHandler .start( diff --git a/src/main/java/io/github/hapjava/server/impl/connections/SubscriptionManager.java b/src/main/java/io/github/hapjava/server/impl/connections/SubscriptionManager.java index 25f68526c..9399295fb 100644 --- a/src/main/java/io/github/hapjava/server/impl/connections/SubscriptionManager.java +++ b/src/main/java/io/github/hapjava/server/impl/connections/SubscriptionManager.java @@ -1,15 +1,18 @@ package io.github.hapjava.server.impl.connections; +import io.github.hapjava.characteristics.Characteristic; import io.github.hapjava.characteristics.EventableCharacteristic; +import io.github.hapjava.server.impl.HomekitRegistry; import io.github.hapjava.server.impl.http.HomekitClientConnection; import io.github.hapjava.server.impl.http.HttpResponse; import io.github.hapjava.server.impl.json.EventController; import java.util.ArrayList; -import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; +import java.util.List; +import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,12 +20,22 @@ public class SubscriptionManager { private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionManager.class); - private final ConcurrentMap> subscriptions = - new ConcurrentHashMap<>(); - private final ConcurrentMap> reverse = - new ConcurrentHashMap<>(); - private final ConcurrentMap> - pendingNotifications = new ConcurrentHashMap<>(); + private static class ConnectionsWithIds { + Set connections; + int aid, iid; + + ConnectionsWithIds(int aid, int iid) { + this.aid = aid; + this.iid = iid; + this.connections = new HashSet<>(); + } + } + + private final Map subscriptions = new HashMap<>(); + private final Map> reverse = + new HashMap<>(); + private final Map> pendingNotifications = + new HashMap<>(); private int nestedBatches = 0; public synchronized void addSubscription( @@ -31,96 +44,111 @@ public synchronized void addSubscription( EventableCharacteristic characteristic, HomekitClientConnection connection) { synchronized (this) { - if (!subscriptions.containsKey(characteristic)) { - subscriptions.putIfAbsent(characteristic, newSet()); - } - subscriptions.get(characteristic).add(connection); - if (subscriptions.get(characteristic).size() == 1) { - characteristic.subscribe( - () -> { - publish(aid, iid, characteristic); - }); + ConnectionsWithIds subscribers; + if (subscriptions.containsKey(characteristic)) { + subscribers = subscriptions.get(characteristic); + } else { + subscribers = new ConnectionsWithIds(aid, iid); + subscriptions.put(characteristic, subscribers); + subscribe(aid, iid, characteristic); } + subscribers.connections.add(connection); if (!reverse.containsKey(connection)) { - reverse.putIfAbsent(connection, newSet()); + reverse.put(connection, new HashSet<>()); } reverse.get(connection).add(characteristic); LOGGER.trace( - "Added subscription to " + characteristic.getClass() + " for " + connection.hashCode()); + "Added subscription to {}:{} ({}) for {}", + aid, + iid, + characteristic.getClass().getSimpleName(), + connection.hashCode()); } } public synchronized void removeSubscription( EventableCharacteristic characteristic, HomekitClientConnection connection) { - Set subscriptions = this.subscriptions.get(characteristic); - if (subscriptions != null) { - subscriptions.remove(connection); - if (subscriptions.size() == 0) { + ConnectionsWithIds subscribers = subscriptions.get(characteristic); + if (subscribers != null) { + subscribers.connections.remove(connection); + if (subscribers.connections.isEmpty()) { + LOGGER.trace("Unsubscribing from characteristic as all subscriptions are closed"); characteristic.unsubscribe(); + subscriptions.remove(characteristic); + } + + // Remove pending notifications for this no-longer-subscribed characteristic + List connectionNotifications = pendingNotifications.get(connection); + if (connectionNotifications != null) { + connectionNotifications.removeIf(n -> n.aid == subscribers.aid && n.iid == subscribers.iid); + if (connectionNotifications.isEmpty()) pendingNotifications.remove(connection); } + + LOGGER.trace( + "Removed subscription from {}:{} ({}) for {}", + subscribers.aid, + subscribers.iid, + characteristic.getClass().getSimpleName(), + connection.hashCode()); } Set reverse = this.reverse.get(connection); if (reverse != null) { reverse.remove(characteristic); + if (reverse.isEmpty()) this.reverse.remove(connection); } - LOGGER.trace( - "Removed subscription to " + characteristic.getClass() + " for " + connection.hashCode()); } public synchronized void removeConnection(HomekitClientConnection connection) { - Set characteristics = reverse.remove(connection); + removeConnection(connection, reverse.remove(connection)); + } + + private void removeConnection( + HomekitClientConnection connection, Set characteristics) { pendingNotifications.remove(connection); if (characteristics != null) { for (EventableCharacteristic characteristic : characteristics) { - Set characteristicSubscriptions = - subscriptions.get(characteristic); - characteristicSubscriptions.remove(connection); - if (characteristicSubscriptions.isEmpty()) { - LOGGER.trace("Unsubscribing from characteristic as all subscriptions are closed"); - characteristic.unsubscribe(); - subscriptions.remove(characteristic); - } + removeSubscription(characteristic, connection); } } LOGGER.trace("Removed connection {}", connection.hashCode()); } - private Set newSet() { - return Collections.newSetFromMap(new ConcurrentHashMap()); - } - public synchronized void batchUpdate() { ++this.nestedBatches; } public synchronized void completeUpdateBatch() { - if (--this.nestedBatches == 0 && !pendingNotifications.isEmpty()) { - LOGGER.trace("Publishing batched changes"); - for (ConcurrentMap.Entry> entry : - pendingNotifications.entrySet()) { - try { - HttpResponse message = new EventController().getMessage(entry.getValue()); - entry.getKey().outOfBand(message); - } catch (Exception e) { - LOGGER.warn("Failed to create new event message", e); - } + if (--this.nestedBatches == 0) flushUpdateBatch(); + } + + private void flushUpdateBatch() { + if (pendingNotifications.isEmpty()) return; + + LOGGER.trace("Publishing batched changes"); + for (Map.Entry> entry : + pendingNotifications.entrySet()) { + try { + HttpResponse message = new EventController().getMessage(entry.getValue()); + entry.getKey().outOfBand(message); + } catch (Exception e) { + LOGGER.warn("Failed to create new event message", e); } - pendingNotifications.clear(); } + pendingNotifications.clear(); } public synchronized void publish(int accessoryId, int iid, EventableCharacteristic changed) { - final Set subscribers = subscriptions.get(changed); - if ((subscribers == null) || (subscribers.isEmpty())) { - LOGGER.debug("No subscribers to characteristic {} at accessory {} ", changed, accessoryId); + final ConnectionsWithIds subscribers = subscriptions.get(changed); + if (subscribers == null || subscribers.connections.isEmpty()) { + LOGGER.trace("No subscribers to characteristic {} at accessory {} ", changed, accessoryId); return; // no subscribers } if (nestedBatches != 0) { LOGGER.trace("Batching change for accessory {} and characteristic {} " + accessoryId, iid); PendingNotification notification = new PendingNotification(accessoryId, iid, changed); - for (HomekitClientConnection connection : subscribers) { + for (HomekitClientConnection connection : subscribers.connections) { if (!pendingNotifications.containsKey(connection)) { pendingNotifications.put(connection, new ArrayList()); } @@ -132,7 +160,7 @@ public synchronized void publish(int accessoryId, int iid, EventableCharacterist try { HttpResponse message = new EventController().getMessage(accessoryId, iid, changed); LOGGER.trace("Publishing change for " + accessoryId); - for (HomekitClientConnection connection : subscribers) { + for (HomekitClientConnection connection : subscribers.connections) { connection.outOfBand(message); } } catch (Exception e) { @@ -140,14 +168,89 @@ public synchronized void publish(int accessoryId, int iid, EventableCharacterist } } + /** + * The accessory registry has changed; go through all subscriptions and link to any new/changed + * characteristics + */ + public synchronized void resync(HomekitRegistry registry) { + LOGGER.trace("Resyncing subscriptions"); + flushUpdateBatch(); + + Map newSubscriptions = new HashMap<>(); + Iterator> i = + subscriptions.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry entry = i.next(); + EventableCharacteristic oldCharacteristic = entry.getKey(); + ConnectionsWithIds subscribers = entry.getValue(); + Characteristic newCharacteristic = + registry.getCharacteristics(subscribers.aid).get(subscribers.iid); + if (newCharacteristic == null || newCharacteristic.getType() != oldCharacteristic.getType()) { + // characteristic is gone or has completely changed; drop all subscriptions for it + LOGGER.trace( + "{}:{} ({}) has gone missing; dropping subscriptions.", + subscribers.aid, + subscribers.iid, + oldCharacteristic.getClass().getSimpleName()); + i.remove(); + for (HomekitClientConnection conn : subscribers.connections) { + removeSubscription(oldCharacteristic, conn); + } + } else if (newCharacteristic != oldCharacteristic) { + EventableCharacteristic newEventableCharacteristic = + (EventableCharacteristic) newCharacteristic; + LOGGER.trace( + "{}:{} has been replaced by a compatible characteristic; re-connecting subscriptions", + subscribers.aid, + subscribers.iid); + // characteristic has been replaced by another instance of the same thing; + // re-connect subscriptions + i.remove(); + oldCharacteristic.unsubscribe(); + subscribe(subscribers.aid, subscribers.iid, newEventableCharacteristic); + // we can't replace the key, and we can't add to the map while we're iterating, + // so save it off to a temporary map that we'll add them all at the end + newSubscriptions.put(newEventableCharacteristic, subscribers); + + for (HomekitClientConnection conn : subscribers.connections) { + Set subscribedCharacteristics = reverse.get(conn); + subscribedCharacteristics.remove(oldCharacteristic); + subscribedCharacteristics.add(newEventableCharacteristic); + + // and also update references for any pending notifications, so they'll get the proper + // value + List connectionPendingNotifications = pendingNotifications.get(conn); + if (connectionPendingNotifications != null) { + for (PendingNotification notification : connectionPendingNotifications) { + if (notification.characteristic == oldCharacteristic) { + notification.characteristic = newEventableCharacteristic; + } + } + } + } + } + } + subscriptions.putAll(newSubscriptions); + } + + private void subscribe(int aid, int iid, EventableCharacteristic characteristic) { + characteristic.subscribe( + () -> { + publish(aid, iid, characteristic); + }); + } + /** Remove all existing subscriptions */ - public void removeAll() { + public synchronized void removeAll() { LOGGER.trace("Removing {} reverse connections from subscription manager", reverse.size()); - Iterator i = reverse.keySet().iterator(); + Iterator>> i = + reverse.entrySet().iterator(); while (i.hasNext()) { - HomekitClientConnection connection = i.next(); + Map.Entry> entry = i.next(); + HomekitClientConnection connection = entry.getKey(); LOGGER.trace("Removing connection {}", connection.hashCode()); - removeConnection(connection); + i.remove(); + removeConnection(connection, entry.getValue()); } LOGGER.trace("Subscription sizes are {} and {}", reverse.size(), subscriptions.size()); } diff --git a/src/main/java/io/github/hapjava/server/impl/json/EventController.java b/src/main/java/io/github/hapjava/server/impl/json/EventController.java index 41473f716..0c7c2bc7c 100644 --- a/src/main/java/io/github/hapjava/server/impl/json/EventController.java +++ b/src/main/java/io/github/hapjava/server/impl/json/EventController.java @@ -4,7 +4,7 @@ import io.github.hapjava.server.impl.connections.PendingNotification; import io.github.hapjava.server.impl.http.HttpResponse; import java.io.ByteArrayOutputStream; -import java.util.ArrayList; +import java.util.List; import javax.json.Json; import javax.json.JsonArrayBuilder; import javax.json.JsonObject; @@ -34,7 +34,7 @@ public HttpResponse getMessage(int accessoryId, int iid, EventableCharacteristic } } - public HttpResponse getMessage(ArrayList notifications) throws Exception { + public HttpResponse getMessage(List notifications) throws Exception { JsonArrayBuilder characteristics = Json.createArrayBuilder(); for (PendingNotification notification : notifications) {