3434
3535#include < algorithm>
3636#include < assert.h>
37+ #include < iomanip>
38+ #include < ios>
3739#include < uv.h>
3840
3941#define CASS_NETWORK_TOPOLOGY_STRATEGY " NetworkTopologyStrategy"
@@ -143,12 +145,32 @@ class ByteOrderedPartitioner {
143145 static StringRef name () { return " ByteOrderedPartitioner" ; }
144146};
145147
148+ inline std::ostream& operator <<(std::ostream& os, const RandomPartitioner::Token& token) {
149+ os << std::setfill (' 0' ) << std::setw (16 ) << std::hex << token.hi << std::setfill (' 0' )
150+ << std::setw (16 ) << std::hex << token.lo ;
151+ return os;
152+ }
153+
154+ inline std::ostream& operator <<(std::ostream& os, const ByteOrderedPartitioner::Token& token) {
155+ for (ByteOrderedPartitioner::Token::const_iterator it = token.begin (), end = token.end ();
156+ it != end; ++it) {
157+ os << std::hex << *it;
158+ }
159+ return os;
160+ }
161+
146162class HostSet : public DenseHashSet <Host::Ptr> {
147163public:
148164 HostSet () {
149165 set_empty_key (Host::Ptr (new Host (Address::EMPTY_KEY)));
150166 set_deleted_key (Host::Ptr (new Host (Address::DELETED_KEY)));
151167 }
168+
169+ template <class InputIterator >
170+ HostSet (InputIterator first, InputIterator last)
171+ : DenseHashSet<Host::Ptr>(first, last, Host::Ptr(new Host(Address::EMPTY_KEY))) {
172+ set_deleted_key (Host::Ptr (new Host (Address::DELETED_KEY)));
173+ }
152174};
153175
154176class RackSet : public DenseHashSet <uint32_t > {
@@ -355,6 +377,17 @@ void ReplicationStrategy<Partitioner>::build_replicas(const TokenHostVec& tokens
355377 }
356378}
357379
380+ // Adds unique replica. It returns true if the replica was added.
381+ inline bool add_replica (CopyOnWriteHostVec& hosts, const Host::Ptr& host) {
382+ for (HostVec::const_reverse_iterator it = hosts->rbegin (); it != hosts->rend (); ++it) {
383+ if ((*it)->address () == host->address ()) {
384+ return false ; // Already in the replica set
385+ }
386+ }
387+ hosts->push_back (host);
388+ return true ;
389+ }
390+
358391template <class Partitioner >
359392void ReplicationStrategy<Partitioner>::build_replicas_network_topology(
360393 const TokenHostVec& tokens, const DatacenterMap& datacenters, TokenReplicasVec& result) const {
@@ -443,24 +476,27 @@ void ReplicationStrategy<Partitioner>::build_replicas_network_topology(
443476 // datacenter only then consider hosts in the same rack
444477
445478 if (rack == 0 || racks_observed_this_dc.size () == rack_count_this_dc) {
446- ++replica_count_this_dc;
447- replicas->push_back (Host::Ptr (host));
479+ if (add_replica (replicas, Host::Ptr (host))) {
480+ ++replica_count_this_dc;
481+ }
448482 } else {
449483 TokenHostQueue& skipped_endpoints_this_dc = dc_rack_info.skipped_endpoints ;
450484 if (racks_observed_this_dc.count (rack) > 0 ) {
451485 skipped_endpoints_this_dc.push_back (curr_token_it);
452486 } else {
453- ++replica_count_this_dc;
454- replicas->push_back (Host::Ptr (host));
455- racks_observed_this_dc.insert (rack);
487+ if (add_replica (replicas, Host::Ptr (host))) {
488+ ++replica_count_this_dc;
489+ racks_observed_this_dc.insert (rack);
490+ }
456491
457492 // Once we visited every rack in the current datacenter then starting considering
458493 // hosts we've already skipped.
459494 if (racks_observed_this_dc.size () == rack_count_this_dc) {
460495 while (!skipped_endpoints_this_dc.empty () &&
461496 replica_count_this_dc < replication_factor) {
462- ++replica_count_this_dc;
463- replicas->push_back (Host::Ptr (skipped_endpoints_this_dc.front ()->second ));
497+ if (add_replica (replicas, Host::Ptr (skipped_endpoints_this_dc.front ()->second ))) {
498+ ++replica_count_this_dc;
499+ }
464500 skipped_endpoints_this_dc.pop_front ();
465501 }
466502 }
@@ -484,9 +520,10 @@ void ReplicationStrategy<Partitioner>::build_replicas_simple(const TokenHostVec&
484520 for (typename TokenHostVec::const_iterator i = tokens.begin (), end = tokens.end (); i != end;
485521 ++i) {
486522 CopyOnWriteHostVec replicas (new HostVec ());
523+ replicas->reserve (num_replicas);
487524 typename TokenHostVec::const_iterator token_it = i;
488525 do {
489- replicas-> push_back (Host::Ptr (token_it->second ));
526+ add_replica ( replicas, Host::Ptr (Host::Ptr (token_it->second ) ));
490527 ++token_it;
491528 if (token_it == tokens.end ()) {
492529 token_it = tokens.begin ();
@@ -578,7 +615,11 @@ class TokenMapImpl : public TokenMap {
578615 virtual const CopyOnWriteHostVec& get_replicas (const String& keyspace_name,
579616 const String& routing_key) const ;
580617
581- // Test only
618+ virtual String dump (const String& keyspace_name) const ;
619+
620+ public:
621+ // Testing only
622+
582623 bool contains (const Token& token) const {
583624 for (typename TokenHostVec::const_iterator i = tokens_.begin (), end = tokens_.end (); i != end;
584625 ++i) {
@@ -587,6 +628,8 @@ class TokenMapImpl : public TokenMap {
587628 return false ;
588629 }
589630
631+ const TokenReplicasVec& token_replicas (const String& keyspace_name) const ;
632+
590633private:
591634 void update_keyspace (const VersionNumber& cassandra_version, const ResultResponse* result,
592635 bool should_build_replicas);
@@ -713,6 +756,35 @@ const CopyOnWriteHostVec& TokenMapImpl<Partitioner>::get_replicas(const String&
713756 return no_replicas_dummy_;
714757}
715758
759+ template <class Partitioner >
760+ String TokenMapImpl<Partitioner>::dump(const String& keyspace_name) const {
761+ String result;
762+ typename KeyspaceReplicaMap::const_iterator ks_it = replicas_.find (keyspace_name);
763+ const TokenReplicasVec& replicas = ks_it->second ;
764+
765+ for (typename TokenReplicasVec::const_iterator it = replicas.begin (), end = replicas.end ();
766+ it != end; ++it) {
767+ OStringStream ss;
768+ ss << std::setw (20 ) << it->first << " [ " ;
769+ const CopyOnWriteHostVec& hosts = it->second ;
770+ for (HostVec::const_iterator host_it = hosts->begin (), end = hosts->end (); host_it != end;
771+ ++host_it) {
772+ ss << (*host_it)->address_string () << " " ;
773+ }
774+ ss << " ]\n " ;
775+ result.append (ss.str ());
776+ }
777+ return result;
778+ }
779+
780+ template <class Partitioner >
781+ const typename TokenMapImpl<Partitioner>::TokenReplicasVec&
782+ TokenMapImpl<Partitioner>::token_replicas(const String& keyspace_name) const {
783+ typename KeyspaceReplicaMap::const_iterator ks_it = replicas_.find (keyspace_name);
784+ static TokenReplicasVec not_found;
785+ return ks_it != replicas_.end () ? ks_it->second : not_found;
786+ }
787+
716788template <class Partitioner >
717789void TokenMapImpl<Partitioner>::update_keyspace(const VersionNumber& cassandra_version,
718790 const ResultResponse* result,
@@ -773,6 +845,8 @@ void TokenMapImpl<Partitioner>::build_replicas() {
773845 const String& keyspace_name = i->first ;
774846 const ReplicationStrategy<Partitioner>& strategy = i->second ;
775847 strategy.build_replicas (tokens_, datacenters_, replicas_[keyspace_name]);
848+ LOG_TRACE (" Replicas for keyspace '%s':\n %s" , keyspace_name.c_str (),
849+ dump (keyspace_name).c_str ());
776850 }
777851}
778852
0 commit comments