Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(blockbuilder): priority queue for job dispatching #15245

Merged
merged 7 commits into from
Dec 4, 2024

Conversation

owen-d
Copy link
Member

@owen-d owen-d commented Dec 4, 2024

Block Builder Scheduler Refactoring: Priority-Based Job Queue

Overview

This PR refactors the block builder scheduler to introduce a more sophisticated job prioritization system. The changes improve job scheduling efficiency and provide better control over job execution order.

Key Changes

1. Generic Priority Queue Implementation

  • Added JobWithPriority[T comparable] type to support flexible priority mechanisms
  • Implemented a generic priority queue using Go's container/heap
  • Added comprehensive test coverage for priority queue operations

2. Circular Buffer for Completed Jobs

  • Implemented a fixed-size circular buffer to track the last N completed jobs
  • Maintains FIFO order with efficient O(1) operations
  • Automatically evicts oldest jobs when capacity is reached

3. Job Queue Enhancements

  • Converted from map-based storage to priority queue for pending jobs
  • Added O(1) job status tracking via statusMap
  • Improved job duration tracking with timestamps
  • Added support for job synchronization after scheduler restarts

4. Planner Interface Updates

  • Modified Planner interface to return []*JobWithPriority[int]
  • Updated RecordCountPlanner to prioritize based on record count
  • Enhanced TimeRangePlanner to use consistent priority mechanism
  • Added deterministic job ordering by partition

@owen-d owen-d requested a review from a team as a code owner December 4, 2024 05:50
Copy link
Contributor

@ashwanthgoli ashwanthgoli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

q.mu.Lock()
defer q.mu.Unlock()

assignment, exists := q.inProgress[jobID]
// Find job in in-progress map
inProgressJob, exists := q.inProgress[jobID]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe this should not error? if a scheduler restarted and lost it's state we'd miss committing already consumed jobs.

but this can be worried about later when we add the committer logic

if !exists {
return fmt.Errorf("job %s not found in progress", jobID)
// Check if job already exists
if status, exists := q.statusMap[jobID]; exists {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should not error?

@owen-d owen-d merged commit 0981273 into grafana:main Dec 4, 2024
58 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants