Skip to content

Commit 1a0c3ac

Browse files
authored
ddl: fix resuming to wrong checkpoint when failed during adding index (#55506) (#55532)
close #55488
1 parent b23ad3c commit 1a0c3ac

File tree

4 files changed

+48
-15
lines changed

4 files changed

+48
-15
lines changed

pkg/ddl/backfilling.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,11 @@ func loadTableRanges(
452452
zap.Int64("physicalTableID", t.GetPhysicalID()))
453453
return []kv.KeyRange{{StartKey: startKey, EndKey: endKey}}, nil
454454
}
455+
failpoint.Inject("setLimitForLoadTableRanges", func(val failpoint.Value) {
456+
if v, ok := val.(int); ok {
457+
limit = v
458+
}
459+
})
455460

456461
rc := s.GetRegionCache()
457462
maxSleep := 10000 // ms
@@ -466,6 +471,12 @@ func loadTableRanges(
466471
if err != nil {
467472
return false, errors.Trace(err)
468473
}
474+
var mockErr bool
475+
failpoint.InjectCall("beforeLoadRangeFromPD", &mockErr)
476+
if mockErr {
477+
return false, kv.ErrTxnRetryable
478+
}
479+
469480
ranges = make([]kv.KeyRange, 0, len(rs))
470481
for _, r := range rs {
471482
ranges = append(ranges, kv.KeyRange{StartKey: r.StartKey(), EndKey: r.EndKey()})
@@ -636,12 +647,7 @@ func makeupDecodeColMap(dbName model.CIStr, t table.Table) (map[int64]decoder.Co
636647
return decodeColMap, nil
637648
}
638649

639-
var backfillTaskChanSize = 128
640-
641-
// SetBackfillTaskChanSizeForTest is only used for test.
642-
func SetBackfillTaskChanSizeForTest(n int) {
643-
backfillTaskChanSize = n
644-
}
650+
const backfillTaskChanSize = 128
645651

646652
func (dc *ddlCtx) runAddIndexInLocalIngestMode(
647653
ctx context.Context,

pkg/ddl/backfilling_operators.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ func (src *TableScanTaskSource) adjustStartKey(start, end kv.Key) (adjusted kv.K
346346
if src.cpMgr == nil {
347347
return start, false
348348
}
349-
cpKey := src.cpMgr.LastProcessedKey()
349+
cpKey := src.cpMgr.NextKeyToProcess()
350350
if len(cpKey) == 0 {
351351
return start, false
352352
}
@@ -364,7 +364,7 @@ func (src *TableScanTaskSource) adjustStartKey(start, end kv.Key) (adjusted kv.K
364364
if cpKey.Cmp(end) == 0 {
365365
return cpKey, true
366366
}
367-
return cpKey.Next(), false
367+
return cpKey, false
368368
}
369369

370370
func (src *TableScanTaskSource) generateTasks() error {

pkg/ddl/ingest/checkpoint.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,9 +156,9 @@ func (s *CheckpointManager) IsKeyProcessed(end kv.Key) bool {
156156
return s.localDataIsValid && len(s.flushedKeyLowWatermark) > 0 && end.Cmp(s.flushedKeyLowWatermark) <= 0
157157
}
158158

159-
// LastProcessedKey finds the last processed key in checkpoint.
160-
// If there is no processed key, it returns nil.
161-
func (s *CheckpointManager) LastProcessedKey() kv.Key {
159+
// NextKeyToProcess finds the next unprocessed key in checkpoint.
160+
// If there is no such key, it returns nil.
161+
func (s *CheckpointManager) NextKeyToProcess() kv.Key {
162162
s.mu.Lock()
163163
defer s.mu.Unlock()
164164

tests/realtikvtest/addindextest3/ingest_test.go

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -348,10 +348,10 @@ func TestAddIndexSplitTableRanges(t *testing.T) {
348348
}
349349
tk.MustQuery("split table t between (0) and (80000) regions 7;").Check(testkit.Rows("6 1"))
350350

351-
ddl.SetBackfillTaskChanSizeForTest(4)
351+
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/setLimitForLoadTableRanges", "return(4)")
352352
tk.MustExec("alter table t add index idx(b);")
353353
tk.MustExec("admin check table t;")
354-
ddl.SetBackfillTaskChanSizeForTest(7)
354+
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/setLimitForLoadTableRanges", "return(7)")
355355
tk.MustExec("alter table t add index idx_2(b);")
356356
tk.MustExec("admin check table t;")
357357

@@ -361,10 +361,37 @@ func TestAddIndexSplitTableRanges(t *testing.T) {
361361
tk.MustExec(fmt.Sprintf("insert into t values (%d, %d);", i*10000, i*10000))
362362
}
363363
tk.MustQuery("split table t by (10000),(20000),(30000),(40000),(50000),(60000);").Check(testkit.Rows("6 1"))
364-
ddl.SetBackfillTaskChanSizeForTest(4)
364+
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/setLimitForLoadTableRanges", "return(4)")
365+
tk.MustExec("alter table t add unique index idx(b);")
366+
tk.MustExec("admin check table t;")
367+
}
368+
369+
func TestAddIndexLoadTableRangeError(t *testing.T) {
370+
store := realtikvtest.CreateMockStoreAndSetup(t)
371+
tk := testkit.NewTestKit(t, store)
372+
tk.MustExec("drop database if exists addindexlit;")
373+
tk.MustExec("create database addindexlit;")
374+
tk.MustExec("use addindexlit;")
375+
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)
376+
tk.MustExec(`set global tidb_enable_dist_task=off;`) // Use checkpoint manager.
377+
378+
tk.MustExec("create table t (a int primary key, b int);")
379+
for i := 0; i < 8; i++ {
380+
tk.MustExec(fmt.Sprintf("insert into t values (%d, %d);", i*10000, i*10000))
381+
}
382+
tk.MustQuery("split table t by (10000),(20000),(30000),(40000),(50000),(60000);").Check(testkit.Rows("6 1"))
383+
384+
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/setLimitForLoadTableRanges", "return(3)")
385+
var batchCnt int
386+
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeLoadRangeFromPD", func(mockErr *bool) {
387+
batchCnt++
388+
if batchCnt == 2 {
389+
*mockErr = true
390+
}
391+
})
392+
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/ingest/forceSyncFlagForTest", "return")
365393
tk.MustExec("alter table t add unique index idx(b);")
366394
tk.MustExec("admin check table t;")
367-
ddl.SetBackfillTaskChanSizeForTest(1024)
368395
}
369396

370397
func TestAddIndexMockFlushError(t *testing.T) {

0 commit comments

Comments
 (0)