diff --git a/src/Microsoft.Data.SqlClient/add-ons/AzureKeyVaultProvider/Microsoft.Data.SqlClient.AlwaysEncrypted.AzureKeyVaultProvider.csproj b/src/Microsoft.Data.SqlClient/add-ons/AzureKeyVaultProvider/Microsoft.Data.SqlClient.AlwaysEncrypted.AzureKeyVaultProvider.csproj index f1d7c76751..8bdb0e520a 100644 --- a/src/Microsoft.Data.SqlClient/add-ons/AzureKeyVaultProvider/Microsoft.Data.SqlClient.AlwaysEncrypted.AzureKeyVaultProvider.csproj +++ b/src/Microsoft.Data.SqlClient/add-ons/AzureKeyVaultProvider/Microsoft.Data.SqlClient.AlwaysEncrypted.AzureKeyVaultProvider.csproj @@ -36,7 +36,6 @@ - diff --git a/src/Microsoft.Data.SqlClient/netcore/ref/Microsoft.Data.SqlClient.csproj b/src/Microsoft.Data.SqlClient/netcore/ref/Microsoft.Data.SqlClient.csproj index a1b8110fb8..bfc2745e90 100644 --- a/src/Microsoft.Data.SqlClient/netcore/ref/Microsoft.Data.SqlClient.csproj +++ b/src/Microsoft.Data.SqlClient/netcore/ref/Microsoft.Data.SqlClient.csproj @@ -42,9 +42,6 @@ - - - diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj index 950d7f83b1..3977c4e566 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj @@ -1102,9 +1102,6 @@ - - - diff --git a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/ChannelDbConnectionPool.cs b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/ChannelDbConnectionPool.cs index 62c11705be..398918c301 100644 --- a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/ChannelDbConnectionPool.cs +++ b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/ChannelDbConnectionPool.cs @@ -95,6 +95,7 @@ internal ChannelDbConnectionPool( Identity = identity; AuthenticationContexts = new(); MaxPoolSize = Convert.ToUInt32(PoolGroupOptions.MaxPoolSize); + TransactedConnectionPool = new(this); _connectionSlots = new(MaxPoolSize); @@ -147,6 +148,9 @@ public ConcurrentDictionary< /// public DbConnectionPoolState State { get; private set; } + /// + public TransactedConnectionPool TransactedConnectionPool { get; } + /// public bool UseLoadBalancing => PoolGroupOptions.UseLoadBalancing; diff --git a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/DbConnectionPoolOptions.cs b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/DbConnectionPoolOptions.cs index 7adf8abdc2..5ac6f4d565 100644 --- a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/DbConnectionPoolOptions.cs +++ b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/DbConnectionPoolOptions.cs @@ -39,6 +39,9 @@ bool hasTransactionAffinity _hasTransactionAffinity = hasTransactionAffinity; } + /// + /// The time (in milliseconds) to wait for a connection to be created/returned before terminating the attempt. + /// public int CreationTimeout { get { return _creationTimeout; } diff --git a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/IDbConnectionPool.cs b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/IDbConnectionPool.cs index b684bb24bb..bfc5789d3f 100644 --- a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/IDbConnectionPool.cs +++ b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/IDbConnectionPool.cs @@ -85,6 +85,11 @@ internal interface IDbConnectionPool /// DbConnectionPoolState State { get; } + /// + /// Holds connections that are currently enlisted in a transaction. + /// + TransactedConnectionPool TransactedConnectionPool { get; } + /// /// Indicates whether the connection pool is using load balancing. /// @@ -106,7 +111,7 @@ internal interface IDbConnectionPool /// The user options to use if a new connection must be opened. /// The retrieved connection will be passed out via this parameter. /// True if a connection was set in the out parameter, otherwise returns false. - bool TryGetConnection(DbConnection owningObject, TaskCompletionSource taskCompletionSource, DbConnectionOptions userOptions, out DbConnectionInternal? connection); + bool TryGetConnection(DbConnection owningObject, TaskCompletionSource? taskCompletionSource, DbConnectionOptions userOptions, out DbConnectionInternal? connection); /// /// Replaces the internal connection currently associated with owningObject with a new internal connection from the pool. diff --git a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/TransactedConnectionPool.cs b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/TransactedConnectionPool.cs index b8e56bd9e3..14ec847379 100644 --- a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/TransactedConnectionPool.cs +++ b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/TransactedConnectionPool.cs @@ -30,7 +30,7 @@ internal class TransactedConnectionPool /// A specialized list that holds database connections associated with a specific transaction. /// Maintains a reference to the transaction for proper cleanup when the transaction completes. /// - private sealed class TransactedConnectionList : List + internal sealed class TransactedConnectionList : List { private readonly Transaction _transaction; @@ -59,9 +59,6 @@ internal void Dispose() } #region Fields - - private readonly Dictionary _transactedCxns; - private static int _objectTypeCount; internal readonly int _objectID = System.Threading.Interlocked.Increment(ref _objectTypeCount); @@ -79,7 +76,7 @@ internal void Dispose() internal TransactedConnectionPool(IDbConnectionPool pool) { Pool = pool; - _transactedCxns = new Dictionary(); + TransactedConnections = new Dictionary(); SqlClientEventSource.Log.TryPoolerTraceEvent(" {0}, Constructed for connection pool {1}", Id, Pool.Id); } @@ -97,6 +94,8 @@ internal TransactedConnectionPool(IDbConnectionPool pool) /// The IDbConnectionPool instance that owns this transacted pool. internal IDbConnectionPool Pool { get; } + internal Dictionary TransactedConnections { get; private set; } + #endregion #region Methods @@ -122,9 +121,9 @@ internal TransactedConnectionPool(IDbConnectionPool pool) TransactedConnectionList? connections; bool txnFound = false; - lock (_transactedCxns) + lock (TransactedConnections) { - txnFound = _transactedCxns.TryGetValue(transaction, out connections); + txnFound = TransactedConnections.TryGetValue(transaction, out connections); } // NOTE: GetTransactedObject is only used when AutoEnlist = True and the ambient transaction @@ -181,10 +180,10 @@ internal void PutTransactedObject(Transaction transaction, DbConnectionInternal // NOTE: because TransactionEnded is an asynchronous notification, there's no guarantee // around the order in which PutTransactionObject and TransactionEnded are called. - lock (_transactedCxns) + lock (TransactedConnections) { // Check if a transacted pool has been created for this transaction - if ((txnFound = _transactedCxns.TryGetValue(transaction, out connections)) + if ((txnFound = TransactedConnections.TryGetValue(transaction, out connections)) && connections is not null) { // synchronize multi-threaded access with GetTransactedObject @@ -212,14 +211,14 @@ internal void PutTransactedObject(Transaction transaction, DbConnectionInternal transactionClone = transaction.Clone(); newConnections = new TransactedConnectionList(2, transactionClone); // start with only two connections in the list; most times we won't need that many. - lock (_transactedCxns) + lock (TransactedConnections) { // NOTE: in the interim between the locks on the transacted pool (this) during // execution of this method, another thread (threadB) may have attempted to // add a different connection to the transacted pool under the same // transaction. As a result, threadB may have completed creating the // transacted pool while threadA was processing the above instructions. - if (_transactedCxns.TryGetValue(transaction, out connections) + if (TransactedConnections.TryGetValue(transaction, out connections) && connections is not null) { // synchronize multi-threaded access with GetTransactedObject @@ -237,7 +236,7 @@ internal void PutTransactedObject(Transaction transaction, DbConnectionInternal // add the connection/transacted object to the list newConnections.Add(transactedObject); - _transactedCxns.Add(transactionClone, newConnections); + TransactedConnections.Add(transactionClone, newConnections); transactionClone = null; // we've used it -- don't throw it or the TransactedConnectionList that references it away. } } @@ -296,9 +295,9 @@ internal void TransactionEnded(Transaction transaction, DbConnectionInternal tra // TODO: that the pending creation of a transacted pool for this transaction is aborted when // TODO: PutTransactedObject finally gets some CPU time? - lock (_transactedCxns) + lock (TransactedConnections) { - if (_transactedCxns.TryGetValue(transaction, out connections) + if (TransactedConnections.TryGetValue(transaction, out connections) && connections is not null) { bool shouldDisposeConnections = false; @@ -318,7 +317,7 @@ internal void TransactionEnded(Transaction transaction, DbConnectionInternal tra if (0 >= connections.Count) { SqlClientEventSource.Log.TryPoolerTraceEvent(" {0}, Transaction {1}, Removing List from transacted pool.", Id, transaction.GetHashCode()); - _transactedCxns.Remove(transaction); + TransactedConnections.Remove(transaction); // we really need to dispose our connection list; it may have // native resources via the tx and GC may not happen soon enough. @@ -350,4 +349,4 @@ internal void TransactionEnded(Transaction transaction, DbConnectionInternal tra } #endregion -} \ No newline at end of file +} diff --git a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/WaitHandleDbConnectionPool.cs b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/WaitHandleDbConnectionPool.cs index ae56af3bcd..1508a0d41f 100644 --- a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/WaitHandleDbConnectionPool.cs +++ b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/ConnectionPool/WaitHandleDbConnectionPool.cs @@ -309,9 +309,9 @@ public bool IsRunning get { return State is Running; } } - private int MaxPoolSize => PoolGroupOptions.MaxPoolSize; + internal int MaxPoolSize => PoolGroupOptions.MaxPoolSize; - private int MinPoolSize => PoolGroupOptions.MinPoolSize; + internal int MinPoolSize => PoolGroupOptions.MinPoolSize; public DbConnectionPoolGroup PoolGroup => _connectionPoolGroup; @@ -328,6 +328,8 @@ public bool IsRunning private bool UsingIntegrateSecurity => _identity != null && DbConnectionPoolIdentity.NoIdentity != _identity; + public TransactedConnectionPool TransactedConnectionPool => _transactedConnectionPool; + private void CleanupCallback(object state) { // Called when the cleanup-timer ticks over. @@ -946,6 +948,8 @@ private bool TryGetConnection(DbConnection owningObject, uint waitForMultipleObj // If automatic transaction enlistment is required, then we try to // get the connection from the transacted connection pool first. + // If automatic enlistment is not enabled, then we cannot vend connections + // from the transacted pool. if (HasTransactionAffinity) { obj = GetFromTransactedPool(out transaction); diff --git a/src/Microsoft.Data.SqlClient/tests/UnitTests/ConnectionPool/TransactedConnectionPoolTest.cs b/src/Microsoft.Data.SqlClient/tests/UnitTests/ConnectionPool/TransactedConnectionPoolTest.cs index 3fc31338cd..18bd9c5ea3 100644 --- a/src/Microsoft.Data.SqlClient/tests/UnitTests/ConnectionPool/TransactedConnectionPoolTest.cs +++ b/src/Microsoft.Data.SqlClient/tests/UnitTests/ConnectionPool/TransactedConnectionPoolTest.cs @@ -668,13 +668,14 @@ internal class MockDbConnectionPool : IDbConnectionPool public DbConnectionPoolGroupOptions PoolGroupOptions => throw new NotImplementedException(); public DbConnectionPoolProviderInfo ProviderInfo => throw new NotImplementedException(); public DbConnectionPoolState State => throw new NotImplementedException(); + public TransactedConnectionPool TransactedConnectionPool => throw new NotImplementedException(); public bool UseLoadBalancing => throw new NotImplementedException(); public ConcurrentBag ReturnedConnections { get; } = new(); public void Clear() => throw new NotImplementedException(); - public bool TryGetConnection(DbConnection owningObject, TaskCompletionSource taskCompletionSource, DbConnectionOptions userOptions, out DbConnectionInternal? connection) + public bool TryGetConnection(DbConnection owningObject, TaskCompletionSource? taskCompletionSource, DbConnectionOptions userOptions, out DbConnectionInternal? connection) { throw new NotImplementedException(); } @@ -739,4 +740,4 @@ internal override void ResetConnection() } #endregion -} \ No newline at end of file +} diff --git a/src/Microsoft.Data.SqlClient/tests/UnitTests/ConnectionPool/WaitHandleDbConnectionPoolTransactionStressTest.cs b/src/Microsoft.Data.SqlClient/tests/UnitTests/ConnectionPool/WaitHandleDbConnectionPoolTransactionStressTest.cs new file mode 100644 index 0000000000..1a9dc9e36f --- /dev/null +++ b/src/Microsoft.Data.SqlClient/tests/UnitTests/ConnectionPool/WaitHandleDbConnectionPoolTransactionStressTest.cs @@ -0,0 +1,1022 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Collections.Concurrent; +using System.Data.Common; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using System.Transactions; +using Microsoft.Data.Common.ConnectionString; +using Microsoft.Data.ProviderBase; +using Microsoft.Data.SqlClient.ConnectionPool; +using Xunit; + +namespace Microsoft.Data.SqlClient.UnitTests.ConnectionPool; + +/// +/// Stress tests for WaitHandleDbConnectionPool transaction functionality under high concurrency and load. +/// These tests verify that pool metrics remain consistent when connections are rapidly opened and closed +/// with intermingled transactions in a highly concurrent environment. +/// +public class WaitHandleDbConnectionPoolTransactionStressTest : IDisposable +{ + private const int DefaultMaxPoolSize = 50; + private const int DefaultMinPoolSize = 0; + private readonly int DefaultCreationTimeoutInMilliseconds = 15000; + + private IDbConnectionPool? _pool; + + public void Dispose() + { + _pool?.Shutdown(); + _pool?.Clear(); + } + + #region Helper Methods + + private WaitHandleDbConnectionPool CreatePool( + int maxPoolSize = DefaultMaxPoolSize, + int minPoolSize = DefaultMinPoolSize, + bool hasTransactionAffinity = true) + { + var poolGroupOptions = new DbConnectionPoolGroupOptions( + poolByIdentity: false, + minPoolSize: minPoolSize, + maxPoolSize: maxPoolSize, + creationTimeout: DefaultCreationTimeoutInMilliseconds, + loadBalanceTimeout: 0, + hasTransactionAffinity: hasTransactionAffinity + ); + + var dbConnectionPoolGroup = new DbConnectionPoolGroup( + new DbConnectionOptions("DataSource=localhost;", null), + new DbConnectionPoolKey("TestDataSource"), + poolGroupOptions + ); + + var connectionFactory = new MockSqlConnectionFactory(); + + var pool = new WaitHandleDbConnectionPool( + connectionFactory, + dbConnectionPoolGroup, + DbConnectionPoolIdentity.NoIdentity, + new DbConnectionPoolProviderInfo() + ); + + pool.Startup(); + return pool; + } + + private void AssertPoolMetrics(IDbConnectionPool pool) + { + Assert.True(pool.Count <= pool.PoolGroupOptions.MaxPoolSize, + $"Pool count ({pool.Count}) exceeded max pool size ({pool.PoolGroupOptions.MaxPoolSize})"); + Assert.True(pool.Count >= pool.PoolGroupOptions.MinPoolSize, + $"Pool count ({pool.Count}) is negative"); + Assert.Empty(pool.TransactedConnectionPool.TransactedConnections); + } + + #endregion + + #region Basic Transaction Stress Tests + + [Theory] + [InlineData(10, 100)] + public void StressTest_TransactionPerIteration(int threadCount, int iterationsPerThread) + { + // Arrange + _pool = CreatePool(); + var tasks = new Task[threadCount]; + + // Act + for (int t = 0; t < threadCount; t++) + { + tasks[t] = Task.Run(() => + { + for (int i = 0; i < iterationsPerThread; i++) + { + using var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled); + var owner = new SqlConnection(); + + _pool.TryGetConnection( + owner, + taskCompletionSource: null, + new DbConnectionOptions("", null), + out DbConnectionInternal? connection); + + Assert.NotNull(connection); + + _pool.ReturnInternalConnection(connection, owner); + scope.Complete(); + } + }); + } + + Task.WaitAll(tasks); + + // Assert + AssertPoolMetrics(_pool); + } + + [Theory] + [InlineData(10, 100)] + public async Task StressTest_TransactionPerIteration_Async(int threadCount, int iterationsPerThread) + { + // Arrange + _pool = CreatePool(); + var tasks = new Task[threadCount]; + + // Act + for (int t = 0; t < threadCount; t++) + { + tasks[t] = Task.Run(async () => + { + for (int i = 0; i < iterationsPerThread; i++) + { + using var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled); + var owner = new SqlConnection(); + + var tcs = new TaskCompletionSource(); + _pool.TryGetConnection( + owner, + taskCompletionSource: tcs, + new DbConnectionOptions("", null), + out DbConnectionInternal? connection); + + // Wait for the task if not obtained immediately + connection ??= await tcs.Task; + + Assert.NotNull(connection); + + _pool.ReturnInternalConnection(connection, owner); + scope.Complete(); + } + }); + } + + await Task.WhenAll(tasks); + + // Assert + AssertPoolMetrics(_pool); + } + + [Theory] + [InlineData(10, 100)] + public void StressTest_TransactionPerThread(int threadCount, int iterationsPerThread) + { + // Arrange + _pool = CreatePool(); + var tasks = new Task[threadCount]; + + // Act - Each transaction should be isolated + for (int t = 0; t < threadCount; t++) + { + tasks[t] = Task.Run(() => + { + using var scope = new TransactionScope(); + var transaction = Transaction.Current; + Assert.NotNull(transaction); + + // Get multiple connections within same transaction + for (int i = 0; i < iterationsPerThread; i++) + { + var owner = new SqlConnection(); + _pool.TryGetConnection(owner, null, new DbConnectionOptions("", null), out var conn); + Assert.NotNull(conn); + _pool.ReturnInternalConnection(conn, owner); + } + + Assert.Single(_pool.TransactedConnectionPool.TransactedConnections[transaction]); + + scope.Complete(); + }); + } + + Task.WaitAll(tasks); + + // Assert + AssertPoolMetrics(_pool); + } + + [Theory] + [InlineData(10, 100)] + public async Task StressTest_TransactionPerThread_Async(int threadCount, int iterationsPerThread) + { + // Arrange + _pool = CreatePool(); + var tasks = new Task[threadCount]; + + // Act - Each transaction should be isolated + for (int t = 0; t < threadCount; t++) + { + tasks[t] = Task.Run(async () => + { + using var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled); + var transaction = Transaction.Current; + Assert.NotNull(transaction); + + // Get multiple connections within same transaction + for (int i = 0; i < iterationsPerThread; i++) + { + var owner = new SqlConnection(); + // The transaction *must* be set as the AsyncState of the TaskCompletionSource. + var tcs = new TaskCompletionSource(transaction); + _pool.TryGetConnection( + owner, + tcs, + new DbConnectionOptions("", null), + out var conn); + + conn ??= await tcs.Task; + + Assert.NotNull(conn); + + _pool.ReturnInternalConnection(conn, owner); + + Assert.Single(_pool.TransactedConnectionPool.TransactedConnections[transaction]); + } + + Assert.Single(_pool.TransactedConnectionPool.TransactedConnections[transaction]); + + scope.Complete(); + }); + } + + await Task.WhenAll(tasks); + + // Assert + AssertPoolMetrics(_pool); + } + + [Theory] + [InlineData(10, 100)] + public void StressTest_SingleSharedTransaction(int threadCount, int iterationsPerThread) + { + // Arrange + _pool = CreatePool(); + var tasks = new Task[threadCount]; + + Transaction? transaction = null; + using (var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled)) + { + transaction = Transaction.Current; + Assert.NotNull(transaction); + // Act - Each transaction should be isolated + for (int t = 0; t < threadCount; t++) + { + tasks[t] = Task.Run(() => + { + using (var innerScope = new TransactionScope(transaction)) + { + Assert.Equal(transaction, Transaction.Current); + // Get multiple connections within same transaction + for (int i = 0; i < iterationsPerThread; i++) + { + using var owner = new SqlConnection(); + _pool.TryGetConnection(owner, null, new DbConnectionOptions("", null), out var conn); + Assert.NotNull(conn); + + // We bypass the SqlConnection.Open flow, so SqlConnection.InnerConnection is never set + // Therefore, SqlConnection.Close doesn't return the connection to the pool, we have to + // do it manually. + _pool.ReturnInternalConnection(conn, owner); + } + + innerScope.Complete(); + } + }); + } + + Task.WaitAll(tasks); + + scope.Complete(); + } + + // Assert + AssertPoolMetrics(_pool); + } + + [Theory] + [InlineData(10, 100)] + public async Task StressTest_SingleSharedTransaction_Async(int threadCount, int iterationsPerThread) + { + // Arrange + _pool = CreatePool(); + var tasks = new Task[threadCount]; + + Transaction? transaction = null; + using (var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled)) + { + transaction = Transaction.Current; + Assert.NotNull(transaction); + // Act - Each transaction should be isolated + for (int t = 0; t < threadCount; t++) + { + tasks[t] = Task.Run(async () => + { + using (var innerScope = new TransactionScope( + transaction, + TransactionScopeAsyncFlowOption.Enabled)) + { + Assert.Equal(transaction, Transaction.Current); + // Get multiple connections within same transaction + for (int i = 0; i < iterationsPerThread; i++) + { + var owner = new SqlConnection(); + // The transaction *must* be set as the AsyncState of the TaskCompletionSource. + var tcs = new TaskCompletionSource(transaction); + _pool.TryGetConnection( + owner, + tcs, + new DbConnectionOptions("", null), + out var conn); + + conn ??= await tcs.Task; + + Assert.NotNull(conn); + + _pool.ReturnInternalConnection(conn, owner); + } + + innerScope.Complete(); + } + }); + } + + await Task.WhenAll(tasks); + scope.Complete(); + } + + // Assert + AssertPoolMetrics(_pool); + } + #endregion + + #region Pool Saturation and Timeout Tests + + [Fact] + public void StressTest_PoolSaturation_WithOpenTransactions_VerifyTimeout() + { + // Arrange - Test that when pool is saturated with transactions, new requests behave correctly + _pool = CreatePool(maxPoolSize: 3); + const int saturatingThreadCount = 3; + const int waitingThreadCount = 5; + var saturatingTasks = new Task[saturatingThreadCount]; + var waitingTasks = new Task[waitingThreadCount]; + var exceptions = new ConcurrentBag(); + var completedWithoutConnection = 0; + using var barrier = new Barrier(saturatingThreadCount + 1); + + // Act - Saturate the pool with long-held connections in transactions + for (int t = 0; t < saturatingThreadCount; t++) + { + saturatingTasks[t] = Task.Run(async () => + { + var signalled = false; + try + { + using var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled); + var owner = new SqlConnection(); + + _pool.TryGetConnection(owner, null, new DbConnectionOptions("", null), out var conn); + Assert.NotNull(conn); + + // Signal that we've acquired a connection + barrier.SignalAndWait(); + signalled = true; + + // Hold the connection briefly + await Task.Delay(200); + + _pool.ReturnInternalConnection(conn, owner); + scope.Complete(); + } + catch (Exception ex) + { + exceptions.Add(ex); + if (!signalled) + { + // Ensure barrier is released even on exception + barrier.SignalAndWait(); + } + } + }); + } + + // Wait for all saturating threads to acquire connections + barrier.SignalAndWait(); + + // Now try to get more connections - pool is saturated + for (int t = 0; t < waitingThreadCount; t++) + { + waitingTasks[t] = Task.Run(() => + { + try + { + using var scope = new TransactionScope(); + var owner = new SqlConnection(); + + var obtained = _pool.TryGetConnection( + owner, + null, + new DbConnectionOptions("", null), + out var conn); + + if (!obtained || conn == null) + { + Interlocked.Increment(ref completedWithoutConnection); + } + else + { + _pool.ReturnInternalConnection(conn, owner); + scope.Complete(); + } + } + catch (Exception ex) + { + exceptions.Add(ex); + } + }); + } + + Task.WaitAll(saturatingTasks.Concat(waitingTasks).ToArray()); + + // Assert + Assert.Empty(exceptions); + Assert.True(completedWithoutConnection >= 0, + $"Completed without connection: {completedWithoutConnection}"); + + // Act + // Now that everything is released, we should be able to get a connection again. + using var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled); + var owner = new SqlConnection(); + + _pool.TryGetConnection( + owner, + null, + new DbConnectionOptions("", null), + out var conn); + + // Assert + Assert.NotNull(conn); + } + + [Fact] + public async Task StressTest_PoolSaturation_WithOpenTransactions_VerifyTimeout_Async() + { + // Arrange - Test that when pool is saturated with transactions, new requests behave correctly + _pool = CreatePool(maxPoolSize: 3); + const int saturatingThreadCount = 3; + const int waitingThreadCount = 5; + var saturatingTasks = new Task[saturatingThreadCount]; + var waitingTasks = new Task[waitingThreadCount]; + var exceptions = new ConcurrentBag(); + var completedWithoutConnection = 0; + + // Async-friendly barrier replacement + var allSaturatingThreadsReady = new TaskCompletionSource(); + var readyCount = 0; + + // Act - Saturate the pool with long-held connections in transactions + for (int t = 0; t < saturatingThreadCount; t++) + { + saturatingTasks[t] = Task.Run(async () => + { + try + { + using var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled); + var owner = new SqlConnection(); + + var tcs = new TaskCompletionSource(Transaction.Current); + _pool.TryGetConnection(owner, tcs, new DbConnectionOptions("", null), out var conn); + conn ??= await tcs.Task; + + Assert.NotNull(conn); + + // Signal that we've acquired a connection + if (Interlocked.Increment(ref readyCount) == saturatingThreadCount) + { + allSaturatingThreadsReady.TrySetResult(true); + } + + // Wait for all saturating threads to be ready + await allSaturatingThreadsReady.Task; + + // Hold the connection briefly + await Task.Delay(200); + + _pool.ReturnInternalConnection(conn, owner); + scope.Complete(); + } + catch (Exception ex) + { + exceptions.Add(ex); + // Ensure barrier is released even on exception + if (Interlocked.Increment(ref readyCount) == saturatingThreadCount) + { + allSaturatingThreadsReady.TrySetResult(true); + } + } + }); + } + + // Wait for all saturating threads to acquire connections + await allSaturatingThreadsReady.Task; + + // Now start waiting threads + for (int t = 0; t < waitingThreadCount; t++) + { + waitingTasks[t] = Task.Run(async () => + { + try + { + using var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled); + var owner = new SqlConnection(); + + var tcs = new TaskCompletionSource(Transaction.Current); + var obtained = _pool.TryGetConnection( + owner, + tcs, + new DbConnectionOptions("", null), + out var conn); + + if (!obtained) + { + // Try to wait with timeout + var timeoutTask = Task.Delay(300); + var completedTask = await Task.WhenAny(tcs.Task, timeoutTask); + + if (completedTask == timeoutTask) + { + Interlocked.Increment(ref completedWithoutConnection); + } + else + { + conn = tcs.Task.Result; + _pool.ReturnInternalConnection(conn, owner); + scope.Complete(); + } + } + else if (conn != null) + { + _pool.ReturnInternalConnection(conn, owner); + scope.Complete(); + } + else + { + Interlocked.Increment(ref completedWithoutConnection); + } + } + catch (Exception ex) + { + exceptions.Add(ex); + } + }); + } + + await Task.WhenAll(saturatingTasks.Concat(waitingTasks).ToArray()); + + // Assert + Assert.Empty(exceptions); + Assert.True(completedWithoutConnection >= 0, + $"Completed without connection: {completedWithoutConnection}"); + + // Act + // Now that everything is released, we should be able to get a connection again. + using var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled); + var owner = new SqlConnection(); + + var tcs = new TaskCompletionSource(Transaction.Current); + _pool.TryGetConnection( + owner, + tcs, + new DbConnectionOptions("", null), + out var conn); + + conn ??= await tcs.Task; + + // Assert + Assert.NotNull(conn); + } + + #endregion + + #region Nested Transaction Tests + + [Theory] + [InlineData(5, 3, 10, TransactionScopeOption.RequiresNew)] + [InlineData(5, 3, 10, TransactionScopeOption.Required)] + public void StressTest_NestedTransactions( + int threadCount, + int nestingLevel, + int iterationsPerThread, + TransactionScopeOption transactionScopeOption) + { + // Arrange - Test nested transactions with multiple nesting levels + _pool = CreatePool(maxPoolSize: 20); + var tasks = new Task[threadCount]; + var exceptions = new ConcurrentBag(); + var successCount = 0; + + // Act + for (int t = 0; t < threadCount; t++) + { + tasks[t] = Task.Run(() => + { + try + { + for (int i = 0; i < iterationsPerThread; i++) + { + ExecuteNestedTransaction(_pool, nestingLevel, transactionScopeOption); + Interlocked.Increment(ref successCount); + } + } + catch (Exception ex) + { + exceptions.Add(ex); + } + }); + } + + Task.WaitAll(tasks); + + // Assert + Assert.Empty(exceptions); + Assert.Equal(threadCount * iterationsPerThread, successCount); + AssertPoolMetrics(_pool); + } + + private void ExecuteNestedTransaction( + IDbConnectionPool pool, + int nestingLevel, + TransactionScopeOption transactionScopeOption) + { + if (nestingLevel <= 0) + { + return; + } + + using var scope = new TransactionScope(transactionScopeOption); + var owner = new SqlConnection(); + + pool.TryGetConnection(owner, null, new DbConnectionOptions("", null), out var conn); + Assert.NotNull(conn); + + // Recursively create nested transaction + if (nestingLevel > 1) + { + ExecuteNestedTransaction(pool, nestingLevel - 1, transactionScopeOption); + } + + pool.ReturnInternalConnection(conn, owner); + scope.Complete(); + } + + [Theory] + [InlineData(5, 3, 10, TransactionScopeOption.RequiresNew)] + [InlineData(5, 3, 10, TransactionScopeOption.Required)] + public async Task StressTest_NestedTransactions_Async( + int threadCount, + int nestingLevel, + int iterationsPerThread, + TransactionScopeOption transactionScopeOption) + { + // Arrange - Test nested transactions with multiple nesting levels + _pool = CreatePool(maxPoolSize: 20); + var tasks = new Task[threadCount]; + var exceptions = new ConcurrentBag(); + var successCount = 0; + + // Act + for (int t = 0; t < threadCount; t++) + { + tasks[t] = Task.Run(async () => + { + try + { + for (int i = 0; i < iterationsPerThread; i++) + { + await ExecuteNestedTransactionAsync(_pool, nestingLevel, transactionScopeOption); + Interlocked.Increment(ref successCount); + } + } + catch (Exception ex) + { + exceptions.Add(ex); + } + }); + } + + await Task.WhenAll(tasks); + + // Assert + Assert.Empty(exceptions); + Assert.Equal(threadCount * iterationsPerThread, successCount); + AssertPoolMetrics(_pool); + } + + private async Task ExecuteNestedTransactionAsync( + IDbConnectionPool pool, + int nestingLevel, + TransactionScopeOption transactionScopeOption) + { + if (nestingLevel <= 0) + { + return; + } + + using var scope = new TransactionScope(transactionScopeOption, TransactionScopeAsyncFlowOption.Enabled); + var transaction = Transaction.Current; + var owner = new SqlConnection(); + + var tcs = new TaskCompletionSource(transaction); + pool.TryGetConnection(owner, tcs, new DbConnectionOptions("", null), out var conn); + conn ??= await tcs.Task; + + Assert.NotNull(conn); + + // Recursively create nested transaction + if (nestingLevel > 1) + { + await ExecuteNestedTransactionAsync(pool, nestingLevel - 1, transactionScopeOption); + } + + pool.ReturnInternalConnection(conn, owner); + scope.Complete(); + } + + #endregion + + #region Intermingled Transaction Stress Tests + + [Fact] + public void StressTest_MixedTransactedAndNonTransacted_HighConcurrency() + { + // Arrange + _pool = CreatePool(maxPoolSize: 40); + const int threadCount = 20; + const int iterationsPerThread = 50; + var tasks = new Task[threadCount]; + var exceptions = new ConcurrentBag(); + + // Act - Half the threads use transactions, half don't + for (int t = 0; t < threadCount; t++) + { + bool useTransactions = t % 2 == 0; + tasks[t] = Task.Run(() => + { + try + { + for (int i = 0; i < iterationsPerThread; i++) + { + if (useTransactions) + { + using var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled); + var owner = new SqlConnection(); + _pool.TryGetConnection(owner, null, new DbConnectionOptions("", null), out var conn); + Assert.NotNull(conn); + _pool.ReturnInternalConnection(conn, owner); + scope.Complete(); + } + else + { + var owner = new SqlConnection(); + _pool.TryGetConnection(owner, null, new DbConnectionOptions("", null), out var conn); + Assert.NotNull(conn); + _pool.ReturnInternalConnection(conn, owner); + } + } + } + catch (Exception ex) + { + exceptions.Add(ex); + } + }); + } + + Task.WaitAll(tasks); + + // Assert + Assert.Empty(exceptions); + AssertPoolMetrics(_pool); + } + + #endregion + + #region Edge Case Stress Tests + + [Fact] + public void StressTest_TransactionRollback_ManyOperations() + { + // Arrange + _pool = CreatePool(maxPoolSize: 20); + const int threadCount = 10; + const int iterationsPerThread = 100; + var tasks = new Task[threadCount]; + var exceptions = new ConcurrentBag(); + var rollbackCount = 0; + + // Act - Alternate between commit and rollback + for (int t = 0; t < threadCount; t++) + { + tasks[t] = Task.Run(() => + { + try + { + for (int i = 0; i < iterationsPerThread; i++) + { + using var scope = new TransactionScope(); + var owner = new SqlConnection(); + + _pool.TryGetConnection(owner, null, new DbConnectionOptions("", null), out var conn); + Assert.NotNull(conn); + + // Randomly commit or rollback + if (i % 2 == 0) + { + scope.Complete(); + } + else + { + Interlocked.Increment(ref rollbackCount); + // Don't call Complete - let it rollback + } + + _pool.ReturnInternalConnection(conn!, owner); + } + } + catch (Exception ex) + { + exceptions.Add(ex); + } + }); + } + + Task.WaitAll(tasks); + + // Assert + Assert.Empty(exceptions); + Assert.True(rollbackCount > 0, "Expected some rollbacks"); + AssertPoolMetrics(_pool); + } + + [Fact] + public void StressTest_PoolShutdownDuringTransactions() + { + // Arrange + _pool = CreatePool(maxPoolSize: 15); + const int threadCount = 20; + using var barrier = new Barrier(threadCount); + var tasks = new Task[threadCount]; + + // Act + for (int t = 0; t < threadCount; t++) + { + tasks[t] = Task.Run(() => + { + barrier.SignalAndWait(); + + for (int i = 0; i < 50; i++) + { + using var scope = new TransactionScope(); + var owner = new SqlConnection(); + + _pool.TryGetConnection(owner, + null, + new DbConnectionOptions("", null), + out var conn); + + if (conn is not null) + { + _pool.ReturnInternalConnection(conn, owner); + } + + scope.Complete(); + } + }); + } + + // Shutdown pool while operations are in progress + _pool.Shutdown(); + + Task.WaitAll(tasks); + + // Assert - Just verify no crash occurred and pool count is valid + AssertPoolMetrics(_pool); + } + + [Fact] + public void StressTest_TransactionCompleteBeforeReturn() + { + // Arrange - Test completing transaction before returning connection + _pool = CreatePool(maxPoolSize: 20); + const int threadCount = 15; + const int iterationsPerThread = 100; + var tasks = new Task[threadCount]; + var exceptions = new ConcurrentBag(); + var successCount = 0; + + // Act - Complete transaction before returning connection + for (int t = 0; t < threadCount; t++) + { + tasks[t] = Task.Run(() => + { + try + { + for (int i = 0; i < iterationsPerThread; i++) + { + DbConnectionInternal? conn = null; + SqlConnection? owner = null; + + using (var scope = new TransactionScope()) + { + owner = new SqlConnection(); + _pool.TryGetConnection(owner, null, new DbConnectionOptions("", null), out conn); + Assert.NotNull(conn); + + // Complete transaction BEFORE returning + scope.Complete(); + } // Transaction completes here + + // Return connection AFTER transaction scope disposal + _pool.ReturnInternalConnection(conn!, owner!); + Interlocked.Increment(ref successCount); + } + } + catch (Exception ex) + { + exceptions.Add(ex); + } + }); + } + + Task.WaitAll(tasks); + + // Assert + Assert.Empty(exceptions); + Assert.Equal(threadCount * iterationsPerThread, successCount); + AssertPoolMetrics(_pool); + } + + #endregion + + #region Mock Classes + + internal class MockSqlConnectionFactory : SqlConnectionFactory + { + protected override DbConnectionInternal CreateConnection( + DbConnectionOptions options, + DbConnectionPoolKey poolKey, + DbConnectionPoolGroupProviderInfo poolGroupProviderInfo, + IDbConnectionPool pool, + DbConnection owningConnection, + DbConnectionOptions userOptions) + { + return new MockDbConnectionInternal(); + } + } + + internal class MockDbConnectionInternal : DbConnectionInternal + { + private static int s_nextId = 1; + public int MockId { get; } = Interlocked.Increment(ref s_nextId); + + public override string ServerVersion => "Mock"; + + public override DbTransaction BeginTransaction(System.Data.IsolationLevel il) + { + throw new NotImplementedException(); + } + + public override void EnlistTransaction(Transaction? transaction) + { + // Mock implementation - handle transaction enlistment + if (transaction != null) + { + EnlistedTransaction = transaction; + } + } + + protected override void Activate(Transaction? transaction) + { + EnlistedTransaction = transaction; + } + + protected override void Deactivate() + { + // Mock implementation - deactivate connection + } + + public override string ToString() => $"MockConnection_{MockId}"; + + internal override void ResetConnection() + { + // Do nothing + } + } + + #endregion +} diff --git a/src/Microsoft.Data.SqlClient/tests/tools/Microsoft.Data.SqlClient.ExtUtilities/Microsoft.Data.SqlClient.ExtUtilities.csproj b/src/Microsoft.Data.SqlClient/tests/tools/Microsoft.Data.SqlClient.ExtUtilities/Microsoft.Data.SqlClient.ExtUtilities.csproj index cd3ffeb61f..e4ce03ba47 100644 --- a/src/Microsoft.Data.SqlClient/tests/tools/Microsoft.Data.SqlClient.ExtUtilities/Microsoft.Data.SqlClient.ExtUtilities.csproj +++ b/src/Microsoft.Data.SqlClient/tests/tools/Microsoft.Data.SqlClient.ExtUtilities/Microsoft.Data.SqlClient.ExtUtilities.csproj @@ -6,12 +6,6 @@ - - -