Skip to content

Commit

Permalink
support launching tasks in the same dc with executer (ydb-platform#8457)
Browse files Browse the repository at this point in the history
  • Loading branch information
gridnevvvit authored Aug 29, 2024
1 parent e2dfc63 commit 154697e
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 25 deletions.
57 changes: 41 additions & 16 deletions ydb/core/kqp/executer_actor/kqp_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -362,33 +362,58 @@ std::unique_ptr<IEventHandle> TKqpPlanner::AssignTasksToNodes() {
planner->SetLogFunc([TxId = TxId, &UserRequestContext = UserRequestContext](TStringBuf msg) { LOG_D(msg); });
}

THashMap<ui64, size_t> nodeIdtoIdx;
for (size_t idx = 0; idx < ResourcesSnapshot.size(); ++idx) {
nodeIdtoIdx[ResourcesSnapshot[idx].nodeid()] = idx;
}

LogMemoryStatistics([TxId = TxId, &UserRequestContext = UserRequestContext](TStringBuf msg) { LOG_D(msg); });

auto plan = planner->Plan(ResourcesSnapshot, ResourceEstimations);
ui64 selfNodeId = ExecuterId.NodeId();
TString selfNodeDC;

if (!plan.empty()) {
for (auto& group : plan) {
for(ui64 taskId: group.TaskIds) {
auto [it, success] = alreadyAssigned.emplace(taskId, group.NodeId);
if (success) {
TasksPerNode[group.NodeId].push_back(taskId);
}
}
TVector<const NKikimrKqp::TKqpNodeResources*> allNodes;
TVector<const NKikimrKqp::TKqpNodeResources*> executerDcNodes;
allNodes.reserve(ResourcesSnapshot.size());

for(auto& snapNode: ResourcesSnapshot) {
const TString& dc = snapNode.GetKqpProxyNodeResources().GetDataCenterId();
if (snapNode.GetNodeId() == selfNodeId) {
selfNodeDC = dc;
break;
}
}

return nullptr;
} else {
for(auto& snapNode: ResourcesSnapshot) {
allNodes.push_back(&snapNode);
if (selfNodeDC == snapNode.GetKqpProxyNodeResources().GetDataCenterId()) {
executerDcNodes.push_back(&snapNode);
}
}

TVector<IKqpPlannerStrategy::TResult> plan;

if (!executerDcNodes.empty() && placingOptions.PreferLocalDatacenterExecution) {
plan = planner->Plan(executerDcNodes, ResourceEstimations);
}

if (plan.empty()) {
plan = planner->Plan(allNodes, ResourceEstimations);
}

if (plan.empty()) {
LogMemoryStatistics([TxId = TxId, &UserRequestContext = UserRequestContext](TStringBuf msg) { LOG_E(msg); });

auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
TStringBuilder() << "Not enough resources to execute query. " << "TraceId: " << UserRequestContext->TraceId);
return std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.Release());
}

for (auto& group : plan) {
for(ui64 taskId: group.TaskIds) {
auto [it, success] = alreadyAssigned.emplace(taskId, group.NodeId);
if (success) {
TasksPerNode[group.NodeId].push_back(taskId);
}
}
}

return nullptr;
}

