Skip to content

Commit 7d7c09a

Browse files
committed
connection_pool: picking the right connection and reconnecting
The reconnection is being scheduled until we reach the desired number of connections per shard. The other change is that instead of picking the 'least busy' connection from the host's pool, we pick the least busy one from the "shards pool".
1 parent 61160a7 commit 7d7c09a

File tree

2 files changed

+67
-35
lines changed

2 files changed

+67
-35
lines changed

src/connection_pool.cpp

Lines changed: 65 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "utils.hpp"
2323

2424
#include <algorithm>
25+
#include <numeric>
2526

2627
using namespace datastax;
2728
using namespace datastax::internal::core;
@@ -77,45 +78,65 @@ ConnectionPool::ConnectionPool(const Connection::Vec& connections, ConnectionPoo
7778
set_pointer_keys(reconnection_schedules_);
7879
set_pointer_keys(to_flush_);
7980

81+
if (host->sharding_info()) {
82+
const auto hosts_shard_cnt = host->sharding_info()->get_shards_count();
83+
connections_by_shard_.resize(hosts_shard_cnt);
84+
num_connections_per_shard_ = settings_.num_connections_per_host / hosts_shard_cnt
85+
+ (settings_.num_connections_per_host % hosts_shard_cnt ? 1u : 0u);
86+
} else {
87+
connections_by_shard_.resize(1);
88+
num_connections_per_shard_ = settings_.num_connections_per_host;
89+
}
90+
8091
for (Connection::Vec::const_iterator it = connections.begin(), end = connections.end(); it != end;
8192
++it) {
8293
const Connection::Ptr& connection(*it);
83-
if (!connection->is_closing()) {
94+
if (!connection->is_closing()
95+
&& connections_by_shard_[connection->shard_id().value_or(0)].size() < num_connections_per_shard_) {
8496
add_connection(PooledConnection::Ptr(new PooledConnection(this, connection)));
8597
}
8698
}
8799

88100
notify_up_or_down();
89101

90102
// We had non-critical errors or some connections closed
91-
assert(connections.size() <= settings_.num_connections_per_host);
92-
size_t needed = settings_.num_connections_per_host - connections_.size();
103+
size_t needed = num_connections_per_shard_ * connections_by_shard_.size()
104+
- std::accumulate(connections_by_shard_.begin(), connections_by_shard_.end(), 0u,
105+
[] (size_t acc, const PooledConnection::Vec& v) {
106+
return acc + v.size();
107+
});
93108
for (size_t i = 0; i < needed; ++i) {
94109
schedule_reconnect();
95110
}
96111
}
97112

98113
PooledConnection::Ptr ConnectionPool::find_least_busy(int64_t token) const {
99-
if (token == CASS_INT64_MIN) {
100-
PooledConnection::Vec::const_iterator it =
101-
std::min_element(connections_.begin(), connections_.end(), least_busy_comp);
102-
if (it == connections_.end() || (*it)->is_closing()) {
103-
return PooledConnection::Ptr();
114+
if (token == CASS_INT64_MIN || !host_->sharding_info()) {
115+
// We got a placeholder token, or a sensible token that is useless without the sharding info.
116+
// In both cases we return the least busy connection of the *entire pool* (or NULL).
117+
PooledConnection::Ptr least_busy; // NULL by default
118+
for (const auto& shard_pool : connections_by_shard_) {
119+
for (const auto& conn : shard_pool) {
120+
least_busy = least_busy ? std::min(least_busy, conn, least_busy_comp) : conn;
121+
}
104122
}
105-
return *it;
123+
return least_busy;
106124
}
107125

108-
const auto desired_shard_num = host_->sharding_info()->shard_id(token);
109-
for (const auto& conn : connections_) {
110-
if (conn->connection()->shard_id() == desired_shard_num) {
111-
return conn;
112-
}
126+
// Otherwise, find the least busy connection pointing to the right shard
127+
const auto& shard_pool = connections_by_shard_[host_->sharding_info()->shard_id(token)];
128+
PooledConnection::Vec::const_iterator it =
129+
std::min_element(shard_pool.begin(), shard_pool.end(), least_busy_comp);
130+
if (it == shard_pool.end() || (*it)->is_closing()) {
131+
return PooledConnection::Ptr();
113132
}
114-
115-
return find_least_busy(CASS_INT64_MIN);
133+
return *it;
116134
}
117135

118-
bool ConnectionPool::has_connections() const { return !connections_.empty(); }
136+
bool ConnectionPool::has_connections() const {
137+
return !std::all_of(connections_by_shard_.begin(), connections_by_shard_.end(),
138+
[] (const PooledConnection::Vec& v) { return v.empty(); });
139+
}
119140

120141
void ConnectionPool::flush() {
121142
for (DenseHashSet<PooledConnection*>::const_iterator it = to_flush_.begin(),
@@ -153,8 +174,9 @@ void ConnectionPool::close_connection(PooledConnection* connection, Protected) {
153174
if (metrics_) {
154175
metrics_->total_connections.dec();
155176
}
156-
connections_.erase(std::remove(connections_.begin(), connections_.end(), connection),
157-
connections_.end());
177+
auto& shard_pool = connections_by_shard_[connection->connection()->shard_id().value_or(0)];
178+
shard_pool.erase(std::remove(shard_pool.begin(), shard_pool.end(), connection),
179+
shard_pool.end());
158180
to_flush_.erase(connection);
159181

160182
if (close_state_ != CLOSE_STATE_OPEN) {
@@ -172,16 +194,16 @@ void ConnectionPool::add_connection(const PooledConnection::Ptr& connection) {
172194
if (metrics_) {
173195
metrics_->total_connections.inc();
174196
}
175-
connections_.push_back(connection);
197+
const size_t new_connections_shard = connection->connection()->shard_id().value_or(0u);
198+
LOG_INFO("add_connection: to host %s to shard %ld", host_->address_string().c_str(), new_connections_shard);
199+
connections_by_shard_[new_connections_shard].push_back(connection);
176200
}
177201

178202
void ConnectionPool::notify_up_or_down() {
179-
if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_UP) &&
180-
connections_.empty()) {
203+
if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_UP) && !has_connections()) {
181204
notify_state_ = NOTIFY_STATE_DOWN;
182205
listener_->on_pool_down(host_->address());
183-
} else if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_DOWN) &&
184-
!connections_.empty()) {
206+
} else if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_DOWN) && has_connections()) {
185207
notify_state_ = NOTIFY_STATE_UP;
186208
listener_->on_pool_up(host_->address());
187209
}
@@ -222,11 +244,13 @@ void ConnectionPool::internal_close() {
222244
// Make copies of connection/connector data structures to prevent iterator
223245
// invalidation.
224246

225-
PooledConnection::Vec connections(connections_);
226-
for (PooledConnection::Vec::iterator it = connections.begin(), end = connections.end();
227-
it != end; ++it) {
228-
(*it)->close();
229-
}
247+
auto connections_per_shards = connections_by_shard_;
248+
std::for_each(connections_per_shards.begin(), connections_per_shards.end(), [] (PooledConnection::Vec& v) {
249+
for (PooledConnection::Vec::iterator it = v.begin(), end = v.end();
250+
it != end; ++it) {
251+
(*it)->close();
252+
}
253+
});
230254

231255
DelayedConnector::Vec pending_connections(pending_connections_);
232256
for (DelayedConnector::Vec::iterator it = pending_connections.begin(),
@@ -243,8 +267,7 @@ void ConnectionPool::internal_close() {
243267
void ConnectionPool::maybe_closed() {
244268
// Remove the pool once all current connections and pending connections
245269
// are terminated.
246-
if (close_state_ == CLOSE_STATE_WAITING_FOR_CONNECTIONS && connections_.empty() &&
247-
pending_connections_.empty()) {
270+
if (close_state_ == CLOSE_STATE_WAITING_FOR_CONNECTIONS && !has_connections() && pending_connections_.empty()) {
248271
close_state_ = CLOSE_STATE_CLOSED;
249272
// Only mark DOWN if it's UP otherwise we might get multiple DOWN events
250273
// when connecting the pool.
@@ -274,9 +297,17 @@ void ConnectionPool::on_reconnect(DelayedConnector* connector) {
274297
}
275298

276299
if (connector->is_ok()) {
277-
add_connection(
278-
PooledConnection::Ptr(new PooledConnection(this, connector->release_connection())));
279-
notify_up_or_down();
300+
PooledConnection::Ptr pooled_conn {new PooledConnection(this, connector->release_connection())};
301+
const size_t new_connections_shard = pooled_conn->connection()->shard_id().value_or(0u);
302+
if (connections_by_shard_.size() > new_connections_shard
303+
&& connections_by_shard_[new_connections_shard].size() < num_connections_per_shard_) {
304+
add_connection(pooled_conn);
305+
notify_up_or_down();
306+
} else {
307+
LOG_INFO("Reconnection to host %s connected us to shard %ld, reconnecting again",
308+
address().to_string().c_str(), new_connections_shard);
309+
schedule_reconnect(schedule.release());
310+
}
280311
} else if (!connector->is_canceled()) {
281312
if (connector->is_critical_error()) {
282313
LOG_ERROR("Closing established connection pool to host %s because of the following error: %s",

src/connection_pool.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,8 @@ class ConnectionPool : public RefCounted<ConnectionPool> {
232232

233233
CloseState close_state_;
234234
NotifyState notify_state_;
235-
PooledConnection::Vec connections_;
235+
std::vector<PooledConnection::Vec> connections_by_shard_; /// Index is the shard ID
236+
size_t num_connections_per_shard_;
236237
DelayedConnector::Vec pending_connections_;
237238
DenseHashSet<PooledConnection*> to_flush_;
238239
};

0 commit comments

Comments
 (0)