Skip to content

Commit 6276f9c

Browse files
ansssuMattDevy
authored andcommitted
fix: Notify items if an error occurs in bulk indexer (#615)
* notify items if an error occurs in bulk indexer * fix message error on line 577 * test: OnFailure is called per item * refactor: use a method to handle errors * makes code DRYer * fixes bug where `res.IsError()` resulted in BulkIndexerItem.OnError receiving a nil `err` --------- Co-authored-by: Matt Devy <[email protected]> (cherry picked from commit 6ea4a8a)
1 parent cdaf2aa commit 6276f9c

File tree

2 files changed

+156
-14
lines changed

2 files changed

+156
-14
lines changed

esutil/bulk_indexer.go

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -577,29 +577,24 @@ func (w *worker) flushBuffer(ctx context.Context) error {
577577
res, err := req.Do(ctx, w.bi.config.Client)
578578
if err != nil {
579579
atomic.AddUint64(&w.bi.stats.numFailed, uint64(len(w.items)))
580-
if w.bi.config.OnError != nil {
581-
w.bi.config.OnError(ctx, fmt.Errorf("flush: %s", err))
582-
}
583-
return fmt.Errorf("flush: %s", err)
580+
err := fmt.Errorf("flush: %w", err)
581+
w.handleError(ctx, err)
582+
return err
584583
}
585584
if res.Body != nil {
586585
defer res.Body.Close()
587586
}
588587
if res.IsError() {
589588
atomic.AddUint64(&w.bi.stats.numFailed, uint64(len(w.items)))
590-
// TODO(karmi): Wrap error (include response struct)
591-
if w.bi.config.OnError != nil {
592-
w.bi.config.OnError(ctx, fmt.Errorf("flush: %s", res.String()))
593-
}
594-
return fmt.Errorf("flush: %s", res.String())
589+
err := fmt.Errorf("flush: %s", res.String())
590+
w.handleError(ctx, err)
591+
return err
595592
}
596593

597594
if err := w.bi.config.Decoder.UnmarshalFromReader(res.Body, &blk); err != nil {
598595
// TODO(karmi): Wrap error (include response struct)
599-
if w.bi.config.OnError != nil {
600-
w.bi.config.OnError(ctx, fmt.Errorf("flush: %s", err))
601-
}
602-
return fmt.Errorf("flush: error parsing response body: %s", err)
596+
w.handleError(ctx, fmt.Errorf("flush: %w", err))
597+
return fmt.Errorf("flush: error parsing response body: %w", err)
603598
}
604599

605600
for i, blkItem := range blk.Items {
@@ -647,6 +642,21 @@ func (w *worker) flushBuffer(ctx context.Context) error {
647642
return err
648643
}
649644

645+
func (w *worker) notifyItemsOnError(ctx context.Context, err error) {
646+
for _, item := range w.items {
647+
if item.OnFailure != nil {
648+
item.OnFailure(ctx, item, BulkIndexerResponseItem{}, err)
649+
}
650+
}
651+
}
652+
653+
func (w *worker) handleError(ctx context.Context, err error) {
654+
if w.bi.config.OnError != nil {
655+
w.bi.config.OnError(ctx, err)
656+
}
657+
w.notifyItemsOnError(ctx, err)
658+
}
659+
650660
type defaultJSONDecoder struct{}
651661

652662
func (d defaultJSONDecoder) UnmarshalFromReader(r io.Reader, blk *BulkIndexerResponse) error {

esutil/bulk_indexer_internal_test.go

Lines changed: 133 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -693,6 +693,133 @@ func TestBulkIndexer(t *testing.T) {
693693
}
694694
})
695695

696+
t.Run("TooManyRequests - Fail", func(t *testing.T) {
697+
var (
698+
wg sync.WaitGroup
699+
numItems = 2
700+
)
701+
702+
esCfg := elasticsearch.Config{
703+
Transport: &mockTransport{
704+
RoundTripFunc: func(*http.Request) (*http.Response, error) {
705+
return &http.Response{
706+
StatusCode: http.StatusTooManyRequests,
707+
Status: "429 TooManyRequests",
708+
Body: io.NopCloser(strings.NewReader(`{"took":1}`)),
709+
}, nil
710+
},
711+
},
712+
713+
MaxRetries: 5,
714+
RetryOnStatus: []int{502, 503, 504, 429},
715+
RetryBackoff: func(i int) time.Duration {
716+
if os.Getenv("DEBUG") != "" {
717+
fmt.Printf("*** Retry #%d\n", i)
718+
}
719+
return time.Duration(i) * 100 * time.Millisecond
720+
},
721+
}
722+
if os.Getenv("DEBUG") != "" {
723+
esCfg.Logger = &elastictransport.ColorLogger{Output: os.Stdout}
724+
}
725+
es, _ := elasticsearch.NewClient(esCfg)
726+
727+
biCfg := BulkIndexerConfig{NumWorkers: 1, FlushBytes: 28 * 2, Client: es}
728+
if os.Getenv("DEBUG") != "" {
729+
biCfg.DebugLogger = log.New(os.Stdout, "", 0)
730+
}
731+
732+
bi, _ := NewBulkIndexer(biCfg)
733+
734+
biiFailureCallbacksCalled := atomic.Uint32{}
735+
biiSuccessCallbacksCalled := atomic.Uint32{}
736+
737+
for i := 1; i <= numItems; i++ {
738+
wg.Add(1)
739+
go func(i int) {
740+
defer wg.Done()
741+
err := bi.Add(context.Background(), BulkIndexerItem{
742+
Action: "foo",
743+
Body: strings.NewReader(`{"title":"foo"}`),
744+
OnFailure: func(ctx context.Context, item BulkIndexerItem, item2 BulkIndexerResponseItem, err error) {
745+
_ = biiFailureCallbacksCalled.Add(1)
746+
if err == nil {
747+
t.Errorf("Unexpected nil error in BulkIndexerItem.OnFailure callback")
748+
}
749+
},
750+
OnSuccess: func(ctx context.Context, item BulkIndexerItem, item2 BulkIndexerResponseItem) {
751+
_ = biiSuccessCallbacksCalled.Add(1)
752+
},
753+
})
754+
if err != nil {
755+
t.Errorf("Unexpected error: %s", err)
756+
return
757+
}
758+
}(i)
759+
}
760+
wg.Wait()
761+
762+
if err := bi.Close(context.Background()); err != nil {
763+
t.Errorf("Unexpected error: %s", err)
764+
}
765+
766+
// BulksIndexerItem.OnFailure() callbacks are called for all items.
767+
if biiFailureCallbacksCalled.Load() != uint32(numItems) {
768+
t.Errorf("Unexpected NumFailedCallbacks: want=%d, got=%d", numItems, biiFailureCallbacksCalled.Load())
769+
}
770+
771+
// BulkIndexerItem.OnSuccess() callbacks are not called.
772+
if biiSuccessCallbacksCalled.Load() != 0 {
773+
t.Errorf("Unexpected NumSuccessCallbacks: want=%d, got=%d", 0, biiSuccessCallbacksCalled.Load())
774+
}
775+
})
776+
777+
t.Run("JSON Decoder Failure", func(t *testing.T) {
778+
es, _ := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{}})
779+
780+
biFailureCallbacksCalled := atomic.Uint32{}
781+
bi, _ := NewBulkIndexer(BulkIndexerConfig{
782+
Client: es,
783+
Decoder: customJSONDecoder{
784+
err: fmt.Errorf("Custom JSON decoder error"),
785+
},
786+
OnError: func(ctx context.Context, err error) {
787+
_ = biFailureCallbacksCalled.Add(1)
788+
},
789+
})
790+
791+
biiFailureCallbacksCalled := atomic.Uint32{}
792+
793+
err := bi.Add(context.Background(), BulkIndexerItem{
794+
Action: "index",
795+
DocumentID: "1",
796+
Body: strings.NewReader(`{"title":"foo"}`),
797+
OnFailure: func(ctx context.Context, item BulkIndexerItem, item2 BulkIndexerResponseItem, err error) {
798+
_ = biiFailureCallbacksCalled.Add(1)
799+
if err == nil {
800+
t.Errorf("Unexpected nil error in BulkIndexerItem.OnFailure callback")
801+
}
802+
},
803+
})
804+
if err != nil {
805+
t.Fatalf("Unexpected error, got %s", err)
806+
}
807+
808+
if err := bi.Close(context.Background()); err != nil {
809+
t.Errorf("Unexpected error: %s", err)
810+
}
811+
812+
// BulksIndexerItem.OnFailure() callbacks are called only for failed items.
813+
if biiFailureCallbacksCalled.Load() != 1 {
814+
t.Errorf("Unexpected NumFailedCallbacks: want=%d, got=%d", 1, biiFailureCallbacksCalled.Load())
815+
}
816+
817+
// BulkIndexer.OnError() callbacks are called for all errors.
818+
if biFailureCallbacksCalled.Load() != 2 {
819+
t.Errorf("Unexpected NumFailedCallbacks: want=%d, got=%d", 2, biFailureCallbacksCalled.Load())
820+
}
821+
})
822+
696823
t.Run("Custom JSON Decoder", func(t *testing.T) {
697824
es, _ := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{}})
698825
bi, _ := NewBulkIndexer(BulkIndexerConfig{Client: es, Decoder: customJSONDecoder{}})
@@ -1080,8 +1207,13 @@ func TestBulkIndexerItem(t *testing.T) {
10801207
})
10811208
}
10821209

1083-
type customJSONDecoder struct{}
1210+
type customJSONDecoder struct {
1211+
err error
1212+
}
10841213

10851214
func (d customJSONDecoder) UnmarshalFromReader(r io.Reader, blk *BulkIndexerResponse) error {
1215+
if d.err != nil {
1216+
return d.err
1217+
}
10861218
return json.NewDecoder(r).Decode(blk)
10871219
}

0 commit comments

Comments
 (0)