diff --git a/.github/workflows/integration_tests_workflows_x86.yml b/.github/workflows/integration_tests_workflows_x86.yml index 72a70ea3ae..25c7e83706 100644 --- a/.github/workflows/integration_tests_workflows_x86.yml +++ b/.github/workflows/integration_tests_workflows_x86.yml @@ -21,7 +21,7 @@ jobs: strategy: matrix: python-version: ["3.9", "3.10", "3.11", "3.12"] - timeout-minutes: 15 + timeout-minutes: 20 steps: - name: 🛎️ Checkout uses: actions/checkout@v4 diff --git a/inference/enterprise/workflows/enterprise_blocks/sinks/opc_writer/v1.py b/inference/enterprise/workflows/enterprise_blocks/sinks/opc_writer/v1.py index 0e9643e863..53bf83f99f 100644 --- a/inference/enterprise/workflows/enterprise_blocks/sinks/opc_writer/v1.py +++ b/inference/enterprise/workflows/enterprise_blocks/sinks/opc_writer/v1.py @@ -1,10 +1,11 @@ import logging +import threading +import time from concurrent.futures import ThreadPoolExecutor from datetime import datetime from functools import partial -from typing import List, Literal, Optional, Tuple, Type, Union +from typing import Dict, List, Literal, Optional, Tuple, Type, Union -from asyncua import ua from asyncua.client import Client as AsyncClient from asyncua.sync import Client, sync_async_client_method from asyncua.ua import VariantType @@ -15,12 +16,349 @@ from inference.core.logger import logger +class OPCUAConnectionManager: + """ + Thread-safe connection manager for OPC UA clients with connection pooling + and circuit breaker pattern. + + Maintains a pool of connections keyed by (url, user_name) to avoid creating + new connections for every write operation. Uses circuit breaker to fail fast + when servers are unreachable. + """ + + _instance: Optional["OPCUAConnectionManager"] = None + _lock = threading.Lock() + + # Circuit breaker: how long to wait before trying a failed server again + CIRCUIT_BREAKER_TIMEOUT_SECONDS = 2.0 + + def __new__(cls) -> "OPCUAConnectionManager": + """Singleton pattern to ensure one connection manager across the application.""" + if cls._instance is None: + with cls._lock: + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + if self._initialized: + return + self._connections: Dict[str, Client] = {} + self._connection_locks: Dict[str, threading.Lock] = {} + self._connection_metadata: Dict[str, dict] = {} + self._connection_failures: Dict[str, float] = ( + {} + ) # key -> timestamp of last failure + self._global_lock = threading.Lock() + self._initialized = True + logger.debug("OPC UA Connection Manager initialized") + + def _get_connection_key(self, url: str, user_name: Optional[str]) -> str: + """Generate a unique key for connection pooling.""" + return f"{url}|{user_name or ''}" + + def _get_connection_lock(self, key: str) -> threading.Lock: + """Get or create a lock for a specific connection.""" + with self._global_lock: + if key not in self._connection_locks: + self._connection_locks[key] = threading.Lock() + return self._connection_locks[key] + + def _create_client( + self, + url: str, + user_name: Optional[str], + password: Optional[str], + timeout: int, + ) -> Client: + """Create and configure a new OPC UA client.""" + logger.debug(f"OPC UA Connection Manager creating client for {url}") + client = Client(url=url, sync_wrapper_timeout=timeout) + if user_name and password: + client.set_user(user_name) + client.set_password(password) + return client + + def _connect_with_retry( + self, + client: Client, + url: str, + max_retries: int = 3, + base_backoff: float = 1.0, + ) -> None: + """ + Connect to OPC UA server with retry logic and exponential backoff. + + Args: + client: The OPC UA client to connect + url: Server URL (for logging) + max_retries: Maximum number of connection attempts + base_backoff: Base delay between retries (seconds), doubles each retry + + Raises: + Exception: If all connection attempts fail + """ + last_exception = None + + for attempt in range(max_retries): + try: + logger.debug( + f"OPC UA Connection Manager connecting to {url} " + f"(attempt {attempt + 1}/{max_retries})" + ) + client.connect() + logger.info( + f"OPC UA Connection Manager successfully connected to {url}" + ) + return + except BadUserAccessDenied as exc: + # Auth errors should not be retried - they will keep failing + logger.error(f"OPC UA Connection Manager authentication failed: {exc}") + raise Exception(f"AUTH ERROR: {exc}") + except OSError as exc: + last_exception = exc + logger.warning( + f"OPC UA Connection Manager network error on attempt {attempt + 1}: {exc}" + ) + except Exception as exc: + last_exception = exc + logger.warning( + f"OPC UA Connection Manager connection error on attempt {attempt + 1}: " + f"{type(exc).__name__}: {exc}" + ) + + # Don't sleep after the last attempt + if attempt < max_retries - 1: + backoff_time = base_backoff * (2**attempt) + logger.debug( + f"OPC UA Connection Manager waiting {backoff_time}s before retry" + ) + time.sleep(backoff_time) + + # All retries exhausted + logger.error( + f"OPC UA Connection Manager failed to connect to {url} " + f"after {max_retries} attempts" + ) + if isinstance(last_exception, OSError): + raise Exception( + f"NETWORK ERROR: Failed to connect after {max_retries} attempts. Last error: {last_exception}" + ) + raise Exception( + f"CONNECTION ERROR: Failed to connect after {max_retries} attempts. Last error: {last_exception}" + ) + + def _is_circuit_open(self, key: str) -> bool: + """ + Check if circuit breaker is open (server recently failed). + Returns True if we should NOT attempt connection (fail fast). + """ + if key not in self._connection_failures: + return False + + time_since_failure = time.time() - self._connection_failures[key] + if time_since_failure < self.CIRCUIT_BREAKER_TIMEOUT_SECONDS: + return True + + # Timeout expired, clear the failure record + del self._connection_failures[key] + return False + + def _record_failure(self, key: str) -> None: + """Record a connection failure for circuit breaker.""" + self._connection_failures[key] = time.time() + + def _clear_failure(self, key: str) -> None: + """Clear failure record after successful connection.""" + if key in self._connection_failures: + del self._connection_failures[key] + + def get_connection( + self, + url: str, + user_name: Optional[str], + password: Optional[str], + timeout: int, + max_retries: int = 1, + base_backoff: float = 0.0, + ) -> Client: + """ + Get a connection from the pool or create a new one. + + This method is thread-safe and will reuse existing healthy connections. + Uses circuit breaker pattern to fail fast for recently failed servers. + + Args: + url: OPC UA server URL + user_name: Optional username for authentication + password: Optional password for authentication + timeout: Connection timeout in seconds + max_retries: Maximum number of connection attempts (default 1) + base_backoff: Base delay between retries (default 0) + + Returns: + A connected OPC UA client + + Raises: + Exception: If connection fails or circuit breaker is open + """ + key = self._get_connection_key(url, user_name) + lock = self._get_connection_lock(key) + + with lock: + # Circuit breaker: fail fast if server recently failed + if self._is_circuit_open(key): + logger.debug( + f"OPC UA Connection Manager circuit breaker open for {url}, " + f"failing fast (will retry in {self.CIRCUIT_BREAKER_TIMEOUT_SECONDS}s)" + ) + raise Exception( + f"CIRCUIT OPEN: Server {url} recently failed, skipping connection attempt. " + f"Will retry after {self.CIRCUIT_BREAKER_TIMEOUT_SECONDS}s." + ) + + # Check if we have an existing connection + if key in self._connections: + logger.debug(f"OPC UA Connection Manager reusing connection for {url}") + return self._connections[key] + + # Create new connection + try: + client = self._create_client(url, user_name, password, timeout) + self._connect_with_retry(client, url, max_retries, base_backoff) + + # Success - clear any failure record and store in pool + self._clear_failure(key) + self._connections[key] = client + self._connection_metadata[key] = { + "url": url, + "user_name": user_name, + "password": password, + "timeout": timeout, + "connected_at": datetime.now(), + } + + return client + except Exception as exc: + # Record failure for circuit breaker + self._record_failure(key) + raise + + def _safe_disconnect(self, client: Client) -> None: + """Safely disconnect a client, swallowing any errors.""" + try: + logger.debug("OPC UA Connection Manager disconnecting client") + client.disconnect() + except Exception as exc: + logger.debug( + f"OPC UA Connection Manager disconnect error (non-fatal): {exc}" + ) + + def release_connection( + self, url: str, user_name: Optional[str], force_close: bool = False + ) -> None: + """ + Release a connection back to the pool. + + By default, connections are kept alive for reuse. Set force_close=True + to immediately close the connection. + + Args: + url: OPC UA server URL + user_name: Optional username used for the connection + force_close: If True, close the connection instead of keeping it + """ + if not force_close: + # Connection stays in pool for reuse + return + + key = self._get_connection_key(url, user_name) + lock = self._get_connection_lock(key) + + with lock: + if key in self._connections: + self._safe_disconnect(self._connections[key]) + del self._connections[key] + if key in self._connection_metadata: + del self._connection_metadata[key] + logger.debug(f"OPC UA Connection Manager closed connection for {url}") + + def invalidate_connection(self, url: str, user_name: Optional[str]) -> None: + """ + Invalidate a connection, forcing it to be recreated on next use. + + Call this when a connection error occurs during an operation to ensure + the next operation gets a fresh connection. + + Args: + url: OPC UA server URL + user_name: Optional username used for the connection + """ + key = self._get_connection_key(url, user_name) + lock = self._get_connection_lock(key) + + with lock: + if key in self._connections: + self._safe_disconnect(self._connections[key]) + del self._connections[key] + if key in self._connection_metadata: + del self._connection_metadata[key] + logger.debug( + f"OPC UA Connection Manager invalidated connection for {url}" + ) + + def close_all(self) -> None: + """Close all connections in the pool.""" + with self._global_lock: + for key, client in list(self._connections.items()): + self._safe_disconnect(client) + self._connections.clear() + self._connection_metadata.clear() + logger.info("OPC UA Connection Manager closed all connections") + + def get_pool_stats(self) -> dict: + """Get statistics about the connection pool.""" + with self._global_lock: + return { + "total_connections": len(self._connections), + "connections": [ + { + "url": meta["url"], + "user_name": meta["user_name"], + "connected_at": meta["connected_at"].isoformat(), + } + for meta in self._connection_metadata.values() + ], + } + + +# Global connection manager instance +_connection_manager: Optional[OPCUAConnectionManager] = None + + +def get_connection_manager() -> OPCUAConnectionManager: + """Get the global OPC UA connection manager instance.""" + global _connection_manager + if _connection_manager is None: + _connection_manager = OPCUAConnectionManager() + return _connection_manager + + class UnsupportedTypeError(Exception): """Raised when an unsupported value type is specified""" pass +# Exception types that should NOT invalidate the connection (user configuration errors) +USER_CONFIG_ERROR_TYPES = ( + BadTypeMismatch, # Wrong data type - configuration error + UnsupportedTypeError, # Invalid value_type parameter + ValueError, # Value range validation errors +) + + from inference.core.workflows.execution_engine.entities.base import OutputDefinition from inference.core.workflows.execution_engine.entities.types import ( BOOLEAN_KIND, @@ -90,10 +428,28 @@ class UnsupportedTypeError(Exception): to a Workflow input. By providing a specific input value, you can dynamically prevent the block from executing. +### Connection Pooling +The block uses a connection pool to efficiently manage OPC UA connections. Instead of creating a new +connection for each write operation, connections are reused across multiple writes to the same server. +This significantly reduces latency and resource usage for high-frequency write scenarios. + +- Connections are automatically pooled per server URL and username combination +- If a connection fails during a write operation, it is automatically invalidated and a new connection + is established on the next write attempt + +### Retry Logic +The block includes configurable retry logic with exponential backoff for handling transient connection failures: + +- `max_retries`: Number of connection attempts before giving up (default: 3) +- `retry_backoff_seconds`: Base delay between retries in seconds (default: 1.0). The delay doubles + after each failed attempt (exponential backoff). + +**Note:** Authentication errors (wrong username/password) are not retried as they will continue to fail. + ### Cooldown Limitations !!! warning "Cooldown Limitations" - The cooldown feature is optimized for workflows involving video processing. - - In other contexts, such as Workflows triggered by HTTP services (e.g., Roboflow Hosted API, + The cooldown feature is optimized for workflows involving video processing. + - In other contexts, such as Workflows triggered by HTTP services (e.g., Roboflow Hosted API, Dedicated Deployment, or self-hosted `Inference` server), the cooldown timer will not be applied effectively. """ @@ -238,6 +594,20 @@ class BlockManifest(WorkflowBlockManifest): "'direct' uses NodeId strings (for Ignition-style string-based tags).", examples=["hierarchical", "direct"], ) + max_retries: Union[int, Selector(kind=[INTEGER_KIND])] = Field( + default=3, + description="Maximum number of connection attempts before giving up. " + "Default is 3 with exponential backoff starting at 15ms.", + examples=[1, 3, "$inputs.max_retries"], + ge=1, + ) + retry_backoff_seconds: Union[float, Selector(kind=[FLOAT_KIND])] = Field( + default=0.015, + description="Base delay between retry attempts in seconds (doubles each retry). " + "Default is 0.015 (15ms) for fast exponential backoff.", + examples=[0.015, 0.5, 1.0, "$inputs.retry_backoff"], + ge=0.0, + ) @classmethod def describe_outputs(cls) -> List[OutputDefinition]: @@ -300,6 +670,8 @@ def run( disable_sink: bool = False, cooldown_seconds: int = 5, node_lookup_mode: Literal["hierarchical", "direct"] = "hierarchical", + max_retries: int = 3, + retry_backoff_seconds: float = 0.015, ) -> BlockResult: if disable_sink: logger.debug("OPC Writer disabled by disable_sink parameter") @@ -346,6 +718,8 @@ def run( value_type=value_type, timeout=timeout, node_lookup_mode=node_lookup_mode, + max_retries=max_retries, + retry_backoff_seconds=retry_backoff_seconds, ) self._last_notification_fired = datetime.now() if fire_and_forget and self._background_tasks: @@ -426,72 +800,120 @@ def opc_connect_and_write_value( timeout: int, node_lookup_mode: Literal["hierarchical", "direct"] = "hierarchical", value_type: str = "String", + max_retries: int = 1, + retry_backoff_seconds: float = 0.0, ) -> Tuple[bool, str]: + """ + Connect to OPC UA server and write a value using connection pooling. + + Uses the connection manager to reuse existing connections. If no connection + exists, attempts to create one. Fails fast on connection errors to avoid + blocking the pipeline. + + Args: + url: OPC UA server URL + namespace: Namespace URI or index + user_name: Optional username for authentication + password: Optional password for authentication + object_name: Target object path + variable_name: Variable to write + value: Value to write + timeout: Connection timeout in seconds + node_lookup_mode: Path lookup strategy ('hierarchical' or 'direct') + value_type: OPC UA data type for the value + max_retries: Maximum number of connection attempts (default 1 = no retries) + retry_backoff_seconds: Base delay between retries (default 0 = no delay) + + Returns: + Tuple of (error_status, message) + """ logger.debug( - f"OPC Writer attempting to connect and write value={value} to {url}/{object_name}/{variable_name}" + f"OPC Writer attempting to write value={value} to {url}/{object_name}/{variable_name}" ) + + connection_manager = get_connection_manager() + try: - _opc_connect_and_write_value( + # Get connection from pool (will create new if needed) + client = connection_manager.get_connection( url=url, - namespace=namespace, user_name=user_name, password=password, + timeout=timeout, + max_retries=max_retries, + base_backoff=retry_backoff_seconds, + ) + + # Perform the write operation + _opc_write_value( + client=client, + namespace=namespace, object_name=object_name, variable_name=variable_name, value=value, - timeout=timeout, node_lookup_mode=node_lookup_mode, value_type=value_type, ) + logger.debug( f"OPC Writer successfully wrote value to {url}/{object_name}/{variable_name}" ) return False, "Value set successfully" + except Exception as exc: - logger.error(f"OPC Writer failed to write value: {exc}") + is_user_config_error = isinstance(exc, USER_CONFIG_ERROR_TYPES) + + # Check the exception chain for wrapped errors + if not is_user_config_error and hasattr(exc, "__cause__") and exc.__cause__: + is_user_config_error = isinstance(exc.__cause__, USER_CONFIG_ERROR_TYPES) + + if not is_user_config_error: + logger.warning( + f"OPC Writer error (invalidating connection): {type(exc).__name__}: {exc}" + ) + connection_manager.invalidate_connection(url, user_name) + else: + # User configuration errors - connection is fine, just log the error + logger.error(f"OPC Writer configuration error: {type(exc).__name__}: {exc}") + return ( True, - f"Failed to write {value} to {object_name}:{variable_name} in {url}. Internal error details: {exc}.", + f"Failed to write {value} to {object_name}:{variable_name} in {url}. Error: {exc}", ) -def _opc_connect_and_write_value( - url: str, +def _opc_write_value( + client: Client, namespace: str, - user_name: Optional[str], - password: Optional[str], object_name: str, variable_name: str, value: Union[bool, float, int, str], - timeout: int, node_lookup_mode: Literal["hierarchical", "direct"] = "hierarchical", value_type: str = "String", -): - logger.debug(f"OPC Writer creating client for {url} with timeout={timeout}") - client = Client(url=url, sync_wrapper_timeout=timeout) - if user_name and password: - client.set_user(user_name) - client.set_password(password) - try: - logger.debug(f"OPC Writer connecting to {url}") - client.connect() - logger.debug("OPC Writer successfully connected to server") - except BadUserAccessDenied as exc: - logger.error(f"OPC Writer authentication failed: {exc}") - safe_disconnect(client) - raise Exception(f"AUTH ERROR: {exc}") - except OSError as exc: - logger.error(f"OPC Writer network error during connection: {exc}") - safe_disconnect(client) - raise Exception(f"NETWORK ERROR: {exc}") - except Exception as exc: - logger.error(f"OPC Writer unhandled connection error: {type(exc)} {exc}") - safe_disconnect(client) - raise Exception(f"UNHANDLED ERROR: {type(exc)} {exc}") +) -> None: + """ + Write a value to an OPC UA variable using an existing connection. + + This is the core write logic, separated from connection management. + Raises exceptions on failure which the caller should handle. + + Args: + client: Connected OPC UA client + namespace: Namespace URI or index + object_name: Target object path + variable_name: Variable to write + value: Value to write + node_lookup_mode: Path lookup strategy + value_type: OPC UA data type for the value + + Raises: + Exception: On any error during the write operation + """ get_namespace_index = sync_async_client_method(AsyncClient.get_namespace_index)( client ) + # Resolve namespace try: if namespace.isdigit(): nsidx = int(namespace) @@ -502,23 +924,27 @@ def _opc_connect_and_write_value( namespaces = get_available_namespaces(client) logger.error(f"OPC Writer invalid namespace: {exc}") logger.error(f"Available namespaces: {namespaces}") - safe_disconnect(client) raise Exception( f"WRONG NAMESPACE ERROR: {exc}. Available namespaces: {namespaces}" - ) + ) from exc except Exception as exc: namespaces = get_available_namespaces(client) logger.error(f"OPC Writer unhandled namespace error: {type(exc)} {exc}") logger.error(f"Available namespaces: {namespaces}") - safe_disconnect(client) raise Exception( f"UNHANDLED ERROR: {type(exc)} {exc}. Available namespaces: {namespaces}" - ) + ) from exc + # Locate the node if node_lookup_mode == "direct": - # Direct NodeId access for Ignition-style string identifiers + # Direct NodeId access for string identifiers + # If variable_name is empty, use object_name as the full identifier + # This allows maximum flexibility for different server naming conventions try: - node_id = f"ns={nsidx};s={object_name}/{variable_name}" + if variable_name: + node_id = f"ns={nsidx};s={object_name}/{variable_name}" + else: + node_id = f"ns={nsidx};s={object_name}" logger.debug(f"OPC Writer using direct NodeId access: {node_id}") var = client.get_node(node_id) logger.debug( @@ -526,10 +952,9 @@ def _opc_connect_and_write_value( ) except Exception as exc: logger.error(f"OPC Writer direct NodeId access failed: {exc}") - safe_disconnect(client) raise Exception( f"WRONG OBJECT OR PROPERTY ERROR: Could not find node with direct NodeId '{node_id}'. Error: {exc}" - ) + ) from exc else: # Hierarchical path navigation (standard OPC UA) try: @@ -544,15 +969,14 @@ def _opc_connect_and_write_value( ) except BadNoMatch as exc: logger.error(f"OPC Writer hierarchical path not found: {exc}") - safe_disconnect(client) raise Exception( f"WRONG OBJECT OR PROPERTY ERROR: Could not find node at hierarchical path '{node_path}'. Error: {exc}" - ) + ) from exc except Exception as exc: logger.error(f"OPC Writer unhandled node lookup error: {type(exc)} {exc}") - safe_disconnect(client) - raise Exception(f"UNHANDLED ERROR: {type(exc)} {exc}") + raise Exception(f"UNHANDLED ERROR: {type(exc)} {exc}") from exc + # Write the value try: logger.debug( f"OPC Writer writing value '{value}' to variable with type '{value_type}'" @@ -603,10 +1027,9 @@ def _opc_connect_and_write_value( var.set_value(int_val, VariantType.UInt64) else: logger.error(f"OPC Writer unsupported value type: {value_type}") - safe_disconnect(client) raise UnsupportedTypeError(f"Value type '{value_type}' is not supported.") logger.info( - f"OPC Writer successfully wrote '{value}' to variable at {object_name}/{variable_name}" + f"OPC Writer successfully wrote '{value}' to variable at {object_name}/{variable_name}" ) except UnsupportedTypeError: raise @@ -615,13 +1038,9 @@ def _opc_connect_and_write_value( logger.error( f"OPC Writer type mismatch: tried to write value '{value}' (type: {type(value).__name__}) to node with data type {node_type}. Error: {exc}" ) - safe_disconnect(client) raise Exception( f"WRONG TYPE ERROR: Tried to write value '{value}' (type: {type(value).__name__}) but node expects type {node_type}. {exc}" - ) + ) from exc except Exception as exc: logger.error(f"OPC Writer unhandled write error: {type(exc)} {exc}") - safe_disconnect(client) - raise Exception(f"UNHANDLED ERROR: {type(exc)} {exc}") - - safe_disconnect(client) + raise Exception(f"UNHANDLED ERROR: {type(exc)} {exc}") from exc diff --git a/tests/workflows/integration_tests/execution/test_workflow_with_opc_writer.py b/tests/workflows/integration_tests/execution/test_workflow_with_opc_writer.py index eb57734a9c..2c1bc992c2 100644 --- a/tests/workflows/integration_tests/execution/test_workflow_with_opc_writer.py +++ b/tests/workflows/integration_tests/execution/test_workflow_with_opc_writer.py @@ -2,6 +2,7 @@ import threading import time from typing import Optional, Union +from unittest.mock import patch, MagicMock import pytest from asyncua import Server @@ -18,6 +19,11 @@ from inference.core.env import WORKFLOWS_MAX_CONCURRENT_STEPS from inference.core.workflows.execution_engine.core import ExecutionEngine +from inference.enterprise.workflows.enterprise_blocks.sinks.opc_writer.v1 import ( + OPCUAConnectionManager, + get_connection_manager, + opc_connect_and_write_value, +) from tests.workflows.integration_tests.execution.workflows_gallery_collector.decorators import ( add_to_workflows_gallery, ) @@ -514,3 +520,231 @@ def test_workflow_with_opc_writer_sink_direct_mode(test_opc_server) -> None: assert result[0]["opc_writer_results"]["throttling_status"] == False assert result[0]["opc_writer_results"]["message"] == "Value set successfully" assert result_value == 42 + + +@pytest.fixture +def reset_connection_manager(): + """Reset the connection manager singleton before and after each test.""" + # Get the manager and close all connections before test + manager = get_connection_manager() + manager.close_all() + + yield manager + + # Clean up after test + manager.close_all() + + +@pytest.mark.timeout(10) +def test_connection_pooling_reuses_connections(test_opc_server, reset_connection_manager) -> None: + """Test that the connection manager reuses connections for the same server.""" + manager = reset_connection_manager + + # First write - should create a new connection + error_status1, message1 = opc_connect_and_write_value( + url=test_opc_server["url"], + namespace=test_opc_server["namespace"], + user_name=test_opc_server["user_name"], + password=test_opc_server["password"], + object_name=test_opc_server["object_name"], + variable_name="Int32Var", + value=100, + timeout=2, + value_type="Int32", + ) + + assert error_status1 == False + assert message1 == "Value set successfully" + + # Check pool stats - should have 1 connection + stats = manager.get_pool_stats() + assert stats["total_connections"] == 1 + + # Second write - should reuse the connection + error_status2, message2 = opc_connect_and_write_value( + url=test_opc_server["url"], + namespace=test_opc_server["namespace"], + user_name=test_opc_server["user_name"], + password=test_opc_server["password"], + object_name=test_opc_server["object_name"], + variable_name="Int32Var", + value=200, + timeout=2, + value_type="Int32", + ) + + assert error_status2 == False + assert message2 == "Value set successfully" + + # Still should have only 1 connection (reused) + stats = manager.get_pool_stats() + assert stats["total_connections"] == 1 + + # Verify the value was actually written + result_value = _opc_connect_and_read_value( + url=test_opc_server["url"], + namespace=test_opc_server["namespace"], + user_name=test_opc_server["user_name"], + password=test_opc_server["password"], + object_name=test_opc_server["object_name"], + variable_name="Int32Var", + timeout=1, + ) + assert result_value == 200 + + +@pytest.mark.timeout(10) +def test_connection_invalidation_creates_new_connection(test_opc_server, reset_connection_manager) -> None: + """Test that invalidating a connection causes a new one to be created.""" + manager = reset_connection_manager + + # Create initial connection + error_status1, message1 = opc_connect_and_write_value( + url=test_opc_server["url"], + namespace=test_opc_server["namespace"], + user_name=test_opc_server["user_name"], + password=test_opc_server["password"], + object_name=test_opc_server["object_name"], + variable_name="Int32Var", + value=300, + timeout=2, + value_type="Int32", + ) + + assert error_status1 == False + stats = manager.get_pool_stats() + assert stats["total_connections"] == 1 + + # Invalidate the connection + manager.invalidate_connection(test_opc_server["url"], test_opc_server["user_name"]) + + # Should have 0 connections now + stats = manager.get_pool_stats() + assert stats["total_connections"] == 0 + + # Next write should create a new connection + error_status2, message2 = opc_connect_and_write_value( + url=test_opc_server["url"], + namespace=test_opc_server["namespace"], + user_name=test_opc_server["user_name"], + password=test_opc_server["password"], + object_name=test_opc_server["object_name"], + variable_name="Int32Var", + value=400, + timeout=2, + value_type="Int32", + ) + + assert error_status2 == False + stats = manager.get_pool_stats() + assert stats["total_connections"] == 1 + + +@pytest.mark.timeout(10) +def test_connection_failure_fails_fast(reset_connection_manager) -> None: + """Test that connection failures fail fast without blocking.""" + manager = reset_connection_manager + + # Try to connect to a non-existent server with default settings (no retries) + error_status, message = opc_connect_and_write_value( + url="opc.tcp://localhost:59999/nonexistent/", # Non-existent server + namespace="http://test.namespace", + user_name=None, + password=None, + object_name="TestObject", + variable_name="TestVar", + value=123, + timeout=1, + value_type="Int32", + # Use defaults: max_retries=1, retry_backoff_seconds=0.0 + ) + + # Should fail quickly + assert error_status == True + assert "Failed to write" in message + + +@pytest.mark.timeout(10) +def test_different_servers_get_different_connections(test_opc_server, reset_connection_manager) -> None: + """Test that different server URLs get separate pooled connections.""" + manager = reset_connection_manager + + # Connect to the real server + error_status1, message1 = opc_connect_and_write_value( + url=test_opc_server["url"], + namespace=test_opc_server["namespace"], + user_name=test_opc_server["user_name"], + password=test_opc_server["password"], + object_name=test_opc_server["object_name"], + variable_name="Int32Var", + value=500, + timeout=2, + value_type="Int32", + ) + + assert error_status1 == False + stats = manager.get_pool_stats() + assert stats["total_connections"] == 1 + + # Now connect with a different user (should create separate connection key) + # This will fail because user2 doesn't exist, but it demonstrates separate pooling + error_status2, message2 = opc_connect_and_write_value( + url=test_opc_server["url"], + namespace=test_opc_server["namespace"], + user_name="different_user", + password="different_pass", + object_name=test_opc_server["object_name"], + variable_name="Int32Var", + value=600, + timeout=1, + value_type="Int32", + max_retries=1, + ) + + # This should fail due to auth, but the point is it tried a separate connection + assert error_status2 == True + assert "AUTH ERROR" in message2 + + +@pytest.mark.timeout(10) +def test_close_all_connections(test_opc_server, reset_connection_manager) -> None: + """Test that close_all properly closes all pooled connections.""" + manager = reset_connection_manager + + # Create a connection + error_status, _ = opc_connect_and_write_value( + url=test_opc_server["url"], + namespace=test_opc_server["namespace"], + user_name=test_opc_server["user_name"], + password=test_opc_server["password"], + object_name=test_opc_server["object_name"], + variable_name="Int32Var", + value=700, + timeout=2, + value_type="Int32", + ) + + assert error_status == False + assert manager.get_pool_stats()["total_connections"] == 1 + + # Close all connections + manager.close_all() + + # Should have no connections + assert manager.get_pool_stats()["total_connections"] == 0 + + # Next write should work fine (creates new connection) + error_status2, _ = opc_connect_and_write_value( + url=test_opc_server["url"], + namespace=test_opc_server["namespace"], + user_name=test_opc_server["user_name"], + password=test_opc_server["password"], + object_name=test_opc_server["object_name"], + variable_name="Int32Var", + value=800, + timeout=2, + value_type="Int32", + ) + + assert error_status2 == False + assert manager.get_pool_stats()["total_connections"] == 1