Skip to content

Commit

Permalink
fix: leader election should choose highest term (#598)
Browse files Browse the repository at this point in the history
### Motivation

#579 want to support choosing the highest term as leader. But that PR
didn't support it well.


### Modification

- Fix the bug might choose a lower term as leader.
- Add enhanced test to cover logic.

---------

Co-authored-by: Matteo Merli <[email protected]>
  • Loading branch information
mattisonchao and merlimat authored Jan 24, 2025
1 parent 851f57d commit bed7f1a
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 23 deletions.
2 changes: 1 addition & 1 deletion .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ linters-settings:
severity: warning
disabled: false
arguments:
- 15
- 18
- name: cyclomatic
severity: warning
disabled: false
Expand Down
17 changes: 9 additions & 8 deletions coordinator/impl/shard_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,17 +664,18 @@ func selectNewLeader(newTermResponses map[model.ServerAddress]*proto.EntryId) (
var candidates []model.ServerAddress

for addr, headEntryId := range newTermResponses {
if headEntryId.Term >= currentMaxTerm {
if headEntryId.Term > currentMaxTerm {
// the new max
currentMaxTerm = headEntryId.Term
switch {
case headEntryId.Offset < currentMax:
continue
case headEntryId.Offset == currentMax:
candidates = append(candidates, addr)
default:
// Found a new max
currentMax = headEntryId.Offset
candidates = []model.ServerAddress{addr}
} else if headEntryId.Term == currentMaxTerm {
if headEntryId.Offset > currentMax {
// the new max
currentMax = headEntryId.Offset
candidates = []model.ServerAddress{addr}
} else if headEntryId.Offset == currentMax {
candidates = append(candidates, addr)
}
}
}
Expand Down
90 changes: 76 additions & 14 deletions coordinator/impl/shard_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,83 @@ var namespaceConfig = &model.NamespaceConfig{
}

func TestLeaderElection_ShouldChooseHighestTerm(t *testing.T) {
s1 := model.ServerAddress{Public: "1", Internal: "1"}
s2 := model.ServerAddress{Public: "2", Internal: "2"}
s3 := model.ServerAddress{Public: "3", Internal: "3"}
candidates := map[model.ServerAddress]*proto.EntryId{
s1: {Term: 200, Offset: 2480},
s2: {Term: 200, Offset: 2500},
s3: {Term: 198, Offset: 3000},
tests := []struct {
name string
candidates map[model.ServerAddress]*proto.EntryId
expectedLeader model.ServerAddress
expectedFollowersCount int
expectedFollowers map[model.ServerAddress]*proto.EntryId
}{
{
name: "Choose highest term",
candidates: map[model.ServerAddress]*proto.EntryId{
{Public: "1", Internal: "1"}: {Term: 200, Offset: 2480},
{Public: "2", Internal: "2"}: {Term: 200, Offset: 2500},
{Public: "3", Internal: "3"}: {Term: 198, Offset: 3000},
},
expectedLeader: model.ServerAddress{Public: "2", Internal: "2"},
expectedFollowersCount: 2,
expectedFollowers: map[model.ServerAddress]*proto.EntryId{
{Public: "1", Internal: "1"}: {Term: 200, Offset: 2480},
{Public: "3", Internal: "3"}: {Term: 198, Offset: 3000},
},
},
{
name: "Same term, different offsets",
candidates: map[model.ServerAddress]*proto.EntryId{
{Public: "1", Internal: "1"}: {Term: 200, Offset: 1000},
{Public: "2", Internal: "2"}: {Term: 200, Offset: 2000},
{Public: "3", Internal: "3"}: {Term: 200, Offset: 1500},
},
expectedLeader: model.ServerAddress{Public: "2", Internal: "2"},
expectedFollowersCount: 2,
expectedFollowers: map[model.ServerAddress]*proto.EntryId{
{Public: "1", Internal: "1"}: {Term: 200, Offset: 1000},
{Public: "3", Internal: "3"}: {Term: 200, Offset: 1500},
},
},
{
name: "Different terms, same offsets",
candidates: map[model.ServerAddress]*proto.EntryId{
{Public: "1", Internal: "1"}: {Term: 200, Offset: 1500},
{Public: "2", Internal: "2"}: {Term: 198, Offset: 1500},
{Public: "3", Internal: "3"}: {Term: 199, Offset: 1500},
},
expectedLeader: model.ServerAddress{Public: "1", Internal: "1"},
expectedFollowersCount: 2,
expectedFollowers: map[model.ServerAddress]*proto.EntryId{
{Public: "2", Internal: "2"}: {Term: 198, Offset: 1500},
{Public: "3", Internal: "3"}: {Term: 199, Offset: 1500},
},
},
{
name: "Single candidate",
candidates: map[model.ServerAddress]*proto.EntryId{
{Public: "1", Internal: "1"}: {Term: 200, Offset: 1500},
},
expectedLeader: model.ServerAddress{Public: "1", Internal: "1"},
expectedFollowersCount: 0,
expectedFollowers: map[model.ServerAddress]*proto.EntryId{},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
leader, followers := selectNewLeader(tt.candidates)

// Check leader
assert.Equal(t, tt.expectedLeader, leader)

// Check followers
assert.Equal(t, tt.expectedFollowersCount, len(followers))
for addr, expectedEntry := range tt.expectedFollowers {
assert.Equal(t, expectedEntry, followers[addr])
}
// Ensure the leader is not in the followers
_, exists := followers[leader]
assert.False(t, exists)
})
}
leader, followers := selectNewLeader(candidates)
assert.EqualValues(t, leader, s2)
assert.EqualValues(t, 2, len(followers))
_, exist := followers[s1]
assert.True(t, exist)
_, exist = followers[s3]
assert.True(t, exist)
}

func TestShardController(t *testing.T) {
Expand Down

0 comments on commit bed7f1a

Please sign in to comment.