Skip to content

Commit 275a9de

Browse files
authored
Merge pull request #7057 from damnever/f/priority
2 parents 6d3700e + 5182d5a commit 275a9de

File tree

4 files changed

+25
-11
lines changed

4 files changed

+25
-11
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@
103103
* [BUGFIX] Runtime-config: Change to check tenant limit validation when loading runtime config only for `all`, `distributor`, `querier`, and `ruler` targets. #6880
104104
* [BUGFIX] Distributor: Fix the `/distributor/all_user_stats` api to work during rolling updates on ingesters. #7026
105105
* [BUGFIX] Runtime-config: Fix panic when the runtime config is `null`. #7062
106+
* [BUGFIX] Scheduler: Avoid all queriers reserved for prioritized requests. #7057
106107

107108
## 1.19.1 2025-09-20
108109

pkg/scheduler/queue/queue_test.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,8 @@ func TestReservedQueriersShouldOnlyGetHighPriorityQueries(t *testing.T) {
306306
ctx := context.Background()
307307

308308
queue.RegisterQuerierConnection("querier-1")
309+
queue.RegisterQuerierConnection("querier-2")
310+
maxQueriers := float64(2)
309311

310312
normalRequest := MockRequest{
311313
id: "normal query",
@@ -315,14 +317,20 @@ func TestReservedQueriersShouldOnlyGetHighPriorityQueries(t *testing.T) {
315317
priority: 1,
316318
}
317319

318-
assert.NoError(t, queue.EnqueueRequest("userID", normalRequest, 1, func() {}))
319-
assert.NoError(t, queue.EnqueueRequest("userID", priority1Request, 1, func() {}))
320-
assert.NoError(t, queue.EnqueueRequest("userID", priority1Request, 1, func() {}))
320+
assert.NoError(t, queue.EnqueueRequest("userID", normalRequest, maxQueriers, func() {}))
321+
assert.NoError(t, queue.EnqueueRequest("userID", priority1Request, maxQueriers, func() {}))
322+
assert.NoError(t, queue.EnqueueRequest("userID", priority1Request, maxQueriers, func() {}))
323+
reservedQueriers := queue.queues.userQueues["userID"].reservedQueriers
324+
require.Equal(t, 1, len(reservedQueriers))
325+
reservedQuerier := ""
326+
for qid := range reservedQueriers {
327+
reservedQuerier = qid
328+
}
321329

322-
nextRequest, _, _ := queue.GetNextRequestForQuerier(ctx, FirstUser(), "querier-1")
330+
nextRequest, _, _ := queue.GetNextRequestForQuerier(ctx, FirstUser(), reservedQuerier)
323331
assert.Equal(t, priority1Request, nextRequest)
324332

325-
nextRequest, _, _ = queue.GetNextRequestForQuerier(ctx, FirstUser(), "querier-1")
333+
nextRequest, _, _ = queue.GetNextRequestForQuerier(ctx, FirstUser(), reservedQuerier)
326334
assert.Equal(t, priority1Request, nextRequest)
327335

328336
ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Second)
@@ -331,19 +339,19 @@ func TestReservedQueriersShouldOnlyGetHighPriorityQueries(t *testing.T) {
331339
time.AfterFunc(2*time.Second, func() {
332340
queue.cond.Broadcast()
333341
})
334-
nextRequest, _, _ = queue.GetNextRequestForQuerier(ctxTimeout, FirstUser(), "querier-1")
342+
nextRequest, _, _ = queue.GetNextRequestForQuerier(ctxTimeout, FirstUser(), reservedQuerier)
335343
assert.Nil(t, nextRequest)
336344
assert.Equal(t, 1, queue.queues.userQueues["userID"].queue.length())
337345

338-
assert.NoError(t, queue.EnqueueRequest("userID", normalRequest, 1, func() {}))
346+
assert.NoError(t, queue.EnqueueRequest("userID", normalRequest, maxQueriers, func() {}))
339347

340348
ctxTimeout, cancel = context.WithTimeout(ctx, 1*time.Second)
341349
defer cancel()
342350

343351
time.AfterFunc(2*time.Second, func() {
344352
queue.cond.Broadcast()
345353
})
346-
nextRequest, _, _ = queue.GetNextRequestForQuerier(ctxTimeout, FirstUser(), "querier-1")
354+
nextRequest, _, _ = queue.GetNextRequestForQuerier(ctxTimeout, FirstUser(), reservedQuerier)
347355
assert.Nil(t, nextRequest)
348356
assert.Equal(t, 2, queue.queues.userQueues["userID"].queue.length())
349357
}

pkg/scheduler/queue/user_queues.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ func getPriorityList(queryPriority validation.QueryPriority, totalQuerierCount i
399399
}
400400
}
401401

402-
if len(priorityList) > totalQuerierCount {
402+
if len(priorityList) >= totalQuerierCount {
403403
return []int64{}
404404
}
405405

pkg/scheduler/queue/user_queues_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,7 @@ func TestGetOrAddQueueShouldUpdateProperties(t *testing.T) {
427427
limits.QueryPriorityVal = validation.QueryPriority{Enabled: true, Priorities: []validation.PriorityDef{
428428
{
429429
Priority: 1,
430-
ReservedQueriers: 10,
430+
ReservedQueriers: float64(q.userQueues["userID"].maxQueriers),
431431
},
432432
}}
433433
q.limits = limits
@@ -674,9 +674,14 @@ func TestGetPriorityList(t *testing.T) {
674674
assert.EqualValues(t, []int64{1, 1, 2, 2, 2}, getPriorityList(queryPriority, 10))
675675
assert.EqualValues(t, []int64{}, getPriorityList(queryPriority, 1))
676676

677+
queryPriority.Priorities[0].ReservedQueriers = 0.3
678+
queryPriority.Priorities[1].ReservedQueriers = 0.6
679+
assert.EqualValues(t, []int64{1, 1, 1, 2, 2, 2, 2, 2, 2}, getPriorityList(queryPriority, 10))
680+
681+
// Cannot reserve all queriers for prioritized requests, remote read and metadata queries also need capacity.
677682
queryPriority.Priorities[0].ReservedQueriers = 0.4
678683
queryPriority.Priorities[1].ReservedQueriers = 0.6
679-
assert.EqualValues(t, []int64{1, 1, 1, 1, 2, 2, 2, 2, 2, 2}, getPriorityList(queryPriority, 10))
684+
assert.EqualValues(t, []int64{}, getPriorityList(queryPriority, 10))
680685

681686
queryPriority.Enabled = false
682687
assert.Nil(t, getPriorityList(queryPriority, 10))

0 commit comments

Comments
 (0)