2222#include " utils.hpp"
2323
2424#include < algorithm>
25+ #include < numeric>
2526
2627using namespace datastax ;
2728using namespace datastax ::internal::core;
@@ -77,45 +78,65 @@ ConnectionPool::ConnectionPool(const Connection::Vec& connections, ConnectionPoo
7778 set_pointer_keys (reconnection_schedules_);
7879 set_pointer_keys (to_flush_);
7980
81+ if (host->sharding_info ()) {
82+ const auto hosts_shard_cnt = host->sharding_info ()->get_shards_count ();
83+ connections_by_shard_.resize (hosts_shard_cnt);
84+ num_connections_per_shard_ = settings_.num_connections_per_host / hosts_shard_cnt
85+ + (settings_.num_connections_per_host % hosts_shard_cnt ? 1u : 0u );
86+ } else {
87+ connections_by_shard_.resize (1 );
88+ num_connections_per_shard_ = settings_.num_connections_per_host ;
89+ }
90+
8091 for (Connection::Vec::const_iterator it = connections.begin (), end = connections.end (); it != end;
8192 ++it) {
8293 const Connection::Ptr& connection (*it);
83- if (!connection->is_closing ()) {
94+ if (!connection->is_closing ()
95+ && connections_by_shard_[connection->shard_id ().value_or (0 )].size () < num_connections_per_shard_) {
8496 add_connection (PooledConnection::Ptr (new PooledConnection (this , connection)));
8597 }
8698 }
8799
88100 notify_up_or_down ();
89101
90102 // We had non-critical errors or some connections closed
91- assert (connections.size () <= settings_.num_connections_per_host );
92- size_t needed = settings_.num_connections_per_host - connections_.size ();
103+ size_t needed = num_connections_per_shard_ * connections_by_shard_.size ()
104+ - std::accumulate (connections_by_shard_.begin (), connections_by_shard_.end (), 0u ,
105+ [] (size_t acc, const PooledConnection::Vec& v) {
106+ return acc + v.size ();
107+ });
93108 for (size_t i = 0 ; i < needed; ++i) {
94109 schedule_reconnect ();
95110 }
96111}
97112
98113PooledConnection::Ptr ConnectionPool::find_least_busy (int64_t token) const {
99- if (token == CASS_INT64_MIN) {
100- PooledConnection::Vec::const_iterator it =
101- std::min_element (connections_.begin (), connections_.end (), least_busy_comp);
102- if (it == connections_.end () || (*it)->is_closing ()) {
103- return PooledConnection::Ptr ();
114+ if (token == CASS_INT64_MIN || !host_->sharding_info ()) {
115+ // We got a placeholder token, or a sensible token that is useless without the sharding info.
116+ // In both cases we return the least busy connection of the *entire pool* (or NULL).
117+ PooledConnection::Ptr least_busy; // NULL by default
118+ for (const auto & shard_pool : connections_by_shard_) {
119+ for (const auto & conn : shard_pool) {
120+ least_busy = least_busy ? std::min (least_busy, conn, least_busy_comp) : conn;
121+ }
104122 }
105- return *it ;
123+ return least_busy ;
106124 }
107125
108- const auto desired_shard_num = host_->sharding_info ()->shard_id (token);
109- for (const auto & conn : connections_) {
110- if (conn->connection ()->shard_id () == desired_shard_num) {
111- return conn;
112- }
126+ // Otherwise, find the least busy connection pointing to the right shard
127+ const auto & shard_pool = connections_by_shard_[host_->sharding_info ()->shard_id (token)];
128+ PooledConnection::Vec::const_iterator it =
129+ std::min_element (shard_pool.begin (), shard_pool.end (), least_busy_comp);
130+ if (it == shard_pool.end () || (*it)->is_closing ()) {
131+ return PooledConnection::Ptr ();
113132 }
114-
115- return find_least_busy (CASS_INT64_MIN);
133+ return *it;
116134}
117135
118- bool ConnectionPool::has_connections () const { return !connections_.empty (); }
136+ bool ConnectionPool::has_connections () const {
137+ return !std::all_of (connections_by_shard_.begin (), connections_by_shard_.end (),
138+ [] (const PooledConnection::Vec& v) { return v.empty (); });
139+ }
119140
120141void ConnectionPool::flush () {
121142 for (DenseHashSet<PooledConnection*>::const_iterator it = to_flush_.begin (),
@@ -153,8 +174,9 @@ void ConnectionPool::close_connection(PooledConnection* connection, Protected) {
153174 if (metrics_) {
154175 metrics_->total_connections .dec ();
155176 }
156- connections_.erase (std::remove (connections_.begin (), connections_.end (), connection),
157- connections_.end ());
177+ auto & shard_pool = connections_by_shard_[connection->connection ()->shard_id ().value_or (0 )];
178+ shard_pool.erase (std::remove (shard_pool.begin (), shard_pool.end (), connection),
179+ shard_pool.end ());
158180 to_flush_.erase (connection);
159181
160182 if (close_state_ != CLOSE_STATE_OPEN) {
@@ -172,16 +194,16 @@ void ConnectionPool::add_connection(const PooledConnection::Ptr& connection) {
172194 if (metrics_) {
173195 metrics_->total_connections .inc ();
174196 }
175- connections_.push_back (connection);
197+ const size_t new_connections_shard = connection->connection ()->shard_id ().value_or (0u );
198+ LOG_INFO (" add_connection: to host %s to shard %ld" , host_->address_string ().c_str (), new_connections_shard);
199+ connections_by_shard_[new_connections_shard].push_back (connection);
176200}
177201
178202void ConnectionPool::notify_up_or_down () {
179- if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_UP) &&
180- connections_.empty ()) {
203+ if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_UP) && !has_connections ()) {
181204 notify_state_ = NOTIFY_STATE_DOWN;
182205 listener_->on_pool_down (host_->address ());
183- } else if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_DOWN) &&
184- !connections_.empty ()) {
206+ } else if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_DOWN) && has_connections ()) {
185207 notify_state_ = NOTIFY_STATE_UP;
186208 listener_->on_pool_up (host_->address ());
187209 }
@@ -222,11 +244,13 @@ void ConnectionPool::internal_close() {
222244 // Make copies of connection/connector data structures to prevent iterator
223245 // invalidation.
224246
225- PooledConnection::Vec connections (connections_);
226- for (PooledConnection::Vec::iterator it = connections.begin (), end = connections.end ();
227- it != end; ++it) {
228- (*it)->close ();
229- }
247+ auto connections_per_shards = connections_by_shard_;
248+ std::for_each (connections_per_shards.begin (), connections_per_shards.end (), [] (PooledConnection::Vec& v) {
249+ for (PooledConnection::Vec::iterator it = v.begin (), end = v.end ();
250+ it != end; ++it) {
251+ (*it)->close ();
252+ }
253+ });
230254
231255 DelayedConnector::Vec pending_connections (pending_connections_);
232256 for (DelayedConnector::Vec::iterator it = pending_connections.begin (),
@@ -243,8 +267,7 @@ void ConnectionPool::internal_close() {
243267void ConnectionPool::maybe_closed () {
244268 // Remove the pool once all current connections and pending connections
245269 // are terminated.
246- if (close_state_ == CLOSE_STATE_WAITING_FOR_CONNECTIONS && connections_.empty () &&
247- pending_connections_.empty ()) {
270+ if (close_state_ == CLOSE_STATE_WAITING_FOR_CONNECTIONS && !has_connections () && pending_connections_.empty ()) {
248271 close_state_ = CLOSE_STATE_CLOSED;
249272 // Only mark DOWN if it's UP otherwise we might get multiple DOWN events
250273 // when connecting the pool.
@@ -274,9 +297,17 @@ void ConnectionPool::on_reconnect(DelayedConnector* connector) {
274297 }
275298
276299 if (connector->is_ok ()) {
277- add_connection (
278- PooledConnection::Ptr (new PooledConnection (this , connector->release_connection ())));
279- notify_up_or_down ();
300+ PooledConnection::Ptr pooled_conn {new PooledConnection (this , connector->release_connection ())};
301+ const size_t new_connections_shard = pooled_conn->connection ()->shard_id ().value_or (0u );
302+ if (connections_by_shard_.size () > new_connections_shard
303+ && connections_by_shard_[new_connections_shard].size () < num_connections_per_shard_) {
304+ add_connection (pooled_conn);
305+ notify_up_or_down ();
306+ } else {
307+ LOG_INFO (" Reconnection to host %s connected us to shard %ld, reconnecting again" ,
308+ address ().to_string ().c_str (), new_connections_shard);
309+ schedule_reconnect (schedule.release ());
310+ }
280311 } else if (!connector->is_canceled ()) {
281312 if (connector->is_critical_error ()) {
282313 LOG_ERROR (" Closing established connection pool to host %s because of the following error: %s" ,
0 commit comments