Skip to content

Commit 7dcb2d1

Browse files
committed
ConnectionPool: allow picking connections by round-robin strategy
Default strategy for intranode load balancing is the "least busy" strategy, that is, we choose the connection that has as few pending requests (`inflight_request_count()`) as possible. This patch adds an alternative strategy: the usual round-robin.
1 parent d506367 commit 7dcb2d1

File tree

10 files changed

+91
-26
lines changed

10 files changed

+91
-26
lines changed

include/cassandra.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1974,6 +1974,23 @@ CASS_EXPORT CASS_DEPRECATED(CassError
19741974
cass_cluster_set_pending_requests_low_water_mark(CassCluster* cluster,
19751975
unsigned num_requests));
19761976

1977+
/**
1978+
* Alters the strategy of picking connections from connection pool for a given host.
1979+
* By default the connection chosen is the one with least pending requests.
1980+
* When this function is called, then usual round-robin will used.
1981+
*
1982+
* This should not be confused with load balancing while doing node selection. What is
1983+
* being load-balanced here are the <b>connections</b> to a single node.
1984+
*
1985+
* <b>Default:</b> cass_false (no round-robin)
1986+
*
1987+
* @public @memberof CassCluster
1988+
*
1989+
* @param[in] cluster
1990+
*/
1991+
CASS_EXPORT void
1992+
cass_cluster_set_round_robin_on_node_connections(CassCluster* cluster);
1993+
19771994
/**
19781995
* Sets the timeout for connecting to a node.
19791996
*

src/cluster_config.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,10 @@ CassError cass_cluster_set_pending_requests_low_water_mark(CassCluster* cluster,
233233
return CASS_OK;
234234
}
235235

236+
void cass_cluster_set_round_robin_on_node_connections(CassCluster* cluster) {
237+
cluster->config().set_connection_pool_round_robin(true);
238+
}
239+
236240
void cass_cluster_set_connect_timeout(CassCluster* cluster, unsigned timeout_ms) {
237241
cluster->config().set_connect_timeout(timeout_ms);
238242
}

src/config.hpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ class Config {
7474
, is_client_id_set_(false)
7575
, host_listener_(new DefaultHostListener())
7676
, monitor_reporting_interval_secs_(CASS_DEFAULT_CLIENT_MONITOR_EVENTS_INTERVAL_SECS)
77-
, cluster_metadata_resolver_factory_(new DefaultClusterMetadataResolverFactory()) {
77+
, cluster_metadata_resolver_factory_(new DefaultClusterMetadataResolverFactory())
78+
, connection_pool_round_robin_(CASS_DEFAULT_CONNECTION_POOL_ROUND_ROBIN) {
7879
profiles_.set_empty_key(String());
7980

8081
// Assign the defaults to the cluster profile
@@ -392,6 +393,11 @@ class Config {
392393
}
393394
}
394395

396+
bool connection_pool_round_robin() const { return connection_pool_round_robin_; }
397+
void set_connection_pool_round_robin(bool new_val) {
398+
connection_pool_round_robin_ = new_val;
399+
}
400+
395401
private:
396402
void init_profiles();
397403

@@ -441,6 +447,7 @@ class Config {
441447
unsigned monitor_reporting_interval_secs_;
442448
CloudSecureConnectionConfig cloud_secure_connection_config_;
443449
ClusterMetadataResolverFactory::Ptr cluster_metadata_resolver_factory_;
450+
bool connection_pool_round_robin_;
444451
};
445452

446453
}}} // namespace datastax::internal::core

src/connection_pool.cpp

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,14 @@ static inline bool least_busy_comp(const PooledConnection::Ptr& a, const PooledC
3939

4040
ConnectionPoolSettings::ConnectionPoolSettings()
4141
: num_connections_per_host(CASS_DEFAULT_NUM_CONNECTIONS_PER_HOST)
42-
, reconnection_policy(new ExponentialReconnectionPolicy()) {}
42+
, reconnection_policy(new ExponentialReconnectionPolicy())
43+
, round_robin(CASS_DEFAULT_CONNECTION_POOL_ROUND_ROBIN) {}
4344

4445
ConnectionPoolSettings::ConnectionPoolSettings(const Config& config)
4546
: connection_settings(config)
4647
, num_connections_per_host(config.core_connections_per_host())
47-
, reconnection_policy(config.reconnection_policy()) {}
48+
, reconnection_policy(config.reconnection_policy())
49+
, round_robin(config.connection_pool_round_robin()) {}
4850

4951
class NopConnectionPoolListener : public ConnectionPoolListener {
5052
public:
@@ -72,7 +74,8 @@ ConnectionPool::ConnectionPool(const Connection::Vec& connections, ConnectionPoo
7274
, settings_(settings)
7375
, metrics_(metrics)
7476
, close_state_(CLOSE_STATE_OPEN)
75-
, notify_state_(NOTIFY_STATE_NEW) {
77+
, notify_state_(NOTIFY_STATE_NEW)
78+
, next_connection_idx_(0u) {
7679
inc_ref(); // Reference for the lifetime of the pooled connections
7780
set_pointer_keys(reconnection_schedules_);
7881
set_pointer_keys(to_flush_);
@@ -95,13 +98,28 @@ ConnectionPool::ConnectionPool(const Connection::Vec& connections, ConnectionPoo
9598
}
9699
}
97100

98-
PooledConnection::Ptr ConnectionPool::find_least_busy() const {
99-
PooledConnection::Vec::const_iterator it =
100-
std::min_element(connections_.begin(), connections_.end(), least_busy_comp);
101-
if (it == connections_.end() || (*it)->is_closing()) {
101+
PooledConnection::Ptr ConnectionPool::find_connection() const {
102+
if (!has_connections()) {
102103
return PooledConnection::Ptr();
103104
}
104-
return *it;
105+
106+
PooledConnection::Ptr conn_ptr;
107+
if (settings_.round_robin) {
108+
size_t idx = next_connection_idx_.load(std::memory_order_acquire);
109+
size_t new_idx;
110+
do {
111+
new_idx = (idx + 1u) % connections_.size();
112+
} while (!next_connection_idx_.compare_exchange_weak(idx, new_idx, std::memory_order_acq_rel));
113+
conn_ptr = connections_[idx];
114+
} else {
115+
conn_ptr = *std::min_element(connections_.begin(), connections_.end(), least_busy_comp);
116+
}
117+
118+
if (conn_ptr && conn_ptr->is_closing()) {
119+
conn_ptr = PooledConnection::Ptr();
120+
}
121+
LOG_TRACE("Chosen connection: %p", (void*)conn_ptr.get());
122+
return conn_ptr;
105123
}
106124

107125
bool ConnectionPool::has_connections() const { return !connections_.empty(); }

src/connection_pool.hpp

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525

2626
#include <uv.h>
2727

28+
#include <atomic>
29+
2830
namespace datastax { namespace internal { namespace core {
2931

3032
class ConnectionPool;
@@ -94,6 +96,7 @@ struct ConnectionPoolSettings {
9496
ConnectionSettings connection_settings;
9597
size_t num_connections_per_host;
9698
ReconnectionPolicy::Ptr reconnection_policy;
99+
bool round_robin;
97100
};
98101

99102
/**
@@ -130,12 +133,17 @@ class ConnectionPool : public RefCounted<ConnectionPool> {
130133
Metrics* metrics);
131134

132135
/**
133-
* Find the least busy connection for the pool. The least busy connection has
134-
* the lowest number of outstanding requests and is not closed.
136+
* Find a connection in the pool. The behavior depends on the value of
137+
* `ConnectionPoolSettings::round_robin`:
138+
* - if it's true, then the returned connection is the next connection
139+
* according to round-robin policy;
140+
* - if it's false, the "least busy" connection is chosen.
141+
* (The "least busy" connection has the lowest number of outstanding requests).
142+
* If the connection found is closed, empty pointer is returned.
135143
*
136-
* @return The least busy connection or null if no connection is available.
144+
* @return The chosen connection or null if no connection is available.
137145
*/
138-
PooledConnection::Ptr find_least_busy() const;
146+
PooledConnection::Ptr find_connection() const;
139147

