Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GODRIVER-2689 (POC) Simplify the "readpref" API #1733

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
GODRIVER-2689 Simplify the readpref API
prestonvasquez committed Jul 19, 2024

Verified

This commit was signed with the committer’s verified signature.
prestonvasquez Preston Vasquez
commit 0ae490d045eff100a0c3d080c3f805fde53dec20
4 changes: 2 additions & 2 deletions event/description.go
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@ import (

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/address"
"go.mongodb.org/mongo-driver/tag"
"go.mongodb.org/mongo-driver/mongo/readpref"
)

// ServerDescription contains information about a node in a cluster. This is
@@ -43,7 +43,7 @@ type ServerDescription struct {
SessionTimeoutMinutes *int64
SetName string
SetVersion uint32
Tags tag.Set
Tags readpref.TagSet
TopologyVersionProcessID bson.ObjectID
TopologyVersionCounter int64
}
4 changes: 2 additions & 2 deletions internal/driverutil/description.go
Original file line number Diff line number Diff line change
@@ -16,7 +16,7 @@ import (
"go.mongodb.org/mongo-driver/internal/handshake"
"go.mongodb.org/mongo-driver/internal/ptrutil"
"go.mongodb.org/mongo-driver/mongo/address"
"go.mongodb.org/mongo-driver/tag"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
)

@@ -421,7 +421,7 @@ func NewServerDescription(addr address.Address, response bson.Raw) description.S
desc.LastError = err
return desc
}
desc.Tags = tag.NewTagSetFromMap(m)
desc.Tags = readpref.NewTagSetFromMap(m)
case "topologyVersion":
doc, ok := element.Value().DocumentOK()
if !ok {
20 changes: 11 additions & 9 deletions internal/integration/unified/common_options.go
Original file line number Diff line number Diff line change
@@ -14,7 +14,6 @@ import (
"go.mongodb.org/mongo-driver/mongo/readconcern"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
"go.mongodb.org/mongo-driver/tag"
)

// This file defines helper types to convert BSON documents to ReadConcern, WriteConcern, and ReadPref objects.
@@ -70,32 +69,35 @@ func (rp *ReadPreference) ToReadPrefOption() (*readpref.ReadPref, error) {
return nil, fmt.Errorf("invalid read preference mode %q", rp.Mode)
}

var rpOptions []readpref.Option
newReadPref := &readpref.ReadPref{Mode: mode}

if rp.TagSets != nil {
// Each item in the TagSets slice is a document that represents one set.
sets := make([]tag.Set, 0, len(rp.TagSets))
sets := make([]readpref.TagSet, 0, len(rp.TagSets))
for _, rawSet := range rp.TagSets {
parsed := make(tag.Set, 0, len(rawSet))
parsed := make(readpref.TagSet, 0, len(rawSet))
for k, v := range rawSet {
parsed = append(parsed, tag.Tag{Name: k, Value: v})
parsed = append(parsed, readpref.Tag{Name: k, Value: v})
}
sets = append(sets, parsed)
}

rpOptions = append(rpOptions, readpref.WithTagSets(sets...))
newReadPref.TagSets = sets
}
if rp.MaxStalenessSeconds != nil {
maxStaleness := time.Duration(*rp.MaxStalenessSeconds) * time.Second
rpOptions = append(rpOptions, readpref.WithMaxStaleness(maxStaleness))
newReadPref.MaxStaleness = &maxStaleness

}
if rp.Hedge != nil {
if len(rp.Hedge) > 1 {
return nil, fmt.Errorf("invalid read preference hedge document: length cannot be greater than 1")
}
if enabled, ok := rp.Hedge["enabled"]; ok {
rpOptions = append(rpOptions, readpref.WithHedgeEnabled(enabled.(bool)))
hedgeEnabled := enabled.(bool)
newReadPref.HedgeEnabled = &hedgeEnabled
}
}

return readpref.New(mode, rpOptions...)
return newReadPref, nil
}
29 changes: 14 additions & 15 deletions internal/serverselector/server_selector.go
Original file line number Diff line number Diff line change
@@ -12,7 +12,6 @@ import (
"time"

"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/tag"
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
)

@@ -190,12 +189,12 @@ func (ssf Func) SelectServer(
}