const IKqpGateway::TKqpSnapshot& TKqpPlanner::GetSnapshot() const {
Expand Down
16 changes: 8 additions & 8 deletions ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,16 @@ class TNodesManager {
return result;
}

TNodesManager(const TVector<NKikimrKqp::TKqpNodeResources>& nodeResources) {
TNodesManager(const TVector<const NKikimrKqp::TKqpNodeResources*>& nodeResources) {
for (auto& node : nodeResources) {
if (!node.GetAvailableComputeActors()) {
if (!node->GetAvailableComputeActors()) {
continue;
}
Nodes.emplace_back(TNodeDesc{
node.GetNodeId(),
ActorIdFromProto(node.GetResourceManagerActorId()),
node.GetTotalMemory() - node.GetUsedMemory(),
node.GetAvailableComputeActors(),
node->GetNodeId(),
ActorIdFromProto(node->GetResourceManagerActorId()),
node->GetTotalMemory() - node->GetUsedMemory(),
node->GetAvailableComputeActors(),
{}
});
}
Expand All @@ -111,7 +111,7 @@ class TKqpGreedyPlanner : public IKqpPlannerStrategy {
public:
~TKqpGreedyPlanner() override {}

TVector<TResult> Plan(const TVector<NKikimrKqp::TKqpNodeResources>& nodeResources,
TVector<TResult> Plan(const TVector<const NKikimrKqp::TKqpNodeResources*>& nodeResources,
const TVector<TTaskResourceEstimation>& tasks) override
{
TVector<TResult> result;
Expand Down Expand Up @@ -161,7 +161,7 @@ class TKqpMockEmptyPlanner : public IKqpPlannerStrategy {
public:
~TKqpMockEmptyPlanner() override {}

TVector<TResult> Plan(const TVector<NKikimrKqp::TKqpNodeResources>&,
TVector<TResult> Plan(const TVector<const NKikimrKqp::TKqpNodeResources*>&,
const TVector<TTaskResourceEstimation>&) override
{
return {};
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_planner_strategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class IKqpPlannerStrategy {
TVector<ui64> TaskIds;
};

virtual TVector<TResult> Plan(const TVector<NKikimrKqp::TKqpNodeResources>& nodeResources,
virtual TVector<TResult> Plan(const TVector<const NKikimrKqp::TKqpNodeResources*>& nodeResources,
const TVector<TTaskResourceEstimation>& estimatedResources) = 0;

protected:
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/rm_service/kqp_rm_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ class TKqpResourceManager : public IKqpResourceManager {
return TPlannerPlacingOptions{
.MaxNonParallelTasksExecutionLimit = MaxNonParallelTasksExecutionLimit.load(),
.MaxNonParallelTopStageExecutionLimit = MaxNonParallelTopStageExecutionLimit.load(),
.PreferLocalDatacenterExecution = PreferLocalDatacenterExecution.load(),
};
}

Expand Down Expand Up @@ -474,6 +475,7 @@ class TKqpResourceManager : public IKqpResourceManager {
SpillingPercent.store(config.GetSpillingPercent());
MaxNonParallelTopStageExecutionLimit.store(config.GetMaxNonParallelTopStageExecutionLimit());
MaxNonParallelTasksExecutionLimit.store(config.GetMaxNonParallelTasksExecutionLimit());
PreferLocalDatacenterExecution.store(config.GetPreferLocalDatacenterExecution());
}

ui32 GetNodeId() override {
Expand Down Expand Up @@ -523,6 +525,7 @@ class TKqpResourceManager : public IKqpResourceManager {
std::atomic<i64> ExternalDataQueryMemory = 0;
std::atomic<ui64> MaxNonParallelTopStageExecutionLimit = 1;
std::atomic<ui64> MaxNonParallelTasksExecutionLimit = 8;
std::atomic<bool> PreferLocalDatacenterExecution = true;

// current state
std::atomic<ui64> LastResourceBrokerTaskId = 0;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/rm_service/kqp_rm_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ struct TKqpLocalNodeResources {
struct TPlannerPlacingOptions {
ui64 MaxNonParallelTasksExecutionLimit = 8;
ui64 MaxNonParallelTopStageExecutionLimit = 1;
bool PreferLocalDatacenterExecution = true;
};

/// per node singleton with instant API
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/table_service_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ message TTableServiceConfig {

optional uint64 MaxNonParallelTasksExecutionLimit = 25 [default = 8];
optional uint64 MaxNonParallelTopStageExecutionLimit = 26 [default = 1];
optional bool PreferLocalDatacenterExecution = 27 [ default = true ];
}

message TSpillingServiceConfig {
Expand Down

0 comments on commit 154697e

Please sign in to comment.