Skip to content

Commit

Permalink
statistics: enhance dumpStatsDeltaToKV to return updated batch update…
Browse files Browse the repository at this point in the history
…s and ensure proper sorting (pingcap#58929)

ref pingcap#57869
  • Loading branch information
Rustin170506 committed Jan 16, 2025
1 parent bb14aea commit ffe10c7
Showing 1 changed file with 14 additions and 9 deletions.
23 changes: 14 additions & 9 deletions pkg/statistics/handle/usage/session_stats_collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,16 @@ func (s *statsUsageImpl) DumpStatsDeltaToKV(dumpAll bool) error {
}

// Process all updates in the batch with a single transaction.
// Note: batchUpdates may be modified in dumpStatsDeltaToKV.
startTs, err := s.dumpStatsDeltaToKV(is, sctx, batchUpdates)
// Note: batchUpdates may be modified in dumpStatsDeltaToKV. (e.g. sorting, updating IsLocked)
startTs, updated, err := s.dumpStatsDeltaToKV(is, sctx, batchUpdates)
if err != nil {
return errors.Trace(err)
}
statsVersion = startTs
// Note: Ensure we use the updated slice after dumpStatsDeltaToKV,
// because dumpStatsDeltaToKV may modify the underlying array of batchUpdates.
// For example, dumpStatsDeltaToKV may sort the array.
batchUpdates = updated
intest.AssertFunc(
func() bool {
return slices.IsSortedFunc(batchUpdates, func(i, j *storage.DeltaUpdate) int {
Expand All @@ -183,7 +187,6 @@ func (s *statsUsageImpl) DumpStatsDeltaToKV(dumpAll bool) error {

// Update deltaMap after the batch is successfully dumped.
for _, update := range batchUpdates {
UpdateTableDeltaMap(deltaMap, update.TableID, -update.Delta.Delta, -update.Delta.Count)
delete(deltaMap, update.TableID)
}

Expand Down Expand Up @@ -234,14 +237,14 @@ func (s *statsUsageImpl) dumpStatsDeltaToKV(
is infoschema.InfoSchema,
sctx sessionctx.Context,
updates []*storage.DeltaUpdate,
) (statsVersion uint64, err error) {
) (statsVersion uint64, updated []*storage.DeltaUpdate, err error) {
if len(updates) == 0 {
return 0, nil
return 0, nil, nil
}
beforeLen := len(updates)
statsVersion, err = utilstats.GetStartTS(sctx)
if err != nil {
return 0, errors.Trace(err)
return 0, nil, errors.Trace(err)
}

// Collect all table IDs that need lock checking.
Expand All @@ -262,7 +265,7 @@ func (s *statsUsageImpl) dumpStatsDeltaToKV(
// Batch get lock status for all tables.
lockedTables, err := s.statsHandle.GetLockedTables(allTableIDs...)
if err != nil {
return 0, errors.Trace(err)
return 0, nil, errors.Trace(err)
}

// Prepare batch updates
Expand Down Expand Up @@ -320,10 +323,12 @@ func (s *statsUsageImpl) dumpStatsDeltaToKV(

// Batch update stats meta.
if err = storage.UpdateStatsMeta(utilstats.StatsCtx, sctx, statsVersion, updates...); err != nil {
return 0, errors.Trace(err)
return 0, nil, errors.Trace(err)
}

return statsVersion, nil
// Because we may sort the updates, we need to return the updated slice.
// Otherwise the caller may use the original slice and get wrong results.
return statsVersion, updates, nil
}

// DumpColStatsUsageToKV sweeps the whole list, updates the column stats usage map and dumps it to KV.
Expand Down

0 comments on commit ffe10c7

Please sign in to comment.