Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support interpodaffinity and podtopologyspread #61

Merged
merged 1 commit into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions pkg/binder/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,27 @@ type binderCache struct {

handler commoncache.CacheHandler
mu *sync.RWMutex

nodeSlices *framework.NodeSlices
}

func newBinderCache(handler commoncache.CacheHandler) *binderCache {
nodeSlices := framework.NewNodeSlices()

bc := &binderCache{
CommonStoresSwitch: commonstore.MakeStoreSwitch(handler, commonstore.Cache, commonstores.GlobalRegistries, orderedStoreNames),

handler: handler,
mu: handler.Mutex(),

nodeSlices: nodeSlices,
}

// NodeStore and PodStore are mandatory, so we don't care if they are nil.
nodeStore, podStore := bc.CommonStoresSwitch.Find(nodestore.Name), bc.CommonStoresSwitch.Find(podstore.Name)
nodeStore.(*nodestore.NodeStore).AfterAdd = func(n framework.NodeInfo) { nodeSlices.Update(n, true) }
nodeStore.(*nodestore.NodeStore).AfterDelete = func(n framework.NodeInfo) { nodeSlices.Update(n, false) }

handler.SetNodeHandler(nodeStore.(*nodestore.NodeStore).GetNodeInfo)
handler.SetPodHandler(podStore.(*podstore.PodStore).GetPodState)

Expand Down Expand Up @@ -217,3 +226,9 @@ func (cache *binderCache) FindStore(storeName commonstore.StoreName) commonstore
defer cache.mu.RUnlock()
return cache.CommonStoresSwitch.Find(storeName)
}

