Skip to content

Commit

Permalink
Remove use of leaf-node round-robin algorithm
Browse files Browse the repository at this point in the history
  • Loading branch information
chencs committed Sep 19, 2024
1 parent e3923a9 commit 6bfb6b6
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 98 deletions.
34 changes: 21 additions & 13 deletions pkg/scheduler/queue/multi_queuing_algorithm_tree_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,15 @@ type MultiQueuingAlgorithmTreeQueue struct {
}

func NewTree(queuingAlgorithms ...QueuingAlgorithm) (*MultiQueuingAlgorithmTreeQueue, error) {
if len(queuingAlgorithms) == 0 {
return nil, fmt.Errorf("cannot create a tree without defined QueuingAlgorithm")
// a nil QueuingAlgorithm would force a leaf node, but we handle adding the leaf node layer to the tree internally.
for _, qa := range queuingAlgorithms {
if qa == nil {
return nil, fmt.Errorf("cannot pass a nil QueuingAlgorithm")
}
}

queuingAlgorithms = append(queuingAlgorithms, nil)

root, err := newNode("root", len(queuingAlgorithms)-1, queuingAlgorithms[0])
if err != nil {
return nil, err
Expand Down Expand Up @@ -82,7 +88,9 @@ func (t *MultiQueuingAlgorithmTreeQueue) IsEmpty() bool {
func (t *MultiQueuingAlgorithmTreeQueue) Dequeue(dequeueArgs *DequeueArgs) (QueuePath, any) {
if dequeueArgs != nil {
for _, qa := range t.algosByDepth {
qa.setup(dequeueArgs)
if qa != nil {
qa.setup(dequeueArgs)
}
}
}
path, v := t.rootNode.dequeue()
Expand Down Expand Up @@ -146,9 +154,6 @@ func newNode(name string, height int, da QueuingAlgorithm) (*Node, error) {
return nil, fmt.Errorf("cannot create a node at negative height")
}

if da == nil {
return nil, fmt.Errorf("cannot create a node without a defined QueuingAlgorithm")
}
n := &Node{
name: name,
localQueue: nil,
Expand All @@ -157,7 +162,7 @@ func newNode(name string, height int, da QueuingAlgorithm) (*Node, error) {
queuingAlgorithm: da,
}
// if the node is a leaf node, it gets memory allocated towards a local queue and cannot have child queues
if height == 0 {
if da == nil {
n.localQueue = list.New()
} else {
// any other kind of node is allowed to have children, but not a local queue
Expand Down Expand Up @@ -187,7 +192,7 @@ func (n *Node) IsEmpty() bool {
}

func (n *Node) isLeaf() bool {
return n.height == 0
return n.queuingAlgorithm == nil
}

// ItemCount counts the queue items in the Node and in all its children, recursively.
Expand Down Expand Up @@ -243,12 +248,13 @@ func (n *Node) dequeue() (QueuePath, any) {
return path, nil
}
if n.isLeaf() {
checkedAllNodes = n.childrenChecked == 1
// there are no other nodes to check except this one's local queue, which we will do in this step
checkedAllNodes = true
dequeueNode = n
} else {
checkedAllNodes = n.childrenChecked == len(n.queueMap)
dequeueNode = n.queuingAlgorithm.dequeueSelectNode(n)
}

dequeueNode = n.queuingAlgorithm.dequeueSelectNode(n)
switch dequeueNode {
case n:
if n.isLeaf() {
Expand All @@ -273,8 +279,10 @@ func (n *Node) dequeue() (QueuePath, any) {
if v == nil {
n.childrenChecked++
}

n.queuingAlgorithm.dequeueUpdateState(n, dequeueNode)
// if n is a leaf node, there's no queuing algorithm to update.
if !n.isLeaf() {
n.queuingAlgorithm.dequeueUpdateState(n, dequeueNode)
}
}
// reset childrenChecked to 0 before completing this dequeue
n.childrenChecked = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,13 +377,13 @@ func TestMultiDimensionalQueueAlgorithmSlowConsumerEffects(t *testing.T) {

tqa := newTenantQuerierAssignments()

nonFlippedRoundRobinTree, err := NewTree(tqa, &roundRobinState{}, &roundRobinState{})
nonFlippedRoundRobinTree, err := NewTree(tqa, &roundRobinState{})
require.NoError(t, err)

flippedRoundRobinTree, err := NewTree(&roundRobinState{}, tqa, &roundRobinState{})
flippedRoundRobinTree, err := NewTree(&roundRobinState{}, tqa)
require.NoError(t, err)

querierWorkerPrioritizationTree, err := NewTree(NewQuerierWorkerQueuePriorityAlgo(), tqa, &roundRobinState{})
querierWorkerPrioritizationTree, err := NewTree(NewQuerierWorkerQueuePriorityAlgo(), tqa)
require.NoError(t, err)

trees := []struct {
Expand Down
Loading

0 comments on commit 6bfb6b6

Please sign in to comment.