func verifyMaxStaleness(rp *readpref.ReadPref, topo description.Topology) error {
maxStaleness, set := rp.MaxStaleness()
if !set {
maxStaleness := rp.MaxStaleness
if maxStaleness == nil {
return nil
}

if maxStaleness < 90*time.Second {
if *maxStaleness < 90*time.Second {
return fmt.Errorf("max staleness (%s) must be greater than or equal to 90s", maxStaleness)
}

@@ -208,7 +207,7 @@ func verifyMaxStaleness(rp *readpref.ReadPref, topo description.Topology) error
s := topo.Servers[0]
idleWritePeriod := 10 * time.Second

if maxStaleness < s.HeartbeatInterval+idleWritePeriod {
if *maxStaleness < s.HeartbeatInterval+idleWritePeriod {
return fmt.Errorf(
"max staleness (%s) must be greater than or equal to the heartbeat interval (%s) plus idle write period (%s)",
maxStaleness, s.HeartbeatInterval, idleWritePeriod,
@@ -242,7 +241,7 @@ func selectSecondaries(rp *readpref.ReadPref, candidates []description.Server) [
if len(secondaries) == 0 {
return secondaries
}
if maxStaleness, set := rp.MaxStaleness(); set {
if maxStaleness := rp.MaxStaleness; maxStaleness != nil {
primaries := selectByKind(candidates, description.ServerKindRSPrimary)
if len(primaries) == 0 {
baseTime := secondaries[0].LastWriteTime
@@ -255,7 +254,7 @@ func selectSecondaries(rp *readpref.ReadPref, candidates []description.Server) [
var selected []description.Server
for _, secondary := range secondaries {
estimatedStaleness := baseTime.Sub(secondary.LastWriteTime) + secondary.HeartbeatInterval
if estimatedStaleness <= maxStaleness {
if estimatedStaleness <= *maxStaleness {
selected = append(selected, secondary)
}
}
@@ -269,7 +268,7 @@ func selectSecondaries(rp *readpref.ReadPref, candidates []description.Server) [
for _, secondary := range secondaries {
estimatedStaleness := secondary.LastUpdateTime.Sub(secondary.LastWriteTime) -
primary.LastUpdateTime.Sub(primary.LastWriteTime) + secondary.HeartbeatInterval
if estimatedStaleness <= maxStaleness {
if estimatedStaleness <= *maxStaleness {
selected = append(selected, secondary)
}
}
@@ -279,7 +278,7 @@ func selectSecondaries(rp *readpref.ReadPref, candidates []description.Server) [
return secondaries
}

func selectByTagSet(candidates []description.Server, tagSets []tag.Set) []description.Server {
func selectByTagSet(candidates []description.Server, tagSets []readpref.TagSet) []description.Server {
if len(tagSets) == 0 {
return candidates
}
@@ -327,33 +326,33 @@ func selectForReplicaSet(
}
}

switch rp.Mode() {
switch rp.Mode {
case readpref.PrimaryMode:
return selectByKind(candidates, description.ServerKindRSPrimary), nil
case readpref.PrimaryPreferredMode:
selected := selectByKind(candidates, description.ServerKindRSPrimary)

if len(selected) == 0 {
selected = selectSecondaries(rp, candidates)
return selectByTagSet(selected, rp.TagSets()), nil
return selectByTagSet(selected, rp.TagSets), nil
}

return selected, nil
case readpref.SecondaryPreferredMode:
selected := selectSecondaries(rp, candidates)
selected = selectByTagSet(selected, rp.TagSets())
selected = selectByTagSet(selected, rp.TagSets)
if len(selected) > 0 {
return selected, nil
}
return selectByKind(candidates, description.ServerKindRSPrimary), nil
case readpref.SecondaryMode:
selected := selectSecondaries(rp, candidates)
return selectByTagSet(selected, rp.TagSets()), nil
return selectByTagSet(selected, rp.TagSets), nil
case readpref.NearestMode:
selected := selectByKind(candidates, description.ServerKindRSPrimary)
selected = append(selected, selectSecondaries(rp, candidates)...)
return selectByTagSet(selected, rp.TagSets()), nil
return selectByTagSet(selected, rp.TagSets), nil
}

return nil, fmt.Errorf("unsupported mode: %d", rp.Mode())
return nil, fmt.Errorf("unsupported mode: %d", rp.Mode)
}
185 changes: 107 additions & 78 deletions internal/serverselector/server_selector_test.go
Original file line number Diff line number Diff line change
@@ -21,7 +21,6 @@ import (
"go.mongodb.org/mongo-driver/internal/spectest"
"go.mongodb.org/mongo-driver/mongo/address"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/tag"
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
)

@@ -115,7 +114,7 @@ func topologyKindFromString(t *testing.T, s string) description.TopologyKind {
return description.Unknown
}

func anyTagsInSets(sets []tag.Set) bool {
func anyTagsInSets(sets []readpref.TagSet) bool {
for _, set := range sets {
if len(set) > 0 {
return true
@@ -219,7 +218,7 @@ func selectServers(t *testing.T, test *testCase) error {
}

if serverDescription.Tags != nil {
server.Tags = tag.NewTagSetFromMap(serverDescription.Tags)
server.Tags = readpref.NewTagSetFromMap(serverDescription.Tags)
}

if test.ReadPreference.MaxStaleness != nil && server.WireVersion == nil {
@@ -243,21 +242,16 @@ func selectServers(t *testing.T, test *testCase) error {
return err
}

options := make([]readpref.Option, 0, 1)
rp := &readpref.ReadPref{Mode: readprefMode}

tagSets := tag.NewTagSetsFromMaps(test.ReadPreference.TagSets)
tagSets := readpref.NewTagSetsFromMaps(test.ReadPreference.TagSets)
if anyTagsInSets(tagSets) {
options = append(options, readpref.WithTagSets(tagSets...))
rp.TagSets = tagSets
}

if test.ReadPreference.MaxStaleness != nil {
s := time.Duration(*test.ReadPreference.MaxStaleness) * time.Second
options = append(options, readpref.WithMaxStaleness(s))
}

rp, err := readpref.New(readprefMode, options...)
if err != nil {
return err
rp.MaxStaleness = &s
}

var selector description.ServerSelector
@@ -496,7 +490,7 @@ var readPrefTestPrimary = description.Server{
LastWriteTime: time.Date(2017, 2, 11, 14, 0, 0, 0, time.UTC),
LastUpdateTime: time.Date(2017, 2, 11, 14, 0, 2, 0, time.UTC),
Kind: description.ServerKindRSPrimary,
Tags: tag.Set{tag.Tag{Name: "a", Value: "1"}},
Tags: readpref.TagSet{readpref.Tag{Name: "a", Value: "1"}},
WireVersion: &description.VersionRange{Min: 6, Max: 21},
}
var readPrefTestSecondary1 = description.Server{
@@ -505,7 +499,7 @@ var readPrefTestSecondary1 = description.Server{
LastWriteTime: time.Date(2017, 2, 11, 13, 58, 0, 0, time.UTC),
LastUpdateTime: time.Date(2017, 2, 11, 14, 0, 2, 0, time.UTC),
Kind: description.ServerKindRSSecondary,
Tags: tag.Set{tag.Tag{Name: "a", Value: "1"}},
Tags: readpref.TagSet{readpref.Tag{Name: "a", Value: "1"}},
WireVersion: &description.VersionRange{Min: 6, Max: 21},
}
var readPrefTestSecondary2 = description.Server{
@@ -514,7 +508,7 @@ var readPrefTestSecondary2 = description.Server{
LastWriteTime: time.Date(2017, 2, 11, 14, 0, 0, 0, time.UTC),
LastUpdateTime: time.Date(2017, 2, 11, 14, 0, 2, 0, time.UTC),
Kind: description.ServerKindRSSecondary,
Tags: tag.Set{tag.Tag{Name: "a", Value: "2"}},
Tags: readpref.TagSet{readpref.Tag{Name: "a", Value: "2"}},
WireVersion: &description.VersionRange{Min: 6, Max: 21},
}
var readPrefTestTopology = description.Topology{
@@ -768,9 +762,12 @@ func TestSelector_PrimaryPreferred(t *testing.T) {
func TestSelector_PrimaryPreferred_ignores_tags(t *testing.T) {
t.Parallel()

subject := readpref.PrimaryPreferred(
readpref.WithTags("a", "2"),
)
subject := readpref.PrimaryPreferred()

tagSet, err := readpref.NewTagSet("a", "2")
assert.NoError(t, err)

subject.TagSets = []readpref.TagSet{tagSet}

result, err := (&ReadPref{ReadPref: subject}).
SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)
@@ -796,9 +793,12 @@ func TestSelector_PrimaryPreferred_with_no_primary(t *testing.T) {
func TestSelector_PrimaryPreferred_with_no_primary_and_tags(t *testing.T) {
t.Parallel()

subject := readpref.PrimaryPreferred(
readpref.WithTags("a", "2"),
)
subject := readpref.PrimaryPreferred()

tagSet, err := readpref.NewTagSet("a", "2")
assert.NoError(t, err)

subject.TagSets = []readpref.TagSet{tagSet}

result, err := (&ReadPref{ReadPref: subject}).
SelectServer(readPrefTestTopology, []description.Server{readPrefTestSecondary1, readPrefTestSecondary2})
@@ -811,9 +811,10 @@ func TestSelector_PrimaryPreferred_with_no_primary_and_tags(t *testing.T) {
func TestSelector_PrimaryPreferred_with_maxStaleness(t *testing.T) {
t.Parallel()

subject := readpref.PrimaryPreferred(
readpref.WithMaxStaleness(time.Duration(90) * time.Second),
)
subject := readpref.PrimaryPreferred()

maxStaleness := 90 * time.Second
subject.MaxStaleness = &maxStaleness

result, err := (&ReadPref{ReadPref: subject}).
SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)
@@ -826,9 +827,10 @@ func TestSelector_PrimaryPreferred_with_maxStaleness(t *testing.T) {
func TestSelector_PrimaryPreferred_with_maxStaleness_and_no_primary(t *testing.T) {
t.Parallel()

subject := readpref.PrimaryPreferred(
readpref.WithMaxStaleness(time.Duration(90) * time.Second),
)
subject := readpref.PrimaryPreferred()

maxStaleness := 90 * time.Second
subject.MaxStaleness = &maxStaleness

result, err := (&ReadPref{ReadPref: subject}).
SelectServer(readPrefTestTopology, []description.Server{readPrefTestSecondary1, readPrefTestSecondary2})
@@ -853,9 +855,12 @@ func TestSelector_SecondaryPreferred(t *testing.T) {
func TestSelector_SecondaryPreferred_with_tags(t *testing.T) {
t.Parallel()

subject := readpref.SecondaryPreferred(
readpref.WithTags("a", "2"),
)
subject := readpref.SecondaryPreferred()

tagSet, err := readpref.NewTagSet("a", "2")
assert.NoError(t, err)

subject.TagSets = []readpref.TagSet{tagSet}

result, err := (&ReadPref{ReadPref: subject}).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)

@@ -867,9 +872,12 @@ func TestSelector_SecondaryPreferred_with_tags(t *testing.T) {
func TestSelector_SecondaryPreferred_with_tags_that_do_not_match(t *testing.T) {
t.Parallel()

subject := readpref.SecondaryPreferred(
readpref.WithTags("a", "3"),
)
subject := readpref.SecondaryPreferred()

tagSet, err := readpref.NewTagSet("a", "3")
assert.NoError(t, err)

subject.TagSets = []readpref.TagSet{tagSet}

result, err := (&ReadPref{ReadPref: subject}).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)

@@ -881,9 +889,12 @@ func TestSelector_SecondaryPreferred_with_tags_that_do_not_match(t *testing.T) {
func TestSelector_SecondaryPreferred_with_tags_that_do_not_match_and_no_primary(t *testing.T) {
t.Parallel()

subject := readpref.SecondaryPreferred(
readpref.WithTags("a", "3"),
)
subject := readpref.SecondaryPreferred()

tagSet, err := readpref.NewTagSet("a", "3")
assert.NoError(t, err)

subject.TagSets = []readpref.TagSet{tagSet}

result, err := (&ReadPref{ReadPref: subject}).SelectServer(readPrefTestTopology, []description.Server{readPrefTestSecondary1, readPrefTestSecondary2})

@@ -917,9 +928,10 @@ func TestSelector_SecondaryPreferred_with_no_secondaries_or_primary(t *testing.T
func TestSelector_SecondaryPreferred_with_maxStaleness(t *testing.T) {
t.Parallel()

subject := readpref.SecondaryPreferred(
readpref.WithMaxStaleness(time.Duration(90) * time.Second),
)
subject := readpref.SecondaryPreferred()

maxStaleness := 90 * time.Second
subject.MaxStaleness = &maxStaleness

result, err := (&ReadPref{ReadPref: subject}).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)

@@ -931,9 +943,10 @@ func TestSelector_SecondaryPreferred_with_maxStaleness(t *testing.T) {
func TestSelector_SecondaryPreferred_with_maxStaleness_and_no_primary(t *testing.T) {
t.Parallel()

subject := readpref.SecondaryPreferred(
readpref.WithMaxStaleness(time.Duration(90) * time.Second),
)
subject := readpref.SecondaryPreferred()

maxStaleness := 90 * time.Second
subject.MaxStaleness = &maxStaleness

result, err := (&ReadPref{ReadPref: subject}).SelectServer(readPrefTestTopology, []description.Server{readPrefTestSecondary1, readPrefTestSecondary2})

@@ -957,9 +970,10 @@ func TestSelector_Secondary(t *testing.T) {
func TestSelector_Secondary_with_tags(t *testing.T) {
t.Parallel()

subject := readpref.Secondary(
readpref.WithTags("a", "2"),
)
subject := readpref.SecondaryPreferred()

maxStaleness := 90 * time.Second
subject.MaxStaleness = &maxStaleness

result, err := (&ReadPref{ReadPref: subject}).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)

@@ -991,13 +1005,13 @@ func TestSelector_Secondary_with_empty_tag_set(t *testing.T) {
Servers: []description.Server{primaryNoTags, firstSecondaryNoTags, secondSecondaryNoTags},
}

nonMatchingSet := tag.Set{
nonMatchingSet := readpref.TagSet{
{Name: "foo", Value: "bar"},
}
emptyTagSet := tag.Set{}
rp := readpref.Secondary(
readpref.WithTagSets(nonMatchingSet, emptyTagSet),
)
emptyTagSet := readpref.TagSet{}

rp := readpref.SecondaryPreferred()
rp.TagSets = []readpref.TagSet{nonMatchingSet, emptyTagSet}

result, err := (&ReadPref{ReadPref: rp}).SelectServer(topologyNoTags, topologyNoTags.Servers)
assert.Nil(t, err, "SelectServer error: %v", err)
@@ -1008,9 +1022,12 @@ func TestSelector_Secondary_with_empty_tag_set(t *testing.T) {
func TestSelector_Secondary_with_tags_that_do_not_match(t *testing.T) {
t.Parallel()

subject := readpref.Secondary(
readpref.WithTags("a", "3"),
)
subject := readpref.Secondary()

tagSet, err := readpref.NewTagSet("a", "3")
assert.NoError(t, err)

subject.TagSets = []readpref.TagSet{tagSet}

result, err := (&ReadPref{ReadPref: subject}).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)

@@ -1032,9 +1049,10 @@ func TestSelector_Secondary_with_no_secondaries(t *testing.T) {
func TestSelector_Secondary_with_maxStaleness(t *testing.T) {
t.Parallel()

subject := readpref.Secondary(
readpref.WithMaxStaleness(time.Duration(90) * time.Second),
)
subject := readpref.Secondary()

maxStaleness := 90 * time.Second
subject.MaxStaleness = &maxStaleness

result, err := (&ReadPref{ReadPref: subject}).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)

@@ -1046,9 +1064,10 @@ func TestSelector_Secondary_with_maxStaleness(t *testing.T) {
func TestSelector_Secondary_with_maxStaleness_and_no_primary(t *testing.T) {
t.Parallel()

subject := readpref.Secondary(
readpref.WithMaxStaleness(time.Duration(90) * time.Second),
)
subject := readpref.Secondary()

maxStaleness := 90 * time.Second
subject.MaxStaleness = &maxStaleness

result, err := (&ReadPref{ReadPref: subject}).SelectServer(readPrefTestTopology, []description.Server{readPrefTestSecondary1, readPrefTestSecondary2})

@@ -1072,9 +1091,12 @@ func TestSelector_Nearest(t *testing.T) {
func TestSelector_Nearest_with_tags(t *testing.T) {
t.Parallel()

subject := readpref.Nearest(
readpref.WithTags("a", "1"),
)
subject := readpref.Nearest()

tagSet, err := readpref.NewTagSet("a", "1")
assert.NoError(t, err)

subject.TagSets = []readpref.TagSet{tagSet}

result, err := (&ReadPref{ReadPref: subject}).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)

@@ -1086,9 +1108,12 @@ func TestSelector_Nearest_with_tags(t *testing.T) {
func TestSelector_Nearest_with_tags_that_do_not_match(t *testing.T) {
t.Parallel()

subject := readpref.Nearest(
readpref.WithTags("a", "3"),
)
subject := readpref.Nearest()

tagSet, err := readpref.NewTagSet("a", "3")
assert.NoError(t, err)

subject.TagSets = []readpref.TagSet{tagSet}

result, err := (&ReadPref{ReadPref: subject}).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)

@@ -1123,9 +1148,10 @@ func TestSelector_Nearest_with_no_secondaries(t *testing.T) {
func TestSelector_Nearest_with_maxStaleness(t *testing.T) {
t.Parallel()

subject := readpref.Nearest(
readpref.WithMaxStaleness(time.Duration(90) * time.Second),
)
subject := readpref.Nearest()

maxStaleness := 90 * time.Second
subject.MaxStaleness = &maxStaleness

result, err := (&ReadPref{ReadPref: subject}).SelectServer(readPrefTestTopology, readPrefTestTopology.Servers)

@@ -1137,9 +1163,10 @@ func TestSelector_Nearest_with_maxStaleness(t *testing.T) {
func TestSelector_Nearest_with_maxStaleness_and_no_primary(t *testing.T) {
t.Parallel()

subject := readpref.Nearest(
readpref.WithMaxStaleness(time.Duration(90) * time.Second),
)
subject := readpref.Nearest()

maxStaleness := 90 * time.Second
subject.MaxStaleness = &maxStaleness

result, err := (&ReadPref{ReadPref: subject}).SelectServer(readPrefTestTopology, []description.Server{readPrefTestSecondary1, readPrefTestSecondary2})

@@ -1151,9 +1178,10 @@ func TestSelector_Nearest_with_maxStaleness_and_no_primary(t *testing.T) {
func TestSelector_Max_staleness_is_less_than_90_seconds(t *testing.T) {
t.Parallel()

subject := readpref.Nearest(
readpref.WithMaxStaleness(time.Duration(50) * time.Second),
)
subject := readpref.Nearest()

maxStaleness := 50 * time.Second
subject.MaxStaleness = &maxStaleness

s := description.Server{
Addr: address.Address("localhost:27017"),
@@ -1176,9 +1204,10 @@ func TestSelector_Max_staleness_is_less_than_90_seconds(t *testing.T) {
func TestSelector_Max_staleness_is_too_low(t *testing.T) {
t.Parallel()

subject := readpref.Nearest(
readpref.WithMaxStaleness(time.Duration(100) * time.Second),
)
subject := readpref.Nearest()

maxStaleness := 100 * time.Second
subject.MaxStaleness = &maxStaleness

s := description.Server{
Addr: address.Address("localhost:27017"),
@@ -1238,7 +1267,7 @@ func TestEqualServers(t *testing.T) {
},
{"setName", description.Server{SetName: "foo"}, false},
{"setVersion", description.Server{SetVersion: 1}, false},
{"tags", description.Server{Tags: tag.Set{tag.Tag{"foo", "bar"}}}, false},
{"tags", description.Server{Tags: readpref.TagSet{readpref.Tag{"foo", "bar"}}}, false},
{"topologyVersion", description.Server{TopologyVersion: &description.TopologyVersion{bson.NewObjectID(), 0}}, false},
{"kind", description.Server{Kind: description.ServerKindStandalone}, false},
{"wireVersion", description.Server{WireVersion: &description.VersionRange{1, 2}}, false},
24 changes: 12 additions & 12 deletions mongo/client_test.go
Original file line number Diff line number Diff line change
@@ -18,11 +18,11 @@ import (
"go.mongodb.org/mongo-driver/event"
"go.mongodb.org/mongo-driver/internal/assert"
"go.mongodb.org/mongo-driver/internal/integtest"
"go.mongodb.org/mongo-driver/internal/require"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readconcern"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
"go.mongodb.org/mongo-driver/tag"
"go.mongodb.org/mongo-driver/x/mongo/driver/mongocrypt"
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
@@ -88,22 +88,22 @@ func TestClient(t *testing.T) {
t.Run("read preference", func(t *testing.T) {
t.Run("absent", func(t *testing.T) {
client := setupClient()
gotMode := client.readPreference.Mode()
gotMode := client.readPreference.Mode
wantMode := readpref.PrimaryMode
assert.Equal(t, gotMode, wantMode, "expected mode %v, got %v", wantMode, gotMode)
_, flag := client.readPreference.MaxStaleness()
assert.False(t, flag, "expected max staleness to not be set but was")
gotMaxStaleness := client.readPreference.MaxStaleness
assert.Nil(t, gotMaxStaleness, "expected max staleness to not be set but was")
})
t.Run("specified", func(t *testing.T) {
tags := []tag.Set{
tags := []readpref.TagSet{
{
tag.Tag{
readpref.Tag{
Name: "one",
Value: "1",
},
},
{
tag.Tag{
readpref.Tag{
Name: "two",
Value: "2",
},
@@ -113,14 +113,14 @@ func TestClient(t *testing.T) {
cs += "?readpreference=secondary&readPreferenceTags=one:1&readPreferenceTags=two:2&maxStaleness=5"

client := setupClient(options.Client().ApplyURI(cs))
gotMode := client.readPreference.Mode()
gotMode := client.readPreference.Mode
assert.Equal(t, gotMode, readpref.SecondaryMode, "expected mode %v, got %v", readpref.SecondaryMode, gotMode)
gotTags := client.readPreference.TagSets()
gotTags := client.readPreference.TagSets
assert.Equal(t, gotTags, tags, "expected tags %v, got %v", tags, gotTags)
gotStaleness, flag := client.readPreference.MaxStaleness()
assert.True(t, flag, "expected max staleness to be set but was not")
gotStaleness := client.readPreference.MaxStaleness
require.NotNil(t, gotStaleness, "expected max staleness to be set but was not")
wantStaleness := time.Duration(5) * time.Second
assert.Equal(t, gotStaleness, wantStaleness, "expected staleness %v, got %v", wantStaleness, gotStaleness)
assert.Equal(t, *gotStaleness, wantStaleness, "expected staleness %v, got %v", wantStaleness, *gotStaleness)
})
})
t.Run("localThreshold", func(t *testing.T) {
2 changes: 1 addition & 1 deletion mongo/database.go
Original file line number Diff line number Diff line change
@@ -183,7 +183,7 @@ func (db *Database) processRunCommand(ctx context.Context, cmd interface{},
ro.ReadPreference = opt.ReadPreference
}
}
if sess != nil && sess.TransactionRunning() && ro.ReadPreference != nil && ro.ReadPreference.Mode() != readpref.PrimaryMode {
if sess != nil && sess.TransactionRunning() && ro.ReadPreference != nil && ro.ReadPreference.Mode != readpref.PrimaryMode {
return nil, sess, errors.New("read preference in a transaction must be primary")
}

26 changes: 11 additions & 15 deletions mongo/options/clientoptions.go
Original file line number Diff line number Diff line change
@@ -28,7 +28,6 @@ import (
"go.mongodb.org/mongo-driver/mongo/readconcern"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
"go.mongodb.org/mongo-driver/tag"
"go.mongodb.org/mongo-driver/x/mongo/driver"
"go.mongodb.org/mongo-driver/x/mongo/driver/connstring"
"go.mongodb.org/mongo-driver/x/mongo/driver/wiremessage"
@@ -432,26 +431,23 @@ func (c *ClientOptions) ApplyURI(uri string) *ClientOptions {
}

if cs.ReadPreference != "" || len(cs.ReadPreferenceTagSets) > 0 || cs.MaxStalenessSet {
opts := make([]readpref.Option, 0, 1)

tagSets := tag.NewTagSetsFromMaps(cs.ReadPreferenceTagSets)
if len(tagSets) > 0 {
opts = append(opts, readpref.WithTagSets(tagSets...))
}

if cs.MaxStaleness != 0 {
opts = append(opts, readpref.WithMaxStaleness(cs.MaxStaleness))
}

mode, err := readpref.ModeFromString(cs.ReadPreference)
if err != nil {
c.err = err
return c
}

c.ReadPreference, c.err = readpref.New(mode, opts...)
if c.err != nil {
return c
c.ReadPreference = &readpref.ReadPref{
Mode: mode,
}

tagSets := readpref.NewTagSetsFromMaps(cs.ReadPreferenceTagSets)
if len(tagSets) > 0 {
c.ReadPreference.TagSets = tagSets
}

if cs.MaxStaleness != 0 {
c.ReadPreference.MaxStaleness = &cs.MaxStaleness
}
}

27 changes: 16 additions & 11 deletions mongo/options/clientoptions_test.go
Original file line number Diff line number Diff line change
@@ -211,15 +211,6 @@ func TestClientOptions(t *testing.T) {
HTTPClient: httputil.DefaultHTTPClient,
},
},
{
"ReadPreference Primary With Options",
"mongodb://localhost/?readPreference=Primary&maxStaleness=200",
&ClientOptions{
err: errors.New("can not specify tags, max staleness, or hedge with mode primary"),
Hosts: []string{"localhost"},
HTTPClient: httputil.DefaultHTTPClient,
},
},
{
"TLS addCertFromFile error",
"mongodb://localhost/?ssl=true&sslCertificateAuthorityFile=testdata/doesntexist",
@@ -367,12 +358,26 @@ func TestClientOptions(t *testing.T) {
{
"ReadPreferenceTagSets",
"mongodb://localhost/?readPreference=secondaryPreferred&readPreferenceTags=foo:bar",
baseClient().SetReadPreference(readpref.SecondaryPreferred(readpref.WithTags("foo", "bar"))),
baseClient().SetReadPreference(func() *readpref.ReadPref {
rp := readpref.SecondaryPreferred() //readpref.NewTagSet("foo", "bar")

tagSet, _ := readpref.NewTagSet("foo", "bar")
rp.TagSets = append(rp.TagSets, tagSet)

return rp
}()),
},
{
"MaxStaleness",
"mongodb://localhost/?readPreference=secondaryPreferred&maxStaleness=250",
baseClient().SetReadPreference(readpref.SecondaryPreferred(readpref.WithMaxStaleness(250 * time.Second))),
baseClient().SetReadPreference(func() *readpref.ReadPref {
rp := readpref.SecondaryPreferred()

maxStaleness := 250 * time.Second
rp.MaxStaleness = &maxStaleness

return rp
}()),
},
{
"RetryWrites",
14 changes: 0 additions & 14 deletions mongo/readpref/mode.go
Original file line number Diff line number Diff line change
@@ -72,17 +72,3 @@ func (mode Mode) String() string {
return "unknown"
}
}

// IsValid checks whether the mode is valid.
func (mode Mode) IsValid() bool {
switch mode {
case PrimaryMode,
PrimaryPreferredMode,
SecondaryMode,
SecondaryPreferredMode,
NearestMode:
return true
default:
return false
}
}
83 changes: 0 additions & 83 deletions mongo/readpref/options.go

This file was deleted.

60 changes: 0 additions & 60 deletions mongo/readpref/options_example_test.go

This file was deleted.

103 changes: 24 additions & 79 deletions mongo/readpref/readpref.go
Original file line number Diff line number Diff line change
@@ -9,121 +9,66 @@ package readpref

import (
"bytes"
"errors"
"fmt"
"time"

"go.mongodb.org/mongo-driver/tag"
)

var (
errInvalidReadPreference = errors.New("can not specify tags, max staleness, or hedge with mode primary")
)

// Primary constructs a read preference with a PrimaryMode.
func Primary() *ReadPref {
return &ReadPref{mode: PrimaryMode}
return &ReadPref{Mode: PrimaryMode}
}

// PrimaryPreferred constructs a read preference with a PrimaryPreferredMode.
func PrimaryPreferred(opts ...Option) *ReadPref {
// New only returns an error with a mode of Primary
rp, _ := New(PrimaryPreferredMode, opts...)
return rp
func PrimaryPreferred() *ReadPref {
return &ReadPref{Mode: PrimaryPreferredMode}
}

// SecondaryPreferred constructs a read preference with a SecondaryPreferredMode.
func SecondaryPreferred(opts ...Option) *ReadPref {
// New only returns an error with a mode of Primary
rp, _ := New(SecondaryPreferredMode, opts...)
return rp
func SecondaryPreferred() *ReadPref {
return &ReadPref{Mode: SecondaryPreferredMode}
}

// Secondary constructs a read preference with a SecondaryMode.
func Secondary(opts ...Option) *ReadPref {
// New only returns an error with a mode of Primary
rp, _ := New(SecondaryMode, opts...)
return rp
func Secondary() *ReadPref {
return &ReadPref{Mode: SecondaryMode}
}

// Nearest constructs a read preference with a NearestMode.
func Nearest(opts ...Option) *ReadPref {
// New only returns an error with a mode of Primary
rp, _ := New(NearestMode, opts...)
return rp
}

// New creates a new ReadPref.
func New(mode Mode, opts ...Option) (*ReadPref, error) {
rp := &ReadPref{
mode: mode,
}

if mode == PrimaryMode && len(opts) != 0 {
return nil, errInvalidReadPreference
}

for _, opt := range opts {
if opt == nil {
continue
}
err := opt(rp)
if err != nil {
return nil, err
}
}

return rp, nil
func Nearest() *ReadPref {
return &ReadPref{Mode: NearestMode}
}

// ReadPref determines which servers are considered suitable for read operations.
type ReadPref struct {
maxStaleness time.Duration
maxStalenessSet bool
mode Mode
tagSets []tag.Set
hedgeEnabled *bool
}
// Maximum amount of time to allow a server to be considered eligible for
// selection. The second return value indicates if this value has been set.
MaxStaleness *time.Duration

// MaxStaleness is the maximum amount of time to allow
// a server to be considered eligible for selection. The
// second return value indicates if this value has been set.
func (r *ReadPref) MaxStaleness() (time.Duration, bool) {
return r.maxStaleness, r.maxStalenessSet
}

// Mode indicates the mode of the read preference.
func (r *ReadPref) Mode() Mode {
return r.mode
}
// Mode indicates the mode of the read preference.
Mode Mode

// TagSets are multiple tag sets indicating
// which servers should be considered.
func (r *ReadPref) TagSets() []tag.Set {
return r.tagSets
}
// Tag sets indicating which servers should be considered.
TagSets []TagSet

// HedgeEnabled returns whether or not hedged reads are enabled for this read preference. If this option was not
// specified during read preference construction, nil is returned.
func (r *ReadPref) HedgeEnabled() *bool {
return r.hedgeEnabled
// Specify whether or not hedged reads are enabled for this read preference.
HedgeEnabled *bool
}

// String returns a human-readable description of the read preference.
func (r *ReadPref) String() string {
var b bytes.Buffer
b.WriteString(r.mode.String())
b.WriteString(r.Mode.String())
delim := "("
if r.maxStalenessSet {
fmt.Fprintf(&b, "%smaxStaleness=%v", delim, r.maxStaleness)
if r.MaxStaleness != nil {
fmt.Fprintf(&b, "%smaxStaleness=%v", delim, r.MaxStaleness)
delim = " "
}
for _, tagSet := range r.tagSets {
for _, tagSet := range r.TagSets {
fmt.Fprintf(&b, "%stagSet=%s", delim, tagSet.String())
delim = " "
}
if r.hedgeEnabled != nil {
fmt.Fprintf(&b, "%shedgeEnabled=%v", delim, *r.hedgeEnabled)
if r.HedgeEnabled != nil {
fmt.Fprintf(&b, "%shedgeEnabled=%v", delim, *r.HedgeEnabled)
delim = " "
}
if delim != "(" {
132 changes: 15 additions & 117 deletions mongo/readpref/readpref_test.go
Original file line number Diff line number Diff line change
@@ -11,133 +11,31 @@ import (
"time"

"go.mongodb.org/mongo-driver/internal/assert"
"go.mongodb.org/mongo-driver/internal/require"
"go.mongodb.org/mongo-driver/tag"
)

func TestPrimary(t *testing.T) {
subject := Primary()

require.Equal(t, PrimaryMode, subject.Mode())
_, set := subject.MaxStaleness()
require.False(t, set)
require.Len(t, subject.TagSets(), 0)
}

func TestPrimaryPreferred(t *testing.T) {
subject := PrimaryPreferred()

require.Equal(t, PrimaryPreferredMode, subject.Mode())
_, set := subject.MaxStaleness()
require.False(t, set)
require.Len(t, subject.TagSets(), 0)
}

func TestPrimaryPreferred_with_options(t *testing.T) {
subject := PrimaryPreferred(
WithMaxStaleness(time.Duration(10)),
WithTags("a", "1", "b", "2"),
)

require.Equal(t, PrimaryPreferredMode, subject.Mode())
ms, set := subject.MaxStaleness()
require.True(t, set)
require.Equal(t, time.Duration(10), ms)
require.Equal(t, []tag.Set{{tag.Tag{Name: "a", Value: "1"}, tag.Tag{Name: "b", Value: "2"}}}, subject.TagSets())
}

func TestSecondaryPreferred(t *testing.T) {
subject := SecondaryPreferred()

require.Equal(t, SecondaryPreferredMode, subject.Mode())
_, set := subject.MaxStaleness()
require.False(t, set)
require.Len(t, subject.TagSets(), 0)
}

func TestSecondaryPreferred_with_options(t *testing.T) {
subject := SecondaryPreferred(
WithMaxStaleness(time.Duration(10)),
WithTags("a", "1", "b", "2"),
)

require.Equal(t, SecondaryPreferredMode, subject.Mode())
ms, set := subject.MaxStaleness()
require.True(t, set)
require.Equal(t, time.Duration(10), ms)
require.Equal(t, []tag.Set{{tag.Tag{Name: "a", Value: "1"}, tag.Tag{Name: "b", Value: "2"}}}, subject.TagSets())
}

func TestSecondary(t *testing.T) {
subject := Secondary()

require.Equal(t, SecondaryMode, subject.Mode())
_, set := subject.MaxStaleness()
require.False(t, set)
require.Len(t, subject.TagSets(), 0)
}

func TestSecondary_with_options(t *testing.T) {
subject := Secondary(
WithMaxStaleness(time.Duration(10)),
WithTags("a", "1", "b", "2"),
)

require.Equal(t, SecondaryMode, subject.Mode())
ms, set := subject.MaxStaleness()
require.True(t, set)
require.Equal(t, time.Duration(10), ms)
require.Equal(t, []tag.Set{{tag.Tag{Name: "a", Value: "1"}, tag.Tag{Name: "b", Value: "2"}}}, subject.TagSets())
}

func TestNearest(t *testing.T) {
subject := Nearest()
func TestReadPref_String(t *testing.T) {
t.Run("ReadPref.String() with all options", func(t *testing.T) {
readPref := Nearest()

require.Equal(t, NearestMode, subject.Mode())
_, set := subject.MaxStaleness()
require.False(t, set)
require.Len(t, subject.TagSets(), 0)
}
maxStaleness := 120 * time.Second
readPref.MaxStaleness = &maxStaleness

func TestNearest_with_options(t *testing.T) {
subject := Nearest(
WithMaxStaleness(time.Duration(10)),
WithTags("a", "1", "b", "2"),
)
readPref.TagSets = []TagSet{{{"a", "1"}, {"b", "2"}}, {{"q", "5"}, {"r", "6"}}}

require.Equal(t, NearestMode, subject.Mode())
ms, set := subject.MaxStaleness()
require.True(t, set)
require.Equal(t, time.Duration(10), ms)
require.Equal(t, []tag.Set{{tag.Tag{Name: "a", Value: "1"}, tag.Tag{Name: "b", Value: "2"}}}, subject.TagSets())
}
hedgeEnabled := true
readPref.HedgeEnabled = &hedgeEnabled

func TestHedge(t *testing.T) {
t.Run("hedge specified with primary mode errors", func(t *testing.T) {
_, err := New(PrimaryMode, WithHedgeEnabled(true))
assert.Equal(t, errInvalidReadPreference, err, "expected error %v, got %v", errInvalidReadPreference, err)
})
t.Run("valid hedge document and mode succeeds", func(t *testing.T) {
rp, err := New(SecondaryMode, WithHedgeEnabled(true))
assert.Nil(t, err, "expected no error, got %v", err)
enabled := rp.HedgeEnabled()
assert.NotNil(t, enabled, "expected HedgeEnabled to return a non-nil value, got nil")
assert.True(t, *enabled, "expected HedgeEnabled to return true, got false")
})
}

func TestReadPref_String(t *testing.T) {
t.Run("ReadPref.String() with all options", func(t *testing.T) {
readPref := Nearest(
WithMaxStaleness(120*time.Second),
WithTagSets(tag.Set{{"a", "1"}, {"b", "2"}}, tag.Set{{"q", "5"}, {"r", "6"}}),
WithHedgeEnabled(true),
)
expected := "nearest(maxStaleness=2m0s tagSet=a=1,b=2 tagSet=q=5,r=6 hedgeEnabled=true)"
assert.Equal(t, expected, readPref.String(), "expected %q, got %q", expected, readPref.String())
})
t.Run("ReadPref.String() with one option", func(t *testing.T) {
readPref := Secondary(WithTags("a", "1", "b", "2"))
readPref := Secondary()

tagSet, err := NewTagSet("a", "1", "b", "2")
assert.NoError(t, err)

readPref.TagSets = append(readPref.TagSets, tagSet)

expected := "secondary(tagSet=a=1,b=2)"
assert.Equal(t, expected, readPref.String(), "expected %q, got %q", expected, readPref.String())
})
47 changes: 32 additions & 15 deletions tag/tag.go → mongo/readpref/tag.go
Original file line number Diff line number Diff line change
@@ -4,11 +4,7 @@
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0

// Package tag provides types for filtering replica set members using tags in a read preference.
//
// For more information about read preference tags, see
// https://www.mongodb.com/docs/manual/core/read-preference-tags/
package tag
package readpref

import (
"bytes"
@@ -26,12 +22,36 @@ func (tag Tag) String() string {
return fmt.Sprintf("%s=%s", tag.Name, tag.Value)
}

// TagSet is an ordered list of Tags.
type TagSet []Tag

// NewTagSet is a convenience function to specify a single tag set used to match
// replica set members. If no members match the tag set, read operations will
// return an error.
//
// For more information about read preference tags, see
// https://www.mongodb.com/docs/manual/core/read-preference-tags/
func NewTagSet(tags ...string) (TagSet, error) {
length := len(tags)
if length < 2 || length%2 != 0 {
return nil, fmt.Errorf("an even number of tags must be specified")
}

tagset := make(TagSet, 0, length/2)

for i := 1; i < length; i += 2 {
tagset = append(tagset, Tag{Name: tags[i-1], Value: tags[i]})
}

return tagset, nil
}

// NewTagSetFromMap creates a tag set from a map.
//
// For more information about read preference tags, see
// https://www.mongodb.com/docs/manual/core/read-preference-tags/
func NewTagSetFromMap(m map[string]string) Set {
var set Set
func NewTagSetFromMap(m map[string]string) TagSet {
set := make(TagSet, 0, len(m))
for k, v := range m {
set = append(set, Tag{Name: k, Value: v})
}
@@ -43,19 +63,16 @@ func NewTagSetFromMap(m map[string]string) Set {
//
// For more information about read preference tags, see
// https://www.mongodb.com/docs/manual/core/read-preference-tags/
func NewTagSetsFromMaps(maps []map[string]string) []Set {
sets := make([]Set, 0, len(maps))
func NewTagSetsFromMaps(maps []map[string]string) []TagSet {
sets := make([]TagSet, 0, len(maps))
for _, m := range maps {
sets = append(sets, NewTagSetFromMap(m))
}
return sets
}

// Set is an ordered list of Tags.
type Set []Tag

// Contains indicates whether the name/value pair exists in the tagset.
func (ts Set) Contains(name, value string) bool {
func (ts TagSet) Contains(name, value string) bool {
for _, t := range ts {
if t.Name == name && t.Value == value {
return true
@@ -66,7 +83,7 @@ func (ts Set) Contains(name, value string) bool {
}

// ContainsAll indicates whether all the name/value pairs exist in the tagset.
func (ts Set) ContainsAll(other []Tag) bool {
func (ts TagSet) ContainsAll(other []Tag) bool {
for _, ot := range other {
if !ts.Contains(ot.Name, ot.Value) {
return false
@@ -77,7 +94,7 @@ func (ts Set) ContainsAll(other []Tag) bool {
}

// String returns a human-readable human-readable description of the tagset.
func (ts Set) String() string {
func (ts TagSet) String() string {
var b bytes.Buffer
for i, tag := range ts {
if i > 0 {
20 changes: 10 additions & 10 deletions tag/tag_test.go → mongo/readpref/tag_test.go
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0

package tag
package readpref

import (
"testing"
@@ -23,7 +23,7 @@ func TestTag_String(t *testing.T) {
func TestTagSets_NewTagSet(t *testing.T) {
t.Parallel()

ts := Set{Tag{Name: "a", Value: "1"}}
ts := TagSet{Tag{Name: "a", Value: "1"}}

require.True(t, ts.Contains("a", "1"))
require.False(t, ts.Contains("1", "a"))
@@ -65,30 +65,30 @@ func TestTagSets_NewTagSetsFromMaps(t *testing.T) {
func TestTagSets_ContainsAll(t *testing.T) {
t.Parallel()

ts := Set{
ts := TagSet{
Tag{Name: "a", Value: "1"},
Tag{Name: "b", Value: "2"},
}

test := Set{Tag{Name: "a", Value: "1"}}
test := TagSet{Tag{Name: "a", Value: "1"}}
require.True(t, ts.ContainsAll(test))
test = Set{Tag{Name: "a", Value: "1"}, Tag{Name: "b", Value: "2"}}
test = TagSet{Tag{Name: "a", Value: "1"}, Tag{Name: "b", Value: "2"}}
require.True(t, ts.ContainsAll(test))
test = Set{Tag{Name: "a", Value: "1"}, Tag{Name: "b", Value: "2"}}
test = TagSet{Tag{Name: "a", Value: "1"}, Tag{Name: "b", Value: "2"}}
require.True(t, ts.ContainsAll(test))

test = Set{Tag{Name: "a", Value: "2"}, Tag{Name: "b", Value: "1"}}
test = TagSet{Tag{Name: "a", Value: "2"}, Tag{Name: "b", Value: "1"}}
require.False(t, ts.ContainsAll(test))
test = Set{Tag{Name: "a", Value: "1"}, Tag{Name: "b", Value: "1"}}
test = TagSet{Tag{Name: "a", Value: "1"}, Tag{Name: "b", Value: "1"}}
require.False(t, ts.ContainsAll(test))
test = Set{Tag{Name: "a", Value: "2"}, Tag{Name: "b", Value: "2"}}
test = TagSet{Tag{Name: "a", Value: "2"}, Tag{Name: "b", Value: "2"}}
require.False(t, ts.ContainsAll(test))
}

func TestTagSets_String(t *testing.T) {
t.Parallel()

ts := Set{
ts := TagSet{
Tag{Name: "a", Value: "1"},
Tag{Name: "b", Value: "2"},
}
4 changes: 2 additions & 2 deletions x/mongo/driver/description/server.go
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@ import (

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/address"
"go.mongodb.org/mongo-driver/tag"
"go.mongodb.org/mongo-driver/mongo/readpref"
)

// ServerKind represents the type of a single server in a topology.
@@ -105,7 +105,7 @@ type Server struct {
SessionTimeoutMinutes *int64
SetName string
SetVersion uint32
Tags tag.Set
Tags readpref.TagSet
TopologyVersion *TopologyVersion
Kind ServerKind
WireVersion *VersionRange
22 changes: 11 additions & 11 deletions x/mongo/driver/operation.go
Original file line number Diff line number Diff line change
@@ -1697,7 +1697,7 @@ func (op Operation) getReadPrefBasedOnTransaction() (*readpref.ReadPref, error)
rp := op.Client.CurrentRp
// Reads in a transaction must have read preference primary
// This must not be checked in startTransaction
if rp != nil && !op.Client.TransactionStarting() && rp.Mode() != readpref.PrimaryMode {
if rp != nil && !op.Client.TransactionStarting() && rp.Mode != readpref.PrimaryMode {
return nil, ErrNonPrimaryReadPref
}
return rp, nil
@@ -1743,7 +1743,7 @@ func (op Operation) createReadPref(desc description.SelectedServer, isOpQuery bo
return nil, nil
}

switch rp.Mode() {
switch rp.Mode {
case readpref.PrimaryMode:
if desc.Server.Kind == description.ServerKindMongos {
return nil, nil
@@ -1764,9 +1764,9 @@ func (op Operation) createReadPref(desc description.SelectedServer, isOpQuery bo
case readpref.PrimaryPreferredMode:
doc = bsoncore.AppendStringElement(doc, "mode", "primaryPreferred")
case readpref.SecondaryPreferredMode:
_, ok := rp.MaxStaleness()
if desc.Server.Kind == description.ServerKindMongos && isOpQuery && !ok && len(rp.TagSets()) == 0 &&
rp.HedgeEnabled() == nil {
ok := rp.MaxStaleness != nil
if desc.Server.Kind == description.ServerKindMongos && isOpQuery && !ok && len(rp.TagSets) == 0 &&
rp.HedgeEnabled == nil {

return nil, nil
}
@@ -1777,8 +1777,8 @@ func (op Operation) createReadPref(desc description.SelectedServer, isOpQuery bo
doc = bsoncore.AppendStringElement(doc, "mode", "nearest")
}

sets := make([]bsoncore.Document, 0, len(rp.TagSets()))
for _, ts := range rp.TagSets() {
sets := make([]bsoncore.Document, 0, len(rp.TagSets))
for _, ts := range rp.TagSets {
i, set := bsoncore.AppendDocumentStart(nil)
for _, t := range ts {
set = bsoncore.AppendStringElement(set, t.Name, t.Value)
@@ -1795,11 +1795,11 @@ func (op Operation) createReadPref(desc description.SelectedServer, isOpQuery bo
doc, _ = bsoncore.AppendArrayEnd(doc, aidx)
}

if d, ok := rp.MaxStaleness(); ok {
doc = bsoncore.AppendInt32Element(doc, "maxStalenessSeconds", int32(d.Seconds()))
if maxStaleness := rp.MaxStaleness; maxStaleness != nil {
doc = bsoncore.AppendInt32Element(doc, "maxStalenessSeconds", int32((*maxStaleness).Seconds()))
}

if hedgeEnabled := rp.HedgeEnabled(); hedgeEnabled != nil {
if hedgeEnabled := rp.HedgeEnabled; hedgeEnabled != nil {
var hedgeIdx int32
hedgeIdx, doc = bsoncore.AppendDocumentElementStart(doc, "hedge")
doc = bsoncore.AppendBooleanElement(doc, "enabled", *hedgeEnabled)
@@ -1818,7 +1818,7 @@ func (op Operation) secondaryOK(desc description.SelectedServer) wiremessage.Que
return wiremessage.SecondaryOK
}

if rp := op.ReadPreference; rp != nil && rp.Mode() != readpref.PrimaryMode {
if rp := op.ReadPreference; rp != nil && rp.Mode != readpref.PrimaryMode {
return wiremessage.SecondaryOK
}

63 changes: 51 additions & 12 deletions x/mongo/driver/operation_test.go
Original file line number Diff line number Diff line change
@@ -24,7 +24,6 @@ import (
"go.mongodb.org/mongo-driver/mongo/readconcern"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
"go.mongodb.org/mongo-driver/tag"
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
"go.mongodb.org/mongo-driver/x/mongo/driver/mnet"
@@ -445,17 +444,32 @@ func TestOperation(t *testing.T) {
{"nearest", readpref.Nearest(), description.ServerKindRSSecondary, description.TopologyKindReplicaSet, false, rpNearest},
{
"secondaryPreferred/withTags",
readpref.SecondaryPreferred(readpref.WithTags("disk", "ssd", "use", "reporting")),
func() *readpref.ReadPref {
rp := readpref.SecondaryPreferred()

tagSet, err := readpref.NewTagSet("disk", "ssd", "use", "reporting")
assert.NoError(t, err)

rp.TagSets = []readpref.TagSet{tagSet}

return rp
}(),
description.ServerKindRSSecondary, description.TopologyKindReplicaSet, false, rpWithTags,
},
// GODRIVER-2205: Ensure empty tag sets are written as an empty document in the read
// preference document. Empty tag sets match any server and are used as a fallback when
// no other tag sets match any servers.
{
"secondaryPreferred/withTags/emptyTagSet",
readpref.SecondaryPreferred(readpref.WithTagSets(
tag.Set{{Name: "disk", Value: "ssd"}},
tag.Set{})),
func() *readpref.ReadPref {
rp := readpref.SecondaryPreferred()
rp.TagSets = []readpref.TagSet{
readpref.TagSet{{Name: "disk", Value: "ssd"}},
readpref.TagSet{},
}

return rp
}(),
description.ServerKindRSSecondary,
description.TopologyKindReplicaSet,
false,
@@ -469,25 +483,50 @@ func TestOperation(t *testing.T) {
},
{
"secondaryPreferred/withMaxStaleness",
readpref.SecondaryPreferred(readpref.WithMaxStaleness(25 * time.Second)),
func() *readpref.ReadPref {
rp := readpref.SecondaryPreferred()

maxStaleness := 25 * time.Second
rp.MaxStaleness = &maxStaleness

return rp
}(),
description.ServerKindRSSecondary, description.TopologyKindReplicaSet, false, rpWithMaxStaleness,
},
{
// A read preference document is generated for SecondaryPreferred if the hedge document is non-nil.
"secondaryPreferred with hedge to mongos using OP_QUERY",
readpref.SecondaryPreferred(readpref.WithHedgeEnabled(true)),
func() *readpref.ReadPref {
rp := readpref.SecondaryPreferred()

he := true
rp.HedgeEnabled = &he

return rp
}(),
description.ServerKindMongos,
description.TopologyKindSharded,
true,
rpWithHedge,
},
{
"secondaryPreferred with all options",
readpref.SecondaryPreferred(
readpref.WithTags("disk", "ssd", "use", "reporting"),
readpref.WithMaxStaleness(25*time.Second),
readpref.WithHedgeEnabled(false),
),
func() *readpref.ReadPref {
rp := readpref.SecondaryPreferred()

tagSet, err := readpref.NewTagSet("disk", "ssd", "use", "reporting")
assert.NoError(t, err)

rp.TagSets = []readpref.TagSet{tagSet}

maxStaleness := 25 * time.Second
rp.MaxStaleness = &maxStaleness

he := false
rp.HedgeEnabled = &he

return rp
}(),
description.ServerKindRSSecondary,
description.TopologyKindReplicaSet,
false,