Skip to content

Commit 0be4602

Browse files
committed
Added routing to shards on top of TokenAware policy
At this point metrics indicate that cross-shard ops are reduced, but the implementation is still raw. The reasons: When connection-to-the-right-shard is being searched among per-host connections, it is done by linear search. Connecting logic does not attempt to reconnect until all shards are hit. Topology change events are not accounted for.
1 parent a014af8 commit 0be4602

File tree

7 files changed

+35
-13
lines changed

7 files changed

+35
-13
lines changed

src/connection_pool.cpp

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -95,13 +95,24 @@ ConnectionPool::ConnectionPool(const Connection::Vec& connections, ConnectionPoo
9595
}
9696
}
9797

98-
PooledConnection::Ptr ConnectionPool::find_least_busy() const {
99-
PooledConnection::Vec::const_iterator it =
100-
std::min_element(connections_.begin(), connections_.end(), least_busy_comp);
101-
if (it == connections_.end() || (*it)->is_closing()) {
102-
return PooledConnection::Ptr();
98+
PooledConnection::Ptr ConnectionPool::find_least_busy(int64_t token) const {
99+
if (token == CASS_INT64_MIN) {
100+
PooledConnection::Vec::const_iterator it =
101+
std::min_element(connections_.begin(), connections_.end(), least_busy_comp);
102+
if (it == connections_.end() || (*it)->is_closing()) {
103+
return PooledConnection::Ptr();
104+
}
105+
return *it;
103106
}
104-
return *it;
107+
108+
const auto desired_shard_num = host_->sharding_info()->shard_id(token);
109+
for (const auto& conn : connections_) {
110+
if (conn->connection()->shard_id() == desired_shard_num) {
111+
return conn;
112+
}
113+
}
114+
115+
return find_least_busy(CASS_INT64_MIN);
105116
}
106117

107118
bool ConnectionPool::has_connections() const { return !connections_.empty(); }

src/connection_pool.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ class ConnectionPool : public RefCounted<ConnectionPool> {
135135
*
136136
* @return The least busy connection or null if no connection is available.
137137
*/
138-
PooledConnection::Ptr find_least_busy() const;
138+
PooledConnection::Ptr find_least_busy(int64_t token) const;
139139

140140
/**
141141
* Determine if the pool has any valid connections.

src/connection_pool_manager.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,12 @@ ConnectionPoolManager::ConnectionPoolManager(const ConnectionPool::Map& pools, u
6262
}
6363
}
6464

65-
PooledConnection::Ptr ConnectionPoolManager::find_least_busy(const Address& address) const {
65+
PooledConnection::Ptr ConnectionPoolManager::find_least_busy(const Address& address, int64_t token) const {
6666
ConnectionPool::Map::const_iterator it = pools_.find(address);
6767
if (it == pools_.end()) {
6868
return PooledConnection::Ptr();
6969
}
70-
return it->second->find_least_busy();
70+
return it->second->find_least_busy(token);
7171
}
7272

7373
bool ConnectionPoolManager::has_connections(const Address& address) const {

src/connection_pool_manager.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ class ConnectionPoolManager
8484
* @return The least busy connection for a host or null if no connections are
8585
* available.
8686
*/
87-
PooledConnection::Ptr find_least_busy(const Address& address) const;
87+
PooledConnection::Ptr find_least_busy(const Address& address, int64_t token) const;
8888

8989
/**
9090
* Determine if a pool has any valid connections.

src/pooled_connection.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ class PooledConnection
7979
*/
8080
bool is_closing() const;
8181

82+
const Connection::Ptr connection() const { return connection_; }
83+
8284
public:
8385
const String& keyspace() const { return connection_->keyspace(); } // Test only
8486

src/request_handler.cpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "result_response.hpp"
3030
#include "row.hpp"
3131
#include "session.hpp"
32+
#include "token_map_impl.hpp"
3233

3334
#include <uv.h>
3435

@@ -357,8 +358,16 @@ void RequestHandler::internal_retry(RequestExecution* request_execution) {
357358

358359
bool is_done = false;
359360
while (!is_done && request_execution->current_host()) {
361+
int64_t token = CASS_INT64_MIN;
362+
const RoutableRequest* routable_req = dynamic_cast<const RoutableRequest*>(request());
363+
if (routable_req) {
364+
String routing_key;
365+
routable_req->get_routing_key(&routing_key);
366+
token = Murmur3Partitioner::hash(routing_key);
367+
}
368+
360369
PooledConnection::Ptr connection =
361-
manager_->find_least_busy(request_execution->current_host()->address());
370+
manager_->find_least_busy(request_execution->current_host()->address(), token);
362371
if (connection) {
363372
int32_t result = connection->write(request_execution);
364373

src/request_processor.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ bool RequestProcessor::on_prepare_all(const RequestHandler::Ptr& request_handler
378378
PrepareAllCallback::Ptr prepare_all_callback(
379379
new PrepareAllCallback(address, prepare_all_handler));
380380

381-
PooledConnection::Ptr connection(connection_pool_manager_->find_least_busy(address));
381+
PooledConnection::Ptr connection(connection_pool_manager_->find_least_busy(address, CASS_INT64_MIN));
382382
if (connection) {
383383
connection->write(prepare_all_callback.get());
384384
}
@@ -580,7 +580,7 @@ bool RequestProcessor::write_wait_callback(const RequestHandler::Ptr& request_ha
580580
const Host::Ptr& current_host,
581581
const RequestCallback::Ptr& callback) {
582582
PooledConnection::Ptr connection(
583-
connection_pool_manager_->find_least_busy(current_host->address()));
583+
connection_pool_manager_->find_least_busy(current_host->address(), CASS_INT64_MIN));
584584
if (connection && connection->write(callback.get()) > 0) {
585585
// Stop the original request timer now that we have a response and
586586
// are waiting for the maximum wait time of the handler.

0 commit comments

Comments
 (0)