diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 6ad03dd45cebc..c50785df5f22d 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -48,6 +48,8 @@ const ( // MaxResolveLocksbackupOffSleep is the maximum sleep time for resolving locks. // 10 minutes for every round. MaxResolveLocksbackupOffSleepMs = 600000 + + IncompleteRangesUpdateInterval = time.Second * 15 ) // ClientMgr manages connections needed by backup. @@ -69,6 +71,13 @@ type Checksum struct { // ProgressUnit represents the unit of progress. type ProgressUnit string +const ( + // UnitRange represents the progress updated counter when a range finished. + UnitRange ProgressUnit = "range" + // UnitRegion represents the progress updated counter when a region finished. + UnitRegion ProgressUnit = "region" +) + type MainBackupLoop struct { BackupSender @@ -82,7 +91,7 @@ type MainBackupLoop struct { ReplicaReadLabel map[string]string StateNotifier chan BackupRetryPolicy - ProgressCallBack func() + ProgressCallBack func(ProgressUnit) GetBackupClientCallBack func(ctx context.Context, storeID uint64, reset bool) (backuppb.BackupClient, error) } @@ -176,6 +185,9 @@ func (bc *Client) RunLoop(ctx context.Context, loop *MainBackupLoop) error { round := uint64(0) // reset grpc connection every round except key_locked error. reset := true + // update incompleteRanges to advance the progress and the request. + incompleteRangesUpdateTicker := time.NewTicker(IncompleteRangesUpdateInterval) + defer incompleteRangesUpdateTicker.Stop() mainLoop: for { round += 1 @@ -211,8 +223,7 @@ mainLoop: mainCancel() return ctx.Err() default: - iter := loop.GlobalProgressTree.Iter() - inCompleteRanges = iter.GetIncompleteRanges() + inCompleteRanges = loop.GlobalProgressTree.GetIncompleteRanges() if len(inCompleteRanges) == 0 { // all range backuped logutil.CL(ctx).Info("This round finished all backup ranges", zap.Uint64("round", round)) @@ -259,6 +270,7 @@ mainLoop: } // infinite loop to collect region backup response to global channel loop.CollectStoreBackupsAsync(handleCtx, round, storeBackupResultChMap, globalBackupResultCh) + incompleteRangesUpdateTicker.Reset(IncompleteRangesUpdateInterval) handleLoop: for { select { @@ -266,6 +278,13 @@ mainLoop: handleCancel() mainCancel() return ctx.Err() + case <-incompleteRangesUpdateTicker.C: + startUpdate := time.Now() + inCompleteRanges = loop.GlobalProgressTree.GetIncompleteRanges() + loop.BackupReq.SubRanges = getBackupRanges(inCompleteRanges) + elapsed := time.Since(startUpdate) + log.Info("update the incomplete ranges", zap.Duration("take", elapsed)) + incompleteRangesUpdateTicker.Reset(max(5*elapsed, IncompleteRangesUpdateInterval)) case storeBackupInfo := <-loop.StateNotifier: if storeBackupInfo.All { logutil.CL(mainCtx).Info("cluster state changed. restart store backups", zap.Uint64("round", round)) @@ -352,7 +371,7 @@ mainLoop: if lock != nil { allTxnLocks = append(allTxnLocks, lock) } - loop.ProgressCallBack() + loop.ProgressCallBack(UnitRegion) } } } @@ -562,7 +581,7 @@ func (bc *Client) StartCheckpointRunner( backupTS uint64, ranges []rtree.Range, safePointID string, - progressCallBack func(), + progressCallBack func(ProgressUnit), ) (err error) { if bc.checkpointMeta == nil { bc.checkpointMeta = &checkpoint.CheckpointMetadataForBackup{ @@ -603,13 +622,12 @@ func (bc *Client) getProgressRange(r rtree.Range) *rtree.ProgressRange { if bc.checkpointMeta != nil && len(bc.checkpointMeta.CheckpointDataMap) > 0 { rangeTree, exists := bc.checkpointMeta.CheckpointDataMap[groupKey] if exists { - incomplete := rangeTree.GetIncompleteRange(r.StartKey, r.EndKey) delete(bc.checkpointMeta.CheckpointDataMap, groupKey) return &rtree.ProgressRange{ - Res: rangeTree, - Incomplete: incomplete, - Origin: r, - GroupKey: groupKey, + Res: rangeTree, + Origin: r, + GroupKey: groupKey, + Complete: false, } } } @@ -617,17 +635,15 @@ func (bc *Client) getProgressRange(r rtree.Range) *rtree.ProgressRange { // the origin range are not recorded in checkpoint // return the default progress range return &rtree.ProgressRange{ - Res: rtree.NewRangeTree(), - Incomplete: []rtree.Range{ - r, - }, + Res: rtree.NewRangeTree(), Origin: r, GroupKey: groupKey, + Complete: false, } } // LoadCheckpointRange loads the checkpoint(finished) sub-ranges of the current range, and calculate its incompleted sub-ranges. -func (bc *Client) loadCheckpointRanges(ctx context.Context, progressCallBack func()) (map[string]rtree.RangeTree, error) { +func (bc *Client) loadCheckpointRanges(ctx context.Context, progressCallBack func(ProgressUnit)) (map[string]rtree.RangeTree, error) { rangeDataMap := make(map[string]rtree.RangeTree) pastDureTime, err := checkpoint.WalkCheckpointFileForBackup(ctx, bc.storage, bc.cipher, func(groupKey string, rg checkpoint.BackupValueType) { @@ -637,7 +653,7 @@ func (bc *Client) loadCheckpointRanges(ctx context.Context, progressCallBack fun rangeDataMap[groupKey] = rangeTree } rangeTree.Put(rg.StartKey, rg.EndKey, rg.Files) - progressCallBack() + progressCallBack(UnitRegion) }) // we should adjust start-time of the summary to `pastDureTime` earlier @@ -1082,7 +1098,7 @@ func (bc *Client) BackupRanges( concurrency uint, replicaReadLabel map[string]string, metaWriter *metautil.MetaWriter, - progressCallBack func(), + progressCallBack func(ProgressUnit), ) error { log.Info("Backup Ranges Started", rtree.ZapRanges(ranges)) init := time.Now() @@ -1101,6 +1117,7 @@ func (bc *Client) BackupRanges( if err != nil { return errors.Trace(err) } + globalProgressTree.SetCallBack(func() { progressCallBack(UnitRange) }) stateNotifier := make(chan BackupRetryPolicy) ObserveStoreChangesAsync(ctx, stateNotifier, bc.mgr.GetPDClient()) diff --git a/br/pkg/backup/client_test.go b/br/pkg/backup/client_test.go index 03a6d94dd0c04..bc4700ed83db6 100644 --- a/br/pkg/backup/client_test.go +++ b/br/pkg/backup/client_test.go @@ -387,7 +387,7 @@ func TestOnBackupResponse(t *testing.T) { require.NoError(t, err) require.Nil(t, lock) - incomplete := tree.Iter().GetIncompleteRanges() + incomplete := tree.GetIncompleteRanges() require.Len(t, incomplete, 1) require.Equal(t, []byte("b"), incomplete[0].StartKey) require.Equal(t, []byte("c"), incomplete[0].EndKey) @@ -398,7 +398,7 @@ func TestOnBackupResponse(t *testing.T) { lock, err = s.backupClient.OnBackupResponse(ctx, r, errContext, &tree) require.NoError(t, err) require.Nil(t, lock) - incomplete = tree.Iter().GetIncompleteRanges() + incomplete = tree.GetIncompleteRanges() require.Len(t, incomplete, 0) // case #5: failed case, key is locked @@ -507,7 +507,7 @@ func TestMainBackupLoop(t *testing.T) { GlobalProgressTree: &tree, ReplicaReadLabel: nil, StateNotifier: ch, - ProgressCallBack: func() {}, + ProgressCallBack: func(backup.ProgressUnit) {}, GetBackupClientCallBack: mockGetBackupClientCallBack, } @@ -548,7 +548,7 @@ func TestMainBackupLoop(t *testing.T) { GlobalProgressTree: &tree, ReplicaReadLabel: nil, StateNotifier: ch, - ProgressCallBack: func() {}, + ProgressCallBack: func(backup.ProgressUnit) {}, GetBackupClientCallBack: mockGetBackupClientCallBack, } @@ -600,7 +600,7 @@ func TestMainBackupLoop(t *testing.T) { GlobalProgressTree: &tree, ReplicaReadLabel: nil, StateNotifier: ch, - ProgressCallBack: func() {}, + ProgressCallBack: func(backup.ProgressUnit) {}, GetBackupClientCallBack: mockGetBackupClientCallBack, } go func() { @@ -655,7 +655,7 @@ func TestMainBackupLoop(t *testing.T) { GlobalProgressTree: &tree, ReplicaReadLabel: nil, StateNotifier: ch, - ProgressCallBack: func() {}, + ProgressCallBack: func(backup.ProgressUnit) {}, GetBackupClientCallBack: mockGetBackupClientCallBack, } go func() { @@ -712,7 +712,7 @@ func TestMainBackupLoop(t *testing.T) { GlobalProgressTree: &tree, ReplicaReadLabel: nil, StateNotifier: ch, - ProgressCallBack: func() {}, + ProgressCallBack: func(backup.ProgressUnit) {}, GetBackupClientCallBack: mockGetBackupClientCallBack, } go func() { diff --git a/br/pkg/rtree/BUILD.bazel b/br/pkg/rtree/BUILD.bazel index 49f8dfcb17d5c..4582bb385017e 100644 --- a/br/pkg/rtree/BUILD.bazel +++ b/br/pkg/rtree/BUILD.bazel @@ -32,7 +32,7 @@ go_test( ], flaky = True, race = "on", - shard_count = 5, + shard_count = 6, deps = [ ":rtree", "//pkg/kv", diff --git a/br/pkg/rtree/rtree.go b/br/pkg/rtree/rtree.go index aefa1343d4150..b9b868e1d4456 100644 --- a/br/pkg/rtree/rtree.go +++ b/br/pkg/rtree/rtree.go @@ -316,10 +316,11 @@ func (rangeTree *RangeTree) GetIncompleteRange( } type ProgressRange struct { - Res RangeTree - Incomplete []Range - Origin Range - GroupKey string + Res RangeTree + Origin Range + GroupKey string + // only for statistic + Complete bool } // Less impls btree.Item. @@ -332,15 +333,24 @@ func (pr *ProgressRange) Less(than *ProgressRange) bool { // All the progress ranges it sorted do not overlap. type ProgressRangeTree struct { *btree.BTreeG[*ProgressRange] + + completeCallBack func() } // NewProgressRangeTree returns an empty range tree. func NewProgressRangeTree() ProgressRangeTree { return ProgressRangeTree{ BTreeG: btree.NewG[*ProgressRange](32, (*ProgressRange).Less), + + completeCallBack: func() {}, } } +// SetCallBack set the complete call back to update the progress. +func (rangeTree *ProgressRangeTree) SetCallBack(callback func()) { + rangeTree.completeCallBack = callback +} + // find is a helper function to find an item that contains the range. func (rangeTree *ProgressRangeTree) find(pr *ProgressRange) *ProgressRange { var ret *ProgressRange @@ -392,50 +402,23 @@ func (rangeTree *ProgressRangeTree) FindContained(startKey, endKey []byte) (*Pro return ret, nil } -type incompleteRangesFetcherItem struct { - pr *ProgressRange - complete bool -} - -type IncompleteRangesFetcher struct { - items []*incompleteRangesFetcherItem - left int -} - -func (rangeTree *ProgressRangeTree) Iter() *IncompleteRangesFetcher { - items := make([]*incompleteRangesFetcherItem, 0, rangeTree.Len()) - rangeTree.Ascend(func(item *ProgressRange) bool { - items = append(items, &incompleteRangesFetcherItem{ - pr: item, - complete: false, - }) - return true - }) - return &IncompleteRangesFetcher{ - items: items, - left: len(items), - } -} - -func (iter *IncompleteRangesFetcher) GetIncompleteRanges() []Range { +func (rangeTree *ProgressRangeTree) GetIncompleteRanges() []Range { // about 64 MB memory if there are 1 million ranges - incompleteRanges := make([]Range, 0, len(iter.items)) - for _, item := range iter.items { - if item.complete { - continue - } - - incomplete := item.pr.Res.GetIncompleteRange(item.pr.Origin.StartKey, item.pr.Origin.EndKey) + incompleteRanges := make([]Range, 0, rangeTree.Len()) + rangeTree.Ascend(func(item *ProgressRange) bool { + // NOTE: maybe there is a late response whose range overlaps with an existing item, which + // may cause the complete range tree to become incomplete. Therefore, `item.Complete` is + // only for statistic. + incomplete := item.Res.GetIncompleteRange(item.Origin.StartKey, item.Origin.EndKey) if len(incomplete) == 0 { - item.complete = true - iter.left -= 1 - continue + if !item.Complete { + item.Complete = true + rangeTree.completeCallBack() + } + return true } incompleteRanges = append(incompleteRanges, incomplete...) - } + return true + }) return incompleteRanges } - -func (iter *IncompleteRangesFetcher) Len() int { - return iter.left -} diff --git a/br/pkg/rtree/rtree_test.go b/br/pkg/rtree/rtree_test.go index bf1235e2dd0db..8846b56957921 100644 --- a/br/pkg/rtree/rtree_test.go +++ b/br/pkg/rtree/rtree_test.go @@ -239,8 +239,7 @@ func TestProgressRangeTree(t *testing.T) { require.NoError(t, prTree.Insert(buildProgressRange("cc", "dd"))) require.NoError(t, prTree.Insert(buildProgressRange("ee", "ff"))) - prIter := prTree.Iter() - ranges := prIter.GetIncompleteRanges() + ranges := prTree.GetIncompleteRanges() require.Equal(t, rtree.Range{StartKey: []byte("aa"), EndKey: []byte("cc")}, ranges[0]) require.Equal(t, rtree.Range{StartKey: []byte("cc"), EndKey: []byte("dd")}, ranges[1]) require.Equal(t, rtree.Range{StartKey: []byte("ee"), EndKey: []byte("ff")}, ranges[2]) @@ -253,7 +252,7 @@ func TestProgressRangeTree(t *testing.T) { require.NoError(t, err) pr.Res.Put([]byte("cc"), []byte("dd"), nil) - ranges = prIter.GetIncompleteRanges() + ranges = prTree.GetIncompleteRanges() require.Equal(t, rtree.Range{StartKey: []byte("aa"), EndKey: []byte("aaa")}, ranges[0]) require.Equal(t, rtree.Range{StartKey: []byte("b"), EndKey: []byte("cc")}, ranges[1]) require.Equal(t, rtree.Range{StartKey: []byte("ee"), EndKey: []byte("ff")}, ranges[2]) @@ -270,6 +269,38 @@ func TestProgressRangeTree(t *testing.T) { require.NoError(t, err) pr.Res.Put([]byte("ee"), []byte("ff"), nil) - ranges = prIter.GetIncompleteRanges() + ranges = prTree.GetIncompleteRanges() require.Equal(t, 0, len(ranges)) } + +func TestProgreeRangeTreeCallBack(t *testing.T) { + prTree := rtree.NewProgressRangeTree() + + require.NoError(t, prTree.Insert(buildProgressRange("a", "b"))) + require.NoError(t, prTree.Insert(buildProgressRange("c", "d"))) + require.NoError(t, prTree.Insert(buildProgressRange("e", "f"))) + + completeCount := 0 + prTree.SetCallBack(func() { completeCount += 1 }) + + pr, err := prTree.FindContained([]byte("a"), []byte("b")) + require.NoError(t, err) + pr.Res.Put([]byte("a"), []byte("b"), nil) + ranges := prTree.GetIncompleteRanges() + require.Equal(t, completeCount, 1) + require.Equal(t, rtree.Range{StartKey: []byte("c"), EndKey: []byte("d")}, ranges[0]) + require.Equal(t, rtree.Range{StartKey: []byte("e"), EndKey: []byte("f")}, ranges[1]) + + pr.Res.Put([]byte("a"), []byte("aa"), nil) + ranges = prTree.GetIncompleteRanges() + require.Equal(t, completeCount, 1) + require.Equal(t, rtree.Range{StartKey: []byte("aa"), EndKey: []byte("b")}, ranges[0]) + require.Equal(t, rtree.Range{StartKey: []byte("c"), EndKey: []byte("d")}, ranges[1]) + require.Equal(t, rtree.Range{StartKey: []byte("e"), EndKey: []byte("f")}, ranges[2]) + + pr.Res.Put([]byte("a"), []byte("b"), nil) + ranges = prTree.GetIncompleteRanges() + require.Equal(t, completeCount, 1) + require.Equal(t, rtree.Range{StartKey: []byte("c"), EndKey: []byte("d")}, ranges[0]) + require.Equal(t, rtree.Range{StartKey: []byte("e"), EndKey: []byte("f")}, ranges[1]) +} diff --git a/br/pkg/task/backup.go b/br/pkg/task/backup.go index af92518043a90..1243cd334bf0b 100644 --- a/br/pkg/task/backup.go +++ b/br/pkg/task/backup.go @@ -620,34 +620,34 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig } summary.CollectInt("backup total ranges", len(ranges)) - - approximateRegions, err := getRegionCountOfRanges(ctx, mgr, ranges) + progressTotalCount, progressUnit, err := getProgressCountOfRanges(ctx, mgr, ranges) if err != nil { return errors.Trace(err) } // Redirect to log if there is no log file to avoid unreadable output. updateCh := g.StartProgress( - ctx, cmdName, int64(approximateRegions), !cfg.LogProgress) - summary.CollectInt("backup total regions", approximateRegions) + ctx, cmdName, int64(progressTotalCount), !cfg.LogProgress) progressCount := uint64(0) - progressCallBack := func() { - updateCh.Inc() - failpoint.Inject("progress-call-back", func(v failpoint.Value) { - log.Info("failpoint progress-call-back injected") - atomic.AddUint64(&progressCount, 1) - if fileName, ok := v.(string); ok { - f, osErr := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY, os.ModePerm) - if osErr != nil { - log.Warn("failed to create file", zap.Error(osErr)) - } - msg := []byte(fmt.Sprintf("region:%d\n", atomic.LoadUint64(&progressCount))) - _, err = f.Write(msg) - if err != nil { - log.Warn("failed to write data to file", zap.Error(err)) + progressCallBack := func(callBackUnit backup.ProgressUnit) { + if progressUnit == callBackUnit { + updateCh.Inc() + failpoint.Inject("progress-call-back", func(v failpoint.Value) { + log.Info("failpoint progress-call-back injected") + atomic.AddUint64(&progressCount, 1) + if fileName, ok := v.(string); ok { + f, osErr := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY, os.ModePerm) + if osErr != nil { + log.Warn("failed to create file", zap.Error(osErr)) + } + msg := []byte(fmt.Sprintf("%s:%d\n", progressUnit, atomic.LoadUint64(&progressCount))) + _, err = f.Write(msg) + if err != nil { + log.Warn("failed to write data to file", zap.Error(err)) + } } - } - }) + }) + } } if cfg.UseCheckpoint { @@ -746,21 +746,30 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig return nil } -func getRegionCountOfRanges( +func getProgressCountOfRanges( ctx context.Context, mgr *conn.Mgr, ranges []rtree.Range, -) (int, error) { +) (int, backup.ProgressUnit, error) { + if len(ranges) > 1000 { + return len(ranges), backup.UnitRange, nil + } + failpoint.Inject("progress-call-back", func(_ failpoint.Value) { + if len(ranges) > 100 { + failpoint.Return(len(ranges), backup.UnitRange, nil) + } + }) // The number of regions need to backup approximateRegions := 0 for _, r := range ranges { regionCount, err := mgr.GetRegionCount(ctx, r.StartKey, r.EndKey) if err != nil { - return 0, errors.Trace(err) + return 0, backup.UnitRegion, errors.Trace(err) } approximateRegions += regionCount } - return approximateRegions, nil + summary.CollectInt("backup total regions", approximateRegions) + return approximateRegions, backup.UnitRegion, nil } // ParseTSString port from tidb setSnapshotTS. diff --git a/br/pkg/task/backup_raw.go b/br/pkg/task/backup_raw.go index 1bb273a55581e..0bb42403ea8a2 100644 --- a/br/pkg/task/backup_raw.go +++ b/br/pkg/task/backup_raw.go @@ -191,7 +191,10 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf updateCh := g.StartProgress( ctx, cmdName, int64(approximateRegions), !cfg.LogProgress) - progressCallBack := func() { + progressCallBack := func(unit backup.ProgressUnit) { + if unit == backup.UnitRange { + return + } updateCh.Inc() } diff --git a/br/pkg/task/backup_txn.go b/br/pkg/task/backup_txn.go index 3ec3321b0ff68..fd4e4071ff066 100644 --- a/br/pkg/task/backup_txn.go +++ b/br/pkg/task/backup_txn.go @@ -177,7 +177,10 @@ func RunBackupTxn(c context.Context, g glue.Glue, cmdName string, cfg *TxnKvConf updateCh := g.StartProgress( ctx, cmdName, int64(approximateRegions), !cfg.LogProgress) - progressCallBack := func() { + progressCallBack := func(unit backup.ProgressUnit) { + if unit == backup.UnitRange { + return + } updateCh.Inc() } backupTS, err := client.GetCurrentTS(ctx) diff --git a/br/tests/br_300_small_tables/run.sh b/br/tests/br_300_small_tables/run.sh index e83558b439902..0d37ae04d9943 100644 --- a/br/tests/br_300_small_tables/run.sh +++ b/br/tests/br_300_small_tables/run.sh @@ -47,7 +47,7 @@ backupv2_size=`grep "backup-data-size" "${BACKUPMETAV2_LOG}" | grep -oP '\[\K[^\ echo "backup meta v2 backup size is ${backupv2_size}" export GO_FAILPOINTS="" -if [[ "$(wc -l <$PROGRESS_FILE)" == "1" ]] && [[ $(grep -c "region" $PROGRESS_FILE) == "1" ]]; +if [[ "$(wc -l <$PROGRESS_FILE)" == "1" ]] && [[ $(grep -c "range" $PROGRESS_FILE) == "1" ]]; then echo "use the correct progress unit" else