@@ -912,6 +912,66 @@ func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error {
912912 }
913913}
914914
915+ func (r * Reader ) FetchMessageBatch (ctx context.Context , batchSize int ) ([]Message , error ) {
916+ r .activateReadLag ()
917+ msgBatch := make ([]Message , 0 , batchSize )
918+
919+ var i int
920+ for i <= batchSize {
921+ r .mutex .Lock ()
922+
923+ if ! r .closed && r .version == 0 {
924+ r .start (r .getTopicPartitionOffset ())
925+ }
926+
927+ version := r .version
928+ r .mutex .Unlock ()
929+
930+ select {
931+ case <- ctx .Done ():
932+ return []Message {}, ctx .Err ()
933+
934+ case err := <- r .runError :
935+ return []Message {}, err
936+
937+ case m , ok := <- r .msgs :
938+ if ! ok {
939+ return []Message {}, io .EOF
940+ }
941+
942+ if m .version < version {
943+ continue
944+ }
945+
946+ r .mutex .Lock ()
947+
948+ switch {
949+ case m .error != nil :
950+ case version == r .version :
951+ r .offset = m .message .Offset + 1
952+ r .lag = m .watermark - r .offset
953+ }
954+
955+ r .mutex .Unlock ()
956+
957+ if errors .Is (m .error , io .EOF ) {
958+ // io.EOF is used as a marker to indicate that the stream
959+ // has been closed, in case it was received from the inner
960+ // reader we don't want to confuse the program and replace
961+ // the error with io.ErrUnexpectedEOF.
962+ m .error = io .ErrUnexpectedEOF
963+ }
964+ if m .error != nil {
965+ return nil , m .error
966+ }
967+
968+ msgBatch = append (msgBatch , m .message )
969+ }
970+ i ++
971+ }
972+ return msgBatch , nil
973+ }
974+
915975// ReadLag returns the current lag of the reader by fetching the last offset of
916976// the topic and partition and computing the difference between that value and
917977// the offset of the last message returned by ReadMessage.
@@ -1487,7 +1547,7 @@ func (r *reader) initialize(ctx context.Context, offset int64) (conn *Conn, star
14871547 return
14881548}
14891549
1490- func (r * reader ) read (ctx context.Context , offset int64 , conn * Conn ) (int64 , error ) {
1550+ func (r * reader ) read (ctx context.Context , offset int64 , conn * Conn , batchSize int ) (int64 , error ) {
14911551 r .stats .fetches .observe (1 )
14921552 r .stats .offset .observe (offset )
14931553
0 commit comments