From b860837d0c77a390a41f7dc103be2fad7a787f41 Mon Sep 17 00:00:00 2001 From: Sam Cao Date: Fri, 21 Nov 2025 17:08:41 +0100 Subject: [PATCH 1/6] fix: Add OPCUA Auto-reconnect Retry Backoff Strategy --- .../adapters/opcua/OpcUaProtocolAdapter.java | 92 ++++++++++++++----- .../opcua/OpcUaProtocolAdapterTest.java | 64 +++++++++++++ 2 files changed, 132 insertions(+), 24 deletions(-) diff --git a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java index 4e40ba5e50..b174cb48a3 100644 --- a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java +++ b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java @@ -70,6 +70,20 @@ public class OpcUaProtocolAdapter implements WritingProtocolAdapter { private static final @NotNull Logger log = LoggerFactory.getLogger(OpcUaProtocolAdapter.class); + // Exponential backoff delays: 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s, 300s (capped at 5 minutes) + private static final long[] BACKOFF_DELAYS_MS = { + 1_000L, // 1 second + 2_000L, // 2 seconds + 4_000L, // 4 seconds + 8_000L, // 8 seconds + 16_000L, // 16 seconds + 32_000L, // 32 seconds + 64_000L, // 64 seconds + 128_000L, // 128 seconds + 256_000L, // 256 seconds + 300_000L // 300 seconds (5 minutes max) + }; + private final @NotNull ProtocolAdapterInformation adapterInformation; private final @NotNull ProtocolAdapterState protocolAdapterState; private final @NotNull String adapterId; @@ -80,21 +94,21 @@ public class OpcUaProtocolAdapter implements WritingProtocolAdapter { private final @NotNull DataPointFactory dataPointFactory; private final @NotNull ProtocolAdapterMetricsService protocolAdapterMetricsService; private final @NotNull OpcUaSpecificAdapterConfig config; - private volatile @Nullable ScheduledExecutorService retryScheduler = null; private final @NotNull AtomicReference> retryFuture = new AtomicReference<>(); - private volatile @Nullable ScheduledExecutorService healthCheckScheduler = null; private final @NotNull AtomicReference> healthCheckFuture = new AtomicReference<>(); private final @NotNull OpcUaServiceFaultListener opcUaServiceFaultListener; + // Retry attempt tracking for exponential backoff private final @NotNull AtomicLong reconnectAttempts = new AtomicLong(0); private final @NotNull AtomicLong lastReconnectTimestamp = new AtomicLong(0); // Lock to prevent concurrent reconnections private final @NotNull ReentrantLock reconnectLock = new ReentrantLock(); - + private volatile @Nullable ScheduledExecutorService retryScheduler = null; + private volatile @Nullable ScheduledExecutorService healthCheckScheduler = null; // Stored for reconnection - set during start() - private volatile ParsedConfig parsedConfig; - private volatile ModuleServices moduleServices; + private volatile @Nullable ParsedConfig parsedConfig; + private volatile @Nullable ModuleServices moduleServices; // Flag to prevent scheduling after stop private volatile boolean stopped = false; @@ -119,6 +133,25 @@ public OpcUaProtocolAdapter( config.getConnectionOptions().autoReconnect()); } + /** + * Calculates exponential backoff delay based on the number of consecutive retry attempts. + * Uses base-2 exponential growth: 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s, 300s (capped at 5 minutes) + * + * @param attemptCount the number of consecutive retry attempts (1-indexed) + * @return the backoff delay in milliseconds + */ + public static long calculateBackoffDelayMs(final int attemptCount) { + // Array is 0-indexed, attemptCount is 1-indexed, so we need attemptCount - 1 + final int index = attemptCount - 1; + + // If attemptCount exceeds array size, use the last value (max delay) + if (index >= BACKOFF_DELAYS_MS.length) { + return BACKOFF_DELAYS_MS[BACKOFF_DELAYS_MS.length - 1]; + } + + return BACKOFF_DELAYS_MS[index]; + } + @Override public @NotNull String getId() { return adapterId; @@ -143,8 +176,7 @@ public synchronized void start( final var result = ParsedConfig.fromConfig(config); if (result instanceof Failure(final String failure)) { log.error("Failed to parse configuration for OPC UA client: {}", failure); - output.failStart(new IllegalStateException(failure), - "Failed to parse configuration for OPC UA client"); + output.failStart(new IllegalStateException(failure), "Failed to parse configuration for OPC UA client"); return; } else if (result instanceof Success(final ParsedConfig successfullyParsedConfig)) { newlyParsedConfig = successfullyParsedConfig; @@ -178,10 +210,8 @@ public synchronized void start( output.startedSuccessfully(); } else { log.error("Cannot start OPC UA protocol adapter '{}' - adapter is already started", adapterId); - output.failStart( - new IllegalStateException("Adapter already started"), - "Cannot start already started adapter. Please stop the adapter first." - ); + output.failStart(new IllegalStateException("Adapter already started"), + "Cannot start already started adapter. Please stop the adapter first."); } } @@ -258,6 +288,9 @@ private void reconnect() { return; } + // Reset retry counter for fresh reconnection attempt with exponential backoff + consecutiveRetryAttempts.set(0); + // Cancel any pending retries and health checks cancelRetry(); cancelHealthCheck(); @@ -292,7 +325,8 @@ private void reconnect() { }; attemptConnection(newConn, parsedConfig, input); } else { - log.warn("OPC UA adapter '{}' reconnect failed - another connection was created concurrently", adapterId); + log.warn("OPC UA adapter '{}' reconnect failed - another connection was created concurrently", + adapterId); } } finally { reconnectLock.unlock(); @@ -343,7 +377,8 @@ private void scheduleHealthCheck() { } log.debug("Scheduled connection health check every {} milliseconds for adapter '{}'", - healthCheckIntervalMs, adapterId); + healthCheckIntervalMs, + adapterId); } /** @@ -583,27 +618,28 @@ private void attemptConnection( scheduleHealthCheck(); log.info("OPC UA adapter '{}' connected successfully", adapterId); } else { - // Connection failed - clean up and schedule retry + // Connection failed - clean up and schedule retry with exponential backoff this.opcUaClientConnection.set(null); protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.ERROR); - final long retryIntervalMs = config.getConnectionOptions().retryIntervalMs(); if (throwable != null) { - log.warn("OPC UA adapter '{}' connection failed, will retry in {} milliseconds", - adapterId, retryIntervalMs, throwable); + log.warn("OPC UA adapter '{}' connection failed, scheduling retry with exponential backoff", + adapterId, + throwable); } else { - log.warn("OPC UA adapter '{}' connection returned false, will retry in {} milliseconds", - adapterId, retryIntervalMs); + log.warn("OPC UA adapter '{}' connection returned false, scheduling retry with exponential backoff", + adapterId); } - // Schedule retry attempt + // Schedule retry attempt with exponential backoff scheduleRetry(input); } }); } /** - * Schedules a retry attempt after configured retry interval. + * Schedules a retry attempt using exponential backoff strategy. + * First retry is after 1 second, subsequent retries use exponential backoff (base 2) up to 5 minutes max. */ private void scheduleRetry(final @NotNull ProtocolAdapterStartInput input) { @@ -613,7 +649,15 @@ private void scheduleRetry(final @NotNull ProtocolAdapterStartInput input) { return; } - final long retryIntervalMs = config.getConnectionOptions().retryIntervalMs(); + // Increment retry attempt counter and calculate backoff delay + final int attemptCount = consecutiveRetryAttempts.updateAndGet(count -> count + 1); + final long backoffDelayMs = calculateBackoffDelayMs(attemptCount); + + log.info("Scheduling retry attempt #{} for OPC UA adapter '{}' with backoff delay of {} ms", + attemptCount, + adapterId, + backoffDelayMs); + final ScheduledFuture future = retryScheduler.schedule(() -> { // Check if adapter was stopped before retry executes if (stopped || this.parsedConfig == null || this.moduleServices == null) { @@ -621,7 +665,7 @@ private void scheduleRetry(final @NotNull ProtocolAdapterStartInput input) { return; } - log.info("Retrying connection for OPC UA adapter '{}'", adapterId); + log.info("Executing retry attempt #{} for OPC UA adapter '{}'", attemptCount, adapterId); // Create new connection object for retry final OpcUaClientConnection newConn = new OpcUaClientConnection(adapterId, @@ -641,7 +685,7 @@ private void scheduleRetry(final @NotNull ProtocolAdapterStartInput input) { } else { log.debug("OPC UA adapter '{}' retry skipped - connection already exists", adapterId); } - }, retryIntervalMs, TimeUnit.MILLISECONDS); + }, backoffDelayMs, TimeUnit.MILLISECONDS); // Store future so it can be cancelled if needed final ScheduledFuture oldFuture = retryFuture.getAndSet(future); diff --git a/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapterTest.java b/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapterTest.java index b0b5e55406..7934b826b9 100644 --- a/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapterTest.java +++ b/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapterTest.java @@ -318,4 +318,68 @@ void whenMultipleRetriesOccur_thenReconnectWorks() throws Exception { return input; } + + /** + * Tests the exponential backoff delay calculation using reflection to access the private method. + * Verifies the backoff sequence: 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s, 300s (capped). + */ + @ParameterizedTest + @CsvSource({ + "1, 1000", // First retry: 1 second + "2, 2000", // Second retry: 2 seconds + "3, 4000", // Third retry: 4 seconds + "4, 8000", // Fourth retry: 8 seconds + "5, 16000", // Fifth retry: 16 seconds + "6, 32000", // Sixth retry: 32 seconds + "7, 64000", // Seventh retry: 64 seconds + "8, 128000", // Eighth retry: 128 seconds + "9, 256000", // Ninth retry: 256 seconds + "10, 300000", // Tenth retry: 300 seconds (max) + "11, 300000", // Eleventh retry: still 300 seconds (capped) + "20, 300000", // Large attempt count: still 300 seconds (capped) + "100, 300000", // Very large attempt count: still 300 seconds (capped) + }) + void testCalculateBackoffDelayMs_exponentialGrowthAndCapping(final int attemptCount, final long expectedDelayMs) { + final long actualDelay = OpcUaProtocolAdapter.calculateBackoffDelayMs(attemptCount); + assertThat(actualDelay).as("Backoff delay for attempt #%d should be %d ms", attemptCount, expectedDelayMs) + .isEqualTo(expectedDelayMs); + } + + /** + * Tests that the backoff strategy correctly handles the maximum delay cap. + * Any attempt count >= 10 should return the maximum delay of 300 seconds. + */ + @Test + void testCalculateBackoffDelayMs_capsAtMaximumDelay() { + for (int attemptCount = 10; attemptCount <= 1000; attemptCount += 10) { + final long actualDelay = OpcUaProtocolAdapter.calculateBackoffDelayMs(attemptCount); + assertThat(actualDelay).as("Backoff delay for attempt #%d should be capped at 300 seconds", attemptCount) + .isEqualTo(300_000L); + } + } + + /** + * Tests that the exponential backoff follows base-2 growth pattern. + * Each delay should be double the previous one (until capped). + */ + @Test + void testCalculateBackoffDelayMs_followsExponentialPattern() { + long previousDelay = 0; + for (int attemptCount = 1; attemptCount <= 9; attemptCount++) { + final long currentDelay = OpcUaProtocolAdapter.calculateBackoffDelayMs(attemptCount); + + if (attemptCount > 1) { + assertThat(currentDelay).as("Delay for attempt #%d should be double the previous delay", attemptCount) + .isEqualTo(previousDelay * 2); + } + + previousDelay = currentDelay; + } + + // Verify that the 10th attempt doesn't follow the exponential pattern (it's capped) + final long tenthAttemptDelay = OpcUaProtocolAdapter.calculateBackoffDelayMs(10); + assertThat(tenthAttemptDelay).as("10th attempt should be capped, not double the 9th") + .isLessThan(previousDelay * 2) + .isEqualTo(300_000L); + } } From 7f61a9dd6892a3046d24b74654b440b78c0338b4 Mon Sep 17 00:00:00 2001 From: Sam Cao Date: Mon, 24 Nov 2025 11:17:16 +0100 Subject: [PATCH 2/6] feat: Add backoff to config --- .../adapters/opcua/OpcUaProtocolAdapter.java | 49 +++++----- .../opcua/config/ConnectionOptions.java | 17 ++-- .../opcua/OpcUaProtocolAdapterTest.java | 89 +++++++++++++++++-- 3 files changed, 120 insertions(+), 35 deletions(-) diff --git a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java index b174cb48a3..32602de574 100644 --- a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java +++ b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java @@ -38,6 +38,7 @@ import com.hivemq.edge.adapters.opcua.client.Failure; import com.hivemq.edge.adapters.opcua.client.ParsedConfig; import com.hivemq.edge.adapters.opcua.client.Success; +import com.hivemq.edge.adapters.opcua.config.ConnectionOptions; import com.hivemq.edge.adapters.opcua.config.OpcUaSpecificAdapterConfig; import com.hivemq.edge.adapters.opcua.config.tag.OpcuaTag; import com.hivemq.edge.adapters.opcua.listeners.OpcUaServiceFaultListener; @@ -70,20 +71,6 @@ public class OpcUaProtocolAdapter implements WritingProtocolAdapter { private static final @NotNull Logger log = LoggerFactory.getLogger(OpcUaProtocolAdapter.class); - // Exponential backoff delays: 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s, 300s (capped at 5 minutes) - private static final long[] BACKOFF_DELAYS_MS = { - 1_000L, // 1 second - 2_000L, // 2 seconds - 4_000L, // 4 seconds - 8_000L, // 8 seconds - 16_000L, // 16 seconds - 32_000L, // 32 seconds - 64_000L, // 64 seconds - 128_000L, // 128 seconds - 256_000L, // 256 seconds - 300_000L // 300 seconds (5 minutes max) - }; - private final @NotNull ProtocolAdapterInformation adapterInformation; private final @NotNull ProtocolAdapterState protocolAdapterState; private final @NotNull String adapterId; @@ -134,22 +121,33 @@ public OpcUaProtocolAdapter( } /** - * Calculates exponential backoff delay based on the number of consecutive retry attempts. - * Uses base-2 exponential growth: 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s, 300s (capped at 5 minutes) + * Calculates backoff delay based on the number of consecutive retry attempts. + * Parses the comma-separated retryIntervalMs string and returns the appropriate delay. + * If attemptCount exceeds the number of configured delays, returns the last configured delay. * + * @param retryIntervalMs comma-separated string of backoff delays in milliseconds * @param attemptCount the number of consecutive retry attempts (1-indexed) * @return the backoff delay in milliseconds + * @throws NumberFormatException when the format is incorrect */ - public static long calculateBackoffDelayMs(final int attemptCount) { + public static long calculateBackoffDelayMs(final @NotNull String retryIntervalMs, final int attemptCount) { + final String[] delayStrings = retryIntervalMs.split(","); + final long[] backoffDelays = new long[delayStrings.length]; + + for (int i = 0; i < delayStrings.length; i++) { + // NumberFormatException is thrown. + backoffDelays[i] = Long.parseLong(delayStrings[i].trim()); + } + // Array is 0-indexed, attemptCount is 1-indexed, so we need attemptCount - 1 final int index = attemptCount - 1; // If attemptCount exceeds array size, use the last value (max delay) - if (index >= BACKOFF_DELAYS_MS.length) { - return BACKOFF_DELAYS_MS[BACKOFF_DELAYS_MS.length - 1]; + if (index >= backoffDelays.length) { + return backoffDelays[backoffDelays.length - 1]; } - return BACKOFF_DELAYS_MS[index]; + return backoffDelays[index]; } @Override @@ -651,7 +649,16 @@ private void scheduleRetry(final @NotNull ProtocolAdapterStartInput input) { // Increment retry attempt counter and calculate backoff delay final int attemptCount = consecutiveRetryAttempts.updateAndGet(count -> count + 1); - final long backoffDelayMs = calculateBackoffDelayMs(attemptCount); + long backoffDelayMs ; + try { + backoffDelayMs = calculateBackoffDelayMs(config.getConnectionOptions().retryIntervalMs(), attemptCount); + } catch (final Exception e) { + log.warn("Failed to calculate backoff delay for adapter '{}' from retryIntervalMs {}", + adapterId, + config.getConnectionOptions().retryIntervalMs(), + e); + backoffDelayMs = calculateBackoffDelayMs(ConnectionOptions.DEFAULT_RETRY_INTERVALS, attemptCount); + } log.info("Scheduling retry attempt #{} for OPC UA adapter '{}' with backoff delay of {} ms", attemptCount, diff --git a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/ConnectionOptions.java b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/ConnectionOptions.java index daacf9246b..e007603baa 100644 --- a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/ConnectionOptions.java +++ b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/ConnectionOptions.java @@ -70,12 +70,10 @@ public record ConnectionOptions( Long healthCheckIntervalMs, @JsonProperty("retryIntervalMs") - @ModuleConfigField(title = "Connection Retry Interval (milliseconds)", - description = "Interval between connection retry attempts in milliseconds", - numberMin = 5 * 1000, - numberMax = 300 * 1000, - defaultValue = ""+DEFAULT_RETRY_INTERVAL) - Long retryIntervalMs, + @ModuleConfigField(title = "Connection Retry Intervals (milliseconds)", + description = "Comma-separated list of backoff delays in milliseconds for connection retry attempts. The adapter will use these delays sequentially for each retry attempt, repeating the last value if attempts exceed the list length.", + defaultValue = DEFAULT_RETRY_INTERVALS) + String retryIntervalMs, @JsonProperty("autoReconnect") @ModuleConfigField(title = "Automatic Reconnection", @@ -97,7 +95,8 @@ public record ConnectionOptions( public static final long DEFAULT_REQUEST_TIMEOUT = 30 * 1000; public static final long DEFAULT_CONNECTION_TIMEOUT = 30 * 1000; public static final long DEFAULT_HEALTHCHECK_INTERVAL = 30 * 1000; - public static final long DEFAULT_RETRY_INTERVAL = 30 * 1000; + // Exponential backoff delays: 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s, 300s (capped at 5 minutes) + public static final String DEFAULT_RETRY_INTERVALS = "1000,2000,4000,8000,16000,32000,64000,128000,256000,300000"; public ConnectionOptions { // Timeout configurations with sensible defaults @@ -107,7 +106,7 @@ public record ConnectionOptions( keepAliveFailuresAllowed = requireNonNullElse(keepAliveFailuresAllowed, DEFAULT_KEEP_ALIVE_FAILURES_ALLOWED); connectionTimeoutMs = requireNonNullElse(connectionTimeoutMs, DEFAULT_CONNECTION_TIMEOUT); healthCheckIntervalMs = requireNonNullElse(healthCheckIntervalMs, DEFAULT_HEALTHCHECK_INTERVAL); - retryIntervalMs = requireNonNullElse(retryIntervalMs, DEFAULT_RETRY_INTERVAL); + retryIntervalMs = requireNonNullElse(retryIntervalMs, DEFAULT_RETRY_INTERVALS); autoReconnect = requireNonNullElse(autoReconnect, DEFAULT_AUTO_RECONNECT); reconnectOnServiceFault = requireNonNullElse(reconnectOnServiceFault, DEFAULT_RECONNECT_ON_SERVICE_FAULT); } @@ -115,6 +114,6 @@ public record ConnectionOptions( public static ConnectionOptions defaultConnectionOptions() { return new ConnectionOptions(DEFAULT_SESSION_TIMEOUT, DEFAULT_REQUEST_TIMEOUT, DEFAULT_KEEP_ALIVE_INTERVAL, DEFAULT_KEEP_ALIVE_FAILURES_ALLOWED, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_HEALTHCHECK_INTERVAL, - DEFAULT_RETRY_INTERVAL, DEFAULT_AUTO_RECONNECT, DEFAULT_RECONNECT_ON_SERVICE_FAULT); + DEFAULT_RETRY_INTERVALS, DEFAULT_AUTO_RECONNECT, DEFAULT_RECONNECT_ON_SERVICE_FAULT); } } diff --git a/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapterTest.java b/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapterTest.java index 7934b826b9..ef03a635e6 100644 --- a/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapterTest.java +++ b/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapterTest.java @@ -52,6 +52,15 @@ import static org.awaitility.Awaitility.await; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.hivemq.edge.adapters.opcua.config.ConnectionOptions; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** * Integration test for OpcUaProtocolAdapter with embedded OPC UA server. @@ -320,7 +329,7 @@ void whenMultipleRetriesOccur_thenReconnectWorks() throws Exception { } /** - * Tests the exponential backoff delay calculation using reflection to access the private method. + * Tests the exponential backoff delay calculation using the comma-separated retry intervals. * Verifies the backoff sequence: 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s, 300s (capped). */ @ParameterizedTest @@ -340,7 +349,8 @@ void whenMultipleRetriesOccur_thenReconnectWorks() throws Exception { "100, 300000", // Very large attempt count: still 300 seconds (capped) }) void testCalculateBackoffDelayMs_exponentialGrowthAndCapping(final int attemptCount, final long expectedDelayMs) { - final long actualDelay = OpcUaProtocolAdapter.calculateBackoffDelayMs(attemptCount); + final long actualDelay = + OpcUaProtocolAdapter.calculateBackoffDelayMs(ConnectionOptions.DEFAULT_RETRY_INTERVALS, attemptCount); assertThat(actualDelay).as("Backoff delay for attempt #%d should be %d ms", attemptCount, expectedDelayMs) .isEqualTo(expectedDelayMs); } @@ -352,7 +362,9 @@ void testCalculateBackoffDelayMs_exponentialGrowthAndCapping(final int attemptCo @Test void testCalculateBackoffDelayMs_capsAtMaximumDelay() { for (int attemptCount = 10; attemptCount <= 1000; attemptCount += 10) { - final long actualDelay = OpcUaProtocolAdapter.calculateBackoffDelayMs(attemptCount); + final long actualDelay = + OpcUaProtocolAdapter.calculateBackoffDelayMs(ConnectionOptions.DEFAULT_RETRY_INTERVALS, + attemptCount); assertThat(actualDelay).as("Backoff delay for attempt #%d should be capped at 300 seconds", attemptCount) .isEqualTo(300_000L); } @@ -366,7 +378,9 @@ void testCalculateBackoffDelayMs_capsAtMaximumDelay() { void testCalculateBackoffDelayMs_followsExponentialPattern() { long previousDelay = 0; for (int attemptCount = 1; attemptCount <= 9; attemptCount++) { - final long currentDelay = OpcUaProtocolAdapter.calculateBackoffDelayMs(attemptCount); + final long currentDelay = + OpcUaProtocolAdapter.calculateBackoffDelayMs(ConnectionOptions.DEFAULT_RETRY_INTERVALS, + attemptCount); if (attemptCount > 1) { assertThat(currentDelay).as("Delay for attempt #%d should be double the previous delay", attemptCount) @@ -377,9 +391,74 @@ void testCalculateBackoffDelayMs_followsExponentialPattern() { } // Verify that the 10th attempt doesn't follow the exponential pattern (it's capped) - final long tenthAttemptDelay = OpcUaProtocolAdapter.calculateBackoffDelayMs(10); + final long tenthAttemptDelay = + OpcUaProtocolAdapter.calculateBackoffDelayMs(ConnectionOptions.DEFAULT_RETRY_INTERVALS, 10); assertThat(tenthAttemptDelay).as("10th attempt should be capped, not double the 9th") .isLessThan(previousDelay * 2) .isEqualTo(300_000L); } + + /** + * Tests that malformed retry intervals throw NumberFormatException. + * Various invalid formats should be rejected with appropriate exceptions. + */ + @ParameterizedTest + @ValueSource(strings = { + "abc,def,ghi", // Non-numeric values + "1000,abc,3000", // Mix of valid and invalid + "1000,2000,", // Trailing comma with empty value + ",1000,2000", // Leading comma with empty value + "1000,,2000", // Double comma with empty value + "1000.5,2000.5", // Floating point values + "not-a-number", // Single invalid value + "" // Empty + }) + void testCalculateBackoffDelayMs_malformedIntervals(final @NotNull String malformedIntervals) { + assertThatThrownBy(() -> OpcUaProtocolAdapter.calculateBackoffDelayMs(malformedIntervals, 1)) + .isInstanceOf(NumberFormatException.class) + .hasMessageContaining("For input string:"); + } + + /** + * Tests that valid custom retry intervals work correctly. + * Verifies that custom configurations are parsed and applied properly. + */ + @Test + void testCalculateBackoffDelayMs_customValidIntervals() { + final String customIntervals = "5000,10000,15000"; + + assertThat(OpcUaProtocolAdapter.calculateBackoffDelayMs(customIntervals, 1)).isEqualTo(5_000L); + assertThat(OpcUaProtocolAdapter.calculateBackoffDelayMs(customIntervals, 2)).isEqualTo(10_000L); + assertThat(OpcUaProtocolAdapter.calculateBackoffDelayMs(customIntervals, 3)).isEqualTo(15_000L); + // Should repeat last value when exceeding array length + assertThat(OpcUaProtocolAdapter.calculateBackoffDelayMs(customIntervals, 4)).isEqualTo(15_000L); + assertThat(OpcUaProtocolAdapter.calculateBackoffDelayMs(customIntervals, 10)).isEqualTo(15_000L); + } + + /** + * Tests that single interval value works correctly. + * A configuration with only one value should use that value for all attempts. + */ + @Test + void testCalculateBackoffDelayMs_singleInterval() { + final String singleInterval = "30000"; + + assertThat(OpcUaProtocolAdapter.calculateBackoffDelayMs(singleInterval, 1)).isEqualTo(30_000L); + assertThat(OpcUaProtocolAdapter.calculateBackoffDelayMs(singleInterval, 2)).isEqualTo(30_000L); + assertThat(OpcUaProtocolAdapter.calculateBackoffDelayMs(singleInterval, 10)).isEqualTo(30_000L); + } + + /** + * Tests that intervals with whitespace are handled correctly. + * Leading and trailing whitespace should be trimmed from each value. + */ + @Test + void testCalculateBackoffDelayMs_intervalsWithWhitespace() { + final String intervalsWithWhitespace = " 1000 , 2000 , 4000 "; + + assertThat(OpcUaProtocolAdapter.calculateBackoffDelayMs(intervalsWithWhitespace, 1)).isEqualTo(1_000L); + assertThat(OpcUaProtocolAdapter.calculateBackoffDelayMs(intervalsWithWhitespace, 2)).isEqualTo(2_000L); + assertThat(OpcUaProtocolAdapter.calculateBackoffDelayMs(intervalsWithWhitespace, 3)).isEqualTo(4_000L); + } + } From 799735752bc2b0a98178b864ce3167317acb235e Mon Sep 17 00:00:00 2001 From: Sam Cao Date: Mon, 24 Nov 2025 13:32:44 +0100 Subject: [PATCH 3/6] feat: Add jitter --- .../adapters/opcua/OpcUaProtocolAdapter.java | 5 +- .../opcua/config/ConnectionOptions.java | 2 + .../opcua/OpcUaProtocolAdapterTest.java | 57 ++++++++++++------- 3 files changed, 43 insertions(+), 21 deletions(-) diff --git a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java index 32602de574..b1c398eed0 100644 --- a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java +++ b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java @@ -57,6 +57,7 @@ import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -147,7 +148,9 @@ public static long calculateBackoffDelayMs(final @NotNull String retryIntervalMs return backoffDelays[backoffDelays.length - 1]; } - return backoffDelays[index]; + final double backoffDelay = + backoffDelays[index] * (1 + new Random().nextDouble(ConnectionOptions.DEFAULT_RETRY_JITTER)); + return Double.valueOf(backoffDelay).longValue(); } @Override diff --git a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/ConnectionOptions.java b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/ConnectionOptions.java index e007603baa..f5c9763ad1 100644 --- a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/ConnectionOptions.java +++ b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/ConnectionOptions.java @@ -97,6 +97,8 @@ public record ConnectionOptions( public static final long DEFAULT_HEALTHCHECK_INTERVAL = 30 * 1000; // Exponential backoff delays: 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s, 300s (capped at 5 minutes) public static final String DEFAULT_RETRY_INTERVALS = "1000,2000,4000,8000,16000,32000,64000,128000,256000,300000"; + // It adds a little jitter to avoid traffic jam. + public static final double DEFAULT_RETRY_JITTER = 0.1; public ConnectionOptions { // Timeout configurations with sensible defaults diff --git a/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapterTest.java b/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapterTest.java index ef03a635e6..ee8db97112 100644 --- a/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapterTest.java +++ b/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapterTest.java @@ -328,6 +328,10 @@ void whenMultipleRetriesOccur_thenReconnectWorks() throws Exception { return input; } + static long getUpperBound(final long value) { + return (long) (value * (1 + ConnectionOptions.DEFAULT_RETRY_JITTER)); + } + /** * Tests the exponential backoff delay calculation using the comma-separated retry intervals. * Verifies the backoff sequence: 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s, 300s (capped). @@ -352,7 +356,8 @@ void testCalculateBackoffDelayMs_exponentialGrowthAndCapping(final int attemptCo final long actualDelay = OpcUaProtocolAdapter.calculateBackoffDelayMs(ConnectionOptions.DEFAULT_RETRY_INTERVALS, attemptCount); assertThat(actualDelay).as("Backoff delay for attempt #%d should be %d ms", attemptCount, expectedDelayMs) - .isEqualTo(expectedDelayMs); + .isGreaterThanOrEqualTo(expectedDelayMs) + .isLessThan(getUpperBound(expectedDelayMs)); } /** @@ -366,7 +371,8 @@ void testCalculateBackoffDelayMs_capsAtMaximumDelay() { OpcUaProtocolAdapter.calculateBackoffDelayMs(ConnectionOptions.DEFAULT_RETRY_INTERVALS, attemptCount); assertThat(actualDelay).as("Backoff delay for attempt #%d should be capped at 300 seconds", attemptCount) - .isEqualTo(300_000L); + .isGreaterThanOrEqualTo(300_000L) + .isLessThan(getUpperBound(300_000L)); } } @@ -384,7 +390,8 @@ void testCalculateBackoffDelayMs_followsExponentialPattern() { if (attemptCount > 1) { assertThat(currentDelay).as("Delay for attempt #%d should be double the previous delay", attemptCount) - .isEqualTo(previousDelay * 2); + .isGreaterThanOrEqualTo(previousDelay * 2) + .isLessThan(getUpperBound(previousDelay * 2)); } previousDelay = currentDelay; @@ -395,7 +402,8 @@ void testCalculateBackoffDelayMs_followsExponentialPattern() { OpcUaProtocolAdapter.calculateBackoffDelayMs(ConnectionOptions.DEFAULT_RETRY_INTERVALS, 10); assertThat(tenthAttemptDelay).as("10th attempt should be capped, not double the 9th") .isLessThan(previousDelay * 2) - .isEqualTo(300_000L); + .isGreaterThanOrEqualTo(300_000L) + .isLessThan(getUpperBound(300_000L)); } /** @@ -406,7 +414,7 @@ void testCalculateBackoffDelayMs_followsExponentialPattern() { @ValueSource(strings = { "abc,def,ghi", // Non-numeric values "1000,abc,3000", // Mix of valid and invalid - "1000,2000,", // Trailing comma with empty value + "1000,2000, ", // Trailing comma with empty value ",1000,2000", // Leading comma with empty value "1000,,2000", // Double comma with empty value "1000.5,2000.5", // Floating point values @@ -414,9 +422,8 @@ void testCalculateBackoffDelayMs_followsExponentialPattern() { "" // Empty }) void testCalculateBackoffDelayMs_malformedIntervals(final @NotNull String malformedIntervals) { - assertThatThrownBy(() -> OpcUaProtocolAdapter.calculateBackoffDelayMs(malformedIntervals, 1)) - .isInstanceOf(NumberFormatException.class) - .hasMessageContaining("For input string:"); + assertThatThrownBy(() -> OpcUaProtocolAdapter.calculateBackoffDelayMs(malformedIntervals, 1)).isInstanceOf( + NumberFormatException.class).hasMessageContaining("For input string:"); } /** @@ -427,12 +434,17 @@ void testCalculateBackoffDelayMs_malformedIntervals(final @NotNull String malfor void testCalculateBackoffDelayMs_customValidIntervals() { final String customIntervals = "5000,10000,15000"; - assertThat(OpcUaProtocolAdapter.calculateBackoffDelayMs(customIntervals, 1)).isEqualTo(5_000L); - assertThat(OpcUaProtocolAdapter.calculateBackoffDelayMs(customIntervals, 2)).isEqualTo(10_000L); - assertThat(OpcUaProtocolAdapter.calculateBackoffDelayMs(customIntervals, 3)).isEqualTo(15_000L); + assertThat(OpcUaProtocolAdapter.calculateBackoffDelayMs(customIntervals, 1)).isGreaterThanOrEqualTo(5_000L) + .isLessThan(getUpperBound(5_000L)); + assertThat(OpcUaProtocolAdapter.calculateBackoffDelayMs(customIntervals, 2)).isGreaterThanOrEqualTo(10_000L) + .isLessThan(getUpperBound(10_000L)); + assertThat(OpcUaProtocolAdapter.calculateBackoffDelayMs(customIntervals, 3)).isGreaterThanOrEqualTo(15_000L) + .isLessThan(getUpperBound(15_000L)); // Should repeat last value when exceeding array length - assertThat(OpcUaProtocolAdapter.calculateBackoffDelayMs(customIntervals, 4)).isEqualTo(15_000L); - assertThat(OpcUaProtocolAdapter.calculateBackoffDelayMs(customIntervals, 10)).isEqualTo(15_000L); + assertThat(OpcUaProtocolAdapter.calculateBackoffDelayMs(customIntervals, 4)).isGreaterThanOrEqualTo(15_000L) + .isLessThan(getUpperBound(15_000L)); + assertThat(OpcUaProtocolAdapter.calculateBackoffDelayMs(customIntervals, 10)).isGreaterThanOrEqualTo(15_000L) + .isLessThan(getUpperBound(15_000L)); } /** @@ -443,9 +455,12 @@ void testCalculateBackoffDelayMs_customValidIntervals() { void testCalculateBackoffDelayMs_singleInterval() { final String singleInterval = "30000"; - assertThat(OpcUaProtocolAdapter.calculateBackoffDelayMs(singleInterval, 1)).isEqualTo(30_000L); - assertThat(OpcUaProtocolAdapter.calculateBackoffDelayMs(singleInterval, 2)).isEqualTo(30_000L); - assertThat(OpcUaProtocolAdapter.calculateBackoffDelayMs(singleInterval, 10)).isEqualTo(30_000L); + assertThat(OpcUaProtocolAdapter.calculateBackoffDelayMs(singleInterval, 1)).isGreaterThanOrEqualTo(30_000L) + .isLessThan(getUpperBound(30_000L)); + assertThat(OpcUaProtocolAdapter.calculateBackoffDelayMs(singleInterval, 2)).isGreaterThanOrEqualTo(30_000L) + .isLessThan(getUpperBound(30_000L)); + assertThat(OpcUaProtocolAdapter.calculateBackoffDelayMs(singleInterval, 10)).isGreaterThanOrEqualTo(30_000L) + .isLessThan(getUpperBound(30_000L)); } /** @@ -456,9 +471,11 @@ void testCalculateBackoffDelayMs_singleInterval() { void testCalculateBackoffDelayMs_intervalsWithWhitespace() { final String intervalsWithWhitespace = " 1000 , 2000 , 4000 "; - assertThat(OpcUaProtocolAdapter.calculateBackoffDelayMs(intervalsWithWhitespace, 1)).isEqualTo(1_000L); - assertThat(OpcUaProtocolAdapter.calculateBackoffDelayMs(intervalsWithWhitespace, 2)).isEqualTo(2_000L); - assertThat(OpcUaProtocolAdapter.calculateBackoffDelayMs(intervalsWithWhitespace, 3)).isEqualTo(4_000L); + assertThat(OpcUaProtocolAdapter.calculateBackoffDelayMs(intervalsWithWhitespace, 1)).isGreaterThanOrEqualTo( + 1_000L).isLessThan(getUpperBound(1_000L)); + assertThat(OpcUaProtocolAdapter.calculateBackoffDelayMs(intervalsWithWhitespace, 2)).isGreaterThanOrEqualTo( + 2_000L).isLessThan(getUpperBound(2_000L)); + assertThat(OpcUaProtocolAdapter.calculateBackoffDelayMs(intervalsWithWhitespace, 3)).isGreaterThanOrEqualTo( + 4_000L).isLessThan(getUpperBound(4_000L)); } - } From b33caf43244f5b21ec1e50dc7e07b278575051f8 Mon Sep 17 00:00:00 2001 From: Sam Cao Date: Mon, 24 Nov 2025 14:38:23 +0100 Subject: [PATCH 4/6] fix: Fix wrong assertion --- .../hivemq/edge/adapters/opcua/OpcUaProtocolAdapterTest.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapterTest.java b/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapterTest.java index ee8db97112..044add5c21 100644 --- a/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapterTest.java +++ b/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapterTest.java @@ -387,13 +387,10 @@ void testCalculateBackoffDelayMs_followsExponentialPattern() { final long currentDelay = OpcUaProtocolAdapter.calculateBackoffDelayMs(ConnectionOptions.DEFAULT_RETRY_INTERVALS, attemptCount); - if (attemptCount > 1) { assertThat(currentDelay).as("Delay for attempt #%d should be double the previous delay", attemptCount) - .isGreaterThanOrEqualTo(previousDelay * 2) - .isLessThan(getUpperBound(previousDelay * 2)); + .isGreaterThan(previousDelay); } - previousDelay = currentDelay; } From a57f284f8e80ab7885114182085754067d87b232 Mon Sep 17 00:00:00 2001 From: Sam Cao Date: Mon, 1 Dec 2025 09:40:17 +0100 Subject: [PATCH 5/6] fix: Fix rebase issue --- .../adapters/opcua/OpcUaProtocolAdapter.java | 34 +++++++++++-------- .../opcua/OpcUaProtocolAdapterTest.java | 23 +++++-------- 2 files changed, 28 insertions(+), 29 deletions(-) diff --git a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java index b1c398eed0..32ecb8801d 100644 --- a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java +++ b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java @@ -63,6 +63,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; @@ -89,6 +90,7 @@ public class OpcUaProtocolAdapter implements WritingProtocolAdapter { // Retry attempt tracking for exponential backoff private final @NotNull AtomicLong reconnectAttempts = new AtomicLong(0); private final @NotNull AtomicLong lastReconnectTimestamp = new AtomicLong(0); + private final @NotNull AtomicInteger consecutiveRetryAttempts = new AtomicInteger(0); // Lock to prevent concurrent reconnections private final @NotNull ReentrantLock reconnectLock = new ReentrantLock(); @@ -113,8 +115,7 @@ public OpcUaProtocolAdapter( this.protocolAdapterMetricsService = input.getProtocolAdapterMetricsHelper(); this.config = input.getConfig(); this.opcUaClientConnection = new AtomicReference<>(); - this.opcUaServiceFaultListener = new OpcUaServiceFaultListener( - protocolAdapterMetricsService, + this.opcUaServiceFaultListener = new OpcUaServiceFaultListener(protocolAdapterMetricsService, input.moduleServices().eventService(), adapterId, this::reconnect, @@ -127,7 +128,7 @@ public OpcUaProtocolAdapter( * If attemptCount exceeds the number of configured delays, returns the last configured delay. * * @param retryIntervalMs comma-separated string of backoff delays in milliseconds - * @param attemptCount the number of consecutive retry attempts (1-indexed) + * @param attemptCount the number of consecutive retry attempts (1-indexed) * @return the backoff delay in milliseconds * @throws NumberFormatException when the format is incorrect */ @@ -191,15 +192,16 @@ public synchronized void start( } final OpcUaClientConnection conn; - if (opcUaClientConnection.compareAndSet(null, conn = new OpcUaClientConnection(adapterId, - tagList, - protocolAdapterState, - input.moduleServices().protocolAdapterTagStreamingService(), - dataPointFactory, - input.moduleServices().eventService(), - protocolAdapterMetricsService, - config, - opcUaServiceFaultListener))) { + if (opcUaClientConnection.compareAndSet(null, + conn = new OpcUaClientConnection(adapterId, + tagList, + protocolAdapterState, + input.moduleServices().protocolAdapterTagStreamingService(), + dataPointFactory, + input.moduleServices().eventService(), + protocolAdapterMetricsService, + config, + opcUaServiceFaultListener))) { protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.DISCONNECTED); // Attempt initial connection asynchronously @@ -274,8 +276,10 @@ private void reconnect() { final long currentTime = System.currentTimeMillis(); final long lastReconnectTime = lastReconnectTimestamp.get(); if (reconnectAttempts.get() > 0 && - currentTime - lastReconnectTime < config.getConnectionOptions().retryIntervalMs()) { - log.debug("Reconnection for adapter '{}' attempted too soon after last reconnect - skipping", adapterId); + currentTime - lastReconnectTime < + calculateBackoffDelayMs(config.getConnectionOptions().retryIntervalMs(), 0)) { + log.debug("Reconnection for adapter '{}' attempted too soon after last reconnect - skipping", + adapterId); return; } reconnectAttempts.incrementAndGet(); @@ -652,7 +656,7 @@ private void scheduleRetry(final @NotNull ProtocolAdapterStartInput input) { // Increment retry attempt counter and calculate backoff delay final int attemptCount = consecutiveRetryAttempts.updateAndGet(count -> count + 1); - long backoffDelayMs ; + long backoffDelayMs; try { backoffDelayMs = calculateBackoffDelayMs(config.getConnectionOptions().retryIntervalMs(), attemptCount); } catch (final Exception e) { diff --git a/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapterTest.java b/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapterTest.java index 044add5c21..08f4edd3f4 100644 --- a/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapterTest.java +++ b/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapterTest.java @@ -42,6 +42,9 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; import util.EmbeddedOpcUaServerExtension; import java.time.Duration; @@ -49,18 +52,10 @@ import java.util.List; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.awaitility.Awaitility.await; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import com.hivemq.edge.adapters.opcua.config.ConnectionOptions; -import org.jetbrains.annotations.NotNull; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.CsvSource; -import org.junit.jupiter.params.provider.ValueSource; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; /** * Integration test for OpcUaProtocolAdapter with embedded OPC UA server. @@ -75,6 +70,10 @@ public class OpcUaProtocolAdapterTest { private @NotNull ProtocolAdapterState protocolAdapterState; private @NotNull FakeEventService eventService; + static long getUpperBound(final long value) { + return (long) (value * (1 + ConnectionOptions.DEFAULT_RETRY_JITTER)); + } + @BeforeEach void setUp() { protocolAdapterState = new ProtocolAdapterStateImpl(mock(), "test-adapter-id", "opcua"); @@ -229,7 +228,7 @@ void whenMultipleRetriesOccur_thenReconnectWorks() throws Exception { // health check interval 10000L, // retry interval - 2000L, + "2000", true, true)); @@ -328,10 +327,6 @@ void whenMultipleRetriesOccur_thenReconnectWorks() throws Exception { return input; } - static long getUpperBound(final long value) { - return (long) (value * (1 + ConnectionOptions.DEFAULT_RETRY_JITTER)); - } - /** * Tests the exponential backoff delay calculation using the comma-separated retry intervals. * Verifies the backoff sequence: 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s, 300s (capped). From 8204b9ac785618140dddff671d66f56a7dc4e267 Mon Sep 17 00:00:00 2001 From: Sam Cao Date: Mon, 1 Dec 2025 15:26:19 +0100 Subject: [PATCH 6/6] fix: Add cancelHealthCheck() when connection is not set up --- .../adapters/opcua/OpcUaProtocolAdapter.java | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java index 32ecb8801d..c2fb41eab4 100644 --- a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java +++ b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java @@ -275,12 +275,20 @@ private void reconnect() { final long currentTime = System.currentTimeMillis(); final long lastReconnectTime = lastReconnectTimestamp.get(); - if (reconnectAttempts.get() > 0 && - currentTime - lastReconnectTime < - calculateBackoffDelayMs(config.getConnectionOptions().retryIntervalMs(), 0)) { - log.debug("Reconnection for adapter '{}' attempted too soon after last reconnect - skipping", - adapterId); - return; + if (reconnectAttempts.get() > 0) { + long backoffDelayMs; + try { + backoffDelayMs = calculateBackoffDelayMs(config.getConnectionOptions().retryIntervalMs(), + (int) reconnectAttempts.get()); + } catch (final Exception e) { + backoffDelayMs = calculateBackoffDelayMs(ConnectionOptions.DEFAULT_RETRY_INTERVALS, + (int) reconnectAttempts.get()); + } + if (currentTime - lastReconnectTime < backoffDelayMs) { + log.debug("Reconnection for adapter '{}' attempted too soon after last reconnect - skipping", + adapterId); + return; + } } reconnectAttempts.incrementAndGet(); lastReconnectTimestamp.set(currentTime); @@ -636,6 +644,7 @@ private void attemptConnection( adapterId); } + cancelHealthCheck(); // Schedule retry attempt with exponential backoff scheduleRetry(input); } @@ -655,7 +664,7 @@ private void scheduleRetry(final @NotNull ProtocolAdapterStartInput input) { } // Increment retry attempt counter and calculate backoff delay - final int attemptCount = consecutiveRetryAttempts.updateAndGet(count -> count + 1); + final int attemptCount = consecutiveRetryAttempts.incrementAndGet(); long backoffDelayMs; try { backoffDelayMs = calculateBackoffDelayMs(config.getConnectionOptions().retryIntervalMs(), attemptCount);