Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.hivemq.edge.adapters.opcua.client.Failure;
import com.hivemq.edge.adapters.opcua.client.ParsedConfig;
import com.hivemq.edge.adapters.opcua.client.Success;
import com.hivemq.edge.adapters.opcua.config.ConnectionOptions;
import com.hivemq.edge.adapters.opcua.config.OpcUaSpecificAdapterConfig;
import com.hivemq.edge.adapters.opcua.config.tag.OpcuaTag;
import com.hivemq.edge.adapters.opcua.listeners.OpcUaServiceFaultListener;
Expand All @@ -56,11 +57,13 @@

import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -80,21 +83,22 @@ public class OpcUaProtocolAdapter implements WritingProtocolAdapter {
private final @NotNull DataPointFactory dataPointFactory;
private final @NotNull ProtocolAdapterMetricsService protocolAdapterMetricsService;
private final @NotNull OpcUaSpecificAdapterConfig config;
private volatile @Nullable ScheduledExecutorService retryScheduler = null;
private final @NotNull AtomicReference<ScheduledFuture<?>> retryFuture = new AtomicReference<>();
private volatile @Nullable ScheduledExecutorService healthCheckScheduler = null;
private final @NotNull AtomicReference<ScheduledFuture<?>> healthCheckFuture = new AtomicReference<>();

private final @NotNull OpcUaServiceFaultListener opcUaServiceFaultListener;
// Retry attempt tracking for exponential backoff
private final @NotNull AtomicLong reconnectAttempts = new AtomicLong(0);
private final @NotNull AtomicLong lastReconnectTimestamp = new AtomicLong(0);
private final @NotNull AtomicInteger consecutiveRetryAttempts = new AtomicInteger(0);

// Lock to prevent concurrent reconnections
private final @NotNull ReentrantLock reconnectLock = new ReentrantLock();

private volatile @Nullable ScheduledExecutorService retryScheduler = null;
private volatile @Nullable ScheduledExecutorService healthCheckScheduler = null;
// Stored for reconnection - set during start()
private volatile ParsedConfig parsedConfig;
private volatile ModuleServices moduleServices;
private volatile @Nullable ParsedConfig parsedConfig;
private volatile @Nullable ModuleServices moduleServices;

// Flag to prevent scheduling after stop
private volatile boolean stopped = false;
Expand All @@ -111,14 +115,45 @@ public OpcUaProtocolAdapter(
this.protocolAdapterMetricsService = input.getProtocolAdapterMetricsHelper();
this.config = input.getConfig();
this.opcUaClientConnection = new AtomicReference<>();
this.opcUaServiceFaultListener = new OpcUaServiceFaultListener(
protocolAdapterMetricsService,
this.opcUaServiceFaultListener = new OpcUaServiceFaultListener(protocolAdapterMetricsService,
input.moduleServices().eventService(),
adapterId,
this::reconnect,
config.getConnectionOptions().autoReconnect());
}

/**
* Calculates backoff delay based on the number of consecutive retry attempts.
* Parses the comma-separated retryIntervalMs string and returns the appropriate delay.
* If attemptCount exceeds the number of configured delays, returns the last configured delay.
*
* @param retryIntervalMs comma-separated string of backoff delays in milliseconds
* @param attemptCount the number of consecutive retry attempts (1-indexed)
* @return the backoff delay in milliseconds
* @throws NumberFormatException when the format is incorrect
*/
public static long calculateBackoffDelayMs(final @NotNull String retryIntervalMs, final int attemptCount) {
final String[] delayStrings = retryIntervalMs.split(",");
final long[] backoffDelays = new long[delayStrings.length];

for (int i = 0; i < delayStrings.length; i++) {
// NumberFormatException is thrown.
backoffDelays[i] = Long.parseLong(delayStrings[i].trim());
}

// Array is 0-indexed, attemptCount is 1-indexed, so we need attemptCount - 1
final int index = attemptCount - 1;

// If attemptCount exceeds array size, use the last value (max delay)
if (index >= backoffDelays.length) {
return backoffDelays[backoffDelays.length - 1];
}

final double backoffDelay =
backoffDelays[index] * (1 + new Random().nextDouble(ConnectionOptions.DEFAULT_RETRY_JITTER));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a new random object per retry seems excessive, why not use ThreadLocalRandom.current()

return Double.valueOf(backoffDelay).longValue();
}

@Override
public @NotNull String getId() {
return adapterId;
Expand All @@ -143,8 +178,7 @@ public synchronized void start(
final var result = ParsedConfig.fromConfig(config);
if (result instanceof Failure<ParsedConfig, String>(final String failure)) {
log.error("Failed to parse configuration for OPC UA client: {}", failure);
output.failStart(new IllegalStateException(failure),
"Failed to parse configuration for OPC UA client");
output.failStart(new IllegalStateException(failure), "Failed to parse configuration for OPC UA client");
return;
} else if (result instanceof Success<ParsedConfig, String>(final ParsedConfig successfullyParsedConfig)) {
newlyParsedConfig = successfullyParsedConfig;
Expand All @@ -158,15 +192,16 @@ public synchronized void start(
}

final OpcUaClientConnection conn;
if (opcUaClientConnection.compareAndSet(null, conn = new OpcUaClientConnection(adapterId,
tagList,
protocolAdapterState,
input.moduleServices().protocolAdapterTagStreamingService(),
dataPointFactory,
input.moduleServices().eventService(),
protocolAdapterMetricsService,
config,
opcUaServiceFaultListener))) {
if (opcUaClientConnection.compareAndSet(null,
conn = new OpcUaClientConnection(adapterId,
tagList,
protocolAdapterState,
input.moduleServices().protocolAdapterTagStreamingService(),
dataPointFactory,
input.moduleServices().eventService(),
protocolAdapterMetricsService,
config,
opcUaServiceFaultListener))) {

protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.DISCONNECTED);
// Attempt initial connection asynchronously
Expand All @@ -178,10 +213,8 @@ public synchronized void start(
output.startedSuccessfully();
} else {
log.error("Cannot start OPC UA protocol adapter '{}' - adapter is already started", adapterId);
output.failStart(
new IllegalStateException("Adapter already started"),
"Cannot start already started adapter. Please stop the adapter first."
);
output.failStart(new IllegalStateException("Adapter already started"),
"Cannot start already started adapter. Please stop the adapter first.");
}
}

Expand Down Expand Up @@ -242,10 +275,20 @@ private void reconnect() {

final long currentTime = System.currentTimeMillis();
final long lastReconnectTime = lastReconnectTimestamp.get();
if (reconnectAttempts.get() > 0 &&
currentTime - lastReconnectTime < config.getConnectionOptions().retryIntervalMs()) {
log.debug("Reconnection for adapter '{}' attempted too soon after last reconnect - skipping", adapterId);
return;
if (reconnectAttempts.get() > 0) {
long backoffDelayMs;
try {
backoffDelayMs = calculateBackoffDelayMs(config.getConnectionOptions().retryIntervalMs(),
(int) reconnectAttempts.get());
} catch (final Exception e) {
backoffDelayMs = calculateBackoffDelayMs(ConnectionOptions.DEFAULT_RETRY_INTERVALS,
(int) reconnectAttempts.get());
}
if (currentTime - lastReconnectTime < backoffDelayMs) {
log.debug("Reconnection for adapter '{}' attempted too soon after last reconnect - skipping",
adapterId);
return;
}
}
reconnectAttempts.incrementAndGet();
lastReconnectTimestamp.set(currentTime);
Expand All @@ -258,6 +301,9 @@ private void reconnect() {
return;
}

// Reset retry counter for fresh reconnection attempt with exponential backoff
consecutiveRetryAttempts.set(0);

// Cancel any pending retries and health checks
cancelRetry();
cancelHealthCheck();
Expand Down Expand Up @@ -292,7 +338,8 @@ private void reconnect() {
};
attemptConnection(newConn, parsedConfig, input);
} else {
log.warn("OPC UA adapter '{}' reconnect failed - another connection was created concurrently", adapterId);
log.warn("OPC UA adapter '{}' reconnect failed - another connection was created concurrently",
adapterId);
}
} finally {
reconnectLock.unlock();
Expand Down Expand Up @@ -343,7 +390,8 @@ private void scheduleHealthCheck() {
}

log.debug("Scheduled connection health check every {} milliseconds for adapter '{}'",
healthCheckIntervalMs, adapterId);
healthCheckIntervalMs,
adapterId);
}

/**
Expand Down Expand Up @@ -583,27 +631,29 @@ private void attemptConnection(
scheduleHealthCheck();
log.info("OPC UA adapter '{}' connected successfully", adapterId);
} else {
// Connection failed - clean up and schedule retry
// Connection failed - clean up and schedule retry with exponential backoff
this.opcUaClientConnection.set(null);
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.ERROR);

final long retryIntervalMs = config.getConnectionOptions().retryIntervalMs();
if (throwable != null) {
log.warn("OPC UA adapter '{}' connection failed, will retry in {} milliseconds",
adapterId, retryIntervalMs, throwable);
log.warn("OPC UA adapter '{}' connection failed, scheduling retry with exponential backoff",
adapterId,
throwable);
} else {
log.warn("OPC UA adapter '{}' connection returned false, will retry in {} milliseconds",
adapterId, retryIntervalMs);
log.warn("OPC UA adapter '{}' connection returned false, scheduling retry with exponential backoff",
adapterId);
}

// Schedule retry attempt
cancelHealthCheck();
// Schedule retry attempt with exponential backoff
scheduleRetry(input);
}
});
}

/**
* Schedules a retry attempt after configured retry interval.
* Schedules a retry attempt using exponential backoff strategy.
* First retry is after 1 second, subsequent retries use exponential backoff (base 2) up to 5 minutes max.
*/
private void scheduleRetry(final @NotNull ProtocolAdapterStartInput input) {

Expand All @@ -613,15 +663,32 @@ private void scheduleRetry(final @NotNull ProtocolAdapterStartInput input) {
return;
}

final long retryIntervalMs = config.getConnectionOptions().retryIntervalMs();
// Increment retry attempt counter and calculate backoff delay
final int attemptCount = consecutiveRetryAttempts.incrementAndGet();
long backoffDelayMs;
try {
backoffDelayMs = calculateBackoffDelayMs(config.getConnectionOptions().retryIntervalMs(), attemptCount);
} catch (final Exception e) {
log.warn("Failed to calculate backoff delay for adapter '{}' from retryIntervalMs {}",
adapterId,
config.getConnectionOptions().retryIntervalMs(),
e);
backoffDelayMs = calculateBackoffDelayMs(ConnectionOptions.DEFAULT_RETRY_INTERVALS, attemptCount);
}

log.info("Scheduling retry attempt #{} for OPC UA adapter '{}' with backoff delay of {} ms",
attemptCount,
adapterId,
backoffDelayMs);

final ScheduledFuture<?> future = retryScheduler.schedule(() -> {
// Check if adapter was stopped before retry executes
if (stopped || this.parsedConfig == null || this.moduleServices == null) {
log.debug("OPC UA adapter '{}' retry cancelled - adapter was stopped", adapterId);
return;
}

log.info("Retrying connection for OPC UA adapter '{}'", adapterId);
log.info("Executing retry attempt #{} for OPC UA adapter '{}'", attemptCount, adapterId);

// Create new connection object for retry
final OpcUaClientConnection newConn = new OpcUaClientConnection(adapterId,
Expand All @@ -641,7 +708,7 @@ private void scheduleRetry(final @NotNull ProtocolAdapterStartInput input) {
} else {
log.debug("OPC UA adapter '{}' retry skipped - connection already exists", adapterId);
}
}, retryIntervalMs, TimeUnit.MILLISECONDS);
}, backoffDelayMs, TimeUnit.MILLISECONDS);

// Store future so it can be cancelled if needed
final ScheduledFuture<?> oldFuture = retryFuture.getAndSet(future);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,10 @@ public record ConnectionOptions(
Long healthCheckIntervalMs,

@JsonProperty("retryIntervalMs")
@ModuleConfigField(title = "Connection Retry Interval (milliseconds)",
description = "Interval between connection retry attempts in milliseconds",
numberMin = 5 * 1000,
numberMax = 300 * 1000,
defaultValue = ""+DEFAULT_RETRY_INTERVAL)
Long retryIntervalMs,
@ModuleConfigField(title = "Connection Retry Intervals (milliseconds)",
description = "Comma-separated list of backoff delays in milliseconds for connection retry attempts. The adapter will use these delays sequentially for each retry attempt, repeating the last value if attempts exceed the list length.",
defaultValue = DEFAULT_RETRY_INTERVALS)
String retryIntervalMs,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change from Long to String..... radical, I hope it does not break existing config?


@JsonProperty("autoReconnect")
@ModuleConfigField(title = "Automatic Reconnection",
Expand All @@ -97,7 +95,10 @@ public record ConnectionOptions(
public static final long DEFAULT_REQUEST_TIMEOUT = 30 * 1000;
public static final long DEFAULT_CONNECTION_TIMEOUT = 30 * 1000;
public static final long DEFAULT_HEALTHCHECK_INTERVAL = 30 * 1000;
public static final long DEFAULT_RETRY_INTERVAL = 30 * 1000;
// Exponential backoff delays: 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s, 300s (capped at 5 minutes)
public static final String DEFAULT_RETRY_INTERVALS = "1000,2000,4000,8000,16000,32000,64000,128000,256000,300000";
// It adds a little jitter to avoid traffic jam.
public static final double DEFAULT_RETRY_JITTER = 0.1;

public ConnectionOptions {
// Timeout configurations with sensible defaults
Expand All @@ -107,14 +108,14 @@ public record ConnectionOptions(
keepAliveFailuresAllowed = requireNonNullElse(keepAliveFailuresAllowed, DEFAULT_KEEP_ALIVE_FAILURES_ALLOWED);
connectionTimeoutMs = requireNonNullElse(connectionTimeoutMs, DEFAULT_CONNECTION_TIMEOUT);
healthCheckIntervalMs = requireNonNullElse(healthCheckIntervalMs, DEFAULT_HEALTHCHECK_INTERVAL);
retryIntervalMs = requireNonNullElse(retryIntervalMs, DEFAULT_RETRY_INTERVAL);
retryIntervalMs = requireNonNullElse(retryIntervalMs, DEFAULT_RETRY_INTERVALS);
autoReconnect = requireNonNullElse(autoReconnect, DEFAULT_AUTO_RECONNECT);
reconnectOnServiceFault = requireNonNullElse(reconnectOnServiceFault, DEFAULT_RECONNECT_ON_SERVICE_FAULT);
}

public static ConnectionOptions defaultConnectionOptions() {
return new ConnectionOptions(DEFAULT_SESSION_TIMEOUT, DEFAULT_REQUEST_TIMEOUT, DEFAULT_KEEP_ALIVE_INTERVAL,
DEFAULT_KEEP_ALIVE_FAILURES_ALLOWED, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_HEALTHCHECK_INTERVAL,
DEFAULT_RETRY_INTERVAL, DEFAULT_AUTO_RECONNECT, DEFAULT_RECONNECT_ON_SERVICE_FAULT);
DEFAULT_RETRY_INTERVALS, DEFAULT_AUTO_RECONNECT, DEFAULT_RECONNECT_ON_SERVICE_FAULT);
}
}
Loading
Loading