1
1
package io .github .hapjava .server .impl .connections ;
2
2
3
+ import io .github .hapjava .characteristics .Characteristic ;
3
4
import io .github .hapjava .characteristics .EventableCharacteristic ;
5
+ import io .github .hapjava .server .impl .HomekitRegistry ;
4
6
import io .github .hapjava .server .impl .http .HomekitClientConnection ;
5
7
import io .github .hapjava .server .impl .http .HttpResponse ;
6
8
import io .github .hapjava .server .impl .json .EventController ;
7
9
import java .util .ArrayList ;
8
- import java .util .Collections ;
10
+ import java .util .HashMap ;
11
+ import java .util .HashSet ;
9
12
import java .util .Iterator ;
13
+ import java .util .List ;
14
+ import java .util .Map ;
10
15
import java .util .Set ;
11
- import java .util .concurrent .ConcurrentHashMap ;
12
- import java .util .concurrent .ConcurrentMap ;
13
16
import org .slf4j .Logger ;
14
17
import org .slf4j .LoggerFactory ;
15
18
16
19
public class SubscriptionManager {
17
20
18
21
private static final Logger LOGGER = LoggerFactory .getLogger (SubscriptionManager .class );
19
22
20
- private final ConcurrentMap <EventableCharacteristic , Set <HomekitClientConnection >> subscriptions =
21
- new ConcurrentHashMap <>();
22
- private final ConcurrentMap <HomekitClientConnection , Set <EventableCharacteristic >> reverse =
23
- new ConcurrentHashMap <>();
24
- private final ConcurrentMap <HomekitClientConnection , ArrayList <PendingNotification >>
25
- pendingNotifications = new ConcurrentHashMap <>();
23
+ private static class ConnectionsWithIds {
24
+ Set <HomekitClientConnection > connections ;
25
+ int aid , iid ;
26
+
27
+ ConnectionsWithIds (int aid , int iid ) {
28
+ this .aid = aid ;
29
+ this .iid = iid ;
30
+ this .connections = new HashSet <>();
31
+ }
32
+ }
33
+
34
+ private final Map <EventableCharacteristic , ConnectionsWithIds > subscriptions = new HashMap <>();
35
+ private final Map <HomekitClientConnection , Set <EventableCharacteristic >> reverse =
36
+ new HashMap <>();
37
+ private final Map <HomekitClientConnection , List <PendingNotification >> pendingNotifications =
38
+ new HashMap <>();
26
39
private int nestedBatches = 0 ;
27
40
28
41
public synchronized void addSubscription (
@@ -31,96 +44,111 @@ public synchronized void addSubscription(
31
44
EventableCharacteristic characteristic ,
32
45
HomekitClientConnection connection ) {
33
46
synchronized (this ) {
34
- if (!subscriptions .containsKey (characteristic )) {
35
- subscriptions .putIfAbsent (characteristic , newSet ());
36
- }
37
- subscriptions .get (characteristic ).add (connection );
38
- if (subscriptions .get (characteristic ).size () == 1 ) {
39
- characteristic .subscribe (
40
- () -> {
41
- publish (aid , iid , characteristic );
42
- });
47
+ ConnectionsWithIds subscribers ;
48
+ if (subscriptions .containsKey (characteristic )) {
49
+ subscribers = subscriptions .get (characteristic );
50
+ } else {
51
+ subscribers = new ConnectionsWithIds (aid , iid );
52
+ subscriptions .put (characteristic , subscribers );
53
+ subscribe (aid , iid , characteristic );
43
54
}
55
+ subscribers .connections .add (connection );
44
56
45
57
if (!reverse .containsKey (connection )) {
46
- reverse .putIfAbsent (connection , newSet ());
58
+ reverse .put (connection , new HashSet <> ());
47
59
}
48
60
reverse .get (connection ).add (characteristic );
49
61
LOGGER .trace (
50
- "Added subscription to " + characteristic .getClass () + " for " + connection .hashCode ());
62
+ "Added subscription to {}:{} ({}) for {}" ,
63
+ aid ,
64
+ iid ,
65
+ characteristic .getClass ().getSimpleName (),
66
+ connection .hashCode ());
51
67
}
52
68
}
53
69
54
70
public synchronized void removeSubscription (
55
71
EventableCharacteristic characteristic , HomekitClientConnection connection ) {
56
- Set <HomekitClientConnection > subscriptions = this .subscriptions .get (characteristic );
57
- if (subscriptions != null ) {
58
- subscriptions .remove (connection );
59
- if (subscriptions .size () == 0 ) {
72
+ ConnectionsWithIds subscribers = subscriptions .get (characteristic );
73
+ if (subscribers != null ) {
74
+ subscribers .connections .remove (connection );
75
+ if (subscribers .connections .isEmpty ()) {
76
+ LOGGER .trace ("Unsubscribing from characteristic as all subscriptions are closed" );
60
77
characteristic .unsubscribe ();
78
+ subscriptions .remove (characteristic );
79
+ }
80
+
81
+ // Remove pending notifications for this no-longer-subscribed characteristic
82
+ List <PendingNotification > connectionNotifications = pendingNotifications .get (connection );
83
+ if (connectionNotifications != null ) {
84
+ connectionNotifications .removeIf (n -> n .aid == subscribers .aid && n .iid == subscribers .iid );
85
+ if (connectionNotifications .isEmpty ()) pendingNotifications .remove (connection );
61
86
}
87
+
88
+ LOGGER .trace (
89
+ "Removed subscription from {}:{} ({}) for {}" ,
90
+ subscribers .aid ,
91
+ subscribers .iid ,
92
+ characteristic .getClass ().getSimpleName (),
93
+ connection .hashCode ());
62
94
}
63
95
64
96
Set <EventableCharacteristic > reverse = this .reverse .get (connection );
65
97
if (reverse != null ) {
66
98
reverse .remove (characteristic );
99
+ if (reverse .isEmpty ()) this .reverse .remove (connection );
67
100
}
68
- LOGGER .trace (
69
- "Removed subscription to " + characteristic .getClass () + " for " + connection .hashCode ());
70
101
}
71
102
72
103
public synchronized void removeConnection (HomekitClientConnection connection ) {
73
- Set <EventableCharacteristic > characteristics = reverse .remove (connection );
104
+ removeConnection (connection , reverse .remove (connection ));
105
+ }
106
+
107
+ private void removeConnection (
108
+ HomekitClientConnection connection , Set <EventableCharacteristic > characteristics ) {
74
109
pendingNotifications .remove (connection );
75
110
if (characteristics != null ) {
76
111
for (EventableCharacteristic characteristic : characteristics ) {
77
- Set <HomekitClientConnection > characteristicSubscriptions =
78
- subscriptions .get (characteristic );
79
- characteristicSubscriptions .remove (connection );
80
- if (characteristicSubscriptions .isEmpty ()) {
81
- LOGGER .trace ("Unsubscribing from characteristic as all subscriptions are closed" );
82
- characteristic .unsubscribe ();
83
- subscriptions .remove (characteristic );
84
- }
112
+ removeSubscription (characteristic , connection );
85
113
}
86
114
}
87
115
LOGGER .trace ("Removed connection {}" , connection .hashCode ());
88
116
}
89
117
90
- private <T > Set <T > newSet () {
91
- return Collections .newSetFromMap (new ConcurrentHashMap <T , Boolean >());
92
- }
93
-
94
118
public synchronized void batchUpdate () {
95
119
++this .nestedBatches ;
96
120
}
97
121
98
122
public synchronized void completeUpdateBatch () {
99
- if (--this .nestedBatches == 0 && !pendingNotifications .isEmpty ()) {
100
- LOGGER .trace ("Publishing batched changes" );
101
- for (ConcurrentMap .Entry <HomekitClientConnection , ArrayList <PendingNotification >> entry :
102
- pendingNotifications .entrySet ()) {
103
- try {
104
- HttpResponse message = new EventController ().getMessage (entry .getValue ());
105
- entry .getKey ().outOfBand (message );
106
- } catch (Exception e ) {
107
- LOGGER .warn ("Failed to create new event message" , e );
108
- }
123
+ if (--this .nestedBatches == 0 ) flushUpdateBatch ();
124
+ }
125
+
126
+ private void flushUpdateBatch () {
127
+ if (pendingNotifications .isEmpty ()) return ;
128
+
129
+ LOGGER .trace ("Publishing batched changes" );
130
+ for (Map .Entry <HomekitClientConnection , List <PendingNotification >> entry :
131
+ pendingNotifications .entrySet ()) {
132
+ try {
133
+ HttpResponse message = new EventController ().getMessage (entry .getValue ());
134
+ entry .getKey ().outOfBand (message );
135
+ } catch (Exception e ) {
136
+ LOGGER .warn ("Failed to create new event message" , e );
109
137
}
110
- pendingNotifications .clear ();
111
138
}
139
+ pendingNotifications .clear ();
112
140
}
113
141
114
142
public synchronized void publish (int accessoryId , int iid , EventableCharacteristic changed ) {
115
- final Set < HomekitClientConnection > subscribers = subscriptions .get (changed );
116
- if (( subscribers == null ) || ( subscribers .isEmpty () )) {
117
- LOGGER .debug ("No subscribers to characteristic {} at accessory {} " , changed , accessoryId );
143
+ final ConnectionsWithIds subscribers = subscriptions .get (changed );
144
+ if (subscribers == null || subscribers .connections . isEmpty ()) {
145
+ LOGGER .trace ("No subscribers to characteristic {} at accessory {} " , changed , accessoryId );
118
146
return ; // no subscribers
119
147
}
120
148
if (nestedBatches != 0 ) {
121
149
LOGGER .trace ("Batching change for accessory {} and characteristic {} " + accessoryId , iid );
122
150
PendingNotification notification = new PendingNotification (accessoryId , iid , changed );
123
- for (HomekitClientConnection connection : subscribers ) {
151
+ for (HomekitClientConnection connection : subscribers . connections ) {
124
152
if (!pendingNotifications .containsKey (connection )) {
125
153
pendingNotifications .put (connection , new ArrayList <PendingNotification >());
126
154
}
@@ -132,22 +160,97 @@ public synchronized void publish(int accessoryId, int iid, EventableCharacterist
132
160
try {
133
161
HttpResponse message = new EventController ().getMessage (accessoryId , iid , changed );
134
162
LOGGER .trace ("Publishing change for " + accessoryId );
135
- for (HomekitClientConnection connection : subscribers ) {
163
+ for (HomekitClientConnection connection : subscribers . connections ) {
136
164
connection .outOfBand (message );
137
165
}
138
166
} catch (Exception e ) {
139
167
LOGGER .warn ("Failed to create new event message" , e );
140
168
}
141
169
}
142
170
171
+ /**
172
+ * The accessory registry has changed; go through all subscriptions and link to any new/changed
173
+ * characteristics
174
+ */
175
+ public synchronized void resync (HomekitRegistry registry ) {
176
+ LOGGER .trace ("Resyncing subscriptions" );
177
+ flushUpdateBatch ();
178
+
179
+ Map <EventableCharacteristic , ConnectionsWithIds > newSubscriptions = new HashMap <>();
180
+ Iterator <Map .Entry <EventableCharacteristic , ConnectionsWithIds >> i =
181
+ subscriptions .entrySet ().iterator ();
182
+ while (i .hasNext ()) {
183
+ Map .Entry <EventableCharacteristic , ConnectionsWithIds > entry = i .next ();
184
+ EventableCharacteristic oldCharacteristic = entry .getKey ();
185
+ ConnectionsWithIds subscribers = entry .getValue ();
186
+ Characteristic newCharacteristic =
187
+ registry .getCharacteristics (subscribers .aid ).get (subscribers .iid );
188
+ if (newCharacteristic == null || newCharacteristic .getType () != oldCharacteristic .getType ()) {
189
+ // characteristic is gone or has completely changed; drop all subscriptions for it
190
+ LOGGER .trace (
191
+ "{}:{} ({}) has gone missing; dropping subscriptions." ,
192
+ subscribers .aid ,
193
+ subscribers .iid ,
194
+ oldCharacteristic .getClass ().getSimpleName ());
195
+ i .remove ();
196
+ for (HomekitClientConnection conn : subscribers .connections ) {
197
+ removeSubscription (oldCharacteristic , conn );
198
+ }
199
+ } else if (newCharacteristic != oldCharacteristic ) {
200
+ EventableCharacteristic newEventableCharacteristic =
201
+ (EventableCharacteristic ) newCharacteristic ;
202
+ LOGGER .trace (
203
+ "{}:{} has been replaced by a compatible characteristic; re-connecting subscriptions" ,
204
+ subscribers .aid ,
205
+ subscribers .iid );
206
+ // characteristic has been replaced by another instance of the same thing;
207
+ // re-connect subscriptions
208
+ i .remove ();
209
+ oldCharacteristic .unsubscribe ();
210
+ subscribe (subscribers .aid , subscribers .iid , newEventableCharacteristic );
211
+ // we can't replace the key, and we can't add to the map while we're iterating,
212
+ // so save it off to a temporary map that we'll add them all at the end
213
+ newSubscriptions .put (newEventableCharacteristic , subscribers );
214
+
215
+ for (HomekitClientConnection conn : subscribers .connections ) {
216
+ Set <EventableCharacteristic > subscribedCharacteristics = reverse .get (conn );
217
+ subscribedCharacteristics .remove (oldCharacteristic );
218
+ subscribedCharacteristics .add (newEventableCharacteristic );
219
+
220
+ // and also update references for any pending notifications, so they'll get the proper
221
+ // value
222
+ List <PendingNotification > connectionPendingNotifications = pendingNotifications .get (conn );
223
+ if (connectionPendingNotifications != null ) {
224
+ for (PendingNotification notification : connectionPendingNotifications ) {
225
+ if (notification .characteristic == oldCharacteristic ) {
226
+ notification .characteristic = newEventableCharacteristic ;
227
+ }
228
+ }
229
+ }
230
+ }
231
+ }
232
+ }
233
+ subscriptions .putAll (newSubscriptions );
234
+ }
235
+
236
+ private void subscribe (int aid , int iid , EventableCharacteristic characteristic ) {
237
+ characteristic .subscribe (
238
+ () -> {
239
+ publish (aid , iid , characteristic );
240
+ });
241
+ }
242
+
143
243
/** Remove all existing subscriptions */
144
- public void removeAll () {
244
+ public synchronized void removeAll () {
145
245
LOGGER .trace ("Removing {} reverse connections from subscription manager" , reverse .size ());
146
- Iterator <HomekitClientConnection > i = reverse .keySet ().iterator ();
246
+ Iterator <Map .Entry <HomekitClientConnection , Set <EventableCharacteristic >>> i =
247
+ reverse .entrySet ().iterator ();
147
248
while (i .hasNext ()) {
148
- HomekitClientConnection connection = i .next ();
249
+ Map .Entry <HomekitClientConnection , Set <EventableCharacteristic >> entry = i .next ();
250
+ HomekitClientConnection connection = entry .getKey ();
149
251
LOGGER .trace ("Removing connection {}" , connection .hashCode ());
150
- removeConnection (connection );
252
+ i .remove ();
253
+ removeConnection (connection , entry .getValue ());
151
254
}
152
255
LOGGER .trace ("Subscription sizes are {} and {}" , reverse .size (), subscriptions .size ());
153
256
}
0 commit comments