Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove use of leaf-node round-robin algorithm #9340

Merged
merged 1 commit into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions pkg/scheduler/queue/multi_queuing_algorithm_tree_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,15 @@ type MultiQueuingAlgorithmTreeQueue struct {
}

func NewTree(queuingAlgorithms ...QueuingAlgorithm) (*MultiQueuingAlgorithmTreeQueue, error) {
if len(queuingAlgorithms) == 0 {
return nil, fmt.Errorf("cannot create a tree without defined QueuingAlgorithm")
// a nil QueuingAlgorithm would force a leaf node, but we handle adding the leaf node layer to the tree internally.
for _, qa := range queuingAlgorithms {
if qa == nil {
return nil, fmt.Errorf("cannot pass a nil QueuingAlgorithm")
}
}

queuingAlgorithms = append(queuingAlgorithms, nil)

root, err := newNode("root", len(queuingAlgorithms)-1, queuingAlgorithms[0])
if err != nil {
return nil, err
Expand Down Expand Up @@ -82,7 +88,9 @@ func (t *MultiQueuingAlgorithmTreeQueue) IsEmpty() bool {
func (t *MultiQueuingAlgorithmTreeQueue) Dequeue(dequeueArgs *DequeueArgs) (QueuePath, any) {
if dequeueArgs != nil {
for _, qa := range t.algosByDepth {
qa.setup(dequeueArgs)
if qa != nil {
qa.setup(dequeueArgs)
}
}
}
path, v := t.rootNode.dequeue()
Expand Down Expand Up @@ -146,9 +154,6 @@ func newNode(name string, height int, da QueuingAlgorithm) (*Node, error) {
return nil, fmt.Errorf("cannot create a node at negative height")
}

if da == nil {
return nil, fmt.Errorf("cannot create a node without a defined QueuingAlgorithm")
}
n := &Node{
name: name,
localQueue: nil,
Expand All @@ -157,7 +162,7 @@ func newNode(name string, height int, da QueuingAlgorithm) (*Node, error) {
queuingAlgorithm: da,
}
// if the node is a leaf node, it gets memory allocated towards a local queue and cannot have child queues
if height == 0 {
if da == nil {
n.localQueue = list.New()
} else {
// any other kind of node is allowed to have children, but not a local queue
Expand Down Expand Up @@ -187,7 +192,7 @@ func (n *Node) IsEmpty() bool {
}

func (n *Node) isLeaf() bool {
return n.height == 0
return n.queuingAlgorithm == nil
}

// ItemCount counts the queue items in the Node and in all its children, recursively.
Expand Down Expand Up @@ -243,12 +248,13 @@ func (n *Node) dequeue() (QueuePath, any) {
return path, nil
}
if n.isLeaf() {
checkedAllNodes = n.childrenChecked == 1
// there is nothing to check other than this node's local queue, which we will do in this step
checkedAllNodes = true
dequeueNode = n
} else {
checkedAllNodes = n.childrenChecked == len(n.queueMap)
dequeueNode = n.queuingAlgorithm.dequeueSelectNode(n)
}

dequeueNode = n.queuingAlgorithm.dequeueSelectNode(n)
switch dequeueNode {
case n:
if n.isLeaf() {
Expand All @@ -273,8 +279,10 @@ func (n *Node) dequeue() (QueuePath, any) {
if v == nil {
n.childrenChecked++
}

n.queuingAlgorithm.dequeueUpdateState(n, dequeueNode)
// if n is a leaf node, there's no queuing algorithm to update.
if !n.isLeaf() {
n.queuingAlgorithm.dequeueUpdateState(n, dequeueNode)
}
}
// reset childrenChecked to 0 before completing this dequeue
n.childrenChecked = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,13 +379,13 @@ func TestMultiDimensionalQueueAlgorithmSlowConsumerEffects(t *testing.T) {
tqaFlipped := newTenantQuerierAssignments()
tqaQuerierWorkerPrioritization := newTenantQuerierAssignments()

nonFlippedRoundRobinTree, err := NewTree(tqaNonFlipped, &roundRobinState{}, &roundRobinState{})
nonFlippedRoundRobinTree, err := NewTree(tqaNonFlipped, &roundRobinState{})
require.NoError(t, err)

flippedRoundRobinTree, err := NewTree(&roundRobinState{}, tqaFlipped, &roundRobinState{})
flippedRoundRobinTree, err := NewTree(&roundRobinState{}, tqaFlipped)
require.NoError(t, err)

querierWorkerPrioritizationTree, err := NewTree(NewQuerierWorkerQueuePriorityAlgo(), tqaQuerierWorkerPrioritization, &roundRobinState{})
querierWorkerPrioritizationTree, err := NewTree(NewQuerierWorkerQueuePriorityAlgo(), tqaQuerierWorkerPrioritization)
require.NoError(t, err)

treeScenarios := []struct {
Expand Down
Loading
Loading