diff --git a/CHANGELOG.md b/CHANGELOG.md index a5da0c1cf29..9b5f22ae7c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -101,6 +101,7 @@ * [BUGFIX] Runtime-config: Change to check tenant limit validation when loading runtime config only for `all`, `distributor`, `querier`, and `ruler` targets. #6880 * [BUGFIX] Distributor: Fix the `/distributor/all_user_stats` api to work during rolling updates on ingesters. #7026 * [BUGFIX] Runtime-config: Fix panic when the runtime config is `null`. #7062 +* [BUGFIX] Scheduler: Avoid all queriers reserved for prioritized requests. #7057 ## 1.19.1 2025-09-20 diff --git a/pkg/scheduler/queue/queue_test.go b/pkg/scheduler/queue/queue_test.go index 36aa97c98a7..47bd5295271 100644 --- a/pkg/scheduler/queue/queue_test.go +++ b/pkg/scheduler/queue/queue_test.go @@ -306,6 +306,8 @@ func TestReservedQueriersShouldOnlyGetHighPriorityQueries(t *testing.T) { ctx := context.Background() queue.RegisterQuerierConnection("querier-1") + queue.RegisterQuerierConnection("querier-2") + maxQueriers := float64(2) normalRequest := MockRequest{ id: "normal query", @@ -315,14 +317,20 @@ func TestReservedQueriersShouldOnlyGetHighPriorityQueries(t *testing.T) { priority: 1, } - assert.NoError(t, queue.EnqueueRequest("userID", normalRequest, 1, func() {})) - assert.NoError(t, queue.EnqueueRequest("userID", priority1Request, 1, func() {})) - assert.NoError(t, queue.EnqueueRequest("userID", priority1Request, 1, func() {})) + assert.NoError(t, queue.EnqueueRequest("userID", normalRequest, maxQueriers, func() {})) + assert.NoError(t, queue.EnqueueRequest("userID", priority1Request, maxQueriers, func() {})) + assert.NoError(t, queue.EnqueueRequest("userID", priority1Request, maxQueriers, func() {})) + reservedQueriers := queue.queues.userQueues["userID"].reservedQueriers + require.Equal(t, 1, len(reservedQueriers)) + reservedQuerier := "" + for qid := range reservedQueriers { + reservedQuerier = qid + } - nextRequest, _, _ := queue.GetNextRequestForQuerier(ctx, FirstUser(), "querier-1") + nextRequest, _, _ := queue.GetNextRequestForQuerier(ctx, FirstUser(), reservedQuerier) assert.Equal(t, priority1Request, nextRequest) - nextRequest, _, _ = queue.GetNextRequestForQuerier(ctx, FirstUser(), "querier-1") + nextRequest, _, _ = queue.GetNextRequestForQuerier(ctx, FirstUser(), reservedQuerier) assert.Equal(t, priority1Request, nextRequest) ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Second) @@ -331,11 +339,11 @@ func TestReservedQueriersShouldOnlyGetHighPriorityQueries(t *testing.T) { time.AfterFunc(2*time.Second, func() { queue.cond.Broadcast() }) - nextRequest, _, _ = queue.GetNextRequestForQuerier(ctxTimeout, FirstUser(), "querier-1") + nextRequest, _, _ = queue.GetNextRequestForQuerier(ctxTimeout, FirstUser(), reservedQuerier) assert.Nil(t, nextRequest) assert.Equal(t, 1, queue.queues.userQueues["userID"].queue.length()) - assert.NoError(t, queue.EnqueueRequest("userID", normalRequest, 1, func() {})) + assert.NoError(t, queue.EnqueueRequest("userID", normalRequest, maxQueriers, func() {})) ctxTimeout, cancel = context.WithTimeout(ctx, 1*time.Second) defer cancel() @@ -343,7 +351,7 @@ func TestReservedQueriersShouldOnlyGetHighPriorityQueries(t *testing.T) { time.AfterFunc(2*time.Second, func() { queue.cond.Broadcast() }) - nextRequest, _, _ = queue.GetNextRequestForQuerier(ctxTimeout, FirstUser(), "querier-1") + nextRequest, _, _ = queue.GetNextRequestForQuerier(ctxTimeout, FirstUser(), reservedQuerier) assert.Nil(t, nextRequest) assert.Equal(t, 2, queue.queues.userQueues["userID"].queue.length()) } diff --git a/pkg/scheduler/queue/user_queues.go b/pkg/scheduler/queue/user_queues.go index 6a9c24b2c20..c7e30d87375 100644 --- a/pkg/scheduler/queue/user_queues.go +++ b/pkg/scheduler/queue/user_queues.go @@ -399,7 +399,7 @@ func getPriorityList(queryPriority validation.QueryPriority, totalQuerierCount i } } - if len(priorityList) > totalQuerierCount { + if len(priorityList) >= totalQuerierCount { return []int64{} } diff --git a/pkg/scheduler/queue/user_queues_test.go b/pkg/scheduler/queue/user_queues_test.go index f70287d51a4..382a1c2eafb 100644 --- a/pkg/scheduler/queue/user_queues_test.go +++ b/pkg/scheduler/queue/user_queues_test.go @@ -427,7 +427,7 @@ func TestGetOrAddQueueShouldUpdateProperties(t *testing.T) { limits.QueryPriorityVal = validation.QueryPriority{Enabled: true, Priorities: []validation.PriorityDef{ { Priority: 1, - ReservedQueriers: 10, + ReservedQueriers: float64(q.userQueues["userID"].maxQueriers), }, }} q.limits = limits @@ -674,9 +674,14 @@ func TestGetPriorityList(t *testing.T) { assert.EqualValues(t, []int64{1, 1, 2, 2, 2}, getPriorityList(queryPriority, 10)) assert.EqualValues(t, []int64{}, getPriorityList(queryPriority, 1)) + queryPriority.Priorities[0].ReservedQueriers = 0.3 + queryPriority.Priorities[1].ReservedQueriers = 0.6 + assert.EqualValues(t, []int64{1, 1, 1, 2, 2, 2, 2, 2, 2}, getPriorityList(queryPriority, 10)) + + // Cannot reserve all queriers for prioritized requests, remote read and metadata queries also need capacity. queryPriority.Priorities[0].ReservedQueriers = 0.4 queryPriority.Priorities[1].ReservedQueriers = 0.6 - assert.EqualValues(t, []int64{1, 1, 1, 1, 2, 2, 2, 2, 2, 2}, getPriorityList(queryPriority, 10)) + assert.EqualValues(t, []int64{}, getPriorityList(queryPriority, 10)) queryPriority.Enabled = false assert.Nil(t, getPriorityList(queryPriority, 10))