@@ -39,12 +39,14 @@ static inline bool least_busy_comp(const PooledConnection::Ptr& a, const PooledC
3939
4040ConnectionPoolSettings::ConnectionPoolSettings ()
4141 : num_connections_per_host(CASS_DEFAULT_NUM_CONNECTIONS_PER_HOST)
42- , reconnection_policy(new ExponentialReconnectionPolicy()) {}
42+ , reconnection_policy(new ExponentialReconnectionPolicy())
43+ , round_robin(CASS_DEFAULT_CONNECTION_POOL_ROUND_ROBIN) {}
4344
4445ConnectionPoolSettings::ConnectionPoolSettings (const Config& config)
4546 : connection_settings(config)
4647 , num_connections_per_host(config.core_connections_per_host())
47- , reconnection_policy(config.reconnection_policy()) {}
48+ , reconnection_policy(config.reconnection_policy())
49+ , round_robin(config.connection_pool_round_robin()) {}
4850
4951class NopConnectionPoolListener : public ConnectionPoolListener {
5052public:
@@ -72,7 +74,8 @@ ConnectionPool::ConnectionPool(const Connection::Vec& connections, ConnectionPoo
7274 , settings_(settings)
7375 , metrics_(metrics)
7476 , close_state_(CLOSE_STATE_OPEN)
75- , notify_state_(NOTIFY_STATE_NEW) {
77+ , notify_state_(NOTIFY_STATE_NEW)
78+ , next_connection_idx_(0u ) {
7679 inc_ref (); // Reference for the lifetime of the pooled connections
7780 set_pointer_keys (reconnection_schedules_);
7881 set_pointer_keys (to_flush_);
@@ -95,13 +98,28 @@ ConnectionPool::ConnectionPool(const Connection::Vec& connections, ConnectionPoo
9598 }
9699}
97100
98- PooledConnection::Ptr ConnectionPool::find_least_busy () const {
99- PooledConnection::Vec::const_iterator it =
100- std::min_element (connections_.begin (), connections_.end (), least_busy_comp);
101- if (it == connections_.end () || (*it)->is_closing ()) {
101+ PooledConnection::Ptr ConnectionPool::find_connection () const {
102+ if (!has_connections ()) {
102103 return PooledConnection::Ptr ();
103104 }
104- return *it;
105+
106+ PooledConnection::Ptr conn_ptr;
107+ if (settings_.round_robin ) {
108+ size_t idx = next_connection_idx_.load (std::memory_order_acquire);
109+ size_t new_idx = (idx + 1u ) % connections_.size ();
110+ while (!next_connection_idx_.compare_exchange_weak (idx, new_idx, std::memory_order_acq_rel)) {
111+ new_idx = (idx + 1u ) % connections_.size ();
112+ }
113+ conn_ptr = connections_[idx];
114+ } else {
115+ conn_ptr = *std::min_element (connections_.begin (), connections_.end (), least_busy_comp);
116+ }
117+
118+ if (conn_ptr && conn_ptr->is_closing ()) {
119+ conn_ptr = PooledConnection::Ptr ();
120+ }
121+ LOG_TRACE (" Chosen connection: %p" , (void *)conn_ptr.get ());
122+ return conn_ptr;
105123}
106124
107125bool ConnectionPool::has_connections () const { return !connections_.empty (); }
0 commit comments