Skip to content

Commit 66c638b

Browse files
committed
fixed bug handling incomplete messages at end of batch; resume consumer when the current kafka segment is deleted prematurely
1 parent 484dbc4 commit 66c638b

File tree

6 files changed

+60
-14
lines changed

6 files changed

+60
-14
lines changed

README.md

+5
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,11 @@ Go language: (http://golang.org/) <br/>
88

99
## Changes
1010

11+
### May 2015
12+
13+
* fixed bug handling partial message at end of a fetch response when the payload is < 4 bytes
14+
* if the the kafka log segment being read is cleaned up, attempt resuming the consumer from the earliest available offset
15+
1116
### April 2015
1217

1318
* added support for Snappy compression

consumer.go

+39-7
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTime
9292
}, quit)
9393

9494
if err != nil {
95-
if err != io.EOF && err.Error() != "use of closed network connection" { //
95+
if err != io.EOF && err.Error() != "use of closed network connection" {
9696
log.Println("Fatal Error: ", err)
9797
panic(err)
9898
}
@@ -129,6 +129,23 @@ func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTime
129129
// MessageHandlerFunc defines the interface for message handlers accepted by Consume()
130130
type MessageHandlerFunc func(msg *Message)
131131

132+
// reconnect from the earliest available offset after the current one becomes unavailable
133+
func (consumer *BrokerConsumer) reconnectFromEarliestAvailableOffset(conn *net.TCPConn) (uint64, error) {
134+
_, err := conn.Write(consumer.broker.EncodeOffsetRequest(OFFSET_EARLIEST, 1))
135+
if err != nil {
136+
log.Println("Failed kafka offset request:", err.Error())
137+
return 0, err
138+
}
139+
140+
length, payload, err := consumer.broker.readResponse(conn)
141+
log.Println("kafka offset request of", length, "bytes starting from offset", consumer.offset, payload)
142+
143+
if err != nil {
144+
return 0, err
145+
}
146+
return binary.BigEndian.Uint64(payload[0:]), nil
147+
}
148+
132149
// Consume makes a single fetch request and sends the messages in the message set to a handler function
133150
func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc, stop <-chan struct{}) (int, error) {
134151
conn, err := consumer.broker.connect()
@@ -157,7 +174,25 @@ func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc M
157174
//log.Println("kafka fetch request of", length, "bytes starting from offset", consumer.offset)
158175

159176
if err != nil {
160-
return -1, err
177+
if err.Error() != "Broker Response Error: 1" {
178+
return -1, err
179+
}
180+
181+
// special case: reset offset if kafka cleaned up the file being read
182+
log.Println("ERROR fetching kafka batch at offset", consumer.offset, "- probably due to timeout or premature cleanup of kafka data file")
183+
log.Println("Fetching earliest available offset in kafka log")
184+
consumer.offset, err = consumer.reconnectFromEarliestAvailableOffset(conn)
185+
if err != nil {
186+
panic(err)
187+
}
188+
log.Println("Resuming at offset", consumer.offset)
189+
length, payload, err = consumer.broker.readResponse(conn)
190+
if err != nil {
191+
log.Println("Cannot resume consuming at new offset, needs manual intervention")
192+
if err.Error() != "Broker Response Error: 1" {
193+
return -1, err
194+
}
195+
}
161196
}
162197

163198
num := 0
@@ -166,14 +201,15 @@ func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc M
166201
currentOffset := uint64(0)
167202
for currentOffset <= uint64(length-4) {
168203
totalLength, msgs, err1 := Decode(payload[currentOffset:], consumer.codecs)
169-
if ErrIncompletePacket == err1 {
204+
if ErrIncompletePacket == err1 || ErrMalformedPacket == err1 {
170205
// Reached the end of the current packet and the last message is incomplete.
171206
if 0 == num {
172207
// This is the very first message in the batch => we need to request a larger packet
173208
// or the consumer will get stuck here indefinitely
174209
log.Printf("ERROR: Incomplete message at offset %d %d, change the configuration to a larger max fetch size\n",
175210
consumer.offset,
176211
currentOffset)
212+
log.Printf("\nPayload length: %d, currentOffset: %d, payload: [%x]\n\n", length, currentOffset, payload)
177213
} else {
178214
// Partial message at end of current batch, need a new Fetch Request from a newer offset
179215
log.Printf("DEBUG: Incomplete message at offset %d %d for topic '%s' (%s, partition %d), fetching new batch from offset %d\n",
@@ -185,10 +221,6 @@ func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc M
185221
consumer.offset+currentOffset)
186222
}
187223
break
188-
} else if ErrMalformedPacket == err1 {
189-
log.Printf("ERROR: Malformed message at offset %d %d\n",
190-
consumer.offset,
191-
currentOffset)
192224
}
193225
if err != nil {
194226
log.Printf("Payload length: %d, currentOffset: %d, payload: [%x]", length, currentOffset, payload)

kafka.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,11 @@ type Broker struct {
4444
}
4545

4646
func newBroker(hostname string, topic string, partition int) *Broker {
47-
return &Broker{topic: topic,
47+
return &Broker{
48+
topic: topic,
4849
partition: partition,
49-
hostname: hostname}
50+
hostname: hostname,
51+
}
5052
}
5153

5254
func (b *Broker) connect() (conn *net.TCPConn, error error) {

kafka_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,8 @@ func TestLongCompressedMessageRoundTripSnappy(t *testing.T) {
230230
}
231231

232232
func TestMultipleCompressedMessages(t *testing.T) {
233-
msgs := []*Message{NewMessage([]byte("testing")),
233+
msgs := []*Message{
234+
NewMessage([]byte("testing")),
234235
NewMessage([]byte("multiple")),
235236
NewMessage([]byte("messages")),
236237
}

message.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -150,14 +150,18 @@ func Decode(packet []byte, payloadCodecsMap map[byte]PayloadCodec) (uint32, []Me
150150
start := payloadLen - messageLenLeft
151151
innerMsg, err = decodeMessage(message.payload[start:], payloadCodecsMap)
152152
if nil != err {
153-
log.Println("DEBUG:", err.Error())
153+
log.Println("DEBUG: (inner compressed msg)", err.Error())
154154
if ErrIncompletePacket == err {
155-
// the current top-level message is incomplete, reached end of packet
156-
err = nil
155+
// the current top-level message is incomplete, reached end of packet:
156+
// we need a larger packet. Given we cannot advance the cursor within
157+
// a compressed message, we discard all the inner messages read so far
158+
// and force a new request. The checksum on the first decodeMessage() call
159+
// in this function should had prevented reaching this point though...
160+
return 0, []Message{}, err
157161
}
158162
break
159163
}
160-
messageLenLeft = messageLenLeft - innerMsg.totalLength - 4 // message length uint32
164+
messageLenLeft -= (4 + innerMsg.totalLength) // message length uint32
161165
messages = append(messages, *innerMsg)
162166
}
163167

request.go

+2
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ const (
3737
REQUEST_MULTIFETCH = 2
3838
REQUEST_MULTIPRODUCE = 3
3939
REQUEST_OFFSETS = 4
40+
OFFSET_LATEST int64 = -1
41+
OFFSET_EARLIEST int64 = -2
4042
)
4143

4244
// EncodeRequestHeader marshals a request into kafka's wire format

0 commit comments

Comments
 (0)