140148
/**
141149
* Determine if the pool has any valid connections.
@@ -235,6 +243,9 @@ class ConnectionPool : public RefCounted<ConnectionPool> {
235243
PooledConnection::Vec connections_;
236244
DelayedConnector::Vec pending_connections_;
237245
DenseHashSet<PooledConnection*> to_flush_;
246+
247+
/// Used when `ConnectionPoolSettings::round_robin` is set to true
248+
mutable std::atomic<size_t> next_connection_idx_;
238249
};
239250

240251
}}} // namespace datastax::internal::core

src/connection_pool_manager.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,12 @@ ConnectionPoolManager::ConnectionPoolManager(const ConnectionPool::Map& pools, u
6262
}
6363
}
6464

65-
PooledConnection::Ptr ConnectionPoolManager::find_least_busy(const Address& address) const {
65+
PooledConnection::Ptr ConnectionPoolManager::find_connection(const Address& address) const {
6666
ConnectionPool::Map::const_iterator it = pools_.find(address);
6767
if (it == pools_.end()) {
6868
return PooledConnection::Ptr();
6969
}
70-
return it->second->find_least_busy();
70+
return it->second->find_connection();
7171
}
7272

7373
bool ConnectionPoolManager::has_connections(const Address& address) const {

src/connection_pool_manager.hpp

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,13 +78,21 @@ class ConnectionPoolManager
7878
const ConnectionPoolSettings& settings);
7979

8080
/**
81-
* Find the least busy connection for a given host.
82-
*
83-
* @param address The address of the host to find a least busy connection.
84-
* @return The least busy connection for a host or null if no connections are
85-
* available.
81+
* Finds the the next connection for a given host according to round-robin policy,
82+
* (when `settings_::round_robin == true`). Otherwise finds the least
83+
* busy connection.
84+
*
85+
* @param address The address of the host to find a connection to.
86+
* @return The connection selected by policy in pool's settings or
87+
* null if no connections are available.
88+
*/
89+
PooledConnection::Ptr find_connection(const Address& address) const;
90+
91+
/**
92+
* Kept for backward compatibility (unit tests).
93+
* The tests will pass only when intranode round-robin is DISABLED (default).
8694
*/
87-
PooledConnection::Ptr find_least_busy(const Address& address) const;
95+
PooledConnection::Ptr find_least_busy(const Address& address) const { return find_connection(address); }
8896

