Skip to content

Commit

Permalink
fix: delegator doesn't follow with wal if streaming enabled (#39890)
Browse files Browse the repository at this point in the history
issue: #38399

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh authored Feb 17, 2025
1 parent 38cfd38 commit 64dad60
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
10 changes: 9 additions & 1 deletion internal/querycoordv2/balance/score_based_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,14 @@ func (b *ScoreBasedBalancer) assignChannel(br *balanceReport, collectionID int64
for _, ch := range channels {
func(ch *meta.DmChannel) {
var targetNode *nodeItem
forceAssignChannel := forceAssign
if streamingutil.IsStreamingServiceEnabled() {
// When streaming service is enabled, we need to assign channel to the node where WAL is located.
nodeID := snmanager.StaticStreamingNodeManager.GetWALLocated(ch.GetChannelName())
if item, ok := nodeItemsMap[nodeID]; ok {
targetNode = item
// assgin channel to the node where WAL is located always has enough benefits.
forceAssignChannel = true
}
}
// for each channel, pick the node with the least score
Expand All @@ -196,10 +199,15 @@ func (b *ScoreBasedBalancer) assignChannel(br *balanceReport, collectionID int64
scoreChanges := b.calculateChannelScore(ch, collectionID)

sourceNode := nodeItemsMap[ch.Node]
if sourceNode != nil && sourceNode.nodeID == targetNode.nodeID {
// if the channel is already on the target node, skip assignment operation.
return
}

// if segment's node exist, which means this segment comes from balancer. we should consider the benefit
// if the segment reassignment doesn't got enough benefit, we should skip this reassignment
// notice: we should skip benefit check for forceAssign
if !forceAssign && sourceNode != nil && !b.hasEnoughBenefit(sourceNode, targetNode, scoreChanges) {
if !forceAssignChannel && sourceNode != nil && !b.hasEnoughBenefit(sourceNode, targetNode, scoreChanges) {
br.AddRecord(StrRecordf("skip generate balance plan for channel %s since no enough benefit", ch.GetChannelName()))
return
}
Expand Down
7 changes: 6 additions & 1 deletion internal/querycoordv2/checkers/leader_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/proto/datapb"
Expand Down Expand Up @@ -92,7 +93,11 @@ func (c *LeaderChecker) Check(ctx context.Context) []task.Task {

replicas := c.meta.ReplicaManager.GetByCollection(ctx, collectionID)
for _, replica := range replicas {
for _, node := range replica.GetRWNodes() {
nodes := replica.GetRWNodes()
if streamingutil.IsStreamingServiceEnabled() {
nodes = replica.GetRWSQNodes()
}
for _, node := range nodes {
leaderViews := c.dist.LeaderViewManager.GetByFilter(meta.WithCollectionID2LeaderView(replica.GetCollectionID()), meta.WithNodeID2LeaderView(node))
for _, leaderView := range leaderViews {
dist := c.dist.SegmentDistManager.GetByFilter(meta.WithChannel(leaderView.Channel), meta.WithReplica(replica))
Expand Down

0 comments on commit 64dad60

Please sign in to comment.