Skip to content

Commit f069f0b

Browse files
authored
statistics: optimize stats delta dumping with batch processing (#58996)
ref #57869
1 parent 0950fe3 commit f069f0b

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+494
-606
lines changed

pkg/disttask/importinto/proto.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,6 @@ type SharedVars struct {
110110
TableImporter *importer.TableImporter
111111
DataEngine *backend.OpenedEngine
112112
IndexEngine *backend.OpenedEngine
113-
Progress *importer.Progress
114113

115114
mu sync.Mutex
116115
Checksum *verification.KVGroupChecksum
@@ -183,5 +182,4 @@ type Checksum struct {
183182
// This portion of the code may be implemented uniformly in the framework in the future.
184183
type Result struct {
185184
LoadedRowCnt uint64
186-
ColSizeMap map[int64]int64
187185
}

pkg/disttask/importinto/scheduler.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -577,14 +577,9 @@ func updateResult(handle storage.TaskHandle, task *proto.Task, taskMeta *TaskMet
577577
}
578578
subtaskMetas = append(subtaskMetas, &subtaskMeta)
579579
}
580-
columnSizeMap := make(map[int64]int64)
581580
for _, subtaskMeta := range subtaskMetas {
582581
taskMeta.Result.LoadedRowCnt += subtaskMeta.Result.LoadedRowCnt
583-
for key, val := range subtaskMeta.Result.ColSizeMap {
584-
columnSizeMap[key] += val
585-
}
586582
}
587-
taskMeta.Result.ColSizeMap = columnSizeMap
588583

589584
if globalSort {
590585
taskMeta.Result.LoadedRowCnt, err = getLoadedRowCountOnGlobalSort(handle, task)
@@ -662,8 +657,7 @@ func (sch *ImportSchedulerExt) finishJob(ctx context.Context, logger *zap.Logger
662657
func(ctx context.Context) (bool, error) {
663658
return true, taskHandle.WithNewSession(func(se sessionctx.Context) error {
664659
if err := importer.FlushTableStats(ctx, se, taskMeta.Plan.TableInfo.ID, &importer.JobImportResult{
665-
Affected: taskMeta.Result.LoadedRowCnt,
666-
ColSizeMap: taskMeta.Result.ColSizeMap,
660+
Affected: taskMeta.Result.LoadedRowCnt,
667661
}); err != nil {
668662
logger.Warn("flush table stats failed", zap.Error(err))
669663
}

pkg/disttask/importinto/subtask_executor.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ func (e *importMinimalTaskExecutor) Run(ctx context.Context, dataWriter, indexWr
6969
sharedVars.TableImporter,
7070
sharedVars.DataEngine,
7171
sharedVars.IndexEngine,
72-
sharedVars.Progress,
7372
logger,
7473
checksum,
7574
); err != nil {
@@ -82,7 +81,6 @@ func (e *importMinimalTaskExecutor) Run(ctx context.Context, dataWriter, indexWr
8281
sharedVars.TableImporter,
8382
dataWriter,
8483
indexWriter,
85-
sharedVars.Progress,
8684
logger,
8785
checksum,
8886
); err != nil {

pkg/disttask/importinto/task_executor.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,6 @@ func (s *importStepExecutor) RunSubtask(ctx context.Context, subtask *proto.Subt
154154
TableImporter: s.tableImporter,
155155
DataEngine: dataEngine,
156156
IndexEngine: indexEngine,
157-
Progress: importer.NewProgress(),
158157
Checksum: verification.NewKVGroupChecksumWithKeyspace(s.tableImporter.GetKeySpace()),
159158
SortedDataMeta: &external.SortedKVMeta{},
160159
SortedIndexMetas: make(map[int64]*external.SortedKVMeta),
@@ -251,7 +250,6 @@ func (s *importStepExecutor) OnFinished(ctx context.Context, subtask *proto.Subt
251250
}
252251
subtaskMeta.Result = Result{
253252
LoadedRowCnt: dataKVCount,
254-
ColSizeMap: sharedVars.Progress.GetColSize(),
255253
}
256254
allocators := sharedVars.TableImporter.Allocators()
257255
subtaskMeta.MaxIDs = map[autoid.AllocatorType]int64{

pkg/domain/domain.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2370,6 +2370,7 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err
23702370
return nil
23712371
}
23722372
do.SetStatsUpdating(true)
2373+
do.wg.Run(do.asyncLoadHistogram, "asyncLoadHistogram")
23732374
// The stats updated worker doesn't require the stats initialization to be completed.
23742375
// This is because the updated worker's primary responsibilities are to update the change delta and handle DDL operations.
23752376
// These tasks do not interfere with or depend on the initialization process.
@@ -2522,6 +2523,33 @@ func (do *Domain) loadStatsWorker() {
25222523
if err != nil {
25232524
logutil.BgLogger().Warn("update stats info failed", zap.Error(err))
25242525
}
2526+
case <-do.exit:
2527+
return
2528+
}
2529+
}
2530+
}
2531+
2532+
func (do *Domain) asyncLoadHistogram() {
2533+
defer util.Recover(metrics.LabelDomain, "asyncLoadStats", nil, false)
2534+
lease := do.statsLease
2535+
if lease == 0 {
2536+
lease = 3 * time.Second
2537+
}
2538+
cleanupTicker := time.NewTicker(lease)
2539+
defer func() {
2540+
cleanupTicker.Stop()
2541+
logutil.BgLogger().Info("asyncLoadStats exited.")
2542+
}()
2543+
select {
2544+
case <-do.StatsHandle().InitStatsDone:
2545+
case <-do.exit: // It may happen that before initStatsDone, tidb receive Ctrl+C
2546+
return
2547+
}
2548+
statsHandle := do.StatsHandle()
2549+
var err error
2550+
for {
2551+
select {
2552+
case <-cleanupTicker.C:
25252553
err = statsHandle.LoadNeededHistograms(do.InfoSchema())
25262554
if err != nil {
25272555
logutil.BgLogger().Warn("load histograms failed", zap.Error(err))

pkg/executor/delete.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ func (e *DeleteExec) removeRow(ctx sessionctx.Context, t table.Table, h kv.Handl
274274
return err
275275
}
276276

277-
err = t.RemoveRecord(ctx.GetTableCtx(), txn, h, data, posInfo.ExtraPartialRowOption)
277+
err = t.RemoveRecord(ctx.GetTableCtx(), txn, h, data, posInfo.IndexesRowLayout)
278278
if err != nil {
279279
return err
280280
}

pkg/executor/importer/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ go_library(
99
"job.go",
1010
"kv_encode.go",
1111
"precheck.go",
12-
"progress.go",
1312
"table_import.go",
1413
],
1514
importpath = "github.com/pingcap/tidb/pkg/executor/importer",

pkg/executor/importer/engine_process.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ func ProcessChunk(
3030
chunk *checkpoints.ChunkCheckpoint,
3131
tableImporter *TableImporter,
3232
dataEngine, indexEngine *backend.OpenedEngine,
33-
progress *Progress,
3433
logger *zap.Logger,
3534
groupChecksum *verification.KVGroupChecksum,
3635
) error {
@@ -65,7 +64,7 @@ func ProcessChunk(
6564
}
6665
}()
6766

68-
return ProcessChunkWithWriter(ctx, chunk, tableImporter, dataWriter, indexWriter, progress, logger, groupChecksum)
67+
return ProcessChunkWithWriter(ctx, chunk, tableImporter, dataWriter, indexWriter, logger, groupChecksum)
6968
}
7069

7170
// ProcessChunkWithWriter processes a chunk, and write kv pairs to dataWriter and indexWriter.
@@ -74,7 +73,6 @@ func ProcessChunkWithWriter(
7473
chunk *checkpoints.ChunkCheckpoint,
7574
tableImporter *TableImporter,
7675
dataWriter, indexWriter backend.EngineWriter,
77-
progress *Progress,
7876
logger *zap.Logger,
7977
groupChecksum *verification.KVGroupChecksum,
8078
) error {
@@ -116,6 +114,5 @@ func ProcessChunkWithWriter(
116114
if err != nil {
117115
return err
118116
}
119-
progress.AddColSize(encoder.GetColumnSize())
120117
return nil
121118
}

pkg/executor/importer/import.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1395,9 +1395,8 @@ func getDataSourceType(p *plannercore.ImportInto) DataSourceType {
13951395

13961396
// JobImportResult is the result of the job import.
13971397
type JobImportResult struct {
1398-
Affected uint64
1399-
Warnings []contextutil.SQLWarn
1400-
ColSizeMap variable.DeltaColsMap
1398+
Affected uint64
1399+
Warnings []contextutil.SQLWarn
14011400
}
14021401

14031402
// GetMsgFromBRError get msg from BR error.

pkg/executor/importer/importer_testkit_test.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -314,11 +314,9 @@ func TestProcessChunkWith(t *testing.T) {
314314
defer ti.Backend().CloseEngineMgr()
315315
kvWriter := mock.NewMockEngineWriter(ctrl)
316316
kvWriter.EXPECT().AppendRows(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
317-
progress := importer.NewProgress()
318317
checksum := verify.NewKVGroupChecksumWithKeyspace(keyspace)
319-
err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, progress, zap.NewExample(), checksum)
318+
err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, zap.NewExample(), checksum)
320319
require.NoError(t, err)
321-
require.Len(t, progress.GetColSize(), 3)
322320
checksumMap := checksum.GetInnerChecksums()
323321
require.Len(t, checksumMap, 1)
324322
require.Equal(t, verify.MakeKVChecksum(74, 2, 15625182175392723123), *checksumMap[verify.DataKVGroupID])
@@ -346,11 +344,9 @@ func TestProcessChunkWith(t *testing.T) {
346344
ti.SetSelectedRowCh(rowsCh)
347345
kvWriter := mock.NewMockEngineWriter(ctrl)
348346
kvWriter.EXPECT().AppendRows(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
349-
progress := importer.NewProgress()
350347
checksum := verify.NewKVGroupChecksumWithKeyspace(keyspace)
351-
err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, progress, zap.NewExample(), checksum)
348+
err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, zap.NewExample(), checksum)
352349
require.NoError(t, err)
353-
require.Len(t, progress.GetColSize(), 3)
354350
checksumMap := checksum.GetInnerChecksums()
355351
require.Len(t, checksumMap, 1)
356352
require.Equal(t, verify.MakeKVChecksum(111, 3, 14585065391351463171), *checksumMap[verify.DataKVGroupID])

0 commit comments

Comments
 (0)