Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/pingcap/tidb into pgo-rea…
Browse files Browse the repository at this point in the history
…d-only
  • Loading branch information
crazycs520 committed Jan 7, 2025
2 parents 7506e8a + c199ddf commit b42b50c
Show file tree
Hide file tree
Showing 70 changed files with 472 additions and 684 deletions.
6 changes: 3 additions & 3 deletions pkg/distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"container/heap"
"context"
"fmt"
"maps"
"strconv"
"sync/atomic"
"time"
Expand Down Expand Up @@ -500,7 +499,6 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr
if r.rootPlanID <= 0 || r.ctx.RuntimeStatsColl == nil || (callee == "" && (copStats.ReqStats == nil || copStats.ReqStats.GetRPCStatsCount() == 0)) {
return
}

if copStats.ScanDetail.ProcessedKeys > 0 || copStats.TimeDetail.KvReadWallTime > 0 {
readKeys := copStats.ScanDetail.ProcessedKeys
readTime := copStats.TimeDetail.KvReadWallTime.Seconds()
Expand Down Expand Up @@ -655,7 +653,9 @@ func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *copr.CopRuntim
if s.backoffSleep == nil {
s.backoffSleep = make(map[string]time.Duration)
}
maps.Copy(s.backoffSleep, copStats.BackoffSleep)
for k, v := range copStats.BackoffSleep {
s.backoffSleep[k] += v
}
}
s.totalProcessTime += copStats.TimeDetail.ProcessTime
s.totalWaitTime += copStats.TimeDetail.WaitTime
Expand Down
17 changes: 13 additions & 4 deletions pkg/distsql/select_result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package distsql
import (
"context"
"testing"
"time"

"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
Expand All @@ -30,11 +31,15 @@ import (
func TestUpdateCopRuntimeStats(t *testing.T) {
ctx := mock.NewContext()
ctx.GetSessionVars().StmtCtx = stmtctx.NewStmtCtx()
sr := selectResult{ctx: ctx.GetDistSQLCtx(), storeType: kv.TiKV}
sr := selectResult{ctx: ctx.GetDistSQLCtx(), storeType: kv.TiKV, stats: &selectResultRuntimeStats{}}
require.Nil(t, ctx.GetSessionVars().StmtCtx.RuntimeStatsColl)

sr.rootPlanID = 1234
sr.updateCopRuntimeStats(context.Background(), &copr.CopRuntimeStats{CopExecDetails: execdetails.CopExecDetails{CalleeAddress: "a"}}, 0)
backOffSleep := make(map[string]time.Duration, 1)
backOffSleep["RegionMiss"] = time.Duration(100)
sr.updateCopRuntimeStats(context.Background(), &copr.CopRuntimeStats{CopExecDetails: execdetails.CopExecDetails{CalleeAddress: "a", BackoffSleep: backOffSleep}}, 0)
// RuntimeStatsColl is nil, so the update doesn't take efffect
require.Equal(t, sr.stats.backoffSleep["RegionMiss"], time.Duration(0))

ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = execdetails.NewRuntimeStatsColl(nil)
// refresh the ctx after assigning `RuntimeStatsColl`.
Expand All @@ -48,13 +53,17 @@ func TestUpdateCopRuntimeStats(t *testing.T) {

require.NotEqual(t, len(sr.copPlanIDs), len(sr.selectResp.GetExecutionSummaries()))

sr.updateCopRuntimeStats(context.Background(), &copr.CopRuntimeStats{CopExecDetails: execdetails.CopExecDetails{CalleeAddress: "callee"}}, 0)
backOffSleep["RegionMiss"] = time.Duration(200)
sr.updateCopRuntimeStats(context.Background(), &copr.CopRuntimeStats{CopExecDetails: execdetails.CopExecDetails{CalleeAddress: "callee", BackoffSleep: backOffSleep}}, 0)
require.False(t, ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.ExistsCopStats(1234))
require.Equal(t, sr.stats.backoffSleep["RegionMiss"], time.Duration(200))

sr.copPlanIDs = []int{sr.rootPlanID}
require.NotNil(t, ctx.GetSessionVars().StmtCtx.RuntimeStatsColl)
require.Equal(t, len(sr.copPlanIDs), len(sr.selectResp.GetExecutionSummaries()))

sr.updateCopRuntimeStats(context.Background(), &copr.CopRuntimeStats{CopExecDetails: execdetails.CopExecDetails{CalleeAddress: "callee"}}, 0)
backOffSleep["RegionMiss"] = time.Duration(300)
sr.updateCopRuntimeStats(context.Background(), &copr.CopRuntimeStats{CopExecDetails: execdetails.CopExecDetails{CalleeAddress: "callee", BackoffSleep: backOffSleep}}, 0)
require.Equal(t, "tikv_task:{time:1ns, loops:1}", ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetCopStats(1234).String())
require.Equal(t, sr.stats.backoffSleep["RegionMiss"], time.Duration(500))
}
4 changes: 4 additions & 0 deletions pkg/disttask/framework/scheduler/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
llog "github.com/pingcap/tidb/pkg/lightning/log"
"github.com/pingcap/tidb/pkg/util/intest"
Expand Down Expand Up @@ -116,6 +117,9 @@ func (b *balancer) doBalanceSubtasks(ctx context.Context, taskID int64, eligible
// managed nodes, subtasks of task might not be balanced.
adjustedNodes := filterNodesWithEnoughSlots(b.currUsedSlots, b.slotMgr.getCapacity(),
eligibleNodes, subtasks[0].Concurrency)
failpoint.Inject("mockNoEnoughSlots", func(_ failpoint.Value) {
adjustedNodes = []string{}
})
if len(adjustedNodes) == 0 {
// no node has enough slots to run the subtasks, skip balance and skip
// update used slots.
Expand Down
23 changes: 17 additions & 6 deletions pkg/disttask/framework/taskexecutor/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func NewBaseTaskExecutor(ctx context.Context, task *proto.Task, param Param) *Ba
// `pending` state, to make sure subtasks can be balanced later when node scale out.
// - If current running subtask are scheduled away from this node, i.e. this node
// is taken as down, cancel running.
func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context) {
func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context, subtaskCtxCancel context.CancelFunc) {
ticker := time.NewTicker(checkBalanceSubtaskInterval)
defer ticker.Stop()
for {
Expand All @@ -143,7 +143,10 @@ func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context) {
e.logger.Info("subtask is scheduled away, cancel running",
zap.Int64("subtaskID", e.currSubtaskID.Load()))
// cancels runStep, but leave the subtask state unchanged.
e.cancelRunStepWith(nil)
if subtaskCtxCancel != nil {
subtaskCtxCancel()
}
failpoint.InjectCall("afterCancelSubtaskExec")
return
}

Expand Down Expand Up @@ -317,6 +320,12 @@ func (e *BaseTaskExecutor) Run() {
continue
}
}
if err := e.stepCtx.Err(); err != nil {
e.logger.Error("step executor context is done, the task should have been reverted",
zap.String("step", proto.Step2Str(task.Type, task.Step)),
zap.Error(err))
continue
}
err = e.runSubtask(subtask)
if err != nil {
// task executor keeps running its subtasks even though some subtask
Expand Down Expand Up @@ -418,23 +427,25 @@ func (e *BaseTaskExecutor) runSubtask(subtask *proto.Subtask) (resErr error) {
logTask := llog.BeginTask(logger, "run subtask")
subtaskErr := func() error {
e.currSubtaskID.Store(subtask.ID)
subtaskCtx, subtaskCtxCancel := context.WithCancel(e.stepCtx)

var wg util.WaitGroupWrapper
checkCtx, checkCancel := context.WithCancel(e.stepCtx)
checkCtx, checkCancel := context.WithCancel(subtaskCtx)
wg.RunWithLog(func() {
e.checkBalanceSubtask(checkCtx)
e.checkBalanceSubtask(checkCtx, subtaskCtxCancel)
})

if e.hasRealtimeSummary(e.stepExec) {
wg.RunWithLog(func() {
e.updateSubtaskSummaryLoop(checkCtx, e.stepCtx, e.stepExec)
e.updateSubtaskSummaryLoop(checkCtx, subtaskCtx, e.stepExec)
})
}
defer func() {
checkCancel()
wg.Wait()
subtaskCtxCancel()
}()
return e.stepExec.RunSubtask(e.stepCtx, subtask)
return e.stepExec.RunSubtask(subtaskCtx, subtask)
}()
failpoint.InjectCall("afterRunSubtask", e, &subtaskErr)
logTask.End2(zap.InfoLevel, subtaskErr)
Expand Down
16 changes: 8 additions & 8 deletions pkg/disttask/framework/taskexecutor/task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func TestTaskExecutorRun(t *testing.T) {
// mock for checkBalanceSubtask, returns empty subtask list
e.taskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "id",
e.task1.ID, proto.StepOne, proto.SubtaskStateRunning).Return([]*proto.Subtask{}, nil)
// this subtask is scheduled awsy during running
// this subtask is scheduled away during running
e.taskTable.EXPECT().GetTaskByID(gomock.Any(), e.task1.ID).Return(e.task1, nil)
e.taskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", e.task1.ID, proto.StepOne,
unfinishedNormalSubtaskStates...).Return(e.pendingSubtask1, nil)
Expand All @@ -326,6 +326,7 @@ func TestTaskExecutorRun(t *testing.T) {
<-ctx.Done()
return ctx.Err()
})
e.taskExecExt.EXPECT().IsRetryableError(gomock.Any()).Return(true)
// keep running next subtask
nextSubtask := &proto.Subtask{SubtaskBase: proto.SubtaskBase{
ID: 2, Type: e.task1.Type, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}}
Expand Down Expand Up @@ -889,18 +890,17 @@ func TestCheckBalanceSubtask(t *testing.T) {
// context canceled
canceledCtx, cancel := context.WithCancel(ctx)
cancel()
taskExecutor.checkBalanceSubtask(canceledCtx)
taskExecutor.checkBalanceSubtask(canceledCtx, nil)
})

t.Run("subtask scheduled away", func(t *testing.T) {
mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1",
task.ID, task.Step, proto.SubtaskStateRunning).Return(nil, errors.New("error"))
mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1",
task.ID, task.Step, proto.SubtaskStateRunning).Return([]*proto.Subtask{}, nil)
runCtx, cancelCause := context.WithCancelCause(ctx)
taskExecutor.mu.runtimeCancel = cancelCause
runCtx, cancel := context.WithCancel(ctx)
require.NoError(t, runCtx.Err())
taskExecutor.checkBalanceSubtask(ctx)
taskExecutor.checkBalanceSubtask(ctx, cancel)
require.ErrorIs(t, runCtx.Err(), context.Canceled)
require.True(t, ctrl.Satisfied())
})
Expand All @@ -913,7 +913,7 @@ func TestCheckBalanceSubtask(t *testing.T) {
mockExtension.EXPECT().IsIdempotent(subtasks[0]).Return(false)
mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(gomock.Any(), "tidb1",
subtasks[0].ID, proto.SubtaskStateFailed, ErrNonIdempotentSubtask).Return(nil)
taskExecutor.checkBalanceSubtask(ctx)
taskExecutor.checkBalanceSubtask(ctx, nil)
require.True(t, ctrl.Satisfied())

// if we failed to change state of non-idempotent subtask, will retry
Expand All @@ -930,7 +930,7 @@ func TestCheckBalanceSubtask(t *testing.T) {
mockExtension.EXPECT().IsIdempotent(subtasks[0]).Return(false)
mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(gomock.Any(), "tidb1",
subtasks[0].ID, proto.SubtaskStateFailed, ErrNonIdempotentSubtask).Return(nil)
taskExecutor.checkBalanceSubtask(ctx)
taskExecutor.checkBalanceSubtask(ctx, nil)
require.True(t, ctrl.Satisfied())
})

Expand All @@ -945,7 +945,7 @@ func TestCheckBalanceSubtask(t *testing.T) {
// used to break the loop
mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1",
task.ID, task.Step, proto.SubtaskStateRunning).Return(nil, nil)
taskExecutor.checkBalanceSubtask(ctx)
taskExecutor.checkBalanceSubtask(ctx, nil)
require.True(t, ctrl.Satisfied())
})
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/disttask/importinto/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ type SharedVars struct {
TableImporter *importer.TableImporter
DataEngine *backend.OpenedEngine
IndexEngine *backend.OpenedEngine
Progress *importer.Progress

mu sync.Mutex
Checksum *verification.KVGroupChecksum
Expand Down Expand Up @@ -183,5 +182,4 @@ type Checksum struct {
// This portion of the code may be implemented uniformly in the framework in the future.
type Result struct {
LoadedRowCnt uint64
ColSizeMap map[int64]int64
}
8 changes: 1 addition & 7 deletions pkg/disttask/importinto/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,14 +577,9 @@ func updateResult(handle storage.TaskHandle, task *proto.Task, taskMeta *TaskMet
}
subtaskMetas = append(subtaskMetas, &subtaskMeta)
}
columnSizeMap := make(map[int64]int64)
for _, subtaskMeta := range subtaskMetas {
taskMeta.Result.LoadedRowCnt += subtaskMeta.Result.LoadedRowCnt
for key, val := range subtaskMeta.Result.ColSizeMap {
columnSizeMap[key] += val
}
}
taskMeta.Result.ColSizeMap = columnSizeMap

if globalSort {
taskMeta.Result.LoadedRowCnt, err = getLoadedRowCountOnGlobalSort(handle, task)
Expand Down Expand Up @@ -662,8 +657,7 @@ func (sch *ImportSchedulerExt) finishJob(ctx context.Context, logger *zap.Logger
func(ctx context.Context) (bool, error) {
return true, taskHandle.WithNewSession(func(se sessionctx.Context) error {
if err := importer.FlushTableStats(ctx, se, taskMeta.Plan.TableInfo.ID, &importer.JobImportResult{
Affected: taskMeta.Result.LoadedRowCnt,
ColSizeMap: taskMeta.Result.ColSizeMap,
Affected: taskMeta.Result.LoadedRowCnt,
}); err != nil {
logger.Warn("flush table stats failed", zap.Error(err))
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/disttask/importinto/subtask_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ func (e *importMinimalTaskExecutor) Run(ctx context.Context, dataWriter, indexWr
sharedVars.TableImporter,
sharedVars.DataEngine,
sharedVars.IndexEngine,
sharedVars.Progress,
logger,
checksum,
); err != nil {
Expand All @@ -82,7 +81,6 @@ func (e *importMinimalTaskExecutor) Run(ctx context.Context, dataWriter, indexWr
sharedVars.TableImporter,
dataWriter,
indexWriter,
sharedVars.Progress,
logger,
checksum,
); err != nil {
Expand Down
2 changes: 0 additions & 2 deletions pkg/disttask/importinto/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ func (s *importStepExecutor) RunSubtask(ctx context.Context, subtask *proto.Subt
TableImporter: s.tableImporter,
DataEngine: dataEngine,
IndexEngine: indexEngine,
Progress: importer.NewProgress(),
Checksum: verification.NewKVGroupChecksumWithKeyspace(s.tableImporter.GetKeySpace()),
SortedDataMeta: &external.SortedKVMeta{},
SortedIndexMetas: make(map[int64]*external.SortedKVMeta),
Expand Down Expand Up @@ -251,7 +250,6 @@ func (s *importStepExecutor) onFinished(ctx context.Context, subtask *proto.Subt
}
subtaskMeta.Result = Result{
LoadedRowCnt: dataKVCount,
ColSizeMap: sharedVars.Progress.GetColSize(),
}
allocators := sharedVars.TableImporter.Allocators()
subtaskMeta.MaxIDs = map[autoid.AllocatorType]int64{
Expand Down
2 changes: 1 addition & 1 deletion pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1902,7 +1902,7 @@ func (do *Domain) LoadPrivilegeLoop(sctx sessionctx.Context) error {
if err != nil {
return err
}
do.privHandle = privileges.NewHandle(sctx.GetRestrictedSQLExecutor())
do.privHandle = privileges.NewHandle(sctx.GetRestrictedSQLExecutor(), sctx.GetSessionVars().GlobalVarsAccessor)

var watchCh clientv3.WatchChan
duration := 5 * time.Minute
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func (e *DeleteExec) removeRow(ctx sessionctx.Context, t table.Table, h kv.Handl
return err
}

err = t.RemoveRecord(ctx.GetTableCtx(), txn, h, data, posInfo.ExtraPartialRowOption)
err = t.RemoveRecord(ctx.GetTableCtx(), txn, h, data, posInfo.IndexesRowLayout)
if err != nil {
return err
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/executor/grant.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ func (e *GrantExec) Next(ctx context.Context, _ *chunk.Chunk) error {
return err
}

defaultAuthPlugin, err := e.Ctx().GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.DefaultAuthPlugin)
if err != nil {
return err
}
// Check which user is not exist.
for _, user := range e.Users {
exists, err := userExists(ctx, e.Ctx(), user.User.Username, user.User.Hostname)
Expand All @@ -169,10 +173,7 @@ func (e *GrantExec) Next(ctx context.Context, _ *chunk.Chunk) error {
// It is required for compatibility with 5.7 but removed from 8.0
// since it results in a massive security issue:
// spelling errors will create users with no passwords.
authPlugin, err := e.Ctx().GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.DefaultAuthPlugin)
if err != nil {
return err
}
authPlugin := defaultAuthPlugin
if user.AuthOpt != nil && user.AuthOpt.AuthPlugin != "" {
authPlugin = user.AuthOpt.AuthPlugin
}
Expand All @@ -181,7 +182,7 @@ func (e *GrantExec) Next(ctx context.Context, _ *chunk.Chunk) error {
return exeerrors.ErrPluginIsNotLoaded.GenWithStackByArgs(extErr.Error())
}
authPluginImpl := extensions.GetAuthPlugins()[authPlugin]
pwd, ok := encodePassword(user, authPluginImpl)
pwd, ok := encodePasswordWithPlugin(*user, authPluginImpl, defaultAuthPlugin)
if !ok {
return errors.Trace(exeerrors.ErrPasswordFormat)
}
Expand Down
1 change: 0 additions & 1 deletion pkg/executor/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ go_library(
"job.go",
"kv_encode.go",
"precheck.go",
"progress.go",
"table_import.go",
],
importpath = "github.com/pingcap/tidb/pkg/executor/importer",
Expand Down
5 changes: 1 addition & 4 deletions pkg/executor/importer/engine_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ func ProcessChunk(
chunk *checkpoints.ChunkCheckpoint,
tableImporter *TableImporter,
dataEngine, indexEngine *backend.OpenedEngine,
progress *Progress,
logger *zap.Logger,
groupChecksum *verification.KVGroupChecksum,
) error {
Expand Down Expand Up @@ -65,7 +64,7 @@ func ProcessChunk(
}
}()

return ProcessChunkWithWriter(ctx, chunk, tableImporter, dataWriter, indexWriter, progress, logger, groupChecksum)
return ProcessChunkWithWriter(ctx, chunk, tableImporter, dataWriter, indexWriter, logger, groupChecksum)
}

// ProcessChunkWithWriter processes a chunk, and write kv pairs to dataWriter and indexWriter.
Expand All @@ -74,7 +73,6 @@ func ProcessChunkWithWriter(
chunk *checkpoints.ChunkCheckpoint,
tableImporter *TableImporter,
dataWriter, indexWriter backend.EngineWriter,
progress *Progress,
logger *zap.Logger,
groupChecksum *verification.KVGroupChecksum,
) error {
Expand Down Expand Up @@ -116,6 +114,5 @@ func ProcessChunkWithWriter(
if err != nil {
return err
}
progress.AddColSize(encoder.GetColumnSize())
return nil
}
5 changes: 2 additions & 3 deletions pkg/executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -1395,9 +1395,8 @@ func getDataSourceType(p *plannercore.ImportInto) DataSourceType {

// JobImportResult is the result of the job import.
type JobImportResult struct {
Affected uint64
Warnings []contextutil.SQLWarn
ColSizeMap variable.DeltaColsMap
Affected uint64
Warnings []contextutil.SQLWarn
}

// GetMsgFromBRError get msg from BR error.
Expand Down
Loading

0 comments on commit b42b50c

Please sign in to comment.