Skip to content

Commit

Permalink
kgo: redirect back to the leader on KIP-392 case 3 failure
Browse files Browse the repository at this point in the history
See embedded comments.

I have a test in kfake that needs to be pushed as a followup PR since
kfake depends on franz-go.

Closes #885.
  • Loading branch information
twmb committed Jan 19, 2025
1 parent 293b7c4 commit 847095b
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 4 deletions.
4 changes: 4 additions & 0 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1955,6 +1955,10 @@ func (s *consumerSession) mapLoadsToBrokers(loads listOrEpochLoads) map[*broker]
// If we are fetching from a follower, we can list
// offsets against the follower itself. The replica
// being non-negative signals that.
//
// Note this is not actually true (i.e. KIP-392 lies),
// but we keep this logic in case we can revert
// to using non-leaders someday.
brokerID = offset.replica
}
if tryBroker := findBroker(brokers, brokerID); tryBroker != nil {
Expand Down
37 changes: 33 additions & 4 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ type cursorOffsetNext struct {
type cursorOffsetPreferred struct {
cursorOffsetNext
preferredReplica int32
ooor bool
}

// Moves a cursor from one source to another. This is done while handling
Expand Down Expand Up @@ -268,12 +269,13 @@ func (cs cursorPreferreds) String() string {
type pnext struct {
p int32
next int32
ooor bool
}
ts := make(map[string][]pnext)
for _, c := range cs {
t := c.from.topic
p := c.from.partition
ts[t] = append(ts[t], pnext{p, c.preferredReplica})
ts[t] = append(ts[t], pnext{p, c.preferredReplica, c.ooor})
}
tsorted := make([]string, 0, len(ts))
for t, ps := range ts {
Expand Down Expand Up @@ -303,9 +305,17 @@ func (cs cursorPreferreds) String() string {

for j, p := range ps {
if j < len(ps)-1 {
fmt.Fprintf(sb, "%d=>%d, ", p.p, p.next)
if p.ooor {
fmt.Fprintf(sb, "%d=>%d[ooor], ", p.p, p.next)
} else {
fmt.Fprintf(sb, "%d=>%d, ", p.p, p.next)
}
} else {
fmt.Fprintf(sb, "%d=>%d", p.p, p.next)
if p.ooor {
fmt.Fprintf(sb, "%d=>%d[ooor]", p.p, p.next)
} else {
fmt.Fprintf(sb, "%d=>%d", p.p, p.next)
}
}
}

Expand Down Expand Up @@ -1065,6 +1075,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
preferreds = append(preferreds, cursorOffsetPreferred{
*partOffset,
preferred,
false,
})
continue
}
Expand Down Expand Up @@ -1134,6 +1145,9 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
// KIP-392 (case 3) specifies that if we are consuming
// from a follower, then if our offset request is before
// the low watermark, we list offsets from the follower.
// However, Kafka does not actually implement handling
// ListOffsets from anything from the leader, so we
// need to redirect ourselves back to the leader.
//
// KIP-392 (case 4) specifies that if we are consuming
// a follower and our request is larger than the high
Expand Down Expand Up @@ -1187,7 +1201,22 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
addList(-1, true)

case partOffset.offset < fp.LogStartOffset: // KIP-392 case 3
addList(s.nodeID, false)
// KIP-392 specifies that we should list offsets against the follower,
// but that actually is not supported and the Java client redirects
// back to the leader. The leader then does *not* direct the client
// back to the follower because the follower is not an in sync
// replica. If we did not redirect back to the leader, we would spin
// loop receiving offset_out_of_range from the follower for Fetch, and
// then not_leader_or_follower from the follower for ListOffsets
// (even though it is a follower). So, we just set the preferred replica
// back to the follower. We go directly back to fetching with the
// hope that the offset is available on the leader, and if not, we'll
// just get an OOOR error again and fall into case 1 just above.
preferreds = append(preferreds, cursorOffsetPreferred{
*partOffset,
partOffset.from.leader,
true,
})

default: // partOffset.offset > fp.HighWatermark, KIP-392 case 4
if kip320 {
Expand Down

0 comments on commit 847095b

Please sign in to comment.