8997
/**
9098
* Determine if a pool has any valid connections.

src/constants.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@
142142
#define CASS_DEFAULT_MAX_TRACING_DATA_WAIT_TIME_MS 15
143143
#define CASS_DEFAULT_RETRY_TRACING_DATA_WAIT_TIME_MS 3
144144
#define CASS_DEFAULT_TRACING_CONSISTENCY CASS_CONSISTENCY_ONE
145+
#define CASS_DEFAULT_CONNECTION_POOL_ROUND_ROBIN false
145146

146147
// Request-level defaults
147148
#define CASS_DEFAULT_CONSISTENCY CASS_CONSISTENCY_LOCAL_ONE

src/request_handler.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -357,8 +357,7 @@ void RequestHandler::internal_retry(RequestExecution* request_execution) {
357357

358358
bool is_done = false;
359359
while (!is_done && request_execution->current_host()) {
360-
PooledConnection::Ptr connection =
361-
manager_->find_least_busy(request_execution->current_host()->address());
360+
PooledConnection::Ptr connection = manager_->find_connection(request_execution->current_host()->address());
362361
if (connection) {
363362
int32_t result = connection->write(request_execution);
364363

src/request_processor.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ bool RequestProcessor::on_prepare_all(const RequestHandler::Ptr& request_handler
378378
PrepareAllCallback::Ptr prepare_all_callback(
379379
new PrepareAllCallback(address, prepare_all_handler));
380380

381-
PooledConnection::Ptr connection(connection_pool_manager_->find_least_busy(address));
381+
PooledConnection::Ptr connection = connection_pool_manager_->find_connection(address);
382382
if (connection) {
383383
connection->write(prepare_all_callback.get());
384384
}
@@ -579,8 +579,8 @@ int RequestProcessor::process_requests(uint64_t processing_time) {
579579
bool RequestProcessor::write_wait_callback(const RequestHandler::Ptr& request_handler,
580580
const Host::Ptr& current_host,
581581
const RequestCallback::Ptr& callback) {
582-
PooledConnection::Ptr connection(
583-
connection_pool_manager_->find_least_busy(current_host->address()));
582+
PooledConnection::Ptr connection = connection_pool_manager_->find_connection(current_host->address());
583+
584584
if (connection && connection->write(callback.get()) > 0) {
585585
// Stop the original request timer now that we have a response and
586586
// are waiting for the maximum wait time of the handler.

0 commit comments

Comments
 (0)