@@ -19,6 +19,7 @@ package rootcoord
19
19
import (
20
20
"context"
21
21
"fmt"
22
+ "strconv"
22
23
"sync"
23
24
"time"
24
25
@@ -27,14 +28,14 @@ import (
27
28
28
29
"github.com/milvus-io/milvus-proto/go-api/commonpb"
29
30
"github.com/milvus-io/milvus/internal/log"
31
+ "github.com/milvus-io/milvus/internal/metrics"
30
32
"github.com/milvus-io/milvus/internal/proto/internalpb"
31
33
"github.com/milvus-io/milvus/internal/proto/proxypb"
32
34
"github.com/milvus-io/milvus/internal/tso"
33
35
"github.com/milvus-io/milvus/internal/types"
34
36
"github.com/milvus-io/milvus/internal/util/metricsinfo"
35
37
"github.com/milvus-io/milvus/internal/util/ratelimitutil"
36
38
"github.com/milvus-io/milvus/internal/util/tsoutil"
37
- "github.com/milvus-io/milvus/internal/util/typeutil"
38
39
)
39
40
40
41
const (
@@ -86,9 +87,9 @@ type QuotaCenter struct {
86
87
dataCoord types.DataCoord
87
88
88
89
// metrics
89
- queryNodeMetrics [ ]* metricsinfo.QueryNodeQuotaMetrics
90
- dataNodeMetrics [ ]* metricsinfo.DataNodeQuotaMetrics
91
- proxyMetrics [ ]* metricsinfo.ProxyQuotaMetrics
90
+ queryNodeMetrics map [ UniqueID ]* metricsinfo.QueryNodeQuotaMetrics
91
+ dataNodeMetrics map [ UniqueID ]* metricsinfo.DataNodeQuotaMetrics
92
+ proxyMetrics map [ UniqueID ]* metricsinfo.ProxyQuotaMetrics
92
93
dataCoordMetrics * metricsinfo.DataCoordQuotaMetrics
93
94
94
95
currentRates map [internalpb.RateType ]Limit
@@ -152,9 +153,9 @@ func (q *QuotaCenter) stop() {
152
153
153
154
// clearMetrics removes all metrics stored in QuotaCenter.
154
155
func (q * QuotaCenter ) clearMetrics () {
155
- q .dataNodeMetrics = make ([ ]* metricsinfo.DataNodeQuotaMetrics , 0 )
156
- q .queryNodeMetrics = make ([ ]* metricsinfo.QueryNodeQuotaMetrics , 0 )
157
- q .proxyMetrics = make ([ ]* metricsinfo.ProxyQuotaMetrics , 0 )
156
+ q .dataNodeMetrics = make (map [ UniqueID ]* metricsinfo.DataNodeQuotaMetrics , 0 )
157
+ q .queryNodeMetrics = make (map [ UniqueID ]* metricsinfo.QueryNodeQuotaMetrics , 0 )
158
+ q .proxyMetrics = make (map [ UniqueID ]* metricsinfo.ProxyQuotaMetrics , 0 )
158
159
}
159
160
160
161
// syncMetrics sends GetMetrics requests to DataCoord and QueryCoord to sync the metrics in DataNodes and QueryNodes.
@@ -185,7 +186,7 @@ func (q *QuotaCenter) syncMetrics() error {
185
186
}
186
187
for _ , queryNodeMetric := range queryCoordTopology .Cluster .ConnectedNodes {
187
188
if queryNodeMetric .QuotaMetrics != nil {
188
- q .queryNodeMetrics = append ( q . queryNodeMetrics , queryNodeMetric .QuotaMetrics )
189
+ q .queryNodeMetrics [ queryNodeMetric . ID ] = queryNodeMetric .QuotaMetrics
189
190
}
190
191
}
191
192
return nil
@@ -206,7 +207,7 @@ func (q *QuotaCenter) syncMetrics() error {
206
207
}
207
208
for _ , dataNodeMetric := range dataCoordTopology .Cluster .ConnectedNodes {
208
209
if dataNodeMetric .QuotaMetrics != nil {
209
- q .dataNodeMetrics = append ( q . dataNodeMetrics , dataNodeMetric .QuotaMetrics )
210
+ q .dataNodeMetrics [ dataNodeMetric . ID ] = dataNodeMetric .QuotaMetrics
210
211
}
211
212
}
212
213
if dataCoordTopology .Cluster .Self .QuotaMetrics != nil {
@@ -228,7 +229,7 @@ func (q *QuotaCenter) syncMetrics() error {
228
229
return err
229
230
}
230
231
if proxyMetric .QuotaMetrics != nil {
231
- q .proxyMetrics = append ( q . proxyMetrics , proxyMetric .QuotaMetrics )
232
+ q .proxyMetrics [ proxyMetric . ID ] = proxyMetric .QuotaMetrics
232
233
}
233
234
}
234
235
return nil
@@ -339,10 +340,11 @@ func (q *QuotaCenter) calculateWriteRates() error {
339
340
}
340
341
log .Debug ("QuotaCenter check diskQuota done" , zap .Bool ("exceeded" , exceeded ))
341
342
342
- ttFactor , err := q .timeTickDelay ( )
343
+ ts , err := q .tsoAllocator . GenerateTSO ( 1 )
343
344
if err != nil {
344
345
return err
345
346
}
347
+ ttFactor := q .timeTickDelay (ts )
346
348
if ttFactor <= 0 {
347
349
q .forceDenyWriting (TimeTickLongDelay ) // tt protection
348
350
return nil
@@ -409,43 +411,46 @@ func (q *QuotaCenter) resetCurrentRates() {
409
411
410
412
// timeTickDelay gets time tick delay of DataNodes and QueryNodes,
411
413
// and return the factor according to max tolerable time tick delay.
412
- func (q * QuotaCenter ) timeTickDelay () (float64 , error ) {
414
+ func (q * QuotaCenter ) timeTickDelay (ts Timestamp ) float64 {
415
+ t1 , _ := tsoutil .ParseTS (ts )
416
+
417
+ var maxDelay time.Duration
418
+ for nodeID , metric := range q .queryNodeMetrics {
419
+ if metric .Fgm .NumFlowGraph > 0 {
420
+ t2 , _ := tsoutil .ParseTS (metric .Fgm .MinFlowGraphTt )
421
+ delay := t1 .Sub (t2 )
422
+ if delay .Nanoseconds () > maxDelay .Nanoseconds () {
423
+ maxDelay = delay
424
+ }
425
+ metrics .RootCoordTtDelay .WithLabelValues (strconv .FormatInt (nodeID , 10 )).Set (float64 (maxDelay .Milliseconds ()))
426
+ }
427
+ }
428
+ for nodeID , metric := range q .dataNodeMetrics {
429
+ if metric .Fgm .NumFlowGraph > 0 {
430
+ t2 , _ := tsoutil .ParseTS (metric .Fgm .MinFlowGraphTt )
431
+ delay := t1 .Sub (t2 )
432
+ if delay .Nanoseconds () > maxDelay .Nanoseconds () {
433
+ maxDelay = delay
434
+ }
435
+ metrics .RootCoordTtDelay .WithLabelValues (strconv .FormatInt (nodeID , 10 )).Set (float64 (maxDelay .Milliseconds ()))
436
+ }
437
+ }
438
+
413
439
if ! Params .QuotaConfig .TtProtectionEnabled {
414
- return 1 , nil
440
+ return 1
415
441
}
416
442
417
443
maxTt := Params .QuotaConfig .MaxTimeTickDelay
418
444
if maxTt < 0 {
419
445
// < 0 means disable tt protection
420
- return 1 , nil
446
+ return 1
421
447
}
422
448
423
- minTs := typeutil .MaxTimestamp
424
- for _ , metric := range q .queryNodeMetrics {
425
- if metric .Fgm .NumFlowGraph > 0 && metric .Fgm .MinFlowGraphTt < minTs {
426
- minTs = metric .Fgm .MinFlowGraphTt
427
- }
428
- }
429
- for _ , metric := range q .dataNodeMetrics {
430
- if metric .Fgm .NumFlowGraph > 0 && metric .Fgm .MinFlowGraphTt < minTs {
431
- minTs = metric .Fgm .MinFlowGraphTt
432
- }
433
- }
434
- ts , err := q .tsoAllocator .GenerateTSO (1 )
435
- if err != nil {
436
- return 0 , err
437
- }
438
- if minTs >= ts {
439
- return 1 , nil
440
- }
441
- t1 , _ := tsoutil .ParseTS (minTs )
442
- t2 , _ := tsoutil .ParseTS (ts )
443
- delay := t2 .Sub (t1 )
444
- log .Debug ("QuotaCenter check timeTick delay" , zap .Time ("minTs" , t1 ), zap .Time ("curTs" , t2 ), zap .Duration ("delay" , delay ))
445
- if delay .Nanoseconds () >= maxTt .Nanoseconds () {
446
- return 0 , nil
449
+ log .Debug ("QuotaCenter check timeTick delay" , zap .Time ("curTs" , t1 ), zap .Duration ("maxDelay" , maxDelay ))
450
+ if maxDelay .Nanoseconds () >= maxTt .Nanoseconds () {
451
+ return 0
447
452
}
448
- return float64 (maxTt .Nanoseconds ()- delay .Nanoseconds ()) / float64 (maxTt .Nanoseconds ()), nil
453
+ return float64 (maxTt .Nanoseconds ()- maxDelay .Nanoseconds ()) / float64 (maxTt .Nanoseconds ())
449
454
}
450
455
451
456
// checkNQInQuery checks search&query nq in QueryNode,
0 commit comments