2222#include " utils.hpp"
2323
2424#include < algorithm>
25+ #include < numeric>
2526
2627using namespace datastax ;
2728using namespace datastax ::internal::core;
@@ -77,34 +78,70 @@ ConnectionPool::ConnectionPool(const Connection::Vec& connections, ConnectionPoo
7778 set_pointer_keys (reconnection_schedules_);
7879 set_pointer_keys (to_flush_);
7980
81+ if (host->sharding_info ()) {
82+ const auto hosts_shard_cnt = host->sharding_info ()->get_shards_count ();
83+ connections_by_shard_.resize (hosts_shard_cnt);
84+ num_connections_per_shard_ = settings_.num_connections_per_host / hosts_shard_cnt
85+ + (settings_.num_connections_per_host % hosts_shard_cnt ? 1u : 0u );
86+ } else {
87+ connections_by_shard_.resize (1 );
88+ num_connections_per_shard_ = settings_.num_connections_per_host ;
89+ }
90+
8091 for (Connection::Vec::const_iterator it = connections.begin (), end = connections.end (); it != end;
8192 ++it) {
8293 const Connection::Ptr& connection (*it);
8394 if (!connection->is_closing ()) {
84- add_connection (PooledConnection::Ptr (new PooledConnection (this , connection)));
95+ if (connections_by_shard_[connection->shard_id ()].size () < num_connections_per_shard_) {
96+ add_connection (PooledConnection::Ptr (new PooledConnection (this , connection)));
97+ } else {
98+ connection->close ();
99+ }
85100 }
86101 }
87102
88103 notify_up_or_down ();
89104
90105 // We had non-critical errors or some connections closed
91- assert (connections.size () <= settings_.num_connections_per_host );
92- size_t needed = settings_.num_connections_per_host - connections_.size ();
106+ size_t needed = num_connections_per_shard_ * connections_by_shard_.size ()
107+ - std::accumulate (connections_by_shard_.begin (), connections_by_shard_.end (), 0u ,
108+ [] (size_t acc, const PooledConnection::Vec& v) {
109+ return acc + v.size ();
110+ });
93111 for (size_t i = 0 ; i < needed; ++i) {
94112 schedule_reconnect ();
95113 }
96114}
97115
98- PooledConnection::Ptr ConnectionPool::find_least_busy () const {
116+ PooledConnection::Ptr ConnectionPool::find_least_busy (int64_t token) const {
117+ if (token == CASS_INT64_MIN || !host_->sharding_info ()) {
118+ // We got a placeholder token, or a sensible token that is useless without the sharding info.
119+ // In both cases we return the least busy connection of the *entire pool* (or NULL).
120+ PooledConnection::Ptr least_busy; // NULL by default
121+ for (const auto & shard_pool : connections_by_shard_) {
122+ for (const auto & conn : shard_pool) {
123+ if (!conn->is_closing ()) {
124+ least_busy = least_busy ? std::min (least_busy, conn, least_busy_comp) : conn;
125+ }
126+ }
127+ }
128+ return least_busy;
129+ }
130+
131+ // Otherwise, find the least busy connection pointing to the right shard (if possible)
132+ const auto & shard_pool = connections_by_shard_[host_->sharding_info ()->shard_id (token)];
99133 PooledConnection::Vec::const_iterator it =
100- std::min_element (connections_ .begin (), connections_ .end (), least_busy_comp);
101- if (it == connections_ .end () || (*it)->is_closing ()) {
102- return PooledConnection::Ptr ( );
134+ std::min_element (shard_pool .begin (), shard_pool .end (), least_busy_comp);
135+ if (it == shard_pool .end () || (*it)->is_closing ()) {
136+ return find_least_busy (CASS_INT64_MIN );
103137 }
104138 return *it;
105139}
106140
107- bool ConnectionPool::has_connections () const { return !connections_.empty (); }
141+ bool ConnectionPool::has_connections () const {
142+ return std::any_of (connections_by_shard_.begin (), connections_by_shard_.end (),
143+ [] (const PooledConnection::Vec& v) { return !v.empty (); });
144+ }
108145
109146void ConnectionPool::flush () {
110147 for (DenseHashSet<PooledConnection*>::const_iterator it = to_flush_.begin (),
@@ -142,8 +179,9 @@ void ConnectionPool::close_connection(PooledConnection* connection, Protected) {
142179 if (metrics_) {
143180 metrics_->total_connections .dec ();
144181 }
145- connections_.erase (std::remove (connections_.begin (), connections_.end (), connection),
146- connections_.end ());
182+ auto & shard_pool = connections_by_shard_[connection->shard_id ()];
183+ shard_pool.erase (std::remove (shard_pool.begin (), shard_pool.end (), connection),
184+ shard_pool.end ());
147185 to_flush_.erase (connection);
148186
149187 if (close_state_ != CLOSE_STATE_OPEN) {
@@ -161,16 +199,16 @@ void ConnectionPool::add_connection(const PooledConnection::Ptr& connection) {
161199 if (metrics_) {
162200 metrics_->total_connections .inc ();
163201 }
164- connections_.push_back (connection);
202+ const size_t new_connections_shard = connection->shard_id ();
203+ LOG_INFO (" add_connection: to host %s to shard %ld" , host_->address_string ().c_str (), new_connections_shard);
204+ connections_by_shard_[new_connections_shard].push_back (connection);
165205}
166206
167207void ConnectionPool::notify_up_or_down () {
168- if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_UP) &&
169- connections_.empty ()) {
208+ if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_UP) && !has_connections ()) {
170209 notify_state_ = NOTIFY_STATE_DOWN;
171210 listener_->on_pool_down (host_->address ());
172- } else if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_DOWN) &&
173- !connections_.empty ()) {
211+ } else if ((notify_state_ == NOTIFY_STATE_NEW || notify_state_ == NOTIFY_STATE_DOWN) && has_connections ()) {
174212 notify_state_ = NOTIFY_STATE_UP;
175213 listener_->on_pool_up (host_->address ());
176214 }
@@ -211,11 +249,12 @@ void ConnectionPool::internal_close() {
211249 // Make copies of connection/connector data structures to prevent iterator
212250 // invalidation.
213251
214- PooledConnection::Vec connections (connections_);
215- for (PooledConnection::Vec::iterator it = connections.begin (), end = connections.end ();
216- it != end; ++it) {
217- (*it)->close ();
218- }
252+ auto connections_per_shards = connections_by_shard_;
253+ std::for_each (connections_per_shards.begin (), connections_per_shards.end (), [] (PooledConnection::Vec& v) {
254+ for (auto & c : v) {
255+ c->close ();
256+ }
257+ });
219258
220259 DelayedConnector::Vec pending_connections (pending_connections_);
221260 for (DelayedConnector::Vec::iterator it = pending_connections.begin (),
@@ -232,8 +271,7 @@ void ConnectionPool::internal_close() {
232271void ConnectionPool::maybe_closed () {
233272 // Remove the pool once all current connections and pending connections
234273 // are terminated.
235- if (close_state_ == CLOSE_STATE_WAITING_FOR_CONNECTIONS && connections_.empty () &&
236- pending_connections_.empty ()) {
274+ if (close_state_ == CLOSE_STATE_WAITING_FOR_CONNECTIONS && !has_connections () && pending_connections_.empty ()) {
237275 close_state_ = CLOSE_STATE_CLOSED;
238276 // Only mark DOWN if it's UP otherwise we might get multiple DOWN events
239277 // when connecting the pool.
@@ -263,9 +301,18 @@ void ConnectionPool::on_reconnect(DelayedConnector* connector) {
263301 }
264302
265303 if (connector->is_ok ()) {
266- add_connection (
267- PooledConnection::Ptr (new PooledConnection (this , connector->release_connection ())));
268- notify_up_or_down ();
304+ PooledConnection::Ptr pooled_conn {new PooledConnection (this , connector->release_connection ())};
305+ const size_t new_connections_shard = pooled_conn->shard_id ();
306+ if (connections_by_shard_.size () > new_connections_shard
307+ && connections_by_shard_[new_connections_shard].size () < num_connections_per_shard_) {
308+ add_connection (pooled_conn);
309+ notify_up_or_down ();
310+ } else {
311+ LOG_INFO (" Reconnection to host %s connected us to shard %ld, reconnecting again" ,
312+ address ().to_string ().c_str (), new_connections_shard);
313+ pooled_conn->close ();
314+ schedule_reconnect (schedule.release ());
315+ }
269316 } else if (!connector->is_canceled ()) {
270317 if (connector->is_critical_error ()) {
271318 LOG_ERROR (" Closing established connection pool to host %s because of the following error: %s" ,
0 commit comments