func (cache *binderCache) List() []framework.NodeInfo {
cache.mu.RLock()
defer cache.mu.RUnlock()
return append(cache.nodeSlices.InPartitionNodeSlice.Nodes(), cache.nodeSlices.OutOfPartitionNodeSlice.Nodes()...)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need lock.

BTW it's dangerous to expose all of the nodes.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have add lock.

Because the binder needs all the nodeInfo information when performing topology checks for InterPodAffinity and PodTopologySpread, I added this function. Is there a safer method?

}
10 changes: 9 additions & 1 deletion pkg/binder/cache/commonstores/node_store/node_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ type NodeStore struct {
// `Deleted` holds all the nodes:
// 1. that have been deleted but still have residual pods.
// 2. that its pod comes before its own, so we can't use it to schedule.
Deleted sets.String
Deleted sets.String
AfterAdd func(framework.NodeInfo) // Triggered by a call to the NodeStore.Add function, used to maintain additional information about the node.
AfterDelete func(framework.NodeInfo) // Triggered by a call to the NodeStore.Delete function, used to maintain additional information about the node.

// A map from image name to its imageState.
// ATTENTION: Like `Deleted` field, it will only be modified and used in the Cache.
Expand Down Expand Up @@ -426,11 +428,17 @@ func (s *NodeStore) Set(nodeName string, nodeInfo framework.NodeInfo) {
// Add will Store the node and trigger the AfterAdd.
func (s *NodeStore) Add(nodeName string, nodeInfo framework.NodeInfo) {
s.Store.Set(nodeName, nodeInfo)
if s.AfterAdd != nil {
s.AfterAdd(nodeInfo)
}
}

// Delete will delete the node and trigger the AfterDelete.
func (s *NodeStore) Delete(nodeName string, nodeInfo framework.NodeInfo) {
s.Store.Delete(nodeName)
if s.AfterDelete != nil {
s.AfterDelete(nodeInfo)
}
}

// AllNodesClone return all nodes's deepcopy and organize them in map.
Expand Down
4 changes: 4 additions & 0 deletions pkg/binder/cache/fake/fake_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,3 +248,7 @@ func (c *Cache) GetAvailablePlaceholderPod(
}
return nil, fmt.Errorf("empty store")
}

func (c *Cache) List() []framework.NodeInfo {
return nil
}
2 changes: 2 additions & 0 deletions pkg/binder/cache/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,6 @@ type BinderCache interface {

// for resource reservation
GetAvailablePlaceholderPod(pod *v1.Pod) (*v1.Pod, error)

List() []framework.NodeInfo
}
7 changes: 6 additions & 1 deletion pkg/binder/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ import (
"github.com/kubewharf/godel-scheduler/pkg/binder/apis"
godelcache "github.com/kubewharf/godel-scheduler/pkg/binder/cache"
"github.com/kubewharf/godel-scheduler/pkg/binder/framework/plugins/defaultbinder"
"github.com/kubewharf/godel-scheduler/pkg/binder/framework/plugins/interpodaffinity"
"github.com/kubewharf/godel-scheduler/pkg/binder/framework/plugins/nodeports"
"github.com/kubewharf/godel-scheduler/pkg/binder/framework/plugins/noderesources"
"github.com/kubewharf/godel-scheduler/pkg/binder/framework/plugins/nodevolumelimits"
"github.com/kubewharf/godel-scheduler/pkg/binder/framework/plugins/nonnativeresource"
"github.com/kubewharf/godel-scheduler/pkg/binder/framework/plugins/podtopologyspread"
"github.com/kubewharf/godel-scheduler/pkg/binder/framework/plugins/volumebinding"
"github.com/kubewharf/godel-scheduler/pkg/binder/queue"
"github.com/kubewharf/godel-scheduler/pkg/features"
Expand Down Expand Up @@ -67,7 +69,10 @@ func DefaultUnitQueueSortFunc() framework.UnitLessFunc {
func NewBasePlugins(victimsCheckingPlugins []*framework.VictimCheckingPluginCollectionSpec) *apis.BinderPluginCollection {
// TODO add some default plugins later
basicPlugins := apis.BinderPluginCollection{
CheckTopology: []string{},
CheckTopology: []string{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add these plugins on demand. We don't need to prepare date when the incoming Pod doesn't have cross-node constraints.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After considering the InterPodAffinity constraint, the antiAffinity of the existing pod will affect the pod to be checked, so we need to check InterPodAffinity, and it is difficult to determine in advance whether InterPodAffinity is needed. The approach here is to check InterPodAffinity and PodTopologySpread by default, but in the check, we will check in advance whether the pod to be checked really has this constraint, and return quickly if not.

interpodaffinity.Name,
podtopologyspread.Name,
},
CheckConflicts: []string{
noderesources.ConflictCheckName,
nodevolumelimits.CSIName,
Expand Down
2 changes: 2 additions & 0 deletions pkg/binder/framework/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,6 @@ type BinderFrameworkHandle interface {
FindStore(storeName commonstore.StoreName) commonstore.Store

GetNodeInfo(string) framework.NodeInfo

ListNodeInfos() []framework.NodeInfo
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
Copyright 2024 The Godel Scheduler Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package interpodaffinity

import (
"context"

"github.com/kubewharf/godel-scheduler/pkg/binder/framework/handle"
framework "github.com/kubewharf/godel-scheduler/pkg/framework/api"
utils "github.com/kubewharf/godel-scheduler/pkg/plugins/interpodaffinity"
"github.com/kubewharf/godel-scheduler/pkg/plugins/podlauncher"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
)

const (
Name = "InterPodAffinityCheck"
ErrorReasonWhenFilterNodeWithSameTopology = "failed to get nodes with same topology labels"
)

type InterPodAffinity struct {
frameworkHandle handle.BinderFrameworkHandle
}

var _ framework.CheckTopologyPlugin = &InterPodAffinity{}

func (pl *InterPodAffinity) Name() string {
return Name
}

func (pl *InterPodAffinity) CheckTopology(_ context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo framework.NodeInfo) *framework.Status {
// Get the nodes with the same topology labels as the node to be scheduled
podLauncher, status := podlauncher.NodeFits(nil, pod, nodeInfo)
if status != nil {
return status
}

nodeInfos := pl.frameworkHandle.ListNodeInfos()

existingPodAntiAffinityMap := utils.GetTPMapMatchingExistingAntiAffinity(pod, nodeInfos)

podInfo := framework.NewPodInfo(pod)
incomingPodAffinityMap, incomingPodAntiAffinityMap := utils.GetTPMapMatchingIncomingAffinityAntiAffinity(podInfo, nodeInfos)

state := &utils.PreFilterState{
TopologyToMatchedExistingAntiAffinityTerms: existingPodAntiAffinityMap,
TopologyToMatchedAffinityTerms: incomingPodAffinityMap,
TopologyToMatchedAntiAffinityTerms: incomingPodAntiAffinityMap,
PodInfo: podInfo,
}

if !utils.SatisfyPodAffinity(state, nodeInfo, podLauncher) {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, utils.ErrReasonAffinityNotMatch, utils.ErrReasonAffinityRulesNotMatch)
}

if !utils.SatisfyPodAntiAffinity(state, nodeInfo, podLauncher) {
return framework.NewStatus(framework.Unschedulable, utils.ErrReasonAffinityNotMatch, utils.ErrReasonAntiAffinityRulesNotMatch)
}

if !utils.SatisfyExistingPodsAntiAffinity(state, nodeInfo, podLauncher) {
return framework.NewStatus(framework.Unschedulable, utils.ErrReasonAffinityNotMatch, utils.ErrReasonExistingAntiAffinityRulesNotMatch)
}

return nil
}

func New(_ runtime.Object, handle handle.BinderFrameworkHandle) (framework.Plugin, error) {
return &InterPodAffinity{
frameworkHandle: handle,
}, nil
}
Loading