Skip to content

Commit fd332f9

Browse files
authored
Merge pull request #846 from rodaine/invalid-base-offset
Fix handling of invalid base offsets
2 parents 01e51dc + 2eed36e commit fd332f9

File tree

2 files changed

+12
-2
lines changed

2 files changed

+12
-2
lines changed

pkg/kgo/producer.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,13 @@ start:
553553
p.promisesMu.Lock()
554554
for i, pr := range b.recs {
555555
pr.LeaderEpoch = 0
556-
pr.Offset = b.baseOffset + int64(i)
556+
if b.baseOffset == -1 {
557+
// if the base offset is invalid/unknown (-1), all record offsets should
558+
// be treated as unknown
559+
pr.Offset = -1
560+
} else {
561+
pr.Offset = b.baseOffset + int64(i)
562+
}
557563
pr.Partition = b.partition
558564
pr.ProducerID = b.pid
559565
pr.ProducerEpoch = b.epoch

pkg/kgo/source.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1773,7 +1773,11 @@ func recordToRecord(
17731773
ProducerID: batch.ProducerID,
17741774
ProducerEpoch: batch.ProducerEpoch,
17751775
LeaderEpoch: batch.PartitionLeaderEpoch,
1776-
Offset: batch.FirstOffset + int64(record.OffsetDelta),
1776+
}
1777+
if batch.FirstOffset == -1 {
1778+
r.Offset = -1
1779+
} else {
1780+
r.Offset = batch.FirstOffset + int64(record.OffsetDelta)
17771781
}
17781782
if r.Attrs.TimestampType() == 0 {
17791783
r.Timestamp = timeFromMillis(batch.FirstTimestamp + record.TimestampDelta64)

0 commit comments

Comments
 (0)