diff --git a/pkg/statistics/handle/handletest/statstest/BUILD.bazel b/pkg/statistics/handle/handletest/statstest/BUILD.bazel index 9493c25c354e0..b2310f7388794 100644 --- a/pkg/statistics/handle/handletest/statstest/BUILD.bazel +++ b/pkg/statistics/handle/handletest/statstest/BUILD.bazel @@ -9,7 +9,7 @@ go_test( ], flaky = True, race = "on", - shard_count = 13, + shard_count = 14, deps = [ "//pkg/config", "//pkg/parser/ast", diff --git a/pkg/statistics/handle/handletest/statstest/stats_test.go b/pkg/statistics/handle/handletest/statstest/stats_test.go index 37587a5b2376c..679688626bcc6 100644 --- a/pkg/statistics/handle/handletest/statstest/stats_test.go +++ b/pkg/statistics/handle/handletest/statstest/stats_test.go @@ -439,3 +439,32 @@ func TestInitStatsIssue41938(t *testing.T) { require.NoError(t, h.InitStats(context.Background(), dom.InfoSchema())) h.SetLease(0) } + +func TestDumpStatsDeltaInBatch(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + testKit := testkit.NewTestKit(t, store) + testKit.MustExec("use test") + testKit.MustExec("create table t1 (c1 int, c2 int)") + testKit.MustExec("insert into t1 values (1, 1), (2, 2), (3, 3)") + testKit.MustExec("create table t2 (c1 int, c2 int)") + testKit.MustExec("insert into t2 values (1, 1), (2, 2), (3, 3)") + + // Dump stats delta in one batch. + handle := dom.StatsHandle() + require.NoError(t, handle.DumpStatsDeltaToKV(true)) + + // Check the mysql.stats_meta table. + rows := testKit.MustQuery("select modify_count, count, version from mysql.stats_meta order by table_id").Rows() + require.Len(t, rows, 2) + + require.Equal(t, "3", rows[0][0]) + require.Equal(t, "3", rows[0][1]) + require.Equal(t, "3", rows[1][0]) + require.Equal(t, "3", rows[1][1]) + require.Equal( + t, + rows[0][2], + rows[1][2], + "The version of two tables should be the same because they are dumped in the same transaction.", + ) +} diff --git a/pkg/statistics/handle/storage/update.go b/pkg/statistics/handle/storage/update.go index 1ffb6314cb038..0660a67123ba7 100644 --- a/pkg/statistics/handle/storage/update.go +++ b/pkg/statistics/handle/storage/update.go @@ -74,6 +74,7 @@ func NewDeltaUpdate(tableID int64, delta variable.TableDelta, isLocked bool) *De // UpdateStatsMeta updates the stats meta for multiple tables. // It uses the INSERT INTO ... ON DUPLICATE KEY UPDATE syntax to fill the missing records. +// Note: Make sure call this function in a transaction. func UpdateStatsMeta( ctx context.Context, sctx sessionctx.Context, @@ -85,14 +86,22 @@ func UpdateStatsMeta( } // Separate locked and unlocked updates - var lockedValues, unlockedPosValues, unlockedNegValues []string - var cacheInvalidateIDs []int64 + // In most cases, the number of locked tables is small. + lockedTableIDs := make([]string, 0, 20) + lockedValues := make([]string, 0, 20) + // In most cases, the number of unlocked tables is large. + unlockedTableIDs := make([]string, 0, len(updates)) + unlockedPosValues := make([]string, 0, max(len(updates)/2, 1)) + unlockedNegValues := make([]string, 0, max(len(updates)/2, 1)) + cacheInvalidateIDs := make([]int64, 0, len(updates)) for _, update := range updates { if update.IsLocked { + lockedTableIDs = append(lockedTableIDs, fmt.Sprintf("%d", update.TableID)) lockedValues = append(lockedValues, fmt.Sprintf("(%d, %d, %d, %d)", startTS, update.TableID, update.Delta.Count, update.Delta.Delta)) } else { + unlockedTableIDs = append(unlockedTableIDs, fmt.Sprintf("%d", update.TableID)) if update.Delta.Delta < 0 { unlockedNegValues = append(unlockedNegValues, fmt.Sprintf("(%d, %d, %d, %d)", startTS, update.TableID, update.Delta.Count, -update.Delta.Delta)) @@ -104,6 +113,22 @@ func UpdateStatsMeta( } } + // Lock the stats_meta and stats_table_locked tables using SELECT FOR UPDATE to prevent write conflicts. + // This ensures that we acquire the necessary locks before attempting to update the tables, reducing the likelihood + // of encountering lock conflicts during the update process. + lockedTableIDsStr := strings.Join(lockedTableIDs, ",") + if lockedTableIDsStr != "" { + if _, err = statsutil.ExecWithCtx(ctx, sctx, fmt.Sprintf("select * from mysql.stats_table_locked where table_id in (%s) for update", lockedTableIDsStr)); err != nil { + return err + } + } + + unlockedTableIDsStr := strings.Join(unlockedTableIDs, ",") + if unlockedTableIDsStr != "" { + if _, err = statsutil.ExecWithCtx(ctx, sctx, fmt.Sprintf("select * from mysql.stats_meta where table_id in (%s) for update", unlockedTableIDsStr)); err != nil { + return err + } + } // Execute locked updates if len(lockedValues) > 0 { sql := fmt.Sprintf("insert into mysql.stats_table_locked (version, table_id, modify_count, count) values %s "+ diff --git a/pkg/statistics/handle/usage/BUILD.bazel b/pkg/statistics/handle/usage/BUILD.bazel index 6738d529aab9a..ffbccaee53aab 100644 --- a/pkg/statistics/handle/usage/BUILD.bazel +++ b/pkg/statistics/handle/usage/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "//pkg/metrics", "//pkg/sessionctx", "//pkg/sessionctx/variable", + "//pkg/statistics/handle/logutil", "//pkg/statistics/handle/storage", "//pkg/statistics/handle/types", "//pkg/statistics/handle/usage/indexusage", @@ -26,6 +27,7 @@ go_library( "//pkg/util/sqlescape", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", + "@org_uber_go_zap//:zap", ], ) diff --git a/pkg/statistics/handle/usage/session_stats_collect.go b/pkg/statistics/handle/usage/session_stats_collect.go index 078dee2346f24..410bfa10614f9 100644 --- a/pkg/statistics/handle/usage/session_stats_collect.go +++ b/pkg/statistics/handle/usage/session_stats_collect.go @@ -29,12 +29,14 @@ import ( "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" + statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil" "github.com/pingcap/tidb/pkg/statistics/handle/storage" utilstats "github.com/pingcap/tidb/pkg/statistics/handle/util" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/sqlescape" + "go.uber.org/zap" ) var ( @@ -82,6 +84,11 @@ func (s *statsUsageImpl) needDumpStatsDelta(is infoschema.InfoSchema, dumpAll bo return false } +const ( + dumpDeltaBatchSize = 100_000 + tooSlowThreshold = 20 * time.Second +) + // DumpStatsDeltaToKV sweeps the whole list and updates the global map, then we dumps every table that held in map to KV. // If the mode is `DumpDelta`, it will only dump that delta info that `Modify Count / Table Count` greater than a ratio. func (s *statsUsageImpl) DumpStatsDeltaToKV(dumpAll bool) error { @@ -91,96 +98,196 @@ func (s *statsUsageImpl) DumpStatsDeltaToKV(dumpAll bool) error { dur := time.Since(start) metrics.StatsDeltaUpdateHistogram.Observe(dur.Seconds()) }() + s.SweepSessionStatsList() deltaMap := s.SessionTableDelta().GetDeltaAndReset() defer func() { s.SessionTableDelta().Merge(deltaMap) }() + if time.Since(start) > tooSlowThreshold { + statslogutil.SingletonStatsSamplerLogger().Warn("Sweeping session list is too slow", + zap.Int("tableCount", len(deltaMap)), + zap.Duration("duration", time.Since(start))) + } + + // Sort table IDs to ensure a consistent dump order to reduce the chance of deadlock. + tableIDs := make([]int64, 0, len(deltaMap)) + for id := range deltaMap { + tableIDs = append(tableIDs, id) + } + slices.Sort(tableIDs) + + // Check if recording historical stats meta is enabled. + // TODO: Once RecordHistoricalStatsMeta supports batch processing, this check can be removed and handled within the API. + skipRecordHistoricalStatsMeta := false + err := utilstats.CallWithSCtx(s.statsHandle.SPool(), func(sctx sessionctx.Context) error { + if !sctx.GetSessionVars().EnableHistoricalStats { + skipRecordHistoricalStatsMeta = true + return nil + } + return nil + }) + if err != nil { + return errors.Trace(err) + } + + // Dump stats delta in batches. + for i := 0; i < len(tableIDs); i += dumpDeltaBatchSize { + end := i + dumpDeltaBatchSize + if end > len(tableIDs) { + end = len(tableIDs) + } + + batchTableIDs := tableIDs[i:end] + var ( + statsVersion uint64 + batchUpdates []*storage.DeltaUpdate + ) + batchStart := time.Now() + err := utilstats.CallWithSCtx(s.statsHandle.SPool(), func(sctx sessionctx.Context) error { + is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) + batchUpdates = make([]*storage.DeltaUpdate, 0, len(batchTableIDs)) + // Collect all updates in the batch. + for _, id := range batchTableIDs { + item := deltaMap[id] + if !s.needDumpStatsDelta(is, dumpAll, id, item, batchStart) { + continue + } + batchUpdates = append(batchUpdates, storage.NewDeltaUpdate(id, item, false)) + } + if time.Since(batchStart) > tooSlowThreshold { + statslogutil.SingletonStatsSamplerLogger().Warn("Collecting batch updates is too slow", + zap.Int("tableCount", len(batchUpdates)), + zap.Duration("duration", time.Since(batchStart))) + } - return utilstats.CallWithSCtx(s.statsHandle.SPool(), func(sctx sessionctx.Context) error { - is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) - currentTime := time.Now() - for id, item := range deltaMap { - if !s.needDumpStatsDelta(is, dumpAll, id, item, currentTime) { - continue + if len(batchUpdates) == 0 { + return nil } - updated, err := s.dumpTableStatCountToKV(is, id, item) + + // Process all updates in the batch with a single transaction. + // Note: batchUpdates may be modified in dumpStatsDeltaToKV. + startTs, err := s.dumpStatsDeltaToKV(is, sctx, batchUpdates) if err != nil { return errors.Trace(err) } - if updated { - UpdateTableDeltaMap(deltaMap, id, -item.Delta, -item.Count) - delete(deltaMap, id) - } else { - m := deltaMap[id] - deltaMap[id] = m + statsVersion = startTs + intest.AssertFunc( + func() bool { + return slices.IsSortedFunc(batchUpdates, func(i, j *storage.DeltaUpdate) int { + return cmp.Compare(i.TableID, j.TableID) + }) + }, + "batchUpdates should be sorted by table ID", + ) + + // Update deltaMap after the batch is successfully dumped. + for _, update := range batchUpdates { + UpdateTableDeltaMap(deltaMap, update.TableID, -update.Delta.Delta, -update.Delta.Count) + delete(deltaMap, update.TableID) } - } - return nil - }) -} -// dumpTableStatDeltaToKV dumps a single delta with some table to KV and updates the version. -// For a partitioned table, we will update its global-stats as well. -func (s *statsUsageImpl) dumpTableStatCountToKV(is infoschema.InfoSchema, physicalTableID int64, delta variable.TableDelta) (updated bool, err error) { - statsVersion := uint64(0) - isLocked := false - defer func() { - // Only record the historical stats meta when the table is not locked because all stats meta are stored in the locked table. - if err == nil && statsVersion != 0 && !isLocked { - failpoint.Inject("panic-when-record-historical-stats-meta", func() { - panic("panic when record historical stats meta") - }) - s.statsHandle.RecordHistoricalStatsMeta(physicalTableID, statsVersion, "flush stats", false) - } - }() - if delta.Count == 0 { - return true, nil - } + if time.Since(batchStart) > tooSlowThreshold { + statslogutil.SingletonStatsSamplerLogger().Warn("Dumping batch updates is too slow", + zap.Int("tableCount", len(batchUpdates)), + zap.Duration("duration", time.Since(batchStart))) + } - err = utilstats.CallWithSCtx(s.statsHandle.SPool(), func(sctx sessionctx.Context) error { - statsVersion, err = utilstats.GetStartTS(sctx) + return nil + }, utilstats.FlagWrapTxn) if err != nil { return errors.Trace(err) } + if !skipRecordHistoricalStatsMeta { + startRecordHistoricalStatsMeta := time.Now() + // Record historical stats meta for all tables one by one. + // FIXME: Although this feature is currently disabled, it would be beneficial to implement it in batches for efficiency. + for _, update := range batchUpdates { + if !update.IsLocked { + failpoint.Inject("panic-when-record-historical-stats-meta", func() { + panic("panic when record historical stats meta") + }) + s.statsHandle.RecordHistoricalStatsMeta(update.TableID, statsVersion, "flush stats", false) + } + } + if time.Since(startRecordHistoricalStatsMeta) > tooSlowThreshold { + statslogutil.SingletonStatsSamplerLogger().Warn("Recording historical stats meta is too slow", + zap.Int("tableCount", len(batchUpdates)), + zap.Duration("duration", time.Since(startRecordHistoricalStatsMeta))) + } + } + } + + return nil +} - tbl, _, _ := is.FindTableByPartitionID(physicalTableID) - // Check if the table and its partitions are locked. - tidAndPid := make([]int64, 0, 2) - if tbl != nil { - tidAndPid = append(tidAndPid, tbl.Meta().ID) +// dumpStatsDeltaToKV processes and writes multiple table stats count deltas to KV storage in batches. +// Note: The `batchUpdates` parameter may be modified during the execution of this function. +// +// 1. Handles partitioned tables: +// - For partitioned tables, the function ensures that the global statistics are updated appropriately +// in addition to the individual partition statistics. +// +// 2. Stashes lock information: +// - Records lock information for each table or partition. +func (s *statsUsageImpl) dumpStatsDeltaToKV( + is infoschema.InfoSchema, + sctx sessionctx.Context, + updates []*storage.DeltaUpdate, +) (statsVersion uint64, err error) { + if len(updates) == 0 { + return 0, nil + } + beforeLen := len(updates) + statsVersion, err = utilstats.GetStartTS(sctx) + if err != nil { + return 0, errors.Trace(err) + } + + // Collect all table IDs that need lock checking. + allTableIDs := make([]int64, 0, len(updates)) + for _, update := range updates { + // No need to update if the delta is zero. + if update.Delta.Count == 0 { + continue } - tidAndPid = append(tidAndPid, physicalTableID) - lockedTables, err := s.statsHandle.GetLockedTables(tidAndPid...) - if err != nil { - return err + // Add psychical table ID. + allTableIDs = append(allTableIDs, update.TableID) + // Add parent table ID if it's a partition table. + if tbl, _, _ := is.FindTableByPartitionID(update.TableID); tbl != nil { + allTableIDs = append(allTableIDs, tbl.Meta().ID) } + } - var affectedRows uint64 - // If it's a partitioned table and its global-stats exists, - // update its count and modify_count as well. - if tbl != nil { - // We need to check if the table and the partition are locked. + // Batch get lock status for all tables. + lockedTables, err := s.statsHandle.GetLockedTables(allTableIDs...) + if err != nil { + return 0, errors.Trace(err) + } + + // Prepare batch updates + for _, update := range updates { + // No need to update if the delta is zero. + if update.Delta.Count == 0 { + continue + } + + tbl, _, _ := is.FindTableByPartitionID(update.TableID) + if tbl != nil { // It's a partition table. + tableID := tbl.Meta().ID isTableLocked := false isPartitionLocked := false - tableID := tbl.Meta().ID + if _, ok := lockedTables[tableID]; ok { isTableLocked = true } - if _, ok := lockedTables[physicalTableID]; ok { + if _, ok := lockedTables[update.TableID]; ok { isPartitionLocked = true } + tableOrPartitionLocked := isTableLocked || isPartitionLocked - isLocked = tableOrPartitionLocked - if err = storage.UpdateStatsMeta( - utilstats.StatsCtx, - sctx, - statsVersion, - storage.NewDeltaUpdate(physicalTableID, delta, tableOrPartitionLocked), - ); err != nil { - return err - } - affectedRows += sctx.GetSessionVars().StmtCtx.AffectedRows() + update.IsLocked = tableOrPartitionLocked + // If the partition is locked, we don't need to update the global-stats. // We will update its global-stats when the partition is unlocked. // 1. If table is locked and partition is locked, we only stash the delta in the partition's lock info. @@ -193,40 +300,30 @@ func (s *statsUsageImpl) dumpTableStatCountToKV(is infoschema.InfoSchema, physic // 4. If table is not locked and partition is not locked, we update the global-stats. // To sum up, we only need to update the global-stats when the table and the partition are not locked. if !isTableLocked && !isPartitionLocked { - // If it's a partitioned table and its global-stats exists, update its count and modify_count as well. - if err = storage.UpdateStatsMeta( - utilstats.StatsCtx, - sctx, - statsVersion, - storage.NewDeltaUpdate(tableID, delta, isTableLocked), - ); err != nil { - return err - } - affectedRows += sctx.GetSessionVars().StmtCtx.AffectedRows() + updates = append(updates, storage.NewDeltaUpdate(tableID, update.Delta, isTableLocked)) } } else { - // This is a non-partitioned table. - // Check if it's locked. isTableLocked := false - if _, ok := lockedTables[physicalTableID]; ok { + if _, ok := lockedTables[update.TableID]; ok { isTableLocked = true } - isLocked = isTableLocked - if err = storage.UpdateStatsMeta( - utilstats.StatsCtx, - sctx, - statsVersion, - storage.NewDeltaUpdate(physicalTableID, delta, isTableLocked), - ); err != nil { - return err - } - affectedRows += sctx.GetSessionVars().StmtCtx.AffectedRows() + update.IsLocked = isTableLocked } + } + intest.Assert(len(updates) >= beforeLen, "updates can only be appended") + if len(updates) > beforeLen { + // Resort updates after appending new updates. + slices.SortFunc(updates, func(i, j *storage.DeltaUpdate) int { + return cmp.Compare(i.TableID, j.TableID) + }) + } - updated = affectedRows > 0 - return nil - }, utilstats.FlagWrapTxn) - return + // Batch update stats meta. + if err = storage.UpdateStatsMeta(utilstats.StatsCtx, sctx, statsVersion, updates...); err != nil { + return 0, errors.Trace(err) + } + + return statsVersion, nil } // DumpColStatsUsageToKV sweeps the whole list, updates the column stats usage map and dumps it to KV.