Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: refactor stats meta handling to use DeltaUpdate for multi-table support #58657

Merged
merged 5 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions pkg/statistics/handle/ddl/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,9 +413,7 @@ func updateGlobalTableStats4DropPartition(
ctx,
sctx,
startTS,
variable.TableDelta{Count: count, Delta: delta},
globalTableInfo.ID,
isLocked,
storage.NewDeltaUpdate(globalTableInfo.ID, variable.TableDelta{Count: count, Delta: delta}, isLocked),
))
}

Expand Down Expand Up @@ -597,9 +595,7 @@ func updateGlobalTableStats4TruncatePartition(
ctx,
sctx,
startTS,
variable.TableDelta{Count: count, Delta: delta},
globalTableInfo.ID,
isLocked,
storage.NewDeltaUpdate(globalTableInfo.ID, variable.TableDelta{Count: count, Delta: delta}, isLocked),
)
if err != nil {
fields := truncatePartitionsLogFields(
Expand Down
97 changes: 76 additions & 21 deletions pkg/statistics/handle/storage/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"slices"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/parser/ast"
Expand Down Expand Up @@ -55,36 +56,90 @@ func UpdateStatsVersion(ctx context.Context, sctx sessionctx.Context) error {
return nil
}

// UpdateStatsMeta update the stats meta stat for this Table.
// DeltaUpdate is the delta update for stats meta.
type DeltaUpdate struct {
Delta variable.TableDelta
TableID int64
IsLocked bool
}

// NewDeltaUpdate creates a new DeltaUpdate.
func NewDeltaUpdate(tableID int64, delta variable.TableDelta, isLocked bool) *DeltaUpdate {
return &DeltaUpdate{
Delta: delta,
TableID: tableID,
IsLocked: isLocked,
}
}

// UpdateStatsMeta updates the stats meta for multiple tables.
// It uses the INSERT INTO ... ON DUPLICATE KEY UPDATE syntax to fill the missing records.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw you update the function from one tabledelta to multi DeltaUpdate. But there is no code in PR to use the multi DeltaUpdate. Will/Where you use the multi interface in the future?
If the answer is yes, could you please add some test for the multi deltaupdate which is not same as the past.

Copy link
Member Author

@Rustin170506 Rustin170506 Jan 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned in the PR body, this PR split from #58331. You can find the usage from there.

If the answer is yes, could you please add some test for the multi deltaupdate which is not same as the past.

We have a lot of tests to check the count and modify_count. I think they have covered this change. To verify it independently, we need to use the mock session to execute the SQL. From my previous experience of using and maintaining it, it is difficult to understand and maintain. (Difficult to follow the mock logic and the mock framework.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also verified the SQL change.

  1. First create the test table:
CREATE TABLE IF NOT EXISTS mysql.stats_meta (
    version BIGINT UNSIGNED NOT NULL,
    table_id BIGINT UNSIGNED NOT NULL,
    modify_count BIGINT UNSIGNED NOT NULL DEFAULT 0,
    count BIGINT UNSIGNED NOT NULL DEFAULT 0,
    PRIMARY KEY (table_id)
) ENGINE=InnoDB;
  1. Insert initial test data:
INSERT INTO mysql.stats_meta (version, table_id, modify_count, count) 
VALUES (900, 100, 10, 25)
ON DUPLICATE KEY UPDATE 
    version = VALUES(version),
    modify_count = VALUES(modify_count),
    count = VALUES(count);
  1. array version of negative value handling SQL:
INSERT INTO mysql.stats_meta (version, table_id, modify_count, count) 
VALUES (1000, 100, 1, 10)
ON DUPLICATE KEY UPDATE 
    version = VALUES(version),
    modify_count = modify_count + VALUES(modify_count),
    count = IF(count > VALUES(count), count - VALUES(count), 0);
  1. or a single version of the negative value handling SQL:
INSERT INTO mysql.stats_meta (version, table_id, modify_count, count) 
VALUES (1000, 100, 1, 0)
ON DUPLICATE KEY UPDATE 
    version = VALUES(version),
    modify_count = modify_count + VALUES(modify_count),
    count = IF(count > 10, count - 10, 0);
  1. View results:
SELECT * FROM mysql.stats_meta WHERE table_id = 100;

For other changes, it is easy to follow, only for the negitive number there is a little refactoring.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there is no good way to test multiple table updates now because we're not using it yet. I guess we can add some tests after we start using it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no problem leaving the manual testing steps in the PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned in the PR body, this PR split from #58331. You can find the usage from there.

If the answer is yes, could you please add some test for the multi deltaupdate which is not same as the past.

We have a lot of tests to check the count and modify_count. I think they have covered this change. To verify it independently, we need to use the mock session to execute the SQL. From my previous experience of using and maintaining it, it is difficult to understand and maintain. (Difficult to follow the mock logic and the mock framework.)

OK, no question for me. Just want to know what is this PR used for.

func UpdateStatsMeta(
ctx context.Context,
sctx sessionctx.Context,
startTS uint64,
delta variable.TableDelta,
id int64,
isLocked bool,
updates ...*DeltaUpdate,
) (err error) {
if isLocked {
// use INSERT INTO ... ON DUPLICATE KEY UPDATE here to fill missing stats_table_locked.
// Note: For locked tables, it is possible that the record gets deleted. So it can be negative.
_, err = statsutil.ExecWithCtx(ctx, sctx, "insert into mysql.stats_table_locked (version, table_id, modify_count, count) values (%?, %?, %?, %?) on duplicate key "+
"update version = values(version), modify_count = modify_count + values(modify_count), count = count + values(count)",
startTS, id, delta.Count, delta.Delta)
} else {
if delta.Delta < 0 {
// use INSERT INTO ... ON DUPLICATE KEY UPDATE here to fill missing stats_meta.
_, err = statsutil.ExecWithCtx(ctx, sctx, "insert into mysql.stats_meta (version, table_id, modify_count, count) values (%?, %?, %?, 0) on duplicate key "+
"update version = values(version), modify_count = modify_count + values(modify_count), count = if(count > %?, count - %?, 0)",
startTS, id, delta.Count, -delta.Delta, -delta.Delta)
if len(updates) == 0 {
return nil
}

// Separate locked and unlocked updates
var lockedValues, unlockedPosValues, unlockedNegValues []string
var cacheInvalidateIDs []int64

for _, update := range updates {
if update.IsLocked {
lockedValues = append(lockedValues, fmt.Sprintf("(%d, %d, %d, %d)",
startTS, update.TableID, update.Delta.Count, update.Delta.Delta))
} else {
// use INSERT INTO ... ON DUPLICATE KEY UPDATE here to fill missing stats_meta.
_, err = statsutil.ExecWithCtx(ctx, sctx, "insert into mysql.stats_meta (version, table_id, modify_count, count) values (%?, %?, %?, %?) on duplicate key "+
"update version = values(version), modify_count = modify_count + values(modify_count), count = count + values(count)", startTS,
id, delta.Count, delta.Delta)
if update.Delta.Delta < 0 {
unlockedNegValues = append(unlockedNegValues, fmt.Sprintf("(%d, %d, %d, %d)",
startTS, update.TableID, update.Delta.Count, -update.Delta.Delta))
} else {
unlockedPosValues = append(unlockedPosValues, fmt.Sprintf("(%d, %d, %d, %d)",
startTS, update.TableID, update.Delta.Count, update.Delta.Delta))
}
cacheInvalidateIDs = append(cacheInvalidateIDs, update.TableID)
}
}

// Execute locked updates
if len(lockedValues) > 0 {
sql := fmt.Sprintf("insert into mysql.stats_table_locked (version, table_id, modify_count, count) values %s "+
"on duplicate key update version = values(version), modify_count = modify_count + values(modify_count), "+
"count = count + values(count)", strings.Join(lockedValues, ","))
if _, err = statsutil.ExecWithCtx(ctx, sctx, sql); err != nil {
return err
}
}

// Execute unlocked updates with positive delta
if len(unlockedPosValues) > 0 {
sql := fmt.Sprintf("insert into mysql.stats_meta (version, table_id, modify_count, count) values %s "+
"on duplicate key update version = values(version), modify_count = modify_count + values(modify_count), "+
"count = count + values(count)", strings.Join(unlockedPosValues, ","))
if _, err = statsutil.ExecWithCtx(ctx, sctx, sql); err != nil {
return err
}
}

// Execute unlocked updates with negative delta
if len(unlockedNegValues) > 0 {
sql := fmt.Sprintf("insert into mysql.stats_meta (version, table_id, modify_count, count) values %s "+
"on duplicate key update version = values(version), modify_count = modify_count + values(modify_count), "+
"count = if(count > values(count), count - values(count), 0)", strings.Join(unlockedNegValues, ","))
if _, err = statsutil.ExecWithCtx(ctx, sctx, sql); err != nil {
return err
}
}

// Invalidate cache for all unlocked tables
for _, id := range cacheInvalidateIDs {
cache.TableRowStatsCache.Invalidate(id)
}
return err

return nil
}

// InsertExtendedStats inserts a record into mysql.stats_extended and update version in mysql.stats_meta.
Expand Down
23 changes: 18 additions & 5 deletions pkg/statistics/handle/usage/session_stats_collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,12 @@ func (s *statsUsageImpl) dumpTableStatCountToKV(is infoschema.InfoSchema, physic
}
tableOrPartitionLocked := isTableLocked || isPartitionLocked
isLocked = tableOrPartitionLocked
if err = storage.UpdateStatsMeta(utilstats.StatsCtx, sctx, statsVersion, delta,
physicalTableID, tableOrPartitionLocked); err != nil {
if err = storage.UpdateStatsMeta(
utilstats.StatsCtx,
sctx,
statsVersion,
storage.NewDeltaUpdate(physicalTableID, delta, tableOrPartitionLocked),
); err != nil {
return err
}
affectedRows += sctx.GetSessionVars().StmtCtx.AffectedRows()
Expand All @@ -190,7 +194,12 @@ func (s *statsUsageImpl) dumpTableStatCountToKV(is infoschema.InfoSchema, physic
// To sum up, we only need to update the global-stats when the table and the partition are not locked.
if !isTableLocked && !isPartitionLocked {
// If it's a partitioned table and its global-stats exists, update its count and modify_count as well.
if err = storage.UpdateStatsMeta(utilstats.StatsCtx, sctx, statsVersion, delta, tableID, isTableLocked); err != nil {
if err = storage.UpdateStatsMeta(
utilstats.StatsCtx,
sctx,
statsVersion,
storage.NewDeltaUpdate(tableID, delta, isTableLocked),
); err != nil {
return err
}
affectedRows += sctx.GetSessionVars().StmtCtx.AffectedRows()
Expand All @@ -203,8 +212,12 @@ func (s *statsUsageImpl) dumpTableStatCountToKV(is infoschema.InfoSchema, physic
isTableLocked = true
}
isLocked = isTableLocked
if err = storage.UpdateStatsMeta(utilstats.StatsCtx, sctx, statsVersion, delta,
physicalTableID, isTableLocked); err != nil {
if err = storage.UpdateStatsMeta(
utilstats.StatsCtx,
sctx,
statsVersion,
storage.NewDeltaUpdate(physicalTableID, delta, isTableLocked),
); err != nil {
return err
}
affectedRows += sctx.GetSessionVars().StmtCtx.AffectedRows()
Expand Down