Skip to content

Commit 25d5d3e

Browse files
committed
feat: FSM 1
1 parent 792f3aa commit 25d5d3e

File tree

4 files changed

+283
-8
lines changed

4 files changed

+283
-8
lines changed

hivemq-edge/src/main/java/com/hivemq/protocols/fsm/ProtocolAdapterInstance.java

Lines changed: 54 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,34 +16,80 @@
1616

1717
package com.hivemq.protocols.fsm;
1818

19+
import com.hivemq.adapter.sdk.api.ProtocolAdapter;
1920
import org.jetbrains.annotations.NotNull;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
2023

2124
public class ProtocolAdapterInstance {
22-
protected @NotNull ProtocolAdapterState state;
23-
protected @NotNull ProtocolAdapterConnectionState connectionState;
25+
private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolAdapterInstance.class);
26+
protected final @NotNull ProtocolAdapter adapter;
27+
protected volatile @NotNull ProtocolAdapterState state;
28+
protected volatile @NotNull ProtocolAdapterConnectionState northboundConnectionState;
29+
protected volatile @NotNull ProtocolAdapterConnectionState southboundConnectionState;
2430

25-
public ProtocolAdapterInstance() {
26-
connectionState = ProtocolAdapterConnectionState.Closed;
31+
public ProtocolAdapterInstance(final @NotNull ProtocolAdapter adapter) {
32+
this.adapter = adapter;
33+
northboundConnectionState = ProtocolAdapterConnectionState.Closed;
34+
southboundConnectionState = ProtocolAdapterConnectionState.Closed;
2735
state = ProtocolAdapterState.Stopped;
2836
}
2937

38+
public @NotNull ProtocolAdapterConnectionState getSouthboundConnectionState() {
39+
return southboundConnectionState;
40+
}
41+
3042
public @NotNull ProtocolAdapterState getState() {
3143
return state;
3244
}
3345

34-
public @NotNull ProtocolAdapterConnectionState getConnectionState() {
35-
return connectionState;
46+
public @NotNull ProtocolAdapterConnectionState getNorthboundConnectionState() {
47+
return northboundConnectionState;
48+
}
49+
50+
public @NotNull String getAdapterId() {
51+
return adapter.getId();
52+
}
53+
54+
public void start() {
55+
final ProtocolAdapterTransitionResponse response = transitionTo(ProtocolAdapterState.Starting);
56+
if (response.status().isSuccess()) {
57+
startNorthbound();
58+
startSouthbound();
59+
}
60+
}
61+
62+
public void stop() {
63+
transitionTo(ProtocolAdapterState.Stopping);
64+
}
65+
66+
protected void startNorthbound() {
67+
68+
}
69+
70+
protected void startSouthbound() {
71+
3672
}
3773

3874
public synchronized @NotNull ProtocolAdapterTransitionResponse transitionTo(final @NotNull ProtocolAdapterState newState) {
39-
final ProtocolAdapterTransitionResponse response = state.transition(newState, this);
40-
this.state = response.toState();
75+
final ProtocolAdapterState fromState = state;
76+
final ProtocolAdapterTransitionResponse response = fromState.transition(newState, this);
77+
state = response.toState();
4178
switch (response.status()) {
4279
case Success -> {
80+
LOGGER.debug("Protocol adapter '{}' transitioned from {} to {} successfully.",
81+
fromState,
82+
state,
83+
getAdapterId());
4384
}
4485
case Failure -> {
86+
LOGGER.error("Protocol adapter '{}' failed to transition from {} to {}.",
87+
fromState,
88+
state,
89+
getAdapterId());
4590
}
4691
case NotChanged -> {
92+
LOGGER.warn("Protocol adapter '{}' state {} is unchanged.", state, getAdapterId());
4793
}
4894
}
4995
return response;

hivemq-edge/src/main/java/com/hivemq/protocols/fsm/ProtocolAdapterOperator.java

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,206 @@
1616

1717
package com.hivemq.protocols.fsm;
1818

19+
import com.codahale.metrics.MetricRegistry;
20+
import com.google.common.collect.Sets;
21+
import com.hivemq.adapter.sdk.api.events.EventService;
22+
import com.hivemq.adapter.sdk.api.events.model.Event;
23+
import com.hivemq.configuration.entity.adapter.ProtocolAdapterEntity;
24+
import com.hivemq.configuration.reader.ProtocolAdapterExtractor;
25+
import com.hivemq.edge.HiveMQEdgeRemoteService;
26+
import com.hivemq.edge.VersionProvider;
27+
import com.hivemq.edge.modules.adapters.data.TagManager;
28+
import com.hivemq.edge.modules.adapters.impl.ModuleServicesImpl;
29+
import com.hivemq.edge.modules.api.adapters.ProtocolAdapterPollingService;
30+
import com.hivemq.protocols.InternalProtocolAdapterWritingService;
31+
import com.hivemq.protocols.ProtocolAdapterConfig;
32+
import com.hivemq.protocols.ProtocolAdapterConfigConverter;
33+
import com.hivemq.protocols.ProtocolAdapterFactoryManager;
34+
import com.hivemq.protocols.ProtocolAdapterMetrics;
35+
import com.hivemq.protocols.northbound.NorthboundConsumerFactory;
36+
import org.jetbrains.annotations.NotNull;
37+
import org.slf4j.Logger;
38+
import org.slf4j.LoggerFactory;
39+
40+
import java.util.ArrayList;
41+
import java.util.HashSet;
42+
import java.util.List;
43+
import java.util.Map;
44+
import java.util.Set;
45+
import java.util.concurrent.ConcurrentHashMap;
46+
import java.util.concurrent.ExecutionException;
47+
import java.util.concurrent.ExecutorService;
48+
import java.util.concurrent.Executors;
49+
import java.util.function.Function;
50+
import java.util.stream.Collectors;
51+
52+
1953
public class ProtocolAdapterOperator {
54+
private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolAdapterOperator.class);
55+
56+
private final @NotNull Map<String, ProtocolAdapterInstance> protocolAdapterMap;
57+
private final @NotNull MetricRegistry metricRegistry;
58+
private final @NotNull ModuleServicesImpl moduleServices;
59+
private final @NotNull HiveMQEdgeRemoteService remoteService;
60+
private final @NotNull EventService eventService;
61+
private final @NotNull ProtocolAdapterConfigConverter configConverter;
62+
private final @NotNull VersionProvider versionProvider;
63+
private final @NotNull ProtocolAdapterPollingService protocolAdapterPollingService;
64+
private final @NotNull ProtocolAdapterMetrics protocolAdapterMetrics;
65+
private final @NotNull InternalProtocolAdapterWritingService protocolAdapterWritingService;
66+
private final @NotNull ProtocolAdapterFactoryManager protocolAdapterFactoryManager;
67+
private final @NotNull NorthboundConsumerFactory northboundConsumerFactory;
68+
private final @NotNull TagManager tagManager;
69+
private final @NotNull ProtocolAdapterExtractor protocolAdapterConfig;
70+
private final @NotNull ExecutorService executorService;
71+
private volatile @NotNull ProtocolAdapterOperatorState state;
72+
73+
public ProtocolAdapterOperator(
74+
final @NotNull MetricRegistry metricRegistry,
75+
final @NotNull ModuleServicesImpl moduleServices,
76+
final @NotNull HiveMQEdgeRemoteService remoteService,
77+
final @NotNull EventService eventService,
78+
final @NotNull ProtocolAdapterConfigConverter configConverter,
79+
final @NotNull VersionProvider versionProvider,
80+
final @NotNull ProtocolAdapterPollingService protocolAdapterPollingService,
81+
final @NotNull ProtocolAdapterMetrics protocolAdapterMetrics,
82+
final @NotNull InternalProtocolAdapterWritingService protocolAdapterWritingService,
83+
final @NotNull ProtocolAdapterFactoryManager protocolAdapterFactoryManager,
84+
final @NotNull NorthboundConsumerFactory northboundConsumerFactory,
85+
final @NotNull TagManager tagManager,
86+
final @NotNull ProtocolAdapterExtractor protocolAdapterConfig) {
87+
this.protocolAdapterMap = new ConcurrentHashMap<>();
88+
this.metricRegistry = metricRegistry;
89+
this.moduleServices = moduleServices;
90+
this.remoteService = remoteService;
91+
this.eventService = eventService;
92+
this.configConverter = configConverter;
93+
this.versionProvider = versionProvider;
94+
this.protocolAdapterPollingService = protocolAdapterPollingService;
95+
this.protocolAdapterMetrics = protocolAdapterMetrics;
96+
this.protocolAdapterWritingService = protocolAdapterWritingService;
97+
this.protocolAdapterFactoryManager = protocolAdapterFactoryManager;
98+
this.northboundConsumerFactory = northboundConsumerFactory;
99+
this.tagManager = tagManager;
100+
this.protocolAdapterConfig = protocolAdapterConfig;
101+
this.executorService = Executors.newSingleThreadExecutor();
102+
this.state = ProtocolAdapterOperatorState.Idle;
103+
Runtime.getRuntime().addShutdownHook(new Thread(executorService::shutdown));
104+
protocolAdapterWritingService.addWritingChangedCallback(() -> protocolAdapterFactoryManager.writingEnabledChanged(
105+
protocolAdapterWritingService.writingEnabled()));
106+
}
107+
108+
public @NotNull ProtocolAdapterOperatorState getState() {
109+
return state;
110+
}
111+
112+
public void start() {
113+
if (LOGGER.isDebugEnabled()) {
114+
LOGGER.debug("Starting adapters");
115+
}
116+
protocolAdapterConfig.registerConsumer(this::refresh);
117+
}
118+
119+
public void refresh(final @NotNull List<ProtocolAdapterEntity> configs) {
120+
executorService.submit(() -> {
121+
state = ProtocolAdapterOperatorState.Running;
122+
LOGGER.info("Refreshing adapters");
123+
124+
final Map<String, ProtocolAdapterConfig> protocolAdapterConfigs = configs.stream()
125+
.map(configConverter::fromEntity)
126+
.collect(Collectors.toMap(ProtocolAdapterConfig::getAdapterId, Function.identity()));
127+
128+
final Set<String> oldProtocolAdapterIdSet = new HashSet<>(protocolAdapterMap.keySet());
129+
final Set<String> newProtocolAdapterIdSet = new HashSet<>(protocolAdapterConfigs.keySet());
130+
131+
final Set<String> toBeDeletedProtocolAdapterIdSet =
132+
new HashSet<>(Sets.difference(oldProtocolAdapterIdSet, newProtocolAdapterIdSet));
133+
final Set<String> toBeCreatedProtocolAdapterIdSet =
134+
new HashSet<>(Sets.difference(newProtocolAdapterIdSet, oldProtocolAdapterIdSet));
135+
final Set<String> toBeUpdatedProtocolAdapterIdSet =
136+
new HashSet<>(Sets.intersection(newProtocolAdapterIdSet, oldProtocolAdapterIdSet));
137+
138+
final List<String> failedAdapters = new ArrayList<>();
139+
140+
toBeDeletedProtocolAdapterIdSet.forEach(adapterId -> {
141+
// try {
142+
// if (LOGGER.isDebugEnabled()) {
143+
// LOGGER.debug("Deleting adapter '{}'", adapterId);
144+
// }
145+
// stopAsync(adapterId, true).whenComplete((ignored, t) -> deleteAdapterInternal(adapterId)).get();
146+
// } catch (final InterruptedException e) {
147+
// Thread.currentThread().interrupt();
148+
// failedAdapters.add(adapterId);
149+
// LOGGER.error("Interrupted while deleting adapter {}", adapterId, e);
150+
// } catch (final ExecutionException e) {
151+
// failedAdapters.add(adapterId);
152+
// LOGGER.error("Failed deleting adapter {}", adapterId, e);
153+
// }
154+
});
155+
156+
toBeCreatedProtocolAdapterIdSet.forEach(name -> {
157+
// try {
158+
// if (LOGGER.isDebugEnabled()) {
159+
// LOGGER.debug("Creating adapter '{}'", name);
160+
// }
161+
// startAsync(createAdapterInternal(protocolAdapterConfigs.get(name),
162+
// versionProvider.getVersion())).get();
163+
// } catch (final InterruptedException e) {
164+
// Thread.currentThread().interrupt();
165+
// failedAdapters.add(name);
166+
// LOGGER.error("Interrupted while adding adapter {}", name, e);
167+
// } catch (final ExecutionException e) {
168+
// failedAdapters.add(name);
169+
// LOGGER.error("Failed adding adapter {}", name, e);
170+
// }
171+
});
172+
173+
toBeUpdatedProtocolAdapterIdSet.forEach(name -> {
174+
// try {
175+
// final var wrapper = protocolAdapters.get(name);
176+
// if (wrapper == null) {
177+
// LOGGER.error(
178+
// "Existing adapters were modified while a refresh was ongoing, adapter with name '{}' was deleted and could not be updated",
179+
// name);
180+
// }
181+
// if (wrapper != null && !protocolAdapterConfigs.get(name).equals(wrapper.getConfig())) {
182+
// if (LOGGER.isDebugEnabled()) {
183+
// LOGGER.debug("Updating adapter '{}'", name);
184+
// }
185+
// stopAsync(name, true).thenApply(v -> {
186+
// deleteAdapterInternal(name);
187+
// return null;
188+
// })
189+
// .thenCompose(ignored -> startAsync(createAdapterInternal(protocolAdapterConfigs.get(name),
190+
// versionProvider.getVersion())))
191+
// .get();
192+
// } else {
193+
// if (LOGGER.isDebugEnabled()) {
194+
// LOGGER.debug("Not-updating adapter '{}' since the config is unchanged", name);
195+
// }
196+
// }
197+
// } catch (final InterruptedException e) {
198+
// Thread.currentThread().interrupt();
199+
// failedAdapters.add(name);
200+
// LOGGER.error("Interrupted while updating adapter {}", name, e);
201+
// } catch (final ExecutionException e) {
202+
// failedAdapters.add(name);
203+
// LOGGER.error("Failed updating adapter {}", name, e);
204+
// }
205+
});
206+
207+
if (failedAdapters.isEmpty()) {
208+
eventService.configurationEvent()
209+
.withSeverity(Event.SEVERITY.INFO)
210+
.withMessage("Configuration has been successfully updated")
211+
.fire();
212+
} else {
213+
eventService.configurationEvent()
214+
.withSeverity(Event.SEVERITY.CRITICAL)
215+
.withMessage("Reloading of configuration failed")
216+
.fire();
217+
}
218+
state = ProtocolAdapterOperatorState.Idle;
219+
});
220+
}
20221
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2019-present HiveMQ GmbH
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.hivemq.protocols.fsm;
18+
19+
public enum ProtocolAdapterOperatorState {
20+
Idle,
21+
Running,
22+
;
23+
}

hivemq-edge/src/main/java/com/hivemq/protocols/fsm/ProtocolAdapterTransitionStatus.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,9 @@ public enum ProtocolAdapterTransitionStatus {
2020
Success,
2121
Failure,
2222
NotChanged,
23+
;
24+
25+
public boolean isSuccess() {
26+
return this == Success;
27+
}
2328
}

0 commit comments

Comments
 (0)