Skip to content

Commit

Permalink
NMS-17727: Some more refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
cgorantla committed Mar 7, 2025
1 parent f78099e commit 25a8e69
Show file tree
Hide file tree
Showing 14 changed files with 131 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public synchronized void startGrpcConnection() throws SSLException {
.build();
LOG.info("TLS disabled, using plain text");
}
LOG.info("Grpc client started connection to {} with channel {} ", this.host, this.channel);
LOG.info("Grpc client started connection to {}", this.host);
}

public synchronized void stopGrpcConnection() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ public void setInventoryCallback(Callback inventoryCallback) {
private synchronized void initializeStreams() {
if (super.getChannelState().equals(ConnectivityState.READY)) {
try {

LOG.info("monitoredServiceSyncStub {}.", this.monitoredServiceSyncStub);
this.inventoryUpdateStream =
this.monitoredServiceSyncStub.inventoryUpdate(new LoggingAckReceiver("monitored_service_inventory_update", this));
this.stateUpdateStream =
Expand All @@ -111,7 +109,7 @@ private synchronized void initializeStreams() {
this.monitoredServiceSyncStub.heartBeatUpdate(new LoggingAckReceiver("heartbeat_update", this));
this.scheduler.shutdown();
this.scheduler = null;
LOG.info("Streams initialized successfully.");
LOG.info("BSM Streams initialized successfully.");
reconnecting.set(false);
// While connecting, reconnecting, send callback to inventory service.
if (inventoryCallback != null) {
Expand Down Expand Up @@ -180,18 +178,18 @@ private LoggingAckReceiver(final String type, BsmGrpcClient client) {

@Override
public void onNext(final Empty value) {
LOG.debug("Received ACK {}", this.type);
LOG.debug("BSM gRPC Client : Received ACK {}", this.type);
}

@Override
public void onError(final Throwable t) {
LOG.error("Received error {}", this.type, t);
LOG.error("BSM gRPC Client : Received error {}", this.type, t);
client.reconnectStreams();
}

@Override
public void onCompleted() {
LOG.info("Completed {}", this.type);
LOG.info("BSM gRPC Client : Completed {}", this.type);
client.reconnectStreams();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,18 @@
import java.util.List;
import java.util.Objects;

public class InventoryEventHandler implements EventListener {
private static final Logger LOG = LoggerFactory.getLogger(InventoryEventHandler.class);
public class BsmInventoryExporter implements EventListener {
private static final Logger LOG = LoggerFactory.getLogger(BsmInventoryExporter.class);

private final EventSubscriptionService eventSubscriptionService;

private final NodeDao nodeDao;

private final InventoryService inventoryService;
private final BsmInventoryService inventoryService;

public InventoryEventHandler(final EventSubscriptionService eventSubscriptionService,
final NodeDao nodeDao,
final InventoryService inventoryService) {
public BsmInventoryExporter(final EventSubscriptionService eventSubscriptionService,
final NodeDao nodeDao,
final BsmInventoryService inventoryService) {
this.eventSubscriptionService = Objects.requireNonNull(eventSubscriptionService);
this.nodeDao = Objects.requireNonNull(nodeDao);
this.inventoryService = Objects.requireNonNull(inventoryService);
Expand All @@ -72,7 +72,7 @@ public void stop() {

@Override
public String getName() {
return InventoryEventHandler.class.getName();
return BsmInventoryExporter.class.getName();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class InventoryService {
private static final Logger LOG = LoggerFactory.getLogger(InventoryService.class);
public class BsmInventoryService {
private static final Logger LOG = LoggerFactory.getLogger(BsmInventoryService.class);
private final NodeDao nodeDao;
private final RuntimeInfo runtimeInfo;
private final BsmGrpcClient client;
Expand All @@ -50,10 +50,10 @@ public class InventoryService {
private final ScheduledExecutorService heartBeatScheduler =
Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("heartbeat-update"));

public InventoryService(final NodeDao nodeDao,
final RuntimeInfo runtimeInfo,
final BsmGrpcClient client,
final Duration snapshotInterval) {
public BsmInventoryService(final NodeDao nodeDao,
final RuntimeInfo runtimeInfo,
final BsmGrpcClient client,
final Duration snapshotInterval) {
this.nodeDao = Objects.requireNonNull(nodeDao);
this.runtimeInfo = Objects.requireNonNull(runtimeInfo);
this.client = Objects.requireNonNull(client);
Expand All @@ -62,10 +62,10 @@ public InventoryService(final NodeDao nodeDao,
new NamedThreadFactory("inventory-service-snapshot-sender"));
}

public InventoryService(final NodeDao nodeDao,
final RuntimeInfo runtimeInfo,
final BsmGrpcClient client,
final long snapshotInterval) {
public BsmInventoryService(final NodeDao nodeDao,
final RuntimeInfo runtimeInfo,
final BsmGrpcClient client,
final long snapshotInterval) {
this(nodeDao, runtimeInfo, client, Duration.ofSeconds(snapshotInterval));
}

Expand Down Expand Up @@ -103,7 +103,7 @@ public void sendSnapshot() {
.flatMap(iface -> iface.getMonitoredServices().stream()
.map(service -> new MonitoredServiceWithMetadata(node, iface, service))))
.collect(Collectors.toList());
LOG.debug("Send snapshot: services={}", services.size());
LOG.debug("Send BSM snapshot: services={}", services.size());
final var inventory = MonitoredServiceMapper.INSTANCE.toInventoryUpdates(services, this.runtimeInfo, true);
this.client.sendMonitoredServicesInventoryUpdate(inventory);
}
Expand All @@ -123,4 +123,27 @@ public void sendHeartBeatUpdate() {
.setMessage("HeartBeat Update from OpenNMS")
.build());
}

public void sendState(final List<MonitoredServiceWithMetadata> services) {
if (!client.isEnabled()) {
LOG.debug("BSM service disabled, not sending state updates");
return;
}
final var updates = MonitoredServiceMapper.INSTANCE.toStateUpdates(services, this.runtimeInfo);
this.client.sendMonitoredServicesStatusUpdate(updates);
}

public void sendAllState() {
if (!client.isEnabled()) {
LOG.debug("BSM service disabled, not sending all state");
return;
}
final var services = this.nodeDao.getNodes().stream()
.flatMap(node -> node.getIpInterfaces().stream()
.flatMap(iface -> iface.getMonitoredServices().stream()
.map(service -> new MonitoredServiceWithMetadata(node, iface, service))))
.collect(Collectors.toList());

this.sendState(services);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,19 @@
import java.util.Objects;
import java.util.stream.Collectors;

public class StateEventHandler implements EventListener {
private static final Logger LOG = LoggerFactory.getLogger(StateEventHandler.class);
public class ServiceStateExporter implements EventListener {
private static final Logger LOG = LoggerFactory.getLogger(ServiceStateExporter.class);

private final EventSubscriptionService eventSubscriptionService;
private final NodeDao nodeDao;
private final StateService stateService;
private final BsmInventoryService bsmInventoryService;

public StateEventHandler(final EventSubscriptionService eventSubscriptionService,
final NodeDao nodeDao,
final StateService stateService) {
public ServiceStateExporter(final EventSubscriptionService eventSubscriptionService,
final NodeDao nodeDao,
final BsmInventoryService bsmInventoryService) {
this.eventSubscriptionService = Objects.requireNonNull(eventSubscriptionService);
this.nodeDao = Objects.requireNonNull(nodeDao);
this.stateService = Objects.requireNonNull(stateService);
this.bsmInventoryService = Objects.requireNonNull(bsmInventoryService);
}

public void start() {
Expand All @@ -72,7 +72,7 @@ public void stop() {

@Override
public String getName() {
return StateEventHandler.class.getName();
return ServiceStateExporter.class.getName();
}

@Override
Expand All @@ -99,6 +99,6 @@ public void onEvent(final IEvent event) {
).map(service -> new MonitoredServiceWithMetadata(node, iface, service)))
.collect(Collectors.toList());

this.stateService.sendState(services);
this.bsmInventoryService.sendState(services);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@
import org.apache.karaf.shell.api.action.Command;
import org.apache.karaf.shell.api.action.lifecycle.Reference;
import org.apache.karaf.shell.api.action.lifecycle.Service;
import org.opennms.features.grpc.exporter.bsm.InventoryService;
import org.opennms.features.grpc.exporter.bsm.BsmInventoryService;

@Command(scope = "opennms", name = "grpc-exporter-send-inventory-snapshot", description = "Send an inventory snapshot")
@Service
public class InventorySnapshotCommand implements Action {

@Reference
private InventoryService inventoryService;
private BsmInventoryService inventoryService;

@Override
public Object execute() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@
import org.apache.karaf.shell.api.action.Command;
import org.apache.karaf.shell.api.action.lifecycle.Reference;
import org.apache.karaf.shell.api.action.lifecycle.Service;
import org.opennms.features.grpc.exporter.bsm.StateService;
import org.opennms.features.grpc.exporter.bsm.BsmInventoryService;

@Command(scope = "opennms", name = "grpc-exporter-send-state", description = "Send all current monitor service states")
@Service
public class SendServiceStateCommand implements Action {

@Reference
private StateService stateService;
private BsmInventoryService bsmInventoryService;

@Override
public Object execute() {
this.stateService.sendAllState();
this.bsmInventoryService.sendAllState();
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
* License.
*/

package org.opennms.features.grpc.exporter.inventory;
package org.opennms.features.grpc.exporter.spog;

import org.opennms.core.utils.SystemInfoUtils;
import org.opennms.features.grpc.exporter.mapper.AlarmMapper;
Expand All @@ -39,9 +39,14 @@ public class AlarmExporter implements AlarmLifecycleListener {

private final NmsInventoryGrpcClient client;

public AlarmExporter(RuntimeInfo runtimeInfo, NmsInventoryGrpcClient client) {
private final boolean alarmExportEnabled;

public AlarmExporter(RuntimeInfo runtimeInfo,
NmsInventoryGrpcClient client,
boolean alarmExportEnabled) {
this.runtimeInfo = runtimeInfo;
this.client = client;
this.alarmExportEnabled = alarmExportEnabled;
}

@Override
Expand Down Expand Up @@ -71,18 +76,24 @@ public void sendAlarmsSnapshot(final List<org.opennms.netmgt.model.OnmsAlarm> al
LOG.debug("NMS Inventory service disabled, not sending alarm snapshot");
return;
}
if (!alarmExportEnabled) {
LOG.debug("Alarm export disabled, not sending alarm snapshot");
return;
}

final var alarmUpdates = AlarmMapper.INSTANCE.toAlarmUpdatesList(alarms, this.runtimeInfo, SystemInfoUtils.getInstanceId(), true);
this.client.sendAlarmUpdate(alarmUpdates);
LOG.info("Sent snapshot for {} alarms.", alarms.stream().count());
}

public void sendAddUpdateAlarms(final List<OnmsAlarm> onmsAlarms) {
if (!client.isEnabled()) {
LOG.debug("NMS Inventory service disabled, not sending alarm updates");
return;
}

if (!alarmExportEnabled) {
LOG.debug("Alarm export disabled, not sending alarm updates");
return;
}
final var alarms = AlarmMapper.INSTANCE.toAlarmUpdatesList(onmsAlarms, this.runtimeInfo, SystemInfoUtils.getInstanceId(), false);
this.client.sendAlarmUpdate(alarms);
}
Expand Down
Loading

0 comments on commit 25a8e69

Please sign in to comment.