Skip to content

Commit

Permalink
statistics: refactor stats meta handling to use DeltaUpdate for multi…
Browse files Browse the repository at this point in the history
…-table support (#58657)

ref #57869
  • Loading branch information
Rustin170506 authored Jan 7, 2025
1 parent b2f2faa commit 67249cb
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 32 deletions.
8 changes: 2 additions & 6 deletions pkg/statistics/handle/ddl/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,9 +413,7 @@ func updateGlobalTableStats4DropPartition(
ctx,
sctx,
startTS,
variable.TableDelta{Count: count, Delta: delta},
globalTableInfo.ID,
isLocked,
storage.NewDeltaUpdate(globalTableInfo.ID, variable.TableDelta{Count: count, Delta: delta}, isLocked),
))
}

Expand Down Expand Up @@ -597,9 +595,7 @@ func updateGlobalTableStats4TruncatePartition(
ctx,
sctx,
startTS,
variable.TableDelta{Count: count, Delta: delta},
globalTableInfo.ID,
isLocked,
storage.NewDeltaUpdate(globalTableInfo.ID, variable.TableDelta{Count: count, Delta: delta}, isLocked),
)
if err != nil {
fields := truncatePartitionsLogFields(
Expand Down
97 changes: 76 additions & 21 deletions pkg/statistics/handle/storage/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"slices"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/parser/ast"
Expand Down Expand Up @@ -55,36 +56,90 @@ func UpdateStatsVersion(ctx context.Context, sctx sessionctx.Context) error {
return nil
}

// UpdateStatsMeta update the stats meta stat for this Table.
// DeltaUpdate is the delta update for stats meta.
type DeltaUpdate struct {
Delta variable.TableDelta
TableID int64
IsLocked bool
}

// NewDeltaUpdate creates a new DeltaUpdate.
func NewDeltaUpdate(tableID int64, delta variable.TableDelta, isLocked bool) *DeltaUpdate {
return &DeltaUpdate{
Delta: delta,
TableID: tableID,
IsLocked: isLocked,
}
}

// UpdateStatsMeta updates the stats meta for multiple tables.
// It uses the INSERT INTO ... ON DUPLICATE KEY UPDATE syntax to fill the missing records.
func UpdateStatsMeta(
ctx context.Context,
sctx sessionctx.Context,
startTS uint64,
delta variable.TableDelta,
id int64,
isLocked bool,
updates ...*DeltaUpdate,
) (err error) {
if isLocked {
// use INSERT INTO ... ON DUPLICATE KEY UPDATE here to fill missing stats_table_locked.
// Note: For locked tables, it is possible that the record gets deleted. So it can be negative.
_, err = statsutil.ExecWithCtx(ctx, sctx, "insert into mysql.stats_table_locked (version, table_id, modify_count, count) values (%?, %?, %?, %?) on duplicate key "+
"update version = values(version), modify_count = modify_count + values(modify_count), count = count + values(count)",
startTS, id, delta.Count, delta.Delta)
} else {
if delta.Delta < 0 {
// use INSERT INTO ... ON DUPLICATE KEY UPDATE here to fill missing stats_meta.
_, err = statsutil.ExecWithCtx(ctx, sctx, "insert into mysql.stats_meta (version, table_id, modify_count, count) values (%?, %?, %?, 0) on duplicate key "+
"update version = values(version), modify_count = modify_count + values(modify_count), count = if(count > %?, count - %?, 0)",
startTS, id, delta.Count, -delta.Delta, -delta.Delta)
if len(updates) == 0 {
return nil
}

// Separate locked and unlocked updates
var lockedValues, unlockedPosValues, unlockedNegValues []string
var cacheInvalidateIDs []int64

for _, update := range updates {
if update.IsLocked {
lockedValues = append(lockedValues, fmt.Sprintf("(%d, %d, %d, %d)",
startTS, update.TableID, update.Delta.Count, update.Delta.Delta))
} else {
// use INSERT INTO ... ON DUPLICATE KEY UPDATE here to fill missing stats_meta.
_, err = statsutil.ExecWithCtx(ctx, sctx, "insert into mysql.stats_meta (version, table_id, modify_count, count) values (%?, %?, %?, %?) on duplicate key "+
"update version = values(version), modify_count = modify_count + values(modify_count), count = count + values(count)", startTS,
id, delta.Count, delta.Delta)
if update.Delta.Delta < 0 {
unlockedNegValues = append(unlockedNegValues, fmt.Sprintf("(%d, %d, %d, %d)",
startTS, update.TableID, update.Delta.Count, -update.Delta.Delta))
} else {
unlockedPosValues = append(unlockedPosValues, fmt.Sprintf("(%d, %d, %d, %d)",
startTS, update.TableID, update.Delta.Count, update.Delta.Delta))
}
cacheInvalidateIDs = append(cacheInvalidateIDs, update.TableID)
}
}

// Execute locked updates
if len(lockedValues) > 0 {
sql := fmt.Sprintf("insert into mysql.stats_table_locked (version, table_id, modify_count, count) values %s "+
"on duplicate key update version = values(version), modify_count = modify_count + values(modify_count), "+
"count = count + values(count)", strings.Join(lockedValues, ","))
if _, err = statsutil.ExecWithCtx(ctx, sctx, sql); err != nil {
return err
}
}

// Execute unlocked updates with positive delta
if len(unlockedPosValues) > 0 {
sql := fmt.Sprintf("insert into mysql.stats_meta (version, table_id, modify_count, count) values %s "+
"on duplicate key update version = values(version), modify_count = modify_count + values(modify_count), "+
"count = count + values(count)", strings.Join(unlockedPosValues, ","))
if _, err = statsutil.ExecWithCtx(ctx, sctx, sql); err != nil {
return err
}
}

// Execute unlocked updates with negative delta
if len(unlockedNegValues) > 0 {
sql := fmt.Sprintf("insert into mysql.stats_meta (version, table_id, modify_count, count) values %s "+
"on duplicate key update version = values(version), modify_count = modify_count + values(modify_count), "+
"count = if(count > values(count), count - values(count), 0)", strings.Join(unlockedNegValues, ","))
if _, err = statsutil.ExecWithCtx(ctx, sctx, sql); err != nil {
return err
}
}

// Invalidate cache for all unlocked tables
for _, id := range cacheInvalidateIDs {
cache.TableRowStatsCache.Invalidate(id)
}
return err

return nil
}

// InsertExtendedStats inserts a record into mysql.stats_extended and update version in mysql.stats_meta.
Expand Down
23 changes: 18 additions & 5 deletions pkg/statistics/handle/usage/session_stats_collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,12 @@ func (s *statsUsageImpl) dumpTableStatCountToKV(is infoschema.InfoSchema, physic
}
tableOrPartitionLocked := isTableLocked || isPartitionLocked
isLocked = tableOrPartitionLocked
if err = storage.UpdateStatsMeta(utilstats.StatsCtx, sctx, statsVersion, delta,
physicalTableID, tableOrPartitionLocked); err != nil {
if err = storage.UpdateStatsMeta(
utilstats.StatsCtx,
sctx,
statsVersion,
storage.NewDeltaUpdate(physicalTableID, delta, tableOrPartitionLocked),
); err != nil {
return err
}
affectedRows += sctx.GetSessionVars().StmtCtx.AffectedRows()
Expand All @@ -190,7 +194,12 @@ func (s *statsUsageImpl) dumpTableStatCountToKV(is infoschema.InfoSchema, physic
// To sum up, we only need to update the global-stats when the table and the partition are not locked.
if !isTableLocked && !isPartitionLocked {
// If it's a partitioned table and its global-stats exists, update its count and modify_count as well.
if err = storage.UpdateStatsMeta(utilstats.StatsCtx, sctx, statsVersion, delta, tableID, isTableLocked); err != nil {
if err = storage.UpdateStatsMeta(
utilstats.StatsCtx,
sctx,
statsVersion,
storage.NewDeltaUpdate(tableID, delta, isTableLocked),
); err != nil {
return err
}
affectedRows += sctx.GetSessionVars().StmtCtx.AffectedRows()
Expand All @@ -203,8 +212,12 @@ func (s *statsUsageImpl) dumpTableStatCountToKV(is infoschema.InfoSchema, physic
isTableLocked = true
}
isLocked = isTableLocked
if err = storage.UpdateStatsMeta(utilstats.StatsCtx, sctx, statsVersion, delta,
physicalTableID, isTableLocked); err != nil {
if err = storage.UpdateStatsMeta(
utilstats.StatsCtx,
sctx,
statsVersion,
storage.NewDeltaUpdate(physicalTableID, delta, isTableLocked),
); err != nil {
return err
}
affectedRows += sctx.GetSessionVars().StmtCtx.AffectedRows()
Expand Down

0 comments on commit 67249cb

Please sign in to comment.