Skip to content

Commit

Permalink
Create flag to prioritize dequeuing from query components over tenant…
Browse files Browse the repository at this point in the history
… fairness (#9016)

* Create flag to prioritize dequeuing from query components before tenant fairness

* Update docs and CHANGELOG

* Address Franco's feedback
  • Loading branch information
chencs authored Aug 17, 2024
1 parent 6c1607e commit 3872ccb
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 36 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
* [ENHANCEMENT] Add HA deduplication features to the `mimir-microservices-mode` development environment. #9012
* [ENHANCEMENT] Make `-query-frontend.additional-query-queue-dimensions-enabled` and `-query-scheduler.additional-query-queue-dimensions-enabled` non-operational flags in preparation for removal. #8984
* [ENHANCEMENT] Add a new ingester endpoint to prepare instances to downscale. #8956
* [ENHANCEMENT] Query-scheduler: Add `query-scheduler.prioritize-query-components` which, when enabled, will primarily prioritize dequeuing fairly across queue components, and secondarily prioritize dequeuing fairly across tenants. When disabled, tenant fairness is primarily prioritized. `query-scheduler.use-multi-algorithm-query-queue` must be enabled in order to use this flag. #9016
* [BUGFIX] Ruler: add support for draining any outstanding alert notifications before shutting down. This can be enabled with the `-ruler.drain-notification-queue-on-shutdown=true` CLI flag. #8346
* [BUGFIX] Query-frontend: fix `-querier.max-query-lookback` enforcement when `-compactor.blocks-retention-period` is not set, and viceversa. #8388
* [BUGFIX] Ingester: fix sporadic `not found` error causing an internal server error if label names are queried with matchers during head compaction. #8391
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -16044,6 +16044,17 @@
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "prioritize_query_components",
"required": false,
"desc": "When enabled, the query scheduler primarily prioritizes dequeuing fairly from queue components and secondarily prioritizes dequeuing fairly across tenants. When disabled, the query scheduler primarily prioritizes tenant fairness. You must enable the `query-scheduler.use-multi-algorithm-query-queue` setting to use this flag.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "query-scheduler.prioritize-query-components",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "querier_forget_delay",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2257,6 +2257,8 @@ Usage of ./cmd/mimir/mimir:
Maximum number of outstanding requests per tenant per query-scheduler. In-flight requests above this limit will fail with HTTP response status code 429. (default 100)
-query-scheduler.max-used-instances int
The maximum number of query-scheduler instances to use, regardless how many replicas are running. This option can be set only when -query-scheduler.service-discovery-mode is set to 'ring'. 0 to use all available query-scheduler instances.
-query-scheduler.prioritize-query-components
[experimental] When enabled, the query scheduler primarily prioritizes dequeuing fairly from queue components and secondarily prioritizes dequeuing fairly across tenants. When disabled, the query scheduler primarily prioritizes tenant fairness. You must enable the `query-scheduler.use-multi-algorithm-query-queue` setting to use this flag.
-query-scheduler.querier-forget-delay duration
[experimental] If a querier disconnects without sending notification about graceful shutdown, the query-scheduler will keep the querier in the tenant's shard until the forget delay has passed. This feature is useful to reduce the blast radius when shuffle-sharding is enabled.
-query-scheduler.ring.consul.acl-token string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1727,6 +1727,14 @@ The `query_scheduler` block configures the query-scheduler.
# CLI flag: -query-scheduler.use-multi-algorithm-query-queue
[use_multi_algorithm_query_queue: <boolean> | default = false]
# (experimental) When enabled, the query scheduler primarily prioritizes
# dequeuing fairly from queue components and secondarily prioritizes dequeuing
# fairly across tenants. When disabled, the query scheduler primarily
# prioritizes tenant fairness. You must enable the
# `query-scheduler.use-multi-algorithm-query-queue` setting to use this flag.
# CLI flag: -query-scheduler.prioritize-query-components
[prioritize_query_components: <boolean> | default = false]

# (experimental) If a querier disconnects without sending notification about
# graceful shutdown, the query-scheduler will keep the querier in the tenant's
# shard until the forget delay has passed. This feature is useful to reduce the
Expand Down
1 change: 1 addition & 0 deletions pkg/frontend/v1/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Regist
log,
cfg.MaxOutstandingPerTenant,
false,
false,
cfg.QuerierForgetDelay,
f.queueLength,
f.discardedRequests,
Expand Down
3 changes: 2 additions & 1 deletion pkg/scheduler/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ func NewRequestQueue(
log log.Logger,
maxOutstandingPerTenant int,
useMultiAlgoQueue bool,
prioritizeQueryComponents bool,
forgetDelay time.Duration,
queueLength *prometheus.GaugeVec,
discardedRequests *prometheus.CounterVec,
Expand Down Expand Up @@ -273,7 +274,7 @@ func NewRequestQueue(
waitingQuerierConnsToDispatch: list.New(),

QueryComponentUtilization: queryComponentCapacity,
queueBroker: newQueueBroker(maxOutstandingPerTenant, useMultiAlgoQueue, forgetDelay),
queueBroker: newQueueBroker(maxOutstandingPerTenant, useMultiAlgoQueue, prioritizeQueryComponents, forgetDelay),
}

q.Service = services.NewBasicService(q.starting, q.running, q.stop).WithName("request queue")
Expand Down
52 changes: 37 additions & 15 deletions pkg/scheduler/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,20 @@ import (
util_test "github.com/grafana/mimir/pkg/util/test"
)

// // TODO (casie): Write tests for prioritizeQueryComponents is true
// buildTreeTestsStruct returns all _allowed_ combinations of config flags for testing.
func buildTreeTestsStruct() []struct {
name string
useMultiAlgoTreeQueue bool
name string
useMultiAlgoTreeQueue bool
prioritizeQueryComponents bool
} {
return []struct {
name string
useMultiAlgoTreeQueue bool
name string
useMultiAlgoTreeQueue bool
prioritizeQueryComponents bool
}{
{"legacy tree queue", false},
{"integrated tree queue", true},
{"legacy tree queue with prioritize query components disabled", false, false},
{"integrated tree queue with prioritize query components disabled", true, false},
{"integrated tree queue with prioritize query components enabled", true, true},
}
}

Expand Down Expand Up @@ -107,9 +110,8 @@ func makeSchedulerRequest(tenantID string, additionalQueueDimensions []string) *
// round-robins between the multiple queues, which has the effect of alternately dequeuing from the slow queries
// and normal queries rather than blocking normal queries behind slow queries.
func TestMultiDimensionalQueueFairnessSlowConsumerEffects(t *testing.T) {
treeTypes := buildTreeTestsStruct()

for _, tt := range treeTypes {
for _, tt := range buildTreeTestsStruct() {
// Only test allowed combinations of these configs
t.Run(tt.name, func(t *testing.T) {
promRegistry := prometheus.NewPedanticRegistry()

Expand Down Expand Up @@ -153,6 +155,7 @@ func TestMultiDimensionalQueueFairnessSlowConsumerEffects(t *testing.T) {
log.NewNopLogger(),
maxOutstandingRequestsPerTenant,
tt.useMultiAlgoTreeQueue,
tt.prioritizeQueryComponents,
forgetQuerierDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
Expand Down Expand Up @@ -264,6 +267,7 @@ func BenchmarkConcurrentQueueOperations(b *testing.B) {
log.NewNopLogger(),
maxOutstandingRequestsPerTenant,
t.useMultiAlgoTreeQueue,
t.prioritizeQueryComponents,
forgetQuerierDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
Expand Down Expand Up @@ -450,6 +454,7 @@ func TestRequestQueue_RegisterAndUnregisterQuerierWorkerConnections(t *testing.T
log.NewNopLogger(),
1,
tt.useMultiAlgoTreeQueue,
tt.prioritizeQueryComponents,
forgetDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
Expand Down Expand Up @@ -543,6 +548,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe
log.NewNopLogger(),
1,
tt.useMultiAlgoTreeQueue,
tt.prioritizeQueryComponents,
forgetDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
Expand Down Expand Up @@ -622,6 +628,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ReshardNotifiedCorrectlyForMultip
log.NewNopLogger(),
1,
tt.useMultiAlgoTreeQueue,
tt.prioritizeQueryComponents,
forgetDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
Expand Down Expand Up @@ -717,6 +724,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldReturnAfterContextCancelled
log.NewNopLogger(),
1,
tt.useMultiAlgoTreeQueue,
tt.prioritizeQueryComponents,
forgetDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
Expand Down Expand Up @@ -777,6 +785,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldReturnImmediatelyIfQuerierI
log.NewNopLogger(),
1,
tt.useMultiAlgoTreeQueue,
tt.prioritizeQueryComponents,
forgetDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
Expand Down Expand Up @@ -814,6 +823,7 @@ func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSend
log.NewNopLogger(),
1,
tt.useMultiAlgoTreeQueue,
tt.prioritizeQueryComponents,
forgetDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
Expand All @@ -824,29 +834,41 @@ func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSend

// bypassing queue dispatcher loop for direct usage of the queueBroker and
// passing a waitingQuerierConn for a canceled querier connection
queueBroker := newQueueBroker(queue.maxOutstandingPerTenant, false, queue.forgetDelay)
queueBroker := newQueueBroker(queue.maxOutstandingPerTenant, tt.useMultiAlgoTreeQueue, tt.prioritizeQueryComponents, queue.forgetDelay)
queueBroker.addQuerierWorkerConn(NewUnregisteredQuerierWorkerConn(context.Background(), querierID))

tenantMaxQueriers := 0 // no sharding
queueDim := randAdditionalQueueDimension(true)
req := &SchedulerRequest{
Ctx: context.Background(),
Request: &httpgrpc.HTTPRequest{Method: "GET", Url: "/hello"},
AdditionalQueueDimensions: randAdditionalQueueDimension(true),
AdditionalQueueDimensions: queueDim,
}
tr := tenantRequest{
tenantID: TenantID("tenant-1"),
req: req,
}

var multiAlgorithmTreeQueuePath QueuePath
if queueDim == nil {
queueDim = []string{unknownQueueDimension}
}
if queueBroker.prioritizeQueryComponents {
multiAlgorithmTreeQueuePath = append(append(multiAlgorithmTreeQueuePath, queueDim...), "tenant-1")
} else {
multiAlgorithmTreeQueuePath = append([]string{"tenant-1"}, queueDim...)
}

// TODO (casie): Clean this up when deprecating legacy tree queue
if tq, ok := queueBroker.tree.(*TreeQueue); ok {
require.Nil(t, tq.getNode(QueuePath{"tenant-1"}))
require.NoError(t, queueBroker.enqueueRequestBack(&tr, tenantMaxQueriers))
require.False(t, tq.getNode(QueuePath{"tenant-1"}).IsEmpty())
} else if itq, ok := queueBroker.tree.(*MultiQueuingAlgorithmTreeQueue); ok {
require.Nil(t, itq.GetNode(QueuePath{"tenant-1"}))
require.Nil(t, itq.GetNode(multiAlgorithmTreeQueuePath))
require.NoError(t, queueBroker.enqueueRequestBack(&tr, tenantMaxQueriers))
require.False(t, itq.GetNode(QueuePath{"tenant-1"}).IsEmpty())
require.False(t, itq.GetNode(multiAlgorithmTreeQueuePath).IsEmpty())

}

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -866,7 +888,7 @@ func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSend
if tq, ok := queueBroker.tree.(*TreeQueue); ok {
require.False(t, tq.getNode(QueuePath{"tenant-1"}).IsEmpty())
} else if itq, ok := queueBroker.tree.(*MultiQueuingAlgorithmTreeQueue); ok {
require.False(t, itq.GetNode(QueuePath{"tenant-1"}).IsEmpty())
require.False(t, itq.GetNode(multiAlgorithmTreeQueuePath).IsEmpty())
}

})
Expand Down
29 changes: 22 additions & 7 deletions pkg/scheduler/queue/tenant_queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,34 @@ type queueBroker struct {
func newQueueBroker(
maxTenantQueueSize int,
useMultiAlgoTreeQueue bool,
prioritizeQueryComponents bool,
forgetDelay time.Duration,
) *queueBroker {
tqas := newTenantQuerierAssignments(forgetDelay)
var tree Tree
var err error
if useMultiAlgoTreeQueue {
algos := []QueuingAlgorithm{
tqas, // root; QueuingAlgorithm selects tenants
&roundRobinState{}, // tenant queues; QueuingAlgorithm selects query component
&roundRobinState{}, // query components; QueuingAlgorithm selects query from local queue
var algos []QueuingAlgorithm
if prioritizeQueryComponents {
algos = []QueuingAlgorithm{
&roundRobinState{}, // root; QueuingAlgorithm selects query component
tqas, // query components; QueuingAlgorithm selects tenants
&roundRobinState{}, // tenant queues; QueuingAlgorithm selects from local queue

}
} else {
algos = []QueuingAlgorithm{
tqas, // root; QueuingAlgorithm selects tenants
&roundRobinState{}, // tenant queues; QueuingAlgorithm selects query component
&roundRobinState{}, // query components; QueuingAlgorithm selects query from local queue
}
}
tree, err = NewTree(algos...)
} else {
// by default, use the legacy tree queue
if prioritizeQueryComponents {
panic("cannot prioritize query components for legacy tree queue")
}
tree = NewTreeQueue("root")
}

Expand All @@ -58,9 +72,10 @@ func newQueueBroker(
panic(fmt.Sprintf("error creating the tree queue: %v", err))
}
qb := &queueBroker{
tree: tree,
tenantQuerierAssignments: tqas,
maxTenantQueueSize: maxTenantQueueSize,
tree: tree,
tenantQuerierAssignments: tqas,
maxTenantQueueSize: maxTenantQueueSize,
prioritizeQueryComponents: prioritizeQueryComponents,
}

return qb
Expand Down
Loading

0 comments on commit 3872ccb

Please sign in to comment.