Skip to content

Commit 080214e

Browse files
authored
Merge pull request #886 from twmb/885
kgo: redirect back to the leader on KIP-392 case 3 failure
2 parents e42c7c5 + 847095b commit 080214e

File tree

2 files changed

+37
-4
lines changed

2 files changed

+37
-4
lines changed

pkg/kgo/consumer.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1955,6 +1955,10 @@ func (s *consumerSession) mapLoadsToBrokers(loads listOrEpochLoads) map[*broker]
19551955
// If we are fetching from a follower, we can list
19561956
// offsets against the follower itself. The replica
19571957
// being non-negative signals that.
1958+
//
1959+
// Note this is not actually true (i.e. KIP-392 lies),
1960+
// but we keep this logic in case we can revert
1961+
// to using non-leaders someday.
19581962
brokerID = offset.replica
19591963
}
19601964
if tryBroker := findBroker(brokers, brokerID); tryBroker != nil {

pkg/kgo/source.go

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ type cursorOffsetNext struct {
234234
type cursorOffsetPreferred struct {
235235
cursorOffsetNext
236236
preferredReplica int32
237+
ooor bool
237238
}
238239

239240
// Moves a cursor from one source to another. This is done while handling
@@ -268,12 +269,13 @@ func (cs cursorPreferreds) String() string {
268269
type pnext struct {
269270
p int32
270271
next int32
272+
ooor bool
271273
}
272274
ts := make(map[string][]pnext)
273275
for _, c := range cs {
274276
t := c.from.topic
275277
p := c.from.partition
276-
ts[t] = append(ts[t], pnext{p, c.preferredReplica})
278+
ts[t] = append(ts[t], pnext{p, c.preferredReplica, c.ooor})
277279
}
278280
tsorted := make([]string, 0, len(ts))
279281
for t, ps := range ts {
@@ -303,9 +305,17 @@ func (cs cursorPreferreds) String() string {
303305

304306
for j, p := range ps {
305307
if j < len(ps)-1 {
306-
fmt.Fprintf(sb, "%d=>%d, ", p.p, p.next)
308+
if p.ooor {
309+
fmt.Fprintf(sb, "%d=>%d[ooor], ", p.p, p.next)
310+
} else {
311+
fmt.Fprintf(sb, "%d=>%d, ", p.p, p.next)
312+
}
307313
} else {
308-
fmt.Fprintf(sb, "%d=>%d", p.p, p.next)
314+
if p.ooor {
315+
fmt.Fprintf(sb, "%d=>%d[ooor]", p.p, p.next)
316+
} else {
317+
fmt.Fprintf(sb, "%d=>%d", p.p, p.next)
318+
}
309319
}
310320
}
311321

@@ -1083,6 +1093,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
10831093
preferreds = append(preferreds, cursorOffsetPreferred{
10841094
*partOffset,
10851095
preferred,
1096+
false,
10861097
})
10871098
continue
10881099
}
@@ -1152,6 +1163,9 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
11521163
// KIP-392 (case 3) specifies that if we are consuming
11531164
// from a follower, then if our offset request is before
11541165
// the low watermark, we list offsets from the follower.
1166+
// However, Kafka does not actually implement handling
1167+
// ListOffsets from anything from the leader, so we
1168+
// need to redirect ourselves back to the leader.
11551169
//
11561170
// KIP-392 (case 4) specifies that if we are consuming
11571171
// a follower and our request is larger than the high
@@ -1205,7 +1219,22 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
12051219
addList(-1, true)
12061220

12071221
case partOffset.offset < fp.LogStartOffset: // KIP-392 case 3
1208-
addList(s.nodeID, false)
1222+
// KIP-392 specifies that we should list offsets against the follower,
1223+
// but that actually is not supported and the Java client redirects
1224+
// back to the leader. The leader then does *not* direct the client
1225+
// back to the follower because the follower is not an in sync
1226+
// replica. If we did not redirect back to the leader, we would spin
1227+
// loop receiving offset_out_of_range from the follower for Fetch, and
1228+
// then not_leader_or_follower from the follower for ListOffsets
1229+
// (even though it is a follower). So, we just set the preferred replica
1230+
// back to the follower. We go directly back to fetching with the
1231+
// hope that the offset is available on the leader, and if not, we'll
1232+
// just get an OOOR error again and fall into case 1 just above.
1233+
preferreds = append(preferreds, cursorOffsetPreferred{
1234+
*partOffset,
1235+
partOffset.from.leader,
1236+
true,
1237+
})
12091238

12101239
default: // partOffset.offset > fp.HighWatermark, KIP-392 case 4
12111240
if kip320 {

0 commit comments

Comments
 (0)