Skip to content

Commit

Permalink
feat(blockbuilder): priority queue for job dispatching (#15245)
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d authored Dec 4, 2024
1 parent d59a5e2 commit 0981273
Show file tree
Hide file tree
Showing 7 changed files with 461 additions and 128 deletions.
193 changes: 193 additions & 0 deletions pkg/blockbuilder/scheduler/prioritiy_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package scheduler

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestPriorityQueue(t *testing.T) {
t.Run("operations", func(t *testing.T) {
tests := []struct {
name string
input []int
wantPops []int
}{
{
name: "empty queue",
input: []int{},
wantPops: []int{},
},
{
name: "single element",
input: []int{1},
wantPops: []int{1},
},
{
name: "multiple elements in order",
input: []int{1, 2, 3},
wantPops: []int{1, 2, 3},
},
{
name: "multiple elements out of order",
input: []int{3, 1, 2},
wantPops: []int{1, 2, 3},
},
{
name: "duplicate elements",
input: []int{2, 1, 2, 1},
wantPops: []int{1, 1, 2, 2},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pq := NewPriorityQueue[int](func(a, b int) bool { return a < b })
require.Equal(t, 0, pq.Len())

// Push all elements
for _, v := range tt.input {
pq.Push(v)
}
require.Equal(t, len(tt.input), pq.Len())

// Pop all elements and verify order
got := make([]int, 0, len(tt.input))
for range tt.input {
v, ok := pq.Pop()
require.True(t, ok)
got = append(got, v)
}
require.Equal(t, tt.wantPops, got)

// Verify empty queue behavior
v, ok := pq.Pop()
require.False(t, ok)
require.Zero(t, v)
require.Equal(t, 0, pq.Len())
})
}
})

t.Run("custom type", func(t *testing.T) {
type Job struct {
ID string
Priority int
}

pq := NewPriorityQueue[Job](func(a, b Job) bool {
return a.Priority < b.Priority
})

jobs := []Job{
{ID: "high", Priority: 3},
{ID: "low", Priority: 1},
{ID: "medium", Priority: 2},
}

// Push all jobs
for _, j := range jobs {
pq.Push(j)
}

// Verify they come out in priority order
want := []string{"low", "medium", "high"}
got := make([]string, 0, len(jobs))
for range jobs {
j, ok := pq.Pop()
require.True(t, ok)
got = append(got, j.ID)
}
require.Equal(t, want, got)
})

t.Run("mixed operations", func(t *testing.T) {
pq := NewPriorityQueue[int](func(a, b int) bool { return a < b })

// Push some elements
pq.Push(3)
pq.Push(1)
require.Equal(t, 2, pq.Len())

// Pop lowest
v, ok := pq.Pop()
require.True(t, ok)
require.Equal(t, 1, v)

// Push more elements
pq.Push(2)
pq.Push(4)

// Verify remaining elements come out in order
want := []int{2, 3, 4}
got := make([]int, 0, 3)
for range want {
v, ok := pq.Pop()
require.True(t, ok)
got = append(got, v)
}
require.Equal(t, want, got)
})
}

func TestCircularBuffer(t *testing.T) {
tests := []struct {
name string
capacity int
input []int
wantPops []int
}{
{
name: "empty buffer",
capacity: 5,
input: []int{},
wantPops: []int{},
},
{
name: "partial fill",
capacity: 5,
input: []int{1, 2, 3},
wantPops: []int{1, 2, 3},
},
{
name: "full buffer",
capacity: 3,
input: []int{1, 2, 3},
wantPops: []int{1, 2, 3},
},
{
name: "overflow buffer",
capacity: 3,
input: []int{1, 2, 3, 4, 5},
wantPops: []int{3, 4, 5},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cb := NewCircularBuffer[int](tt.capacity)
require.Equal(t, 0, cb.Len())

// Push all elements
for _, v := range tt.input {
cb.Push(v)
}
require.Equal(t, min(tt.capacity, len(tt.input)), cb.Len())

// Pop all elements and verify order
got := make([]int, 0, cb.Len())
for cb.Len() > 0 {
v, ok := cb.Pop()
require.True(t, ok)
got = append(got, v)
}
require.Equal(t, tt.wantPops, got)

// Verify empty buffer behavior
v, ok := cb.Pop()
require.False(t, ok)
require.Zero(t, v)
require.Equal(t, 0, cb.Len())
})
}
}
126 changes: 126 additions & 0 deletions pkg/blockbuilder/scheduler/priority_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package scheduler

import (
"container/heap"
)

// PriorityQueue is a generic priority queue.
type PriorityQueue[T any] struct {
h *priorityHeap[T]
}

// NewPriorityQueue creates a new priority queue.
func NewPriorityQueue[T any](less func(T, T) bool) *PriorityQueue[T] {
h := &priorityHeap[T]{
less: less,
heap: make([]T, 0),
}
heap.Init(h)
return &PriorityQueue[T]{h: h}
}

// Push adds an element to the queue.
func (pq *PriorityQueue[T]) Push(v T) {
heap.Push(pq.h, v)
}

// Pop removes and returns the element with the highest priority from the queue.
func (pq *PriorityQueue[T]) Pop() (T, bool) {
if pq.Len() == 0 {
var zero T
return zero, false
}
return heap.Pop(pq.h).(T), true
}

// Len returns the number of elements in the queue.
func (pq *PriorityQueue[T]) Len() int {
return pq.h.Len()
}

// priorityHeap is the internal heap implementation that satisfies heap.Interface.
type priorityHeap[T any] struct {
less func(T, T) bool
heap []T
}

func (h *priorityHeap[T]) Len() int {
return len(h.heap)
}

func (h *priorityHeap[T]) Less(i, j int) bool {
return h.less(h.heap[i], h.heap[j])
}

func (h *priorityHeap[T]) Swap(i, j int) {
h.heap[i], h.heap[j] = h.heap[j], h.heap[i]
}

func (h *priorityHeap[T]) Push(x any) {
h.heap = append(h.heap, x.(T))
}

func (h *priorityHeap[T]) Pop() any {
old := h.heap
n := len(old)
x := old[n-1]
h.heap = old[0 : n-1]
return x
}

// CircularBuffer is a generic circular buffer.
type CircularBuffer[T any] struct {
buffer []T
size int
head int
tail int
}

// NewCircularBuffer creates a new circular buffer with the given capacity.
func NewCircularBuffer[T any](capacity int) *CircularBuffer[T] {
return &CircularBuffer[T]{
buffer: make([]T, capacity),
size: 0,
head: 0,
tail: 0,
}
}

// Push adds an element to the circular buffer and returns the evicted element if any
func (b *CircularBuffer[T]) Push(v T) (T, bool) {
var evicted T
hasEvicted := false

if b.size == len(b.buffer) {
// If buffer is full, evict the oldest element (at head)
evicted = b.buffer[b.head]
hasEvicted = true
b.head = (b.head + 1) % len(b.buffer)
} else {
b.size++
}

b.buffer[b.tail] = v
b.tail = (b.tail + 1) % len(b.buffer)

return evicted, hasEvicted
}

// Pop removes and returns the oldest element from the buffer
func (b *CircularBuffer[T]) Pop() (T, bool) {
if b.size == 0 {
var zero T
return zero, false
}

v := b.buffer[b.head]
b.head = (b.head + 1) % len(b.buffer)
b.size--

return v, true
}

// Len returns the number of elements in the buffer
func (b *CircularBuffer[T]) Len() int {
return b.size
}
Loading

0 comments on commit 0981273

Please sign in